0x06 同步原语

同步

线程以非确定性的方式独立执行, 线程何时开始执行, 何时被打断, 何时恢复执行, 完全由操作系统来调度管理, 这是用户和程序都无法决定的.

如果一个线程需要判断其他的一些线程中, 是否已经执行到过程中的某个点, 根据这个判断, 来执行后续的工作, 那么就需要线程之间的同步工作来完成. 用来完成线程之间同步的对象称为同步原语.

threading包中提供了三种同步原语:

  • Event事件: 用于一次性事件, 一旦这个事件完成了(完成了设置), 这个Event对象就被丢弃了

  • Semaphore信号量: 每次希望只唤醒一个单独的等待线程

  • Condition条件: 打算一遍又一遍重复通知某个事件, 如定时器

Event

Event对象允许线程等待某个事情发生. 初始状态, 事件对象被置为0, 如果事件没有被设置, 线程就会等待该事件, 线程就会被阻塞, 进入休眠状态, 直到事件被设置为止.

当有线程设置了这个事件时, 会唤醒所有正在等待该事件的线程(如果有的话), 使得线程得以继续执行. 因此是一个一对多的关系.

import time
from threading import Thread, Event

def count_down(n, started_evt):
    print("count down starting...")
    started_evt.set()
    while n > 0:
        print("T-minus", n)
        n -= 1
        time.sleep(5)

started_evt = Event()
print("launching count down...")
t = Thread(target=count_down, args=(10, started_evt))
t.start()
started_evt.wait()
print("count down is running...")

这样保证了count down is running...总会在count down starting...后显示.

使用方法:

  • evt.set(): 设置事件, 消除其他进程中的阻塞

  • evt.wait(): 等待事件被设置, 产生阻塞

注意: Event对象最好只用于一次性事件. 尽管可以对一个事件, 在设置了之后, 用clear()方法来清除设置, 重新等待被设置, 但要安全地清除事件并等待它被再次设置这个过程很难同步协调. 因此一个Event一旦完成了设置, 这个对象就应该被丢弃.

Condition

Condition对象用在线程打算一遍又一遍地重复通知某事件的情况中, 常用在定时器场景中, 每当定时器超时, 其他线程感知到超时时间, 进而做相应的执行.

import time
import threading

class PeriodicTimer:
    def __init__(self, interval):
        self.interval = interval
        self.flag = 0
        self.cv = threading.Condition()

    def start(self):
        t = threading.Thread(target=self.run, daemon=True)
        t.start()

    def run(self):
        while 1:
            time.sleep(self.interval)
            with self.cv:
                self.flag ^= 1
                self.cv.notify_all()

    def wait_for_tick(self):
        with self.cv:
            last_flag = self.flag
            while last_flag == self.flag:
                self.cv.wait()

def count_down(nticks):
    while nticks > 0:
        ptimer.wait_for_tick()
        print("T-minus", nticks)
        nticks -= 1

def count_up(last):
    n = 0
    while n < last:
        ptimer.wait_for_tick()
        print("Counting", n)
        n += 1

ptimer = PeriodicTimer(5)
ptimer.start()
threading.Thread(target=count_down, args=(10,)).start()
threading.Thread(target=count_up, args=(5,)).start()

使用方法:

首先使用一个Condition对象的方法时, 需要使用with语句:

with condition:
    pass

阻塞方法自然是.wait(). 发起通知的方法有两个:

  • notify_all(): 通知全部的等待线程, 所有线程继续执行

  • notify(): 通知一个等待线程, 即使多个线程在等待, 也只唤醒一个线程

Semaphore

如果我们只希望唤醒一个单独的等待线程, 除了用Conditionnotify()方法外, 还可以使用信号量Semaphore.

def worker(n, sema):
    sema.acquire()
    print("working", n)

sema = threading.Semaphore(0)
n_workers = 10
for n in range(n_workers):
    t = threading.Thread(target=worker, args=(n, sema))
    t.start()

释放信号量:

>>> sema.release()
working 0
>>> sema.release()
working 1

使用方法:

  • acquire(): 获取信号量, 信号量对象计数-1, 计数为0时会产生阻塞, 等待信号量的释放

  • release(): 释放信号量, 信号量对象计数+1

其他用途:

Semaphore也可以用在和控制并发数量的场景中(具体应用见一章).

最后更新于