线程有时称为轻量级进程,线程不过是一种实现并发的方式,但在操作系统级没有切换任务的开销(当然,计算界对线程的定义并没有完全达成一致,但这里不讨论这些)。
Ruby线程是用户级线程,且独立于操作系统,可用于DOS和UNIX系统中。然而,这肯定对性能有影响,且影响随操作系统而异。
当不同的代码段以彼此独立的方式完成其功能时,线程很有用。在应用程序将大量时间用于等待事件时,也可使用线程。通常在一个线程等待时,另一个线程可执行有用的处理。
另一方面,使用线程也有一些潜在的缺点。必须在线程导致的速度降低和它带来的优点之间进行权衡。另外,在有些情况下,对资源的访问本质上是串行的,因此线程不会有任何帮助。最后,有时候同步访问全局资源导致的开销超过了多线程节省的开销。
鉴于以上原因以及其他原因,有些权威人士认为应避免线程化编程。并行代码确实可能很复杂、容易出错且难以调试,但在什么情况下适合使用这些技术留给读者去判断。
非同步线程存在的问题众所周知。试图同时访问全局数据的线程可能破坏它们,还可能发生静态条件(Race Condition):一个线程假设其他线程完成了某些操作,这可能导致代码是“非确定性的”,代码每次执行的结果都不同。最后,还有死锁的危险,即线程都不能继续执行,因为它等待另一个线程释放其占用的资源,而后者也被阻断。能够避免这些问题的代码被称为线程安全的。
并非Ruby的各个方面都是线程安全的,但可使用同步方法控制对变量和资源的访问、保护代码的临界区部分以及避免死锁。本章将介绍这些技术并提供演示它们的代码段。
13.1 创建和操控线程
最基本的线程操作包括:创建线程、传入和传出信息、终止线程等,还可获得线程列表、检查线程的状态以及查看各种其他信息。
下面概述这些基本操作。
13.1.1 创建线程
创建线程很简单,只需调用new方法,并指定一个将用做线程体的代码块。
thread = Thread.new do
# Statements comprising
# the thread...
end
返回值显然是一个类型为Thread的对象,主线程可使用该返回值来控制它创建的线程。
如果要传递参数给线程呢?可将其传递给Thread.new,后者再将参数传递给代码块。
a = 4
b = 5
c = 6
thread2 = Thread.new(a,b,c) do |a, x, y|
# Manipulate a, x, and y as needed.
end
# Note that if a is changed in the new thread, it will
# change suddenly and without warning in the main
# thread.
与其他代码块的参数一样,与已有变量对应的参数将与这些变量相同。因此,从这种意义上说,上述代码段中的变量a是个“危险”的变量,正如注释所说的。
线程也可访问创建线程的作用域中的变量。显然,如果没有同步,这可能导致问题。主线程和其他线程可能分别修改变量,其结果将是不可预测的。
x = 1
y = 2
thread3 = Thread.new do
# This thread can manipulate x and y from the outer scope,
# but this is not always safe.
sleep(rand(0)) # Sleep a random fraction of a second.
x = 3
end
sleep(rand(0))
puts x
# Running this code repeatedly, x may be 1 or 3 when it
# is printed here!
fork方法是new方法的别名,它来自UNIX系统中著名的fork调用。
13.1.2 访问线程的局部变量
前面指出过,线程使用其作用域外的变量是危险的。另外,线程也可以有自己的局部数据,但如果线程想使其数据为“公有的”,该如何办呢?
有一种特殊的机制可完成这项工作。如果线程对象被视为散列,则可在线程对象的作用域内访问线程的局部数据。这并不意味着局部变量可以这种方式访问,而只是可在线程内部访问其命名数据。
还有一个key?方法,可判断线程是否使用了给定的名称。
在线程内,必须像散列那样引用数据,而使用Thread.current更方便。
thread = Thread.new do
t = Thread.current
t[:var1] = "This is a string"
t[:var2] = 365
end
# Access the thread-local data from outside...
x = thread[:var1] # "This is a string"
y = thread[:var2] # 365
has_var2 = thread.key?("var2") # true
has_var3 = thread.key?("var3") # false
注意,即使拥有这些数据的线程已失效,其他线程仍可访问它们(就像这个例子中一样)。
除像上面那样使用符号外,还可使用字符串来标识线程的局部变量。
thread = Thread.new do
t = Thread.current
t["var3"] = 25
t[:var4] = "foobar"
end
a = thread[:var3] = 25
b = thread["var4"] = "foobar"
不要将这些特殊名称和实际的局部变量混为一谈,下面的代码段更清楚地说明了两者的区别:
thread = Thread.new do
t = Thread.current
t["var3"] = 25
t[:var4] = "foobar"
var3 = 99 # True local variables (not
var4 = "zorch" # accessible from outside)
end
a = thread[:var3] # 25
b = thread["var4"] # "foobar"
最后,在线程内可将对象引用(指向真正的局部变量)用做简略表示法,但必须保留同一个对象引用,而不能创建新对象。
thread = Thread.new do
t = Thread.current
x = "nXxeQPdMdxiBAxh"
t[:my_message] = x
x.reverse!
x.delete! "x"
x.gsub!(/[A-Z]/,"")
# On the other hand, assignment would create a new
# object and make this shorthand useless...
end
a = thread[:my_message] # "hidden"
另外,这种简洁表示法显然不适用于Fixnum等值,因为这些值存储为直接值,而不是对象引用。
13.1.3 查询和修改线程状态
Thread类有几个类方法可用于各种用途。list方法返回一个数组,其中包含所有处于存活状态的线程;main方法返回一个引用,它指向生成其他线程的主线程;current方法让线程能够获悉自己的身份。
t1 = Thread.new { sleep 100 }
t2 = Thread.new do
if Thread.current == Thread.main
puts "This is the main thread." # Does NOT print
end
1.upto(1000) { sleep 0.1 }
end
count = Thread.list.size # 3
if Thread.list.include?(Thread.main)
puts "Main thread is alive." # Always prints!
end
if Thread.current == Thread.main
puts "I'm the main thread." # Prints here...
end
方法exit、pass、start、stop和kill用于(从线程内部或外部)控制线程的执行:
# In the main thread...
Thread.kill(t1) # Kill this thread now
Thread.pass # Pass execution to t2 now
t3 = Thread.new do
sleep 20
Thread.exit # Exit the thread
puts "Can't happen!" # Never reached
end
Thread.kill(t2) # Now kill t2
# Now exit the main thread (killing any others)
Thread.exit
注意,没有实例方法stop,因此线程能够停止自己,但不能停止其他线程。
有各种用于检查线程状态的方法。实例方法alive?判断线程是否处于“存活”状态(没有退出),stop?判断线程是否处于停止状态。
count = 0
t1 = Thread.new { loop { count += 1 } }
t2 = Thread.new { Thread.stop }
sleep 1
flags = [t1.alive?, # true
t1.stop?, # false
t2.alive?, # true
t2.stop?] # true
线程的状态可用status方法来判断。如果线程正在运行,返回值将为run;如果线程已停止、处于休眠中或等待I/O,返回值将为sleep;如果线程正常终止,返回值将为false;如果线程因异常而终止,返回值将为nil。
t1 = Thread.new { loop {} }
t2 = Thread.new { sleep 5 }
t3 = Thread.new { Thread.stop }
t4 = Thread.new { Thread.exit }
t5 = Thread.new { raise "exception" }
s1 = t1.status # "run"
s2 = t2.status # "sleep"
s3 = t3.status # "sleep"
s4 = t4.status # false
s5 = t5.status # nil
在不同的线程中,全局变量$SAFE的设置可能不同。从这种意义上说,它不是真正的全局变量,但这也有优点,这样线程可以以不同的安全级别运行。safe_level方法返回线程运行在何种安全级别上。
t1 = Thread.new { $SAFE = 1; sleep 5 }
t2 = Thread.new { $SAFE = 3; sleep 5 }
sleep 1
lev0 = Thread.main.safe_level # 0
lev1 = t1.safe_level # 1
lev2 = t2.safe_level # 3
使用priority存取器可以查看和修改线程的优先级:
t1 = Thread.new { loop { sleep 1 } }
t2 = Thread.new { loop { sleep 1 } }
t2.priority = 3 # Set t2 at priority 3
p1 = t1.priority # 0
p2 = t2.priority # 3
线程的优先级越高(数字越大),得到调度的频率越高。
线程要将控制权交给调度器时,可使用特殊方法pass。线程只是让出当前时隙,并不会停止或休眠。
t1 = Thread.new do
puts "alpha"
Thread.pass
puts "beta"
end
t2 = Thread.new do
puts "gamma"
puts "delta"
end
t1.join
t2.join
在这个虚构的例子中,按上面所示调用Thread.pass时,将依次输出alpha、gamma、delta和beta;如果不调用Thread.pass,输出将依次为alpha、beta、gamma和delta。当然,这种功能不能用于实现同步,而只能用于精明地分配时隙。
停止的线程可使用方法run或wakeup来唤醒:
t1 = Thread.new do
Thread.stop
puts "There is an emerald here."
end
t2 = Thread.new do
Thread.stop
puts "You're at Y2."
end
sleep 1
t1.wakeup
t2.run
这两个方法的区别很小。调用wakeup方法将修改线程的状态,使其变成可运行的,但不使其运行;而run方法唤醒线程,并使其立即运行。
在这个例子中,结果是t1在t2前被唤醒,但t2先运行,从而生成如下输出:
You're at Y2.
There is an emerald here.
当然,依赖于这种细微的差别来实现真正的同步是不明智的。
实例方法raise在指定为接受者的线程中引发异常(不一定要在该线程中调用该方法)。
factorial1000 = Thread.new do
begin
prod = 1
1.upto(1000) {|n| prod *= n }
puts "1000! = #{prod}"
rescue
# Do nothing...
end
end
sleep 0.01 # Your mileage may vary.
if factorial1000.alive?
factorial1000.raise("Stop!")
puts "Calculation was interrupted!"
else
puts "Calculation was successful."
end
在上面的例子中,生成的线程计算1000的阶乘。如果计算在百分之一秒内没有完成,主线程将终止该线程。因此,在比较慢的机器中,这个代码段将输出消息Calculation was interrupted!。显然,线程内的rescue语句与其他rescue语句一样,可以包含任何代码。
13.1.4 实现会合点并拦截返回值
有时主线程需要等待另一个线程完成,实例方法join可完成这项工作。
t1 = Thread.new { do_something_long() }
do_something_brief()
t1.join # Wait for t1
注意,如果主线程要等待另一个线程,就必须使用join;否则,当主线程退出时,将终止该线程。例如,在下面的代码段中,最后如果不调用join,将不会输出最终结果:
meaning_of_life = Thread.new do
puts "The answer is..."
sleep 10
puts 42
end
sleep 9
meaning_of_life.join
下面是一种很有用的习惯用法,这个方法对除主线程外的所有存活线程调用join方法(包括主线程在内的任何线程对自己调用join方法都将导致错误)。
Thread.list.each { |t| t.join if t != Thread.main }
当然,当两个线程都不是主线程时,一个线程可对另一个线程调用join方法。如果主线程和其他线程互相调用join,将导致死锁;解释器将检测到这种情况并退出程序。
thr = Thread.new { sleep 1; Thread.main.join }
thr.join # Deadlock results!
线程有一个相关联的代码块,而代码块可以有返回值,这意味着线程可返回一个值。方法value隐式地执行join操作并等待线程完成,然后返回线程中最后计算的表达式的值。
max = 10000
thr = Thread.new do
sum = 0
1.upto(max) { |i| sum += i }
sum
end
guess = (max*(max+1))/2
print "Formula is "
if guess == thr.value
puts "right."
else
puts "wrong."
end
13.1.5 处理异常
如果在线程中发生异常结果将如何?这是可以配置的。
有一个abort_on_exception标记,适用于类级和实例级,它在类级和实例级被实现为一个存取器(即可读写)。
总之,如果线程的abort_on_exception为true,则在该线程中发生异常时,将终止其他所有线程。
Thread.abort_on_exception = true
t1 = Thread.new do
puts "Hello"
sleep 2
raise "some exception"
puts "Goodbye"
end
t2 = Thread.new { sleep 100 }
sleep 2
puts "The End"
在上面的代码中,系统级abort_on_exception被设置为true(覆盖默认设置),因此,当t1发生异常时,t1和主线程也将被终止。因此,程序只输出Hello。
下面这个例子的效果相同:
t1 = Thread.new do
puts "Hello"
sleep 2
raise "some exception"
puts "Goodbye"
end
t1.abort_on_exception = true
t2 = Thread.new { sleep 100 }
sleep 2
puts "The End"
在下面的例子中,如果默认值为false,将看到主线程输出的 The End(看不到输出Goodbye,因为发生异常时t1总是终止)。
t1 = Thread.new do
puts "Hello"
sleep 2
raise "some exception"
puts "Goodbye"
end
t2 = Thread.new { sleep 100 }
sleep 2
puts "The End"
# Output:
Hello
The End
13.1.6 使用线程组
线程组是一种对逻辑上彼此相关的线程进行管理的方式,通常所有线程都属于线程组Default(这是一个类常量),但如果创建了新的线程组,新线程可能添加到该线程组中。
线程不能同时属于多个线程组。将线程添加到线程组中时,自动将其从以前所属的线程组中删除。
类方法ThreadGroup.new用于创建新线程组,实例方法add用于将线程添加到组中。
f1 = Thread.new("file1") { |file| waitfor(file) }
f2 = Thread.new("file2") { |file| waitfor(file) }
file_threads = ThreadGroup.new
file_threads.add f1
file_threads.add f2
实例方法list返回一个包含线程组中所有线程的数组。
# Count living threads in this_group
count = 0
this_group.list.each {|x| count += 1 if x.alive? }
if count < this_group.list.size
puts "Some threads in this_group are not living."
else
puts "All threads in this_group are alive."
end
还可以给ThreadGroup添加很多有用的方法,下面的例子添加了这样一些方法:唤醒线程组中所有的线程、等待所有线程完成(通过join)以及终止线程组中所有的线程:
class ThreadGroup
def wakeup
list.each { |t| t.wakeup }
end
def join
list.each { |t| t.join if t != Thread.current }
end
def kill
list.each { |t| t.kill }
end
end







