Pipelines in Python
Generators (PEP 255 “Simple Generators”) and Coroutines (PEP 342 “Coroutines via Enhanced Generators”) are the cleanest way I’ve come across so far to implement the concept of a “pipeline” in Python.
First approximation
A pipeline is made of:
- a Producer, that generates data;
- many Stages, that receive data from the previous stage and send it to the next;
- a Consumer, that receives data from the last stage.
The producer is a coroutine that only sends data, generated internally from some initial state. Stages are coroutines that both receive and send messages. The consumer only receives data.
Chaining is done in function pipeline: each argument but the last is instantiated with an instance of the next stage. The full pipeline is started by issuing a next (or send(None)) to the Producer.
In the following example, a stream of integers is produced and pushed down the pipeline: each stage adds 1 and finally the result is printed in the consumer.
'''Pipeline
(yield) -> receiver.send -> producer'''
import time
N = 0
def P(n): '''Producer: only .send (and yield as entry point).'''
def _f(): global N N += 1 time.sleep(1) return N
yield # to "start" while True: n.send(_f())
def S(n): '''Stage: both (yield) and .send.'''
def _f(x): print 'Stage', x return x + 1
while True: r = (yield) n.send(_f(r))
def C(): '''Consumer: only (yield).'''
def _f(x): print 'Consumed', x
while True: r = (yield) _f(r)
def pipeline(*args): '''Chain stages together. Assumes the last is the consumer.'''
c = args[-1]() c.next() t = c for S in reversed(args[:-1]): s = S(t) s.next() t = s return t
if __name__ == '__main__': p = pipeline(P, S, S, S, C) p.next() # to "start"Wrapping it up
A pattern emerges, so we’d better wrap it up in a class. Moreover, let’s split the “architecture” of the pipeline from the behavior of each stage.
'''Pipeline
(yield) -> receiver.send -> producer
Provide initial state to producer, avoiding globals.Stop iteration after a bit.Wrap in nice class.'''
class StopPipeline(Exception): pass class Pipeline(object): '''Chain stages together. Assumes the last is the consumer.''' def __init__(self, *args): c = Pipeline.C(args[-1]) c.next() t = c for stg in reversed(args[1:-1]): s = Pipeline.S(stg, t) s.next() t = s p = Pipeline.P(args[0], t) p.next() self._pipeline = p
def start(self, initial_state): try: self._pipeline.send(initial_state) except StopIteration: self._pipeline.close()
@staticmethod def P(f, n): '''Producer: only .send (and yield as entry point).''' state = (yield) # get initial state while True: try: res, state = f(state) except StopPipeline: return n.send(res)
@staticmethod def S(f, n): '''Stage: both (yield) and .send.''' while True: r = (yield) n.send(f(r)) @staticmethod def C(f): '''Consumer: only (yield).''' while True: r = (yield) f(r) def produce(state): '''Given a state, produce a result and the next state.''' import time if state == 3: raise StopPipeline('Enough!') time.sleep(1) return state, state + 1
def stage(x): print 'Stage', x return x + 1
def consume(x): print 'Consumed', x
if __name__ == '__main__': p = Pipeline( produce, stage, stage, stage, consume, ) initial_state = 0 p.start(initial_state)More useful example
As a more interesting application, here is how to use a pipeline to implement a simple crawler, to download links from http://news.ycombinator.com/ and find all the posts where the word “Python” is mentioned.
'''Pipeline (yield) -> receiver.send -> producer Provide initial state to producer, avoiding globals.Stop iteration after a bit.Wrap in nice class.Simple crawler to check if "python" was mentioned on HN. ''' class StopPipeline(Exception): pass class Pipeline(object): '''Chain stages together. Assumes the last is the consumer.'''
def __init__(self, *args): c = Pipeline.C(args[-1]) c.next() t = c for stg in reversed(args[1:-1]): s = Pipeline.S(stg, t) s.next() t = s p = Pipeline.P(args[0], t) p.next() self._pipeline = p def start(self, initial_state): try: self._pipeline.send(initial_state) except StopIteration: self._pipeline.close() @staticmethod def P(f, n): '''Producer: only .send (and yield as entry point).''' state = (yield) # get initial state while True: try: res, state = f(state) except StopPipeline: return n.send(res) @staticmethod def S(f, n): '''Stage: both (yield) and .send.''' while True: r = (yield) n.send(f(r))
@staticmethod def C(f): '''Consumer: only (yield).'''
while True: r = (yield) f(r)
def produce((urls, visited, domain)): '''Given a state, produce a result and the next state.''' import urllib2 import re nurls = len(urls) if nurls == 0: raise StopPipeline('No more urls') else: print 'Queue %d' % nurls
url = urls.pop() doc = urllib2.urlopen(url).read()
links = re.compile('href="(http.+?)"').findall(doc) urls.update([l for l in links if domain in l and l not in visited]) visited.add(url)
return (url, doc), (urls, visited, domain)
def stage((url, doc)): return (url, 'python' in doc.lower())
def consume((url, haskell)): if haskell: print 'Python mentioned in %s' % url
if __name__ == '__main__': p = Pipeline( produce, stage, consume, ) urls = {'http://news.ycombinator.com'} domain = 'ycombinator.com' p.start((urls, set(), domain))Cleaning things up
Things are still far from clean and bulletproof. One step in the right direction is to follow the suggestions found in David Beazley’s presentation on coroutines.
'''Pipeline (yield) -> receiver.send -> producer Provide initial state to producer, avoiding globals.Stop iteration after a bit.Wrap in nice class.Simple crawler to check if "python" was mentioned on HN.Some improvements after: http://www.dabeaz.com/Fcoroutines/Coroutines.pdf - coroutine decorator - catch GeneratorExit ''' from contextlib import contextmanager
def coroutine(f):
def start(*args, **kwargs): cr = f(*args, **kwargs) cr.next() return cr
return start @contextmanagerdef close_on_exit(n): try: yield except GeneratorExit: n.close() class StopPipeline(Exception): pass
class Pipeline(object): '''Chain stages together. Assumes the last is the consumer.'''
def __init__(self, *args): c = Pipeline.C(args[-1]) t = c for stg in reversed(args[1:-1]): s = Pipeline.S(stg, t) t = s p = Pipeline.P(args[0], t) self._pipeline = p
def start(self, initial_state): try: self._pipeline.send(initial_state) except StopIteration: self._pipeline.close()
@staticmethod @coroutine def P(f, n): '''Producer: only .send (and yield as entry point).'''
state = (yield) # get initial state with close_on_exit(n): while True: try: res, state = f(state) except StopPipeline: return n.send(res)
@staticmethod @coroutine def S(f, n): '''Stage: both (yield) and .send.'''
with close_on_exit(n): while True: r = (yield) n.send(f(r))
@staticmethod @coroutine def C(f): '''Consumer: only (yield).'''
# nothing to "close" here while True: r = (yield) f(r)
def produce((urls, visited, domain)): '''Given a state, produce a result and the next state.''' import urllib2 import re
nurls = len(urls) if nurls == 0: raise StopPipeline('No more urls') else: print 'Queue %d' % nurls
url = urls.pop() doc = urllib2.urlopen(url).read()
links = re.compile('href="(http.+?)"').findall(doc) urls.update([l for l in links if domain in l and l not in visited]) visited.add(url)
return (url, doc), (urls, visited, domain)
def stage((url, doc)): return (url, 'python' in doc.lower())
def consume((url, haskell)): if haskell: print 'Python mentioned in %s' % url
if __name__ == '__main__': p = Pipeline( produce, stage, consume, ) urls = {'http://news.ycombinator.com'} domain = 'ycombinator.com' p.start((urls, set(), domain))The previous examples is by no means “production ready”, but maybe someone will find some good idea to apply to real world problems.
Tagged coroutine, generator, pipeline, python
