Python Multiprocessing
A quick note-to-self on how to make the Python multiprocessing module work the way I'd like it to ...
Multiprocessing is a term often applied in Python to the mechanics of splitting work between multiple processes, rather than multiple threads. With the latter being of limited value in Python (due to the GIL), multiprocessing is often the key to performant Python code. If only it was a little less problematic to use ...
Pools
One of the attractive features on the Python multiprocessing module are Pools. Essentially you get to say "I want a pool of (n) processes, and I'd like that pool to process a batch of work such that it's evenly distributed over each process in the pool". Ideally the semantics look like this;
with Pool(processes=4) as pool:
pool.map(worker, list_of_jobs)
So on the face of it, this is pretty easy, worker can just be a python function, and list_of_jobs can literally be a list with a bunch of integers in it, for example;
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as pool:
print(pool.map(f, [1, 2, 3]))
> [1, 4, 9]
Easy. So the issues come when you start to apply this to real-world problems. First off, for our worker we're probably going to want to implement it as a class. We may well want to initialise that class by opening files or network connections, and when the worker terminates, we may want to explicitly code the files and network connections and flush caches.
Welcome to the minefield.
Problem
I've had a crack at trying to nail-down a solution for one specific use-case. In this instance my real-world application wants to download and store data from the internet. When a batch is complete, run some relatively trivial processing to confirm and index the download. What I don't want to do it write a one-shot process that opens a network connection and database connection for each data item, then close and flush. I need to open resources once, download and store all data for my batch, then close and process once.
This sounds like it should be relatively easy, but if you've ever tried this you will note there are a few issues which aren't all that obvious. For a start, multiprocessing doesn't honour normal exit code, so atexit, __del__ etc, so cleanup code is never going to get called. Then even if you can find something to activate the cleanup code, the enter/exit block for Pool will attempt to call terminate on each worker when the with context exits. (which is likely to randomly kill your cleanup code while it's trying to run)
Solution
To start off we have a Daemon class that is going to encapsulate the work we want to do. In this instance all we're doing is logging, but to prove the point we just need to make sure all our logging appears, and that it appears in the correct order.
from os import getpid
from loguru import logger as log
from multiprocessing import Process
from multiprocessing.pool import Pool
from multiprocessing.util import Finalize
class Daemon:
def __init__(self):
log.debug(f'initialising daemon (parent, pid={getpid()})')
def __enter__(self):
"""Open resources [called once per process]"""
log.debug(f'starting process (parent, pid={getpid()})')
return self
def __exit__(self, *args, **kwargs):
"""Flush resources [called once per process]"""
log.debug(f'stopping process (worker, pid={getpid()})')
def run(self, arg):
log.debug(f'worker is running (worker, pid={getpid()} arg={arg}')
So if we just run this in isolation, just to show this is a testable self-contained class we can develop 'outside' of a multiprocessing environment;
>>> with Daemon() as d:
... d.run(1)
...
| DEBUG | demo:__init__:17 - initialising daemon (parent, pid=654815)
| DEBUG | demo:__enter__:22 - starting process (parent, pid=654815)
| DEBUG | demo:run:30 - worker is running job (worker, pid=654815 arg=1
| DEBUG | demo:__exit__:27 - stopping process (worker, pid=654815)
Next comes the more interesting part, we need to subclass the multiprocessing Pool class to circumvent some of it's, erm, features;
class ProcessPool(Pool):
def __init__(self, *args, **kwargs):
kwargs['initializer'] = self.worker_init
super().__init__(*args, **kwargs)
def worker_init(self):
global instance
Finalize(instance, instance.__exit__, exitpriority=1)
instance.__enter__()
def Process(self, *args, **kwargs):
global instance
instance = Daemon()
return super().Process(*args, **kwargs)
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
self.join()
@staticmethod
def worker(request):
try:
global instance
instance.run(request)
except Exception as e:
log.exception(e)
So, what are we doing here? First we're making use of the initializer parameter which allows us to call some of our own code inside the worker process after the Pool has created it, but before we start sending any data to it.
worker_init is our code which is to be run inside the process, I'll double back to this in a moment.
Process is an override of the Pool's Process method, which is used to launch a worker, this is called inside the worker process, before the initializer code. So Process creates an instance of our Worker class (Daemon) and assigns it to a global variable. Coming back to worker_init, which is called after Process(), this picks up the instance of Daemon and registers it's "__exit__" method with Finalize. Now .. this is a bit of a cheat as Finalize is an undocumented feature of the multiprocessing module, however it would appear to be the correct way to schedule cleanup code inside a multiprocessing module. (i.e. the Finalize code is called when del , atexit etc are not)
"__exit__" , so we're overriding the Pool exit handler. All the default handler does is to call terminate, which for one-shot processes should be fine as we've only reached exit after all processing is complete. Except that in this instance we're calling cleanup handlers, so terminate is likely to prematurely kill any cleanup code. Instead we're going to call close followed by join, which is what terminate does after killing the process. This will honour the Finalize code and wait for it to complete before finishing the Pool shutdown.
Note; the downside here is that your worker code needs to work properly, if it hangs in any way, the with context will never exit. The Pool shutdown code will wait indefinitely for all worker processes to exit. (so at the very least, make sure the worker always exits!)
Last but not least, worker is the function that map will be calling, and this is run inside a worker process. Again it is picking up the instance of Daemon as a global variable, then calling the run method with the passed parameter.
So putting it all together, a run would look like this;
f __name__ == '__main__':
log.warning('started')
with ProcessPool(2) as pool:
pool.map(pool.worker, range(4))
log.warning('finished')
__main__:<module>:63 - started
__main__:__init__:17 - initialising daemon (parent, pid=655890)
__main__:__init__:17 - initialising daemon (parent, pid=655890)
__main__:__enter__:22 - starting process (parent, pid=655891)
__main__:__enter__:22 - starting process (parent, pid=655892)
__main__:run:30 - worker is running job (worker, pid=655891 arg=0
__main__:run:30 - worker is running job (worker, pid=655891 arg=1
__main__:run:30 - worker is running job (worker, pid=655891 arg=2
__main__:run:30 - worker is running job (worker, pid=655892 arg=3
__main__:__exit__:27 - stopping process (worker, pid=655892)
__main__:__exit__:27 - stopping process (worker, pid=655891)
__main__:<module>:66 - finished
So the run method is called 4 times, twice in each worker process, and the start and stop routines are called only once each per worker process.
Hope this helps someone, but either way I'll be back here next time I need to write some multi-processing code that needs to do more than add up numbers .. :)