Spaces:
Runtime error
Runtime error
''' | |
WorkerPool and WorkerBase for handling the common problems in managing | |
a multiprocess pool of workers that aren't done by multiprocessing.Pool, | |
including setup with per-process state, debugging by putting the worker | |
on the main thread, and correct handling of unexpected errors, and ctrl-C. | |
To use it, | |
1. Put the per-process setup and the per-task work in the | |
setup() and work() methods of your own WorkerBase subclass. | |
2. To prepare the process pool, instantiate a WorkerPool, passing your | |
subclass type as the first (worker) argument, as well as any setup keyword | |
arguments. The WorkerPool will instantiate one of your workers in each | |
worker process (passing in the setup arguments in those processes). | |
If debugging, the pool can have process_count=0 to force all the work | |
to be done immediately on the main thread; otherwise all the work | |
will be passed to other processes. | |
3. Whenever there is a new piece of work to distribute, call pool.add(*args). | |
The arguments will be queued and passed as worker.work(*args) to the | |
next available worker. | |
4. When all the work has been distributed, call pool.join() to wait for all | |
the work to complete and to finish and terminate all the worker processes. | |
When pool.join() returns, all the work will have been done. | |
No arrangement is made to collect the results of the work: for example, | |
the return value of work() is ignored. If you need to collect the | |
results, use your own mechanism (filesystem, shared memory object, queue) | |
which can be distributed using setup arguments. | |
''' | |
from multiprocessing import Process, Queue, cpu_count | |
import signal | |
import atexit | |
import sys | |
class WorkerBase(Process): | |
''' | |
Subclass this class and override its work() method (and optionally, | |
setup() as well) to define the units of work to be done in a process | |
worker in a woker pool. | |
''' | |
def __init__(self, i, process_count, queue, initargs): | |
if process_count > 0: | |
# Make sure we ignore ctrl-C if we are not on main process. | |
signal.signal(signal.SIGINT, signal.SIG_IGN) | |
self.process_id = i | |
self.process_count = process_count | |
self.queue = queue | |
super(WorkerBase, self).__init__() | |
self.setup(**initargs) | |
def run(self): | |
# Do the work until None is dequeued | |
while True: | |
try: | |
work_batch = self.queue.get() | |
except (KeyboardInterrupt, SystemExit): | |
print('Exiting...') | |
break | |
if work_batch is None: | |
self.queue.put(None) # for another worker | |
return | |
self.work(*work_batch) | |
def setup(self, **initargs): | |
''' | |
Override this method for any per-process initialization. | |
Keywoard args are passed from WorkerPool constructor. | |
''' | |
pass | |
def work(self, *args): | |
''' | |
Override this method for one-time initialization. | |
Args are passed from WorkerPool.add() arguments. | |
''' | |
raise NotImplementedError('worker subclass needed') | |
class WorkerPool(object): | |
''' | |
Instantiate this object (passing a WorkerBase subclass type | |
as its first argument) to create a worker pool. Then call | |
pool.add(*args) to queue args to distribute to worker.work(*args), | |
and call pool.join() to wait for all the workers to complete. | |
''' | |
def __init__(self, worker=WorkerBase, process_count=None, **initargs): | |
global active_pools | |
if process_count is None: | |
process_count = cpu_count() | |
if process_count == 0: | |
# zero process_count uses only main process, for debugging. | |
self.queue = None | |
self.processes = None | |
self.worker = worker(None, 0, None, initargs) | |
return | |
# Ctrl-C strategy: worker processes should ignore ctrl-C. Set | |
# this up to be inherited by child processes before forking. | |
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) | |
active_pools[id(self)] = self | |
self.queue = Queue(maxsize=(process_count * 3)) | |
self.processes = None # Initialize before trying to construct workers | |
self.processes = [worker(i, process_count, self.queue, initargs) | |
for i in range(process_count)] | |
for p in self.processes: | |
p.start() | |
# The main process should handle ctrl-C. Restore this now. | |
signal.signal(signal.SIGINT, original_sigint_handler) | |
def add(self, *work_batch): | |
if self.queue is None: | |
if hasattr(self, 'worker'): | |
self.worker.work(*work_batch) | |
else: | |
print('WorkerPool shutting down.', file=sys.stderr) | |
else: | |
try: | |
# The queue can block if the work is so slow it gets full. | |
self.queue.put(work_batch) | |
except (KeyboardInterrupt, SystemExit): | |
# Handle ctrl-C if done while waiting for the queue. | |
self.early_terminate() | |
def join(self): | |
# End the queue, and wait for all worker processes to complete nicely. | |
if self.queue is not None: | |
self.queue.put(None) | |
for p in self.processes: | |
p.join() | |
self.queue = None | |
# Remove myself from the set of pools that need cleanup on shutdown. | |
try: | |
del active_pools[id(self)] | |
except: | |
pass | |
def early_terminate(self): | |
# When shutting down unexpectedly, first end the queue. | |
if self.queue is not None: | |
try: | |
self.queue.put_nowait(None) # Nonblocking put throws if full. | |
self.queue = None | |
except: | |
pass | |
# But then don't wait: just forcibly terminate workers. | |
if self.processes is not None: | |
for p in self.processes: | |
p.terminate() | |
self.processes = None | |
try: | |
del active_pools[id(self)] | |
except: | |
pass | |
def __del__(self): | |
if self.queue is not None: | |
print('ERROR: workerpool.join() not called!', file=sys.stderr) | |
self.join() | |
# Error and ctrl-C handling: kill worker processes if the main process ends. | |
active_pools = {} | |
def early_terminate_pools(): | |
for _, pool in list(active_pools.items()): | |
pool.early_terminate() | |
atexit.register(early_terminate_pools) | |