# 0x04 队列通信

## 问题

需要在多线线程之间实现安全的通信和数据的交换.

## 队列

将数据从一个线程转移到另一个线程最安全的方法就是使用**队列**. 在Python中如下使用:

```python
from queue import Queue
```

创建一个`Queue`实例, 会被**所有线程共享**. `Queue`实例已经拥有了所需的锁, 可以安全地在任意多的线程之间共享.

使用队列对象的`put()`和`get()`方法给队列添加或移除元素.

使用队列进行通信的程序设计往往可以扩展到多进程, 分布式系统上, 而不需要对框架做大的改动.

**注意**:

线程中使用队列时, 入列的数据不会产生拷贝, 不同线程之间对象的传递都是以引用的形式进行的. 如果需要保证传递的数据在其他进程中不被改变, 需要:

* 只传递不可变的数据结构: 整数, 字符串, 元组等
* 对入列的数据进行深拷贝

## 队列方法

### Queue(N)

创建一个有大小限制的队列. 如果生产者在产生数据比消费者处理数据的速度快得多时, 给队列一个可容纳元素的上限值就有意义了.

在队列慢的时候, 调用`put()`方法再向队列中插入数据时, 就可以对生产者的程序进行**阻塞**.

### task\_done()和join()

保证每个被放入到队列的元素, 都被消费者使用. 可以使用队列的`join()`方法对队列进行阻塞, 只有在`task_done()`的调用次数等于`put()`的调用次数时, 队列的阻塞才会打开.

使用时需要考虑`producer`和`consumer`两者的执行速度, 需要使用在`producer`执行很快, 而`consumer`相对来说很慢的情况下使用.

```python
from queue import Queue
from threading import Thread

def producer(out_q):
    while running:
        # here produce some data
        out_q.put(data)

def consummer(in_q):
    while 1:
        data = in_q.get()
        # process the data
        in_q.task_done()

q = Queue()
t1 = Thread(target=consummer, args=(q,))
t2 = Thread(target=consummer, args=(q,))
t1.start()
t2.start()
q.join()
```

### get()和put()方法的非阻塞和阻塞超时

`get()`和`put()`方法默认都是阻塞的, 即在队列为空时获取数据和在队列爆满时插入数据, 都会造成对应线程的阻塞, 可以通过`block=False`参数设置为非阻塞的形式. 以非阻塞的形式进行调用时, 如果获取/插入不了数据时会报错, 需要对错误进行捕捉并处理.

```python
try:
    data = q.get(block=False)
except queue.Empty:
    pass

try:
    data = q.put(data, block=False)
except queue.Full:
    pass
```

可以通过`timeout=1`参数设置阻塞时间的长短.

### qsize(), full(), empty()不可靠

这三个方法分别为获取队列的当前元素数量和状态, 但这些方法在多线程环境中都是**不可靠的**, 因为调用过程中, 另一个线程可能对这个队列进行了操作, 使这个队列的状态发生了改变. 因此开发过程中最好不要使用这三个方法.

## 生产者消费者模型

队列最经典的应用场景当属生产者消费者队列模型. 生产者将输出放入队列, 消费者从队列中取出元素进行消费.

### 同步协调关闭多个消费者线程

在这个场景中, 需要考虑的一个问题是**如何对生产者和消费者的关闭过程进行同步协调**的问题. 解决这个问题的方法是使用一个**特殊的值**, 当我们想关闭时, 就将这个值放入到队列中, 消费者在接受到这个值时就会关闭退出.

由于队列是先入先出的数据结构, 因此使用这种方法不会导致所有的输入仍有部分在关闭后没有执行的情况, 只需要这个触发关闭的值是在最后放入队列中的即可.

示例如下:

```python
from queue import Queue
from threading import Thread

_signal = object()

def producer(out_q):
    while running:
        # here produce some data
        # break out if satisfy some condition
        out_q.put(data)
    out_q.put(_signal)

def consummer(in_q):
    while 1:
        data = in_q.get()
        # check for termination
        if data is _signal:
            in_q.put(_signal)
            break
        # process the data
```

值得注意的是, 在上面代码的第`18`行, 当消费者接收到这个特殊的终止值后, 会立刻将其重新放回到队列中, 从而使得**监听同一个队列的其他消费者线程**也能够收到终止值, 从而一个一个都会关掉.

### 消费者处理了数据生产者需要立即感知

将发送的数据和一个`Event`对象配在一起, 允许生产者线程监视这一过程.

```python
from queue import Queue
from threading import Thread, Event

def producer(out_q):
    while running:
        # here produce some data
        # make a (data, event) pair and hand it to consumer
        evt = Event()
        out_q.put((data, evt))
        # wait for the consumer to process the item
        evt.wait()

def consummer(in_q):
    while 1:
        data, evt = in_q.get()
        # process the data
        # ...
        # indicate completion
        evt.set()
```

## 优先级队列

优先级队列最简单的实现方法, 就是使用**堆**进行存储, 按照优先级的大小对堆进行调整, 使得堆顶的任务优先级是最高的. 借助`heapq`包可以实现**线程安全**的优先级队列.

```python
import heapq
import threading

class PriorityQueue:
    def __init__(self):
        self.queue = []
        self.count = 0
        self.cv = threading.Condition()

    def put(self, item, priority):
        with self.cv:
            heapq.heappush(self.queue, (-priority, self.count, item))
            self.count += 1
            self.cv.notify()

    def get(self):
        with self.cv:
            while len(self.queue) == 0:
                self.cv.wait()
            return heapq.heappop(self.queue)[-1]
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://kerasnoone.gitbook.io/garnet/gong-cheng-zhan/python/bing-fa/duo-xian-cheng/0x04-dui-lie-tong-xin.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
