最后更新于
最后更新于
需要在多线线程之间实现安全的通信和数据的交换.
将数据从一个线程转移到另一个线程最安全的方法就是使用队列. 在Python中如下使用:
创建一个Queue
实例, 会被所有线程共享. Queue
实例已经拥有了所需的锁, 可以安全地在任意多的线程之间共享.
使用队列对象的put()
和get()
方法给队列添加或移除元素.
使用队列进行通信的程序设计往往可以扩展到多进程, 分布式系统上, 而不需要对框架做大的改动.
注意:
线程中使用队列时, 入列的数据不会产生拷贝, 不同线程之间对象的传递都是以引用的形式进行的. 如果需要保证传递的数据在其他进程中不被改变, 需要:
只传递不可变的数据结构: 整数, 字符串, 元组等
对入列的数据进行深拷贝
创建一个有大小限制的队列. 如果生产者在产生数据比消费者处理数据的速度快得多时, 给队列一个可容纳元素的上限值就有意义了.
在队列慢的时候, 调用put()
方法再向队列中插入数据时, 就可以对生产者的程序进行阻塞.
保证每个被放入到队列的元素, 都被消费者使用. 可以使用队列的join()
方法对队列进行阻塞, 只有在task_done()
的调用次数等于put()
的调用次数时, 队列的阻塞才会打开.
使用时需要考虑producer
和consumer
两者的执行速度, 需要使用在producer
执行很快, 而consumer
相对来说很慢的情况下使用.
get()
和put()
方法默认都是阻塞的, 即在队列为空时获取数据和在队列爆满时插入数据, 都会造成对应线程的阻塞, 可以通过block=False
参数设置为非阻塞的形式. 以非阻塞的形式进行调用时, 如果获取/插入不了数据时会报错, 需要对错误进行捕捉并处理.
可以通过timeout=1
参数设置阻塞时间的长短.
这三个方法分别为获取队列的当前元素数量和状态, 但这些方法在多线程环境中都是不可靠的, 因为调用过程中, 另一个线程可能对这个队列进行了操作, 使这个队列的状态发生了改变. 因此开发过程中最好不要使用这三个方法.
队列最经典的应用场景当属生产者消费者队列模型. 生产者将输出放入队列, 消费者从队列中取出元素进行消费.
在这个场景中, 需要考虑的一个问题是如何对生产者和消费者的关闭过程进行同步协调的问题. 解决这个问题的方法是使用一个特殊的值, 当我们想关闭时, 就将这个值放入到队列中, 消费者在接受到这个值时就会关闭退出.
由于队列是先入先出的数据结构, 因此使用这种方法不会导致所有的输入仍有部分在关闭后没有执行的情况, 只需要这个触发关闭的值是在最后放入队列中的即可.
示例如下:
值得注意的是, 在上面代码的第18
行, 当消费者接收到这个特殊的终止值后, 会立刻将其重新放回到队列中, 从而使得监听同一个队列的其他消费者线程也能够收到终止值, 从而一个一个都会关掉.
将发送的数据和一个Event
对象配在一起, 允许生产者线程监视这一过程.
优先级队列最简单的实现方法, 就是使用堆进行存储, 按照优先级的大小对堆进行调整, 使得堆顶的任务优先级是最高的. 借助heapq
包可以实现线程安全的优先级队列.