首页 新闻 论坛 群组 Blog 文档 下载 读书 Tag 网摘 搜索 开源 FAQ 第二书店 博文视点 程序员
频道: 研发 数据库 中间件 信息化 视频 .NET Java 游戏 移动 服务: 人才 外包 培训
    图书品种:235680
       
热门搜索: ASP.NET Ajax Spring Hibernate Java

13.2  同步线程

为什么需要同步?这是因为操作“交替”将导致通过阅读各个线程的代码难以看出访问变量和其他实体的方式。访问同一个变量的多个线程可能以不可预见的方式交互,导致难以调试。

以下面的代码为例:

x = 0

t1 = Thread.new do

  1.upto(1000) do

    x = x + 1

  end

end

t2 = Thread.new do

  1.upto(1000) do

    x = x + 1

  end

end

t1.join

t2.join

puts x

变量x的初始值为0,每个线程将其递增1000次。从逻辑上判断,输出x时其值应为2000。

但实际结果可能不符合这种逻辑。在某个特定的系统中,其输出结果为1044,这是为什么呢?

这里的代码假设整数递增是项原子(即不可分割)运算,但实际并非如此。请看下述代码示例的逻辑流程:将线程t1放在左边,t2放在右边,每个时隙占一行,假设开始该逻辑时,x的值为123。

t1                                 t2

__________________________    __________________________

Retrieve value of x (123)

                                   Retrieve value of x (123)

Add one to value (124)

                                   Add one to value (124)

Store result back in x

                                   Store result back in x

显然,每个线程都从自己的角度执行一次递增运算,但在这个例子中,两个线程都对x执行递增运算后,x的值只有124。

这只是最简单的同步问题。最糟糕的问题非常难于管理,值得计算机科学家和数学家研究。

13.2.1  使用临界区执行简单同步

最简单的同步形式是使用临界区(critical section)。当线程进入代码的临界区时,这种技术确保所有其他线程都不运行,直到该线程离开其临界区。

将存取器Thread.critical设置为true可禁止其他线程被调度。在下面的代码中,修改了前一个例子,使用critical存取器来定义临界区并保护代码的敏感部分。

x = 0

t1 = Thread.new do

  1.upto(1000) do

    Thread.critical = true

    x = x + 1

    Thread.critical = false

  end

end

t2 = Thread.new do

  1.upto(1000) do

    Thread.critical = true

    x = x + 1

    Thread.critical = false

  end

end

t1.join

t2.join

puts x

现在逻辑流程不同了,请参阅下面有关t1和t2如何执行的描述(当然,在对x执行递增运算的代码外,两个线程几乎可以随机地交替操作)。

t1                                 t2

__________________________    __________________________

Retrieve value of x (123)

Add one to value (124)

Store result back in x

                                    Retrieve value of x (124)

                                    Add one to value (125)

                                    Store result back in x

可以组合线程操纵操作,导致在一个线程处于临界区中调用另一个线程。最简单的一种情况是,新创建的线程可立即执行,而不管是否有另一个线程处于临界区。因此,只应在最简单的情况下使用这种技术。

13.2.2  同步对资源的访问(mutex.rb)

下面以一个Web索引应用程序为例。假设要通过网络从多个数据源检索单词,并将它们存储在散列中。单词本身将用做键,值是标识文档及行号的字符串。

这是一个简单的例子,但如果做以下假设将更简单:

·    将远程文档表示为简单字符串;

·    最多只有三个这样的字符串(简单的硬编码数据);

·    使用随机休眠来表示网络访问的易变性。

来看程序清单13.1所示的例子,它没有输出收集的数据,而只是输出找到的单词数(非唯一)。注意,每次检查或修改散列时,都调用hesitate方法休眠一段随机时间。这将导致程序以更不确定和更真实的方式运行。

程序清单13.1  有缺陷的索引示例(使用静态条件)

@list = []

@list[0]="shoes ships\nsealing-wax"

@list[1]="cabbages kings"

@list[2]="quarks\nships\ncabbages"

def hesitate

  sleep rand(0)

end

@hash = {}

def process_list(listnum)

  lnum = 0

  @list[listnum].each do |line|

    words = line.chomp.split

    words.each do |w|

      hesitate

      if @hash[w]

        hesitate

        @hash[w] += ["#{listnum}:#{lnum}"]

      else

        hesitate

        @hash[w] = ["#{listnum}:#{lnum}"]

      end

    end

    lnum += 1

  end

end

t1 = Thread.new(0) {|num| process_list(num) }

t2 = Thread.new(1) {|num| process_list(num) }

t3 = Thread.new(2) {|num| process_list(num) }

t1.join

t2.join

t3.join

count = 0

@hash.values.each {|v| count += v.size }

puts "Total: #{count} words"     # May print 7 or 8!

但存在一个问题。如果读者的系统与作者的测试系统相同,该程序可能输出的数字有两个!在作者的测试中,它输出答案7和8,两者的概率大致相等。当单词和列表更多时,变化可能更大。

下面尝试使用mutex来修复这个问题,mutex控制对共享资源的访问(mutex一词源于mutual exclusion),如程序清单13.2所示。

通过使用Mutex库,可创建和操作mutex。要访问散列前可锁定它,在使用完散列后解除锁定。

程序清单13.2  使用Mutex进行保护的索引示例

require 'thread.rb'

@list = []

@list[0]="shoes ships\nsealing-wax"

@list[1]="cabbages kings"

@list[2]="quarks\nships\ncabbages"

def hesitate

  sleep rand(0)

end

@hash = {}

@mutex = Mutex.new

def process_list(listnum)

  lnum = 0

  @list[listnum].each do |line|

    words = line.chomp.split

    words.each do |w|

      hesitate

      @mutex.lock

        if @hash[w]

          hesitate

          @hash[w] += ["#{listnum}:#{lnum}"]

        else

          hesitate

          @hash[w] = ["#{listnum}:#{lnum}"]

        end

      @mutex.unlock

    end

    lnum += 1

  end

end

t1 = Thread.new(0) {|num| process_list(num) }

t2 = Thread.new(1) {|num| process_list(num) }

t3 = Thread.new(2) {|num| process_list(num) }

t1.join

t2.join

t3.join

count = 0

@hash.values.each {|v| count += v.size }

puts "Total: #{count} words"     # Always prints 8!

需要指出的是,除lock外,Mutex类还有try_lock方法。这个方法与lock类似,只是如果另一个线程已锁定资源,它将立即返回false而不是等待。

require 'thread'

mutex = Mutex.new

t1 = Thread.new { mutex.lock; sleep 30 }

sleep 1

t2 = Thread.new do

  if mutex.try_lock

    puts "Locked it"

  else

    puts "Could not lock"   # Prints immediately

  end

end

sleep 2

如果不想阻断线程,这种功能将很有用。

还有一个synchronize方法,它接受一个代码块作为参数。

mutex = Mutex.new

mutex.synchronize do

  # Whatever code needs to be

  #   protected...

end

还有一个mutex_m库,它定义了一个Mutex_m 模块,该模块可被混合插入到类中(或用于扩展对象)。以这种方式扩展的对象都有mutex方法,因此将对象本身视为mutex。

require 'mutex_m'

class MyClass

  include Mutex_m

  # Now any MyClass object can call

  # lock, unlock, synchronize, ...

  # or external objects can invoke

  # these methods on a MyClass object.

end

13.2.3  使用预定义的同步队列类

线程库thread.rb有两个很有用的类,类Queue是一个支持线程的队列,能够同步对队列末尾的访问,即不同线程可使用同一个队列,而不必担心同步问题。类SizedQueue与类Queue基本相同,但能够限制队列的长度(可包含的元素数)。

类SizedQueue继承了类Queue,因此它们提供的方法几乎相同。SizedQueue还有一个存取器max,用于获取或设置队列的最大长度。

buff = SizedQueue.new(25)

upper1 = buff.max            # 25

# Now raise it...

buff.max = 50

upper2 = buff.max            # 50

程序清单13.3是一个简单的生产者/消费者例子。消费者的平均延迟稍长(通过设置更长的休眠时间),让生产者生成的数据有一定的富余。

程序清单13.3  生产者—消费者问题

require 'thread'

buffer = SizedQueue.new(2)

producer = Thread.new do

  item = 0

  loop do

    sleep rand 0

    puts "Producer makes #{item}"

    buffer.enq item

    item += 1

  end

end

consumer = Thread.new do

  loop do

    sleep (rand 0)+0.9

    item = buffer.deq

    puts "Consumer retrieves #{item}"

    puts "  waiting = #{buffer.num_waiting}"

  end

end

sleep 60   # Run a minute, then die and kill threads

推荐使用enq和deq方法将元素插入队列中以及将其从队列中删除,也可使用push方法将元素插入队列,使用pop或shift方法将元素删除,但显式地使用队列时,这些方法的名称不那么好记。

empty?方法检测队列是否为空,clear方法清空队列,size方法(或其别名length)返回队列中实际的元素数。

# Assume no other threads interfering...

buff = Queue.new

buff.enq "one"

buff.enq "two"

buff.enq "three"

n1 = buff.size            # 3

flag1 = buff.empty?      # false

buff.clear

n2 = buff.size            # 0

flag2 = buff.empty?      # true

num_waiting方法返回等待访问队列的线程数。对于没有长度限制的队列,这是等待删除元素的线程数;对于有长度限制的队列,还包括等待添加元素的线程数。

Queue类的deq方法有一个可选参数 non_block,其默认值为false。如果该参数为true,空队列将导致ThreadError错误,而不是阻断线程。

13.2.4  使用条件变量

条件变量(condition variable)实际上是一个线程队列,它与mutex结合使用为线程同步提供高级控制。

条件变量总是与特定的mutex相关联,用于放弃对mutex的控制,直到满足特定的条件。假设有线程锁定了mutex但不能继续,因为所处的环境不满足条件。该线程可基于条件变量进行休眠,直到条件满足后被唤醒。

当线程基于条件变量进行等待时,mutex将被释放,让其他线程能够访问mutex,理解这一点很重要。当另一个线程执行信号操作(以唤醒等待的线程)时,等待的线程将重新锁定mutex,认识到这一点也很重要。

下面来看一个与经典的“哲学家吃饭”问题类似的例子。假设桌子周围坐了三个小提琴家,他们要轮流表演,但只有两把小提琴和一把弓。显然,仅当同时获得两把小提琴之一和唯一的弓时,小提琴家才能表演。

首先记录小提琴和弓的数量。当表演者需要小提琴或弓时,他必须等待。在下面的代码中,使用mutex进行保护,并分别等待小提琴和弓,它们都与该mutex相关联。如果没有小提琴或弓可用,线程将休眠。它释放mutex,直到其他线程发出资源可用信号将它唤醒。被唤醒后,原来的线程将重新锁定mutex。

程序清单13.4列出了代码。

程序清单13.4  三个小提琴家

require 'thread'

@music  = Mutex.new

@violin = ConditionVariable.new

@bow    = ConditionVariable.new

@violins_free = 2

@bows_free    = 1

def musician(n)

  loop do

    sleep rand(0)

      @music.synchronize do

      @violin.wait(@music) while @violins_free == 0

      @violins_free -= 1

      puts "#{n} has a violin"

      puts "violins #@violins_free, bows #@bows_free"

      @bow.wait(@music) while @bows_free == 0

      @bows_free -= 1

      puts "#{n} has a bow"

      puts "violins #@violins_free, bows #@bows_free"

    end

    sleep rand(0)

    puts "#{n}:  (...playing...)"

    sleep rand(0)

    puts "#{n}: Now I've finished."

    @music.synchronize do

      @violins_free += 1

      @violin.signal if @violins_free == 1

      @bows_free += 1

      @bow.signal if @bows_free == 1

    end

  end

end

threads = []

3.times {|i| threads << Thread.new { musician(i) } }

threads.each {|t| t.join }

作者认为这个解决方案不会发生死锁,虽然这很难证明。但需要指出的是,这种算法不是公平算法,在作者的测试中,第一个表演者的表演次数比其他两个多,而第二个比第三个多。其中的原因和解决方案作为练习留给读者去完成。

13.2.5  使用其他同步技术

另一种同步机制是监控器(monitor),在Ruby中它被实现为monitor.rb库。这种技术比mutex高级,其中最重要的是mutex锁定不能嵌套,而monitor锁定可以。

没有人会这样编写代码:

@mutex = Mutex.new

@mutex.synchronize do

  @mutex.synchronize do

       #...

  end

end

但可能发生这样的情况(或通过递归方法调用),在任何情形下这都将导致死锁。在这种情况下,使用Monitor mixin可避免死锁,这是它的优点之一。

@mutex = Mutex.new

def some_method

  @mutex.synchronize do

    #...

    some_other_method    # Deadlock!

  end

end

def some_other_method

  @mutex.synchronize do

    #...

  end

end

Monitor mixin通常用于扩展对象,然后就可使用new_cond方法来实例化条件变量。

monitor.rb中的类ConditionVariable对线程库中的相应的定义进行了改进,它提供了wait_until和wait_while方法,这两个方法将根据条件阻断线程。该类还支持等待超时,因为wait方法有一个timeout 参数,其单位为秒数(默认值为nil)。

由于线程的例子不多,程序清单13.5使用monitor技术重写了类Queue和SizedQueue。这些代码是Shugo Maeda编写的,征得其同意在这里使用它。

程序清单13.5  使用Monitor实现Queue

# Author:  Shugo Maeda

require 'monitor'

class Queue

  def initialize

    @que = []

    @monitor = Monitor.new

    @empty_cond = @monitor.new_cond

  end

  def enq(obj)

    @monitor.synchronize do

      @que.push(obj)

      @empty_cond.signal

    end

  end

  def deq

    @monitor.synchronize do

      while @que.empty?

        @empty_cond.wait

      end

      return @que.shift

    end

  end

end

class SizedQueue < Queue

  attr :max

  def initialize(max)

    super()

    @max = max

    @full_cond = @monitor.new_cond

  end

  def enq(obj)

    @monitor.synchronize do

      while @que.length >= @max

        @full_cond.wait

      end

      super(obj)

    end

  end

  def deq

    @monitor.synchronize do

      obj = super

      if @que.length < @max

        @full_cond.signal

      end

      return obj

    end

  end

  def max=(max)

    @monitor.synchronize do

      @max = max

      @full_cond.broadcast

    end

  end

end

sync.rb库是执行线程同步的另一种方式(使用带计数器的两阶段锁(two-phase lock with counter))。它定义了一个Sync_m模块,用于include或extend(与Mutex_m极其相似)。该模块向类提供了方法locked?、shared?、exclusive?、lock、unlock和try_lock。

13.2.6  允许操作超时

很多情况都需要指定操作执行的最长时间。这样可避免无限循环以及获得对处理的更大控制权。这样的功能在网络环境和其他情形下很有用,在网络环境中,可能得到远程服务器的响应,也可能得不到。

timeout.rb库是针对这个问题的一种基于线程的解决方案(见程序清单13.6)。timeout方法执行一个调用它时指定的代码块,过了指定的秒数后,它将引发TimeoutError,rescue子句将捕获这种异常。

程序清单13.6  一个超时的例子

require 'timeout.rb'

flag = false

answer = nil

begin

  timeout(5) do

    puts "I want a cookie!"

    answer = gets.chomp

    flag = true

  end

rescue TimeoutError

  flag = false

end

if flag

  if answer == "cookie"

    puts "Thank you! Chomp, chomp, ..."

  else

    puts "That's not a cookie!"

    exit

  end

else

  puts "Hey, too slow!"

  exit

end

puts "Bye now..."

13.2.7  等待事件发生

很多情况可能需要使用一个或多个线程在其他线程执行其工作时监控“外部世界”。虽然这里的所有例子都不符合现实,但它们确实说明了基本原理。

在下面的例子中,三个线程执行应用程序的“工作”,另一个线程每隔5秒钟唤醒一次,它检查全局变量$flag,如果这个标记被设置,它将唤醒另外两个线程。这让三个工作线程不必直接与另外两个线程交互,从而避免多次唤醒它们。

$flag = false

work1 = Thread.new { job1() }

work2 = Thread.new { job2() }

work3 = Thread.new { job3() }

thread4 = Thread.new { Thread.stop; job4() }

thread5 = Thread.new { Thread.stop; job5() }

watcher = Thread.new do

  loop do

    sleep 5

    if $flag

      thread4.wakeup

      thread5.wakeup

      Thread.exit

    end

  end

end

如果在方法job执行期间的任何时刻变量$flag变为true,thread4和thread5将在5秒内开始执行;然后线程watcher终止。

接下来的例子等待文件被创建。每隔30秒检查一次,如果文件被创建,将启动另一个线程,同时,其他线程可执行任何操作。实际上,这里监测的文件有三个。

def waitfor(filename)

  loop do

    if File.exist? filename

      file_processor = Thread.new { process_file(filename) }

      Thread.exit

    else

      sleep 30

    end

  end

end

waiter1 = Thread.new { waitfor("Godot") }

sleep 10

waiter2 = Thread.new { waitfor("Guffman") }

sleep 10

headwaiter = Thread.new { waitfor("head") }

# Main thread goes off to do other things...

在很多情况下,线程可能等待外部事件发生,例如,在网络应用程序中,套接字另一端的服务器可能很慢或不可靠。

13.2.8  在I/O期间继续处理

应用程序经常需要执行一个或多个耗时的I/O操作,尤其是在用户输入时,因为用户的键盘输入比任何磁盘操作都慢。通过使用线程,可将这段时间利用起来。

以必须等待人类下一步棋的象棋程序为例,当然,这里只大概说明这个概念。

假设迭代器predict_move不断生成人类可能走的下一步棋(然后决定程序要走的下一步棋),当人类走棋时,这步棋可能被程序预测到。

scenario = {}    # move-response hash

humans_turn = true

thinking_ahead = Thread.new(board) do

  predict_move do |m|

    scenario[m] = my_response(board,m)

    Thread.exit if humans_turn == false

  end

end

human_move = get_human_move(board)

humans_turn = false   # Stop the thread gracefully

# Now we can access scenario which may contain the

# move the person just made...

当然,真正的象棋程序通常不以这种方式运行。

13.2.9  实现并行迭代器

假设要并行地对多个对象进行迭代,即对于n个对象中每一个,首先需要获得其第一个元素,然后是第二个元素和第三个元素,依次类推。

为使这种概念更具体,请看下面的例子。这里假设compose方法返回所有迭代器的组合,并假设指定的每个对象都有默认迭代器each,迭代时将使用这个迭代器,且每次迭代时每个对象提供一个元素。

arr1 = [1, 2, 3, 4]

arr2 = [5, 10, 15, 20]

compose(arr1, arr2) {|a,b| puts "#{a} and #{b}" }

# Should output:

# 1 and 5

# 2 and 10

# 3 and 15

# 4 and 20

当然,也可使用zip来实现这种功能,但如果希望解决方案更优雅(不实际存储所有的元素),则使用线程是唯一的简单方式。

程序清单13.7是这个解决方案的代码。

程序清单13.7  并行地迭代

def compose(*objects)

  threads = []

  for obj in objects do

    threads << Thread.new(obj) do |myobj|

    me = Thread.current

    me[:queue] = []

    myobj.each {|x| me[:queue].push(x) }

    end

  end

  list = [0]                               # Dummy non-nil value

  while list.nitems > 0 do              # Still some non-nils

    list = []

    for thr in threads

      list << thr[:queue].shift         # Remove one from each

    end

    yield list if list.nitems > 0   # Don't yield all nils

  end

end

x = [1, 2, 3, 4, 5, 6, 7, 8]

y = "  first\n second\n  third\n fourth\n  fifth\n"

z = %w[a b c d e f]

compose(x, y, z) {|a,b,c| p [a, b, c] }

# Output:

#

# [1, "  first\n", "a"]

# [2, " second\n", "b"]

# [3, "  third\n", "c"]

# [4, " fourth\n", "d"]

# [5, "  fifth\n", "e"]

# [6, nil, "f"]

# [7, nil, nil]

# [8, nil, nil]

注意,这里没有假设每个对象要迭代的元素数相同。如果某个迭代器比其他迭代器先“迭代完所有元素”,它将不断生成nil值,直到执行时间最长的迭代器执行完毕。

当然,可编写一个更通用的方法,它每次迭代时都能获取多个值(毕竟,并非所有迭代器都每次只返回一个值)。可使用第一个参数指定每个迭代器将返回多少个值。

也可使用任何迭代器(而不是默认迭代器each)。可以字符串的方式传入迭代器的名称,然后使用send来调用它们。还可采用其他技巧。

然而,作者认为在大多数情况下,这里给出的例子足够用了,其他的变体作为练习留给读者去完成。

13.2.10  并行地递归删除

下面编写一些代码并行地删除整个目录树,以添加乐趣。这里的递归删除方法采用线程化形式,当目录项本身是目录时,将启动一个新线程来遍历该目录并删除其内容。

同时,将创建的线程存储在数组thread中。由于这是个局部变量,因此每个线程都有其拷贝。不能有多个线程同时访问该数组,因此不必同步对数组的访问。

另外,将fullname传入了线程的代码块,这样就不用担心线程访问的变量在变。线程将fn用做该变量的本地拷贝。

遍历完整个目录后,等待创建的线程运行结束后才删除当前目录。

def delete_all(dir)

  threads = []

  Dir.foreach(dir) do |e|

    next if [".",".."].include? e          # Skip . and ..

    fullname = dir + "/" + e

    if FileTest.directory?(fullname)

      threads << Thread.new(fullname) {|fn| delete_all(fn) }

    else

      File.delete(fullname)

    end

  end

  threads.each { |t| t.join }

  Dir.delete(dir)

end

delete_all("/tmp/stuff")

这是否比非线程版本的速度快?答案是不一定。这可能取决于操作系统和要删除的目录的实际结构——即目录的深度和文件大小等。

查看所有评论(0)条】

最近评论



正在载入评论列表...
热点评论