Threaded Generator¶
Easily run iterators in background threads or processes to create parallel producer-consumer pipelines.
Documentation: https://kwon-young.github.io/threaded-generator/
This library provides utilities to wrap an iterable (like a generator, a slow I/O process, or a CPU-bound task) in a background thread or process. It buffers items in a queue so that the consumer and producer can work concurrently, smoothing out bursts and improving throughput.
Key Features:
ThreadedGenerator: Run an iterator in a background thread (good for I/O bound tasks).ProcessGenerator: Run an iterator in a background process (good for CPU bound tasks, avoids GIL).ParallelGenerator: Split the work of an iterator across multiple parallel workers.ShutdownQueue: A queue wrapper that handles graceful shutdown signaling between producers and consumers.Monitor: Real-time terminal visualization of queue sizes and throughput.
Installation¶
pip install threaded-generator
Usage¶
1. Basic Threaded Generation (I/O Bound)¶
Use ThreadedGenerator when your source is slow due to I/O (network, disk).
import time
from threaded_generator import ThreadedGenerator
def slow_io_task():
for i in range(5):
time.sleep(0.5) # Simulate network/disk wait
yield i
# Buffers up to 3 items in a background thread
gen = ThreadedGenerator(slow_io_task(), maxsize=3)
for item in gen:
print(f"Got {item}")
2. Process Generation (CPU Bound)¶
Use ProcessGenerator to bypass the GIL for CPU-intensive tasks.
from threaded_generator import ProcessGenerator
def heavy_computation():
for i in range(5):
# Simulate heavy CPU work
yield sum(range(1_000_000 * i))
gen = ProcessGenerator(heavy_computation(), maxsize=2)
for result in gen:
print(result)
3. Parallel Processing (Multiple Workers)¶
Use ParallelGenerator with num_workers > 1 to distribute work.
Important: You must decorate the generator function with @partial_generator. This allows each worker to create its own fresh instance of the iterator.
from threaded_generator import ParallelGenerator, partial_generator
import time
@partial_generator
def process_data(x):
# Simulate work
time.sleep(0.5)
yield x * x
# Spawns 4 worker processes to process items in parallel
gen = ParallelGenerator(process_data(10), num_workers=4, maxsize=10)
for res in gen:
print(res)
4. Monitoring¶
Visualize the performance of your pipeline in the terminal.
from threaded_generator import ThreadedGenerator, Monitor
import time
monitor = Monitor()
# Pass the monitor to the generator
gen = ThreadedGenerator(range(100), monitor=monitor, name="MyGen")
with monitor:
for item in gen:
time.sleep(0.1) # Simulate slow consumption
Advanced Usage¶
ShutdownQueue¶
Use ShutdownQueue directly if you need a queue that supports graceful shutdown signaling.
import multiprocessing as mp
from queue import ShutDown
from threaded_generator import ShutdownQueue
# Create a shutdown-capable queue backed by a multiprocessing Queue
sq = ShutdownQueue(maxsize=10, queue_type=mp.Queue)
# Producer
sq.put(1)
sq.put(2)
sq.shutdown() # Signal end of stream
# Consumer
try:
while True:
print(sq.get())
except ShutDown:
print("Queue shut down")
Error Handling¶
Exceptions raised within the source iterable are caught in the background worker and re-raised in the main thread (wrapped in a RuntimeError) when join() is called or iteration completes.
Note: When using ProcessGenerator or ParallelGenerator (multiprocessing), exception propagation is limited. Because exceptions are pickled to be sent across processes, the original traceback and __cause__ attributes are lost during serialization. As a result, when chaining multiple process-based generators, the main thread will receive a RuntimeError but may not be able to access the full chain of causes or the original stack trace from the worker process. Use ThreadedGenerator if preserving the full exception chain is required.
Credits¶
The original idea for ThreadedGenerator (combining a generator, a thread, and a queue) is attributed to everilae and their GitHub Gist.
Documentation¶
API Reference