Shutdown Queue¶
This module provides a queue wrapper that supports graceful shutdown signaling.
It allows producers to signal that no more items will be produced, and consumers to detect this signal and stop iteration. It handles both immediate shutdowns (aborting operations) and graceful shutdowns (processing remaining items).
Key components:
ShutdownQueue: A wrapper around standard queues (like multiprocessing.Queue) that adds shutdown semantics.
Examples¶
>>> import multiprocessing as mp
>>> from queue import Empty, ShutDown
>>>
>>> # Create a shutdown-capable queue
>>> sq = ShutdownQueue(maxsize=10, queue_type=mp.Queue)
>>>
>>> # Producer
>>> sq.put(1)
>>> sq.put(2)
>>> sq.shutdown() # Signal end of stream
>>>
>>> # Consumer
>>> try:
... while True:
... print(sq.get())
... except ShutDown:
... print("Queue shut down")
1
2
Queue shut down
- class ShutdownQueue(maxsize=0, queue_type=None, poll_time=1)[source]¶
Bases:
GenericA wrapper class that adds shutdown capabilities to a standard queue.
This class wraps a queue implementation (like
multiprocessing.Queue) and adds mechanisms to signal shutdown. When shut down, consumers reading from the queue will eventually receive aqueue.ShutDownexception.- Parameters:
maxsize (int) – Maximum size of the queue. Defaults to 0 (infinite).
queue_type (Type[Q] | None) – The class of the underlying queue to use. Defaults to
multiprocessing.Queue.poll_time (float) – Interval in seconds to check for shutdown signals while blocking.
- get(block=True, timeout=None)[source]¶
Remove and return an item from the queue.
If the queue is empty and shut down, raises
queue.ShutDown.- Parameters:
block (bool) – If True, block until an item is available.
timeout (float | None) – Time to wait if blocking.
- Returns:
The item retrieved from the queue.
- Raises:
ShutDown – If the queue has been shut down.
Empty – If the queue is empty and block is False or timeout reached (before shutdown).
- Return type:
T
- property is_shutdown: bool¶
Check or set the shutdown signal.
- Getter:
Returns True if the shutdown signal has been set.
- Setter:
Manually set or clear the shutdown state. Clearing shutdown state (False) also drains the queue of existing items to reset it. Setting it to True triggers a shutdown.
- put(obj, block=True, timeout=None)[source]¶
Put an item into the queue.
- Parameters:
obj (T) – The item to put.
block (bool) – If True, block if the queue is full.
timeout (float | None) – Time to wait if blocking.
- Raises:
ShutDown – If the queue has been shut down.
Full – If the queue is full and block is False or timeout reached.
- Return type:
None
- shutdown(immediate=False)[source]¶
Signal the queue to shut down. Can block up to self.poll_time.
- Parameters:
immediate (bool) – If True, consumers raise ShutDown immediately upon next access without draining remaining items. If False, consumers drain the queue until the Sentinel is reached.
- Return type:
None