首页 新闻 论坛 群组 Blog 文档 下载 读书 Tag 网摘 搜索 开源 FAQ 第二书店 博文视点 程序员
频道: 研发 数据库 中间件 信息化 视频 .NET Java 游戏 移动 服务: 人才 外包 培训
    图书品种:235680
       
热门搜索: ASP.NET Ajax Spring Hibernate Java

12.2 使资源是线程安全的

问题

当一个程序使用了多个线程时,如何保证在同一时刻多个线程不会修改同一个资源?总的来说,这个过程被称为资源线程安全,或者是串行化访问资源。

解决方案

使用定义在boost/thread/mutex.hpp中的mutex类,从而使得线程对资源的访问串行化。示例12-2给出了一个简单地使用mutex对象来控制对一个队列queue的并发访问。

示例12-2  使一个类是线程安全的

#include <iostream>

#include <boost/thread/thread.hpp>

#include <string>

// A simple queue class; don't do this, use std::queue

template<typename T>

class Queue {

public:

   Queue() {}

  ~Queue() {}

   void enqueue(const T& x) {

      // Lock the mutex for this queue

      boost::mutex::scoped_lock lock(mutex_);

      list_.push_back(x);

      // A scoped_lock is automatically destroyed (and thus unlocked)

      // when it goes out of scope

   }

   T dequeue() {

     boost::mutex::scoped_lock lock(mutex_);

     if (list_.empty())

        throw "empty!";     // This leaves the current scope, so the

     T tmp = list_.front(); // lock is released

     list_.pop_front();

     return(tmp);

   } // Again: when scope ends, mutex_ is unlocked

private:

   std::list<T> list_;

   boost::mutex mutex_;

};

Queue<std::string> queueOfStrings;

void sendSomething() {

   std::string s;

   for (int i = 0; i < 10; ++i) {

       queueOfStrings.enqueue("Cyrus");

   }

}

void recvSomething() {

   std::string s;

   for (int i = 0; i < 10; ++i) {

      try {s = queueOfStrings.dequeue();}

      catch(...) {}

   }

}

int main() {

   boost::thread thr1(sendSomething);

   boost::thread thr2(recvSomething);

   thr1.join();

   thr2.join();

}

讨论

使类、函数、代码块或者其他的对象线程安全是多线程程序设计的关键。如果你正在设计的软件是多线程的,那么每一个线程就都可能拥有它自己的一序列的资源,如栈和堆对象、操作系统资源等。但是不久你就会发现需要在线程之间共享某些资源。它可能是那些进来的工作需求(如在一个多线程web服务器上)的一个共享的队列或者是简单的一个输出流(如一个日志文件或者一个事件输出流中)。协调这个资源的安全访问的标准方法就是使用一个互斥体mutex,它提供了对某个事物的排他性访问。

这个讨论的其他部分将描述总体上一个mutex是什么,并且为了串行化对一个资源的访问该如何使用boost::mutex。我使用前面讨论的概念/模型方法来介绍它。一个概念就是某个事物描述的抽象(不依赖于语言的),而一个概念的模型就是C++中类形式的一个具体代表。一个概念的精炼就是一个带有某些附加或扩张行为的概念。

尽管并发编程是一个复杂的主题,但是还是有很多的技术能适用于某个简单的应用中。同时又有很多不同的设计模式可以使用,并且在不同的应用中又有不同的策略可以使用。如果你需要设计一个较大的多线程程序,或者你的应用程序对性能的要求很高,你就应该多看看有关多线程设计模式的一些好的书籍。那些使得多线程的调试变得很困难的大部分问题都能通过仔细的单调一点的设计避免掉。

使用mutex

mutex的概念是简单的:一个mutex是一个代表一个资源的事物并且能够一次被一个线程锁定和解锁。它是一个用来协调由多个消费者使用的资源访问的标志。在Boost线程库中,这个mutex概念由boost::mutex类来实现。在示例12-2中,通过使用一个mutex的成员变量来保证对Queue类的写访问:

boost::mutex mutex_;

mutex_必须被任意的成员函数锁定,这些任意的成员函数必须改变这些被维护元素的队列的状态。这个mutex对象本身并不知道它代表的是什么。它仅仅是那些被多个消费者使用的资源的锁定/解锁的标志。

在示例12-2中,当一个Queue的成员函数需要改变这个对象的状态时,它必须首先锁定这个mutex_。在某个时刻,只有一个线程可以锁定它,这就阻止了同一时刻有多个对象可以改变这个Queue对象的状态。因此,一个mutex就是一个简单的信号机制,但它和一个简单的布尔型或整型数不同,因为一个mutex需要串行化访问,这只能由操作系统的内核来保证。如果你使用一个布尔型变量来这样做,它将达不到目的,因为没有什么东西可以阻止多个线程在同一时刻改变这个布尔型变量的状态(不同的操作系统有不同的实现,这也就是不容易实现一个可移植的线程库的原因)。

可以使用不同的锁策略来给mutex对象加锁和解锁,最简单的方法就是使用scoped_

lock。一个scoped_lock是一个类,使用一个mutex参数来构造它,并且一直锁定它直到这个锁被销毁。请看示例12-2中的enqueue成员函数中scoped_lock是如何工作的:

void enqueue(const T& x) {

   boost::mutex::scoped_lock lock(mutex_);

   list_.push_back(x);

} // unlocked!

当这个锁被销毁时,mutex_就解锁。如果这个正在被构造锁的mutex已经被别的线程锁定的话,当前线程就会进入wait等待状态直到这个锁被解开。

这个设计第一眼看上去你会觉得奇怪,为什么不在mutex上设置一个lock和unlock方法呢?事实上使用scoped_lock类的方法在构造时就加锁而在析构时就解锁更为方便并且更不容易出错。当你使用scoped_lock方法创建一个锁时,它在它的整个生命周期都锁定这个对象,这就意味着你没必要在每一个控制路径上显式地解锁。另一方面,如果你需要不得不显式地锁定mutex,你就不得不捕获你的函数抛出的任意异常(或者在调用栈的函数中的任意地方)并且解锁这个mutex。而使用scoped_lock,如果有异常抛出或函数返回的话,这个scoped_lock对象都会被销毁并且这个mutex会被解锁。

使用一个mutex能使工作完成,但是还是有点美中不足。它没有区分读和写,而这是很重要的,因为线程如果只是做只读操作的话,将会强制线程串行化访问,这样效率是很低的,而且这种读操作不需要排他性访问。基于这个原因,Boost线程库提供了read_

write_mutex。示例12-3 说明如何用一个让调用者在不弹出这个queue中的第一个元素时就获取这个元素的副本的成员函数front,并让这个函数与read_write_mutex一起使用实现示例12-2中的功能:

示例12-3  使用读/写mutex

#include <iostream>

#include <boost/thread/thread.hpp>

#include <boost/thread/read_write_mutex.hpp>

#include <string>

template<typename T>

class Queue {

public:

   Queue() :  // Use a read/write mutex and give writers priority

      rwMutex_(boost::read_write_scheduling_policy::writer_priority) {}

  ~Queue() {}

   void enqueue(const T& x) {

      // Use a r/w lock since enqueue updates the state

      boost::read_write_mutex::scoped_write_lock writeLock(rwMutex_);

      list_.push_back(x);

   }

   T dequeue() {

      // Again, use a write lock

      boost::read_write_mutex::scoped_write_lock writeLock(rwMutex_);

      if (list_.empty())

         throw "empty!";

      T tmp = list_.front();

      list_.pop_front();

      return(tmp);

   }

   T getFront() {

      // This is a read-only operation, so you only need a read lock

      boost::read_write_mutex::scoped_read_lock readLock(rwMutex_);

      if (list_.empty())

         throw "empty!";

      return(list_.front());

   }

private:

   std::list<T> list_;

   boost::read_write_mutex rwMutex_;

};

Queue<std::string> queueOfStrings;

void sendSomething() {

   std::string s;

   for (int i = 0; i < 10; ++i) {

      queueOfStrings.enqueue("Cyrus");

   }

}

void checkTheFront() {

   std::string s;

   for (int i = 0; i < 10; ++i) {

      try {s = queueOfStrings.getFront();}

      catch(...) {}

   }

}

int main() {

   boost::thread thr1(sendSomething);

   boost::thread_group grp;

   grp.create_thread(checkTheFront);

   grp.create_thread(checkTheFront);

   grp.create_thread(checkTheFront);

   grp.create_thread(checkTheFront);

   thr1.join();

   grp.join_all();

}

这里有很多我应该指出的。请注意我这里使用了read_write_mutex,如下所示:

boost::read_write_mutex rwMutex_;

当你使用读/写mutex时这个锁是不同的。在示例12-3中,当我为了写而把这个Queue锁定时,我创建了一个scoped_write_lock:

boost::read_write_mutex::scoped_write_lock writeLock(rwMutex_);

并且当我仅需要读这个Queue时,我使用了一个scoped_read_lock:

boost::read_write_mutex::scoped_read_lock readLock(rwMutex_);

读/写锁都是便利的,但是它们不能阻止你自己绊着自己。当你仅使用一个读锁时,为了保证你没有改变资源,它们没有对rwMutex_代表的资源在编译时进行检查。由于编译器不会检查,所以当你有一个写锁时你需要采取额外的工作来保证一个线程仅修改了这个对象的状态。

精确地说这些锁的调度依赖于当你构造这个mutex时选定的调度策略。Boost线程库中提供了四种策略:

reader_priority

等待读锁的线程将优先于那些等待写锁的线程得到锁。

writer_priority

等待写锁的线程将优先于那些等待读锁的线程得到锁。

alternating_single_read

在读锁和写锁之间交替。当某个时刻轮到了某个读操作的顺序时就允许这个简单读操作一个读锁。这个策略总的来说还是写优先的。例如,如果一个mutex是写锁定,并且同时有好几个挂起来的读锁和一个挂起来的写锁,那么一个读锁将会被允许,然后是一个写锁被允许,然后是所有的读锁被允许。在这段时间内假定没有新的锁请求。

alternating_many_reads

在读锁和写锁之间交替。当某个读锁轮到了它的时刻时,允许所有的读锁。换句话说,这个策略将在两个写锁之间使得所有的在这个queue上挂起来的读锁都被允许。

这些策略都有各自的优点和缺点,并且它们的执行都依赖于你的应用程序。决定使用哪种策略需要仔细考虑,因为简单地使用读锁或写锁都可能会导致某些锁不能成功也即是饿死的出现,下面我将要详细地描述。

危险

当你设计一个多线程程序时,通常有三个基本问题:死锁、饿死和竞态条件。有很多技术可以用来避免它们的出现,这些技术的复杂性是不同的,在本节中就不准备讨论它们的复杂性。我将详细地描述每一个这种问题,这样你就能知道该如何提防它们,但是如果你打算开发多线程程序的话,你还是应首先看看多线程设计模式。

死锁是涉及到最少两个线程和两个资源的某种情况。假设两个线程A和B,两个资源X和Y,线程A锁定了资源X,而线程B锁定资源Y。当A又想锁定资源Y而线程B又想锁定资源X时,死锁就出现了。如果线程没有设计打破一定的死锁的话,那么它们将永远等待下去。

Boost线程库通过对mutex和锁概念的精炼让你能避免死锁。一个试探性的mutex支持对锁的试探性操作,使用一个试探性的锁可能成功也可能失败,但不会被阻塞住并一直等到资源被锁住。try_mutex和scoped_try_lock就是这个概念的模型形式,你的代码在你需要的资源被锁住时不会阻塞住并且可以从这个锁出来去做一些别的操作。还有另一种关于这个试探性锁的概念的精炼就是时间锁。使用一个时间锁,当阻塞的时间到达某个特定的数时线程就会放弃。这里关于时间锁的详细技术就不讨论了,可以参考Boost线程库文档。

例如,示例12-2中的Queue类,你想使用一个试探性的mutex,因此dequeue返回一个布尔型值来表明是否成功地获取了第一个元素。这样dequeue方法的使用就不必非得一直等待直到这个queue被锁定。下面就是实现这个dequeue的例子:

bool dequeue(T& x) {

   boost::try_mutex::scoped_try_lock lock(tryMutex_);

   if (!lock.locked())

      return(false);

   else {

      if (list_.empty())

         throw "empty!";

      x = list_.front();

      list_.pop_front();

      return(true);

   }

}

private:

boost::try_mutex tryMutex_;

// ...

使用的mutex和锁与示例12-2中的都不同。请正确地保证你正在使用的mutex和锁的类的名字,否则,你将不能得到你期望的行为。

当串行化访问某个事物时,你就告诉用户排队并且等待他自己的顺序。如果每个人都站在队中相同的位置,那么每个人都会得到机会使用这个资源。但如果你让别人插队的话,就有可能使得队列后面的人永远也得不到机会使用资源。这就是饿死。

当使用一个mutex时,用户是在一个组而不是在一个队列中等待。这些等待锁的线程先后顺序是没有保证的。对读/写mutex来说,Boost线程库使用了前面介绍的四种调度策略。因此,当使用读/写mutex时,必须意识到这些不同的调度策略是什么意思,并且需要明白线程正在做什么。如果你正在使用writer_priority策略,并且你有很多创建写锁的线程,那么你的读锁的线程就可能饿死;相同的情况reader_priority也可能发生,因为这些调度策略总是优先于某种类型的锁的。通过测试,如果你意识到了某种类型的线程不能达到它应该有的程度的话,你就应该考虑使用alternating_many_reads或alternating_single_read策略。当你构造一个读/写的mutex时你就指定了策略。

最后,一个竞态条件就是这种情况:你的代码对锁的顺序或锁的原子性做出的某种假设是错误的。例如,考虑Queue类的一个消费者在这个queue的头部查询并有条件地调用dequeue:

if (q.getFront() == "Cyrus") {

   str = q.dequeue();

   // ...

这些代码在单线程的环境中工作得很好,因为q在第一行代码和第二行代码之间不会被修改。然而,当使用多线程时,你就不得不考虑这种情况:别的线程可能在任意时刻修改q,事实上当一个线程没有给共享对象上锁时你就应该假设这个共享对象会被修改。在第一行代码后面,另外一个线程可能进来并且从q中的下一个元素处调用dequeue,这也就意味着第二行代码可能会得到一个不可靠的结果或者什么也得不到。为了修改q,getFront和dequeue都会锁定这个简单的mutex,但在它们解锁之前,如果有另外一个线程正等待着这个锁,那么这个线程可能在第二行代码运行前就把锁获取过去了。

对这种竞态条件的一个解决方案就是保证在某个操作的全过程中都持有锁。创建一个名为dequeueIfEquals的成员函数,它仅仅在等于它的输入参数的那个元素上调用dequeues。dequeueIfEquals可以使用锁:

T dequeueIfEquals(const T& t) {

   boost::mutex::scoped_lock lock(mutex_);

   if (list_.front() == t)

      // ...

还有其他类型的竞态条件,但这个例子给你指出了一个该如何提防它的总体意见。随着你使用的线程和共享资源的增加,这些竞态条件将变得更为敏感和难以捕获。因此,你应该采取一些特殊考虑的设计来阻止它的发生。

多线程开发中保证资源的串行化访问是一个最难的课题,因为当你没有正确的处理时,调试是非常困难的。由于多线程程序天生就具有不确定性(因为线程的执行具有不同的顺序并且每次程序的执行都具有不同的时间),因此要精确地查明一个线程在哪里或者是如何修改了它不应该修改的事物是非常痛苦的。将会比单线程编程花费更多的工作,一个好的设计就是使得调试和重复工作最少。

查看所有评论(0)条】

最近评论



正在载入评论列表...
热点评论