事件循环(有时称为消息循环)是等待并调度传入事件的线程。线程阻塞等待请求的到达,然后将事件分派给事件处理程序函数。循环通常使用消息队列来保存传入消息。依次对每个消息进行出队,解码,然后执行操作。事件循环是实现进程间通信的一种方法。
所有操作系统都支持多线程应用程序。每个操作系统都有用于创建线程,消息队列和计时器的唯一函数调用。随着C ++ 11线程支持库的出现,现在可以创建可移植的代码并避免特定于OS的函数调用。本文提供了一个简单的示例,说明如何仅依靠C ++标准库来创建线程事件循环,消息队列和计时器服务。任何支持线程库的C ++ 11编译器都应该能够编译附加的源代码。
背景
通常,我需要一个线程来充当事件循环。线程将入站消息出队,并根据唯一的消息标识符将数据调度到适当的函数处理程序。能够调用功能的计时器支持对于低速轮询很方便,如果在预期的时间内没有发生任何事情,则可以生成超时。很多时候,辅助线程是在启动时创建的,直到应用程序终止后才被销毁。
该实现的关键要求是,传入消息必须在同一线程实例上执行。尽管std::async
可以使用池中的临时线程,但是此类确保所有传入消息使用同一线程。例如,可以使用不是线程安全的代码来实现子系统。单个WorkerThread
实例用于安全地将函数调用分派到子系统中。
乍一看,C ++线程支持似乎缺少一些关键功能。是的,std::thread
可以拆分一个线程,但是没有线程安全队列,也没有计时器-大多数OS都提供的服务。我将展示如何使用C ++标准库创建这些“缺失”功能,并提供许多程序员熟悉的事件处理循环。
工作线程
本WorkerThread
类封装了所有必要的事件循环机制。一个简单的类接口允许线程创建,将消息发布到事件循环以及最终的线程终止。界面如下图所示:

class WorkerThread
{
public:
/// Constructor
WorkerThread(const char* threadName);
/// Destructor
~WorkerThread();
/// Called once to create the worker thread
/// @return True if thread is created. False otherwise.
bool CreateThread();
/// Called once a program exit to exit the worker thread
void ExitThread();
/// Get the ID of this thread instance
/// @return The worker thread ID
std::thread::id GetThreadId();
/// Get the ID of the currently executing thread
/// @return The current thread ID
static std::thread::id GetCurrentThreadId();
/// Add a message to the thread queue
/// @param[in] data - thread specific message information
void PostMsg(std::shared_ptr<UserData> msg);
private:
WorkerThread(const WorkerThread&) = delete;
WorkerThread& operator=(const WorkerThread&) = delete;
/// Entry point for the worker thread
void Process();
/// Entry point for timer thread
void TimerThread();
std::unique_ptr<std::thread> m_thread;
std::queue<std::shared_ptr<ThreadMsg>> m_queue;
std::mutex m_mutex;
std::condition_variable m_cv;
std::atomic<bool> m_timerExit;
const char* THREAD_NAME;
};
首先要注意的是,std::thread
它用于创建主工作线程。主要工作线程功能是Process()
。
bool WorkerThread::CreateThread()
{
if (!m_thread)
m_thread = new thread(&WorkerThread::Process, this);
return true;
}
事件循环
该Process()
事件循环如下所示。该线程依赖于a std::queue<ThreadMsg*>
消息队列。std::queue
不是线程安全的,因此对队列的所有访问都必须由互斥保护。A std::condition_variable
用于暂停线程,直到收到新消息已添加到队列的通知。

void WorkerThread::Process()
{
m_timerExit = false;
std::thread timerThread(&WorkerThread::TimerThread, this);
while (1)
{
std::shared_ptr<ThreadMsg> msg;
{
// Wait for a message to be added to the queue
std::unique_lock<std::mutex> lk(m_mutex);
while (m_queue.empty())
m_cv.wait(lk);
if (m_queue.empty())
continue;
msg = m_queue.front();
m_queue.pop();
}
switch (msg->id)
{
case MSG_POST_USER_DATA:
{
ASSERT_TRUE(msg->msg != NULL);
auto userData = std::static_pointer_cast<UserData>(msg->msg);
cout << userData->msg.c_str() << " " << userData->year << " on " << THREAD_NAME << endl;
break;
}
case MSG_TIMER:
cout << "Timer expired on " << THREAD_NAME << endl;
break;
case MSG_EXIT_THREAD:
{
m_timerExit = true;
timerThread.join();
return;
}
default:
ASSERT();
}
}
}
PostMsg()
ThreadMsg
在堆上创建一个新对象,将该消息添加到队列中,然后使用条件变量通知工作线程。
void WorkerThread::PostMsg(std::shared_ptr<UserData> data)
{
ASSERT_TRUE(m_thread);
// Create a new ThreadMsg
std::shared_ptr<ThreadMsg> threadMsg(new ThreadMsg(MSG_POST_USER_DATA, data));
// Add user data msg to queue and notify worker thread
std::unique_lock<std::mutex> lk(m_mutex);
m_queue.push(threadMsg);
m_cv.notify_one();
}
循环将继续处理消息,直到MSG_EXIT_THREAD
收到并退出线程为止。
void WorkerThread::ExitThread()
{
if (!m_thread)
return;
// Create a new ThreadMsg
std::shared_ptr<ThreadMsg> threadMsg(new ThreadMsg(MSG_EXIT_THREAD, 0));
// Put exit thread message into the queue
{
lock_guard<mutex> lock(m_mutex);
m_queue.push(threadMsg);
m_cv.notify_one();
}
m_thread->join();
m_thread = nullptr;
}
事件循环(Win32)
下面的代码片段将std::thread
上面的事件循环与使用Windows API的类似Win32版本进行了对比。注意GetMessage()
API用于代替std::queue
。使用将消息发布到OS消息队列PostThreadMessage()
。最后,timerSetEvent()
用于将WM_USER_TIMER
消息放入队列。所有这些服务均由OS提供。std::thread WorkerThread
此处介绍的实现避免了原始OS调用,但实现功能与Win32版本相同,而仅依赖于C ++标准库。

unsigned long WorkerThread::Process(void* parameter)
{
MSG msg;
BOOL bRet;
// Start periodic timer
MMRESULT timerId = timeSetEvent(250, 10, &WorkerThread::TimerExpired,
reinterpret_cast<DWORD>(this), TIME_PERIODIC);
while ((bRet = GetMessage(&msg, NULL, WM_USER_BEGIN, WM_USER_END)) != 0)
{
switch (msg.message)
{
case WM_DISPATCH_DELEGATE:
{
ASSERT_TRUE(msg.wParam != NULL);
// Convert the ThreadMsg void* data back to a UserData*
const UserData* userData = static_cast<const UserData*>(msg.wParam);
cout << userData->msg.c_str() << " " << userData->year << " on " << THREAD_NAME << endl;
// Delete dynamic data passed through message queue
delete userData;
break;
}
case WM_USER_TIMER:
cout << "Timer expired on " << THREAD_NAME << endl;
break;
case WM_EXIT_THREAD:
timeKillEvent(timerId);
return 0;
default:
ASSERT();
}
}
return 0;
}
计时器
使用辅助专用线程将低分辨率定期计时器消息插入队列。计时器线程在内部创建Process()
。
void WorkerThread::Process()
{
m_timerExit = false;
std::thread timerThread(&WorkerThread::TimerThread, this);
...
计时器线程的唯一责任是MSG_TIMER
每250ms 插入一条消息。在此实现中,无法防止计时器线程将多个计时器消息注入到队列中。如果工作线程落后并且无法足够快地服务于消息队列,则可能发生这种情况。根据工作线程,处理负载以及计时器消息的插入速度,可以采用其他逻辑来防止泛滥队列。
void WorkerThread::TimerThread()
{
while (!m_timerExit)
{
// Sleep for 250mS then put a MSG_TIMER into the message queue
std::this_thread::sleep_for(250ms);
std::shared_ptr<ThreadMsg> threadMsg (new ThreadMsg(MSG_TIMER, 0));
// Add timer msg to queue and notify worker thread
std::unique_lock<std::mutex> lk(m_mutex);
m_queue.push(threadMsg);
m_cv.notify_one();
}
}
用法
main()
下面的函数显示了如何使用WorkerThread
该类。创建两个工作线程,并将消息发布到每个工作线程。短暂延迟后,两个线程均退出。

// Worker thread instances
WorkerThread workerThread1("WorkerThread1");
WorkerThread workerThread2("WorkerThread2");
int main(void)
{
// Create worker threads
workerThread1.CreateThread();
workerThread2.CreateThread();
// Create message to send to worker thread 1
std::shared_ptr<UserData> userData1(new UserData());
userData1->msg = "Hello world";
userData1->year = 2017;
// Post the message to worker thread 1
workerThread1.PostMsg(userData1);
// Create message to send to worker thread 2
std::shared_ptr<UserData> userData2(new UserData());
userData2->msg = "Goodbye world";
userData2->year = 2017;
// Post the message to worker thread 2
workerThread2.PostMsg(userData2);
// Give time for messages processing on worker threads
this_thread::sleep_for(1s);
workerThread1.ExitThread();
workerThread2.ExitThread();
return 0;
}
结论
C ++线程支持库提供了独立于平台的方式来编写多线程应用程序代码,而无需依赖于特定于操作系统的API。WorkerThread
这里介绍的类是事件循环的基本实现,但所有基础知识都已准备就绪,可以进行扩展。