12.3 从一个线程中给另一个线程发送通知
问题
你正在使用一个模式,在这个模式中一个线程(或者一组线程)处理某些事情并且这个线程需要让别的线程(或别的线程组)知道它。你可以有一个负责分发工作给别的辅线程的主线程,或者你可能使用一组线程来向一个队列中追加数据,并且另一个线程从这个队列中取数据然后做一些有用的事情。
解决方案
使用定义在boost/thread/mutex.hpp和boost/thread/condition.hpp文件中的mutex和condition对象。当你需要线程等待某个事物时你就可以创建一个condition对象,然后通过这个condition对象来通知那些等待的线程。示例12-4说明了如何在一个主/辅线程模型中发送信号。
示例12-4 在线程之间发送信号
#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/thread/condition.hpp>
#include <boost/thread/mutex.hpp>
#include <list>
#include <string>
class Request { /*...*/ };
// A simple job queue class; don't do this, use std::queue
template<typename T>
class JobQueue {
public:
JobQueue() {}
~JobQueue() {}
void submitJob(const T& x) {
boost::mutex::scoped_lock lock(mutex_);
list_.push_back(x);
workToBeDone_.notify_one();
}
T getJob() {
boost::mutex::scoped_lock lock(mutex_);
workToBeDone_.wait(lock); // Wait until this condition is
// satisfied, then lock the mutex
T tmp = list_.front();
list_.pop_front();
return(tmp);
}
private:
std::list<T> list_;
boost::mutex mutex_;
boost::condition workToBeDone_;
};
JobQueue<Request> myJobQueue;
void boss ) {
for (;;) {
// Get the request from somewhere
Request req;
myJobQueue.submitJob(req);
}
}
void worker() {
for (;;) {
Request r(myJobQueue.getJob());
// Do something with the job...
}
}
int main() {
boost::thread thr1(boss);
boost::thread thr2(worker);
boost::thread thr3(worker);
thr1.join();
thr2.join();
thr3.join();
}
讨论
一个condition对象使用mutex对象,并且让你等待这个mutex对象不被锁定这种情况。考虑示例12-4,它是示例12-2中说明的Queue类的一个改版。我把这个Queue类修改成某个特殊形式的队列类,叫做JobQueue类,在这个类中,对象代表由主线程提交的工作并且这个工作由辅线程获取。
这个JobQueue类的最重要的变化就是这个condition类型的成员变量workToBeDone_。这个condition对象表明在这个队列中是否有工作存在。当一个线程想从这个工作队列中获取一个元素时,它就需要调用getJob方法,这个方法需要获取这个mutex互斥对象上的一个锁,然后等待取到这个锁,如下所示:
boost::mutex::scoped_lock lock(mutex_);
workToBeDone_.wait(lock);
第一行通过一种平常的方法来锁定这个mutex互斥对象。第二行代码解开这个mutex上的锁然后进行等待或者休眠,直到它的条件得到了满足。这个mutex互斥对象的解锁让其他的线程能够使用这个mutex对象,它们中的某个需要设置这个等待的条件,否则,其他的线程将不能锁定这个正在被某个线程等待的mutex互斥对象。
在这个submitJob方法中,当这个工作加入到内部的链表中时,我加上了如下这一行代码:
workToBeDone_.notify_one();
这就满足了getJob正在等待的条件。从技术上来说,这就意味着如果有任何在这个condition对象上调用wait的线程的话,这些线程中的某一个就会进入到运行run状态。在getJob中,也就意味着线程将继续从以下这行代码执行:
workToBeDone_.wait(lock);
但不仅仅是这样,wait方法做两件事:它一直等待直到有人在它正等待的condition上调用了notify_one或notify_all,然后它就锁定跟它相关的mutex。当submitJob方法调用notify_all时事实上发生的是正在等待的线程进入运行状态,并且接着做的就是尽量去锁定这个由submitJob方法已经锁定的mutex对象,因此它就仍进入wait等待状态直到submitJob方法完成。因此,condition::wait方法需要当mutex被调用时锁定它,在mutex对象被解锁时,如果碰到一个condition它又会被锁定。
调用notify_all时将通知那些所有正在等待某个条件变成真的线程。它和notify_one的工作方式相同,只不过那些所有正处在等待这个条件的线程都变成运行状态之外。尽管它们都尽量去获取下一个锁,接下来会发生什么依赖于这个mutex的类型和使用的锁的类型。
一个condition能使你达到一个微妙的效果,也就是当你使用mutex并单独锁定它们时,你并不能取得它。考虑一下前面的那个Queue类。正在等待双端队列的线程一直等待直到它获得一个写锁,然后从这个队列中弹出下一个元素。不使用任何类型的信号机制也许这个类能工作得很好,但真的是这样吗?当这个队列是空的时候呢?如果你正在等待某个条件变成真的话也许你有很多种选择来实现这个双端队列:获取一个锁;查看这个队列中是否有元素,如果没有,就返回;使用另一个mutex,当这个队列为空时就锁定这个mutex,而当这个队列有元素时就给这个mutex解锁(这不是一个好的想法);或者当这个队列为空时就返回一个特殊的值。这几种方法要么有问题要么就效率不高。如果这个队列为空时通过抛出一个异常或返回一个特殊的值而简单地返回,那么你的客户将不得不不断地检查是否有东西到达。这在资源上是一个不必要的消耗。
一个condition对象能让消费者线程休眠,因此在还没有碰到一个condition时处理器就可以去处理别的事情。例如一个web服务器使用一个工作线程池来处理进来的请求。当没有需求进来时让这些子线程处于等待状态比让它们循环地查询或睡眠然后偶尔唤醒来检查这个队列要好得多。






