器→工具, 编程语言

Python标准库之队列queue

钱魏Way · · 133 次浏览

队列简介

Queue(队列)是一种常见的数据结构,遵循先进先出(FIFO,First In First Out)的原则。它类似于生活中的排队现象,即最先进入队列的元素最先被处理。

队列的基本操作

  • 入队(Enqueue):将元素添加到队列的末尾。
  • 出队(Dequeue):移除并返回队列的第一个元素。
  • 查看队头(Peek/Front):返回队列的第一个元素,但不移除它。
  • 检查是否为空(IsEmpty):判断队列中是否没有元素。
  • 获取队列长度(Size/Length):返回队列中元素的数量。

队列的实现

队列可以通过多种方式实现,常见的有:

  • 数组实现:使用固定大小的数组来实现队列。需要注意的是,当队列满时,需要进行扩容或者通过循环数组的方式来利用未使用的空间。
  • 链表实现:使用链表可以实现动态大小的队列,避免了数组的固定大小限制。

队列的应用

队列在计算机科学和日常应用中有着广泛的用途:

  • 操作系统中的任务调度:操作系统使用队列来管理任务的执行顺序。
  • 打印队列:当多个文档被发送到打印机时,它们会排成队列,按顺序打印。
  • 广度优先搜索(BFS):在图的遍历中,队列用于存储待访问的节点。
  • 消息队列:在分布式系统中,消息队列用于在不同组件之间传递消息。

队列的变种

  • 双端队列(Deque):可以在队列的两端进行插入和删除操作。
  • 优先队列(Priority Queue):元素有优先级,出队时总是选择优先级最高的元素。

队列是一种简单而强大的数据结构,适合用于需要按顺序处理数据的场景。

Python中的队列

queue 模块是 Python 标准库中的一个模块,提供了多种适用于线程安全的队列实现。它适用于需要多线程或多进程环境中安全地共享数据的场景。queue 模块的队列实现支持先进先出(FIFO)、后进先出(LIFO)和优先级队列等不同的队列类型。

queue 模块中的队列是线程安全的,可以在多线程程序中安全地进行数据交换。主要有三种队列类型:

  • Queue:先进先出(FIFO)队列。
  • LifoQueue:后进先出(LIFO)队列,类似于栈。
  • PriorityQueue:优先级队列,其中元素按优先级排序。

queue.Queue(maxsize=0)

Queue 类实现了一个线程安全的先进先出(FIFO)队列。默认情况下,maxsize 为 0,表示队列大小没有限制。你可以通过设置 maxsize 限制队列的大小。如果队列满了,put() 方法将会阻塞,直到队列有空间。

主要方法:

  • qsize() 返回队列的大致大小。注意,qsize() > 0 不保证后续的 get() 不被阻塞,qsize() < maxsize 也不保证 put() 不被阻塞。
  • empty() 如果队列为空,返回 True ,否则返回 False 。如果 empty() 返回 True ,不保证后续调用的 put() 不被阻塞。类似的,如果 empty() 返回 False ,也不保证后续调用的 get() 不被阻塞。
  • full() 如果队列是满的返回 True ,否则返回 False 。如果 full() 返回 True 不保证后续调用的 get() 不被阻塞。类似的,如果 full() 返回 False 也不保证后续调用的 put() 不被阻塞。
  • put(item, block=True, timeout=None) 将 item 加入队列。 如果可选参数 block 为真值并且 timeout 为 None (默认值),则会在必要时阻塞直到有空闲槽位可用。 如为 timeout 为正数,则将阻塞最多 timeout 秒并会在没有可用的空闲槽位时引发 Full 异常。 在其他情况下 (block 为假值),则如果空闲槽位立即可用则将条目加入队列,否则将引发 Full 异常 (timeout 在此情况下将被忽略)。如果队列已被关闭则会引发 ShutDown。
  • put_nowait(item) 相当于 put(item, block=False)。
  • get(block=True, timeout=None) 从队列中移除并返回一个项目。如果可选参数 block 是 true 并且 timeout 是 None (默认值),则在必要时阻塞至项目可得到。如果 timeout 是个正数,将最多阻塞 timeout 秒,如果在这段时间内项目不能得到,将引发 Empty 异常。反之 (block 是 false) , 如果一个项目立即可得到,则返回一个项目,否则引发 Empty 异常 (这种情况下,timeout 将被忽略)。POSIX 系统上在 3.0 之前,以及在 Windows 上的所有版本中,如果 block 为真值并且 timeout 为 None,此操作将进入在底层锁上的不可中断的等待。 这意味着不会发生任何异常,特别是 SIGINT 将不会触发 KeyboardInterrupt。如果队列已被关闭并且为空,或者如果队列已被立即关闭则会引发 ShutDown。
  • get_nowait() 相当于 get(False) 。提供了两个方法,用于支持跟踪 排队的任务 是否 被守护的消费者线程 完整的处理。
  • task_done() 表示前面排队的任务已经被完成。被队列的消费者线程使用。每个 get() 被用于获取一个任务, 后续调用 task_done() 告诉队列,该任务的处理已经完成。如果 join() 当前正在阻塞,在所有条目都被处理后,将解除阻塞(意味着每个 put() 进队列的条目的 task_done() 都被收到)。
  • shutdown(immediate=True) 将为队列中每个剩余的项调用 task_done()。如果被调用的次数多于放入队列中的项目数量,将引发 ValueError 异常 。
  • join() 阻塞至队列中所有的元素都被接收和处理完毕。当一个条目被添加到队列的时候未完成任务的计数将会增加。 每当一个消费者线程调用 task_done() 来表明该条目已被提取且其上的所有工作已完成时未完成计数将会减少。 当未完成计数降为零时,join() 将解除阻塞。

示例:

import queue
import threading

# 创建一个队列
q = queue.Queue(maxsize=5)

# 插入元素
q.put(1)
q.put(2)

# 获取元素
item = q.get()
print(item)  # 输出: 1

# 等待队列中所有任务完成
def worker():
    while True:
        item = q.get()
        if item is None:
            break
        # 处理任务
        q.task_done()

# 启动线程
threads = []
for _ in range(2):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

# 添加任务
for item in range(10):
    q.put(item)

# 等待所有任务完成
q.join()

# 停止线程
for _ in range(2):
    q.put(None)
for t in threads:
    t.join()

queue.LifoQueue(maxsize=0)

LifoQueue 类实现了一个线程安全的后进先出(LIFO)队列,类似于栈。它的 maxsize 参数的作用和 Queue 类相同。

主要方法:

  • put(item, block=True, timeout=None):将 item 放入队列。行为与 Queue 类相同。
  • get(block=True, timeout=None):从队列中获取一个元素。返回的元素是最后放入的元素。
  • task_done() 和 join() 方法与 Queue 类相同。

示例:

import queue

# 创建一个 LifoQueue
lifo_q = queue.LifoQueue()

# 插入元素
lifo_q.put(1)
lifo_q.put(2)

# 获取元素
item = lifo_q.get()
print(item)  # 输出: 2

queue.PriorityQueue(maxsize=0)

PriorityQueue 类实现了一个线程安全的优先级队列,其中元素按照优先级排序。优先级较低的元素会先被处理。maxsize 的作用与前两者相同。

主要方法:

  • put(item, block=True, timeout=None):将带有优先级的 item 放入队列。优先级是通过元素的排序规则来决定的。
  • get(block=True, timeout=None):从队列中获取优先级最高的元素。
  • task_done() 和 join() 方法与 Queue 类相同。

示例:

import queue

# 创建一个 PriorityQueue
priority_q = queue.PriorityQueue()

# 插入带有优先级的元素 (优先级,值)
priority_q.put((2, 'second'))
priority_q.put((1, 'first'))

# 获取元素
item = priority_q.get()
print(item)  # 输出: (1, 'first')

多线程应用示例

使用 queue 模块可以轻松地在多线程环境中共享数据。

示例:创建一个简单的生产者-消费者模型。

import queue
import threading
import time

def producer(q, n):
    for i in range(n):
        item = f'item-{i}'
        q.put(item)
        print(f'Produced {item}')
        time.sleep(1)

def consumer(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f'Consumed {item}')
        q.task_done()

q = queue.Queue()
num_items = 5

# 创建生产者线程
producer_thread = threading.Thread(target=producer, args=(q, num_items))
producer_thread.start()

# 创建消费者线程
consumer_thread = threading.Thread(target=consumer, args=(q,))
consumer_thread.start()

# 等待生产者完成
producer_thread.join()

# 发送结束信号
q.put(None)

# 等待消费者完成
consumer_thread.join()

注意事项

  • Queue及其变种是线程安全的,适合多线程环境。
  • put()和 get() 支持阻塞和超时,适用于需要等待资源的场景。
  • 在使用PriorityQueue 时,确保优先级值能正确比较,否则可能会导致异常。

参考链接:

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注