Home | Articles | CV (pdf | short)
<2014-01-17> by Lorenzo

Long running processes manager

If you want to run multiple processes in Python, the nicest way, in my opinion, is to use concurrent.futures (there is a backport for Python 2.x, too).

A problem that came up recently was to manage multiple long running processes, intended to run forever, from the same Python process. Using something like supervisord wouldn't be ideal, because each individual application would have its own config file and deployment procedure.

Instead, having just one "master" Python process to drive several other processes doing the job seemed the easiest solution to manage. The only caveat is that, being the processes managed long running, you want to restart them as soon as they die.

See below a take to solve the problem using concurrent.futures. The class maintain a reference to the running futures and as soon as one finishes, it checks if it was successful or not. If not, it re-submits. Callbacks on_error and on_success can be overwritten to execute some actions on the finished futures.

import concurrent.futures
class ProcessManager(object):
def __init__(self):
self.pool = concurrent.futures.ProcessPoolExecutor()
self.futures = {}
def submit(self, f, *args, **kwargs):
future = self.pool.submit(f, *args, **kwargs)
self.futures[future] = (f, args, kwargs)
def start(self):
while self.futures:
res = concurrent.futures.wait(
self.futures,
return_when=concurrent.futures.FIRST_EXCEPTION)
print 'Tasks', len(res.done), len(res.not_done)
for future in res.done:
f, args, kwargs = self.futures[future]
del self.futures[future]
exc = future.exception()
if exc is None:
self.on_success(future, exc, f, *args, **kwargs)
else:
self.on_error(future, exc, f, *args, **kwargs)
print 'Resubmitting'
self.submit(f, *args, **kwargs)
# Overwrite in subclasses
def on_error(self, future, exc, f, *args, **kwargs):
print 'Got exception from', future, exc
def on_success(self, future, exc, f, *args, **kwargs):
print 'Future finished', future, future.result()
if __name__ == '__main__':
import random
import time
def f(i):
print 'start'
while True:
time.sleep(0.1)
x = random.random()
if x < 0.1:
raise Exception('DEAD')
elif x < 0.9:
continue
else:
return 'hello world', i
pm = ProcessManager()
for i in xrange(5):
pm.submit(f, i)
pm.start()