同步
线程以非确定性 的方式独立执行, 线程何时开始执行, 何时被打断, 何时恢复执行, 完全由操作系统 来调度管理, 这是用户和程序都无法决定的.
如果一个线程需要判断其他的一些线程中, 是否已经执行到过程中的某个点, 根据这个判断, 来执行后续的工作, 那么就需要线程之间的同步工作来完成. 用来完成线程之间同步的对象称为同步原语 .
threading
包中提供了三种同步原语:
Event 事件: 用于一次性事件, 一旦这个事件完成了(完成了设置), 这个Event
对象就被丢弃了
Semaphore 信号量: 每次希望只唤醒一个单独的等待线程
Condition 条件: 打算一遍又一遍重复通知某个事件, 如定时器
Event
Event
对象允许线程等待某个事情发生. 初始状态, 事件对象被置为0, 如果事件没有被设置, 线程就会等待该事件, 线程就会被阻塞, 进入休眠状态, 直到事件被设置为止.
当有线程设置了这个事件时, 会唤醒所有 正在等待该事件的线程(如果有的话), 使得线程得以继续执行. 因此是一个一对多的关系.
复制 import time
from threading import Thread , Event
def count_down ( n , started_evt ):
print ( "count down starting..." )
started_evt . set ()
while n > 0 :
print ( "T-minus" , n)
n -= 1
time . sleep ( 5 )
started_evt = Event ()
print ( "launching count down..." )
t = Thread (target = count_down, args = ( 10 , started_evt))
t . start ()
started_evt . wait ()
print ( "count down is running..." )
这样保证了count down is running...
总会在count down starting...
后显示.
使用方法 :
evt.set() : 设置事件, 消除其他进程中的阻塞
evt.wait() : 等待事件被设置, 产生阻塞
注意 : Event
对象最好只用于一次性事件 . 尽管可以对一个事件, 在设置了之后, 用clear()
方法来清除设置, 重新等待被设置, 但要安全地清除事件并等待它被再次设置这个过程很难同步协调. 因此一个Event
一旦完成了设置, 这个对象就应该被丢弃.
Condition
Condition
对象用在线程打算一遍又一遍地重复通知某事件的情况中, 常用在定时器 场景中, 每当定时器超时, 其他线程感知到超时时间, 进而做相应的执行.
复制 import time
import threading
class PeriodicTimer :
def __init__ ( self , interval ):
self . interval = interval
self . flag = 0
self . cv = threading . Condition ()
def start ( self ):
t = threading . Thread (target = self.run, daemon = True )
t . start ()
def run ( self ):
while 1 :
time . sleep (self.interval)
with self . cv :
self . flag ^= 1
self . cv . notify_all ()
def wait_for_tick ( self ):
with self . cv :
last_flag = self . flag
while last_flag == self . flag :
self . cv . wait ()
def count_down ( nticks ):
while nticks > 0 :
ptimer . wait_for_tick ()
print ( "T-minus" , nticks)
nticks -= 1
def count_up ( last ):
n = 0
while n < last :
ptimer . wait_for_tick ()
print ( "Counting" , n)
n += 1
ptimer = PeriodicTimer ( 5 )
ptimer . start ()
threading . Thread (target = count_down, args = ( 10 ,)). start ()
threading . Thread (target = count_up, args = ( 5 ,)). start ()
使用方法 :
首先使用一个Condition
对象的方法时, 需要使用with
语句:
阻塞方法自然是.wait() . 发起通知的方法有两个:
notify_all() : 通知全部的等待线程, 所有线程继续执行
notify() : 通知一个 等待线程, 即使多个线程在等待, 也只唤醒一个线程
Semaphore
如果我们只希望唤醒一个单独的等待线程, 除了用Condition
的notify()
方法外, 还可以使用信号量Semaphore
.
复制 def worker ( n , sema ):
sema . acquire ()
print ( "working" , n)
sema = threading . Semaphore ( 0 )
n_workers = 10
for n in range (n_workers):
t = threading . Thread (target = worker, args = (n, sema))
t . start ()
释放信号量:
复制 >>> sema . release ()
working 0
>>> sema . release ()
working 1
使用方法 :
acquire() : 获取信号量, 信号量对象计数-1, 计数为0时会产生阻塞, 等待信号量的释放
release() : 释放信号量, 信号量对象计数+1
其他用途 :
Semaphore
也可以用在锁 和控制并发数量的场景中(具体应用见锁
一章).