Skip to content

Pipelines in Python

Generators (PEP 255 “Simple Generators”) and Coroutines (PEP 342 “Coroutines via Enhanced Generators”) are the cleanest way I’ve come across so far to implement the concept of a “pipeline” in Python.

First approximation

A pipeline is made of:

  • a Producer, that generates data;
  • many Stages, that receive data from the previous stage and send it to the next;
  • a Consumer, that receives data from the last stage.

The producer is a coroutine that only sends data, generated internally from some initial state. Stages are coroutines that both receive and send messages. The consumer only receives data.

Chaining is done in function pipeline: each argument but the last is instantiated with an instance of the next stage. The full pipeline is started by issuing a next (or send(None)) to the Producer.

In the following example, a stream of integers is produced and pushed down the pipeline: each stage adds 1 and finally the result is printed in the consumer.

'''Pipeline

(yield) -> receiver
.send -> producer
'''

import time

N = 0

def P(n):
    '''Producer: only .send (and yield as entry point).'''

    def _f():
        global N
        N += 1
        time.sleep(1)
        return N

    yield # to "start"
    while True:
        n.send(_f())


def S(n):
    '''Stage: both (yield) and .send.'''

    def _f(x):
        print 'Stage', x
        return x + 1

    while True:
        r = (yield)
        n.send(_f(r))


def C():
    '''Consumer: only (yield).'''

    def _f(x):
        print 'Consumed', x

    while True:
        r = (yield)
        _f(r)


def pipeline(*args):
    '''Chain stages together. Assumes the last is the consumer.'''

    c = args[-1]()
    c.next()
    t = c
    for S in reversed(args[:-1]):
        s = S(t)
        s.next()
        t = s
    return t


if __name__ == '__main__':
    p = pipeline(P, S, S, S, C)
    p.next() # to "start"
view raw pipeline_1.py This Gist brought to you by GitHub.

Wrapping it up

A pattern emerges, so we’d better wrap it up in a class. Moreover, let’s split the “architecture” of the pipeline from the behavior of each stage.

'''Pipeline

(yield) -> receiver
.send -> producer

Provide initial state to producer, avoiding globals.
Stop iteration after a bit.
Wrap in nice class.
'''


class StopPipeline(Exception):
    pass
    
        
class Pipeline(object):
    '''Chain stages together. Assumes the last is the consumer.'''
        
    def __init__(self, *args):
        c = Pipeline.C(args[-1])
        c.next()
        t = c
        for stg in reversed(args[1:-1]):
            s = Pipeline.S(stg, t)
            s.next()
            t = s
        p = Pipeline.P(args[0], t)
        p.next()
        self._pipeline = p

    def start(self, initial_state):
        try:
            self._pipeline.send(initial_state)
        except StopIteration:
            self._pipeline.close()


    @staticmethod
    def P(f, n):
        '''Producer: only .send (and yield as entry point).'''
    
        state = (yield) # get initial state
        while True:
            try:
                res, state = f(state)
            except StopPipeline:
                return
            n.send(res)
    

    @staticmethod
    def S(f, n):
        '''Stage: both (yield) and .send.'''
    
        while True:
            r = (yield)
            n.send(f(r))
    
    
    @staticmethod
    def C(f):
        '''Consumer: only (yield).'''
    
        while True:
            r = (yield)
            f(r)
    
            
def produce(state):
    '''Given a state, produce a result and the next state.'''
    import time
    if state == 3:
        raise StopPipeline('Enough!')
    time.sleep(1)
    return state, state + 1


def stage(x):
    print 'Stage', x
    return x + 1


def consume(x):
    print 'Consumed', x


if __name__ == '__main__':
    p = Pipeline(
            produce,
            stage,
            stage,
            stage,
            consume,
            )
    initial_state = 0
    p.start(initial_state)
view raw pipeline_3.py This Gist brought to you by GitHub.

More useful example

As a more interesting application, here is how to use a pipeline to implement a simple crawler, to download links from http://news.ycombinator.com/ and find all the posts where the word “Python” is mentioned.

'''Pipeline
(yield) -> receiver
.send -> producer
Provide initial state to producer, avoiding globals.
Stop iteration after a bit.
Wrap in nice class.
Simple crawler to check if "python" was mentioned on HN.
'''
        
        
class StopPipeline(Exception):
    pass
        
            
class Pipeline(object):
    '''Chain stages together. Assumes the last is the consumer.'''

    def __init__(self, *args):
        c = Pipeline.C(args[-1])
        c.next()
        t = c
        for stg in reversed(args[1:-1]):
            s = Pipeline.S(stg, t)
            s.next()
            t = s
        p = Pipeline.P(args[0], t)
        p.next()
        self._pipeline = p
            
    def start(self, initial_state):
        try:
            self._pipeline.send(initial_state)
        except StopIteration:
            self._pipeline.close()
    
        
    @staticmethod
    def P(f, n):
        '''Producer: only .send (and yield as entry point).'''
    
        state = (yield) # get initial state
        while True:
            try:
                res, state = f(state)
            except StopPipeline:
                return
            n.send(res)
    
            
    @staticmethod
    def S(f, n):
        '''Stage: both (yield) and .send.'''
    
        while True:
            r = (yield)
            n.send(f(r))


    @staticmethod
    def C(f):
        '''Consumer: only (yield).'''

        while True:
            r = (yield)
            f(r)


def produce((urls, visited, domain)):
    '''Given a state, produce a result and the next state.'''
    import urllib2
    import re
            
    nurls = len(urls)
    if nurls == 0:
        raise StopPipeline('No more urls')
    else:
        print 'Queue %d' % nurls

    url = urls.pop()
    doc = urllib2.urlopen(url).read()

    links = re.compile('href="(http.+?)"').findall(doc)
    urls.update([l for l in links if domain in l and l not in visited])
    visited.add(url)

    return (url, doc), (urls, visited, domain)


def stage((url, doc)):
    return (url, 'python' in doc.lower())


def consume((url, haskell)):
    if haskell:
        print 'Python mentioned in %s' % url


if __name__ == '__main__':
    p = Pipeline(
            produce,
            stage,
            consume,
            )
    urls = {'http://news.ycombinator.com'}
    domain = 'ycombinator.com'
    p.start((urls, set(), domain))
view raw pipeline_4.py This Gist brought to you by GitHub.

Cleaning things up

Things are still far from clean and bulletproof. One step in the right direction is to follow the suggestions found in David Beazley’s presentation on coroutines.

'''Pipeline
(yield) -> receiver
.send -> producer
Provide initial state to producer, avoiding globals.
Stop iteration after a bit.
Wrap in nice class.
Simple crawler to check if "python" was mentioned on HN.
Some improvements after: http://www.dabeaz.com/Fcoroutines/Coroutines.pdf
- coroutine decorator
- catch GeneratorExit
'''
    
from contextlib import contextmanager
    

def coroutine(f):

    def start(*args, **kwargs):
        cr = f(*args, **kwargs)
        cr.next()
        return cr

    return start
    
        
@contextmanager
def close_on_exit(n):
    try:
        yield
    except GeneratorExit:
        n.close()
            
            
class StopPipeline(Exception):
    pass
    

class Pipeline(object):
    '''Chain stages together. Assumes the last is the consumer.'''

    def __init__(self, *args):
        c = Pipeline.C(args[-1])
        t = c
        for stg in reversed(args[1:-1]):
            s = Pipeline.S(stg, t)
            t = s
        p = Pipeline.P(args[0], t)
        self._pipeline = p

    def start(self, initial_state):
        try:
            self._pipeline.send(initial_state)
        except StopIteration:
            self._pipeline.close()

    @staticmethod
    @coroutine
    def P(f, n):
        '''Producer: only .send (and yield as entry point).'''

        state = (yield) # get initial state
        with close_on_exit(n):
            while True:
                try:
                    res, state = f(state)
                except StopPipeline:
                    return
                n.send(res)

    @staticmethod
    @coroutine
    def S(f, n):
        '''Stage: both (yield) and .send.'''

        with close_on_exit(n):
            while True:
                r = (yield)
                n.send(f(r))


    @staticmethod
    @coroutine
    def C(f):
        '''Consumer: only (yield).'''

        # nothing to "close" here
        while True:
            r = (yield)
            f(r)


def produce((urls, visited, domain)):
    '''Given a state, produce a result and the next state.'''
    import urllib2
    import re

    nurls = len(urls)
    if nurls == 0:
        raise StopPipeline('No more urls')
    else:
        print 'Queue %d' % nurls

    url = urls.pop()
    doc = urllib2.urlopen(url).read()

    links = re.compile('href="(http.+?)"').findall(doc)
    urls.update([l for l in links if domain in l and l not in visited])
    visited.add(url)

    return (url, doc), (urls, visited, domain)


def stage((url, doc)):
    return (url, 'python' in doc.lower())


def consume((url, haskell)):
    if haskell:
        print 'Python mentioned in %s' % url


if __name__ == '__main__':
    p = Pipeline(
            produce,
            stage,
            consume,
            )
    urls = {'http://news.ycombinator.com'}
    domain = 'ycombinator.com'
    p.start((urls, set(), domain))
view raw pipeline_5.py This Gist brought to you by GitHub.

The previous examples is by no means “production ready”, but maybe someone will find some good idea to apply to real world problems.

Tagged , , ,

Change engine to all tables in a MySQL database

Here is a simple shell script to change the engine of all the tables in a MySQL database:
#!/bin/sh
DBUSER=user
DBPWD=password
DBNAME=db
ENGINE=MyISAM
for t in `echo "show tables" | mysql -u$DBUSER -p$DBPWD --batch --skip-column-names $DBNAME`; do
mysql -u$DBUSER -p$DBPWD $DBNAME -e "ALTER TABLE \`$t\` ENGINE = $ENGINE;";
done

Just because, as usual, I keep forgetting these kind of things...

Tagged , , ,

hsenv

If you have never used hsenv before, you should. It’s basically the Haskell’s equivalent of Python’s virtualenv.

You can find a tweaked version of hsenv here: I’ve relaxed some requirements in order to make it install on newer GHC releases.

Finally, if when using you hit this error:

$ hsenv
Creating Virtual Haskell directory structure
Installing GHC
Initializing GHC Package database at /home/lollo/work/Unique/.hsenv/ghc_pkg_db
Copying necessary packages from original GHC package database
hsenv: fd:9: hGetContents: invalid argument (invalid byte sequence)
hsenv: thread blocked indefinitely in an MVar operation

Then, you have a problem with your locale. Follow the steps here, and retry.

Tagged , , , ,

Sudoku solver in Haskell

Recently, I’ve been challenged to write a Sudoku solver.

Not knowing the rules and with only 30 minutes, I needed some help… So, in case someone asks me again, I’ve implemented the same algorithm in Haskell.

module Main where

import Data.List (nubBy, concat, findIndices)
import Control.Monad (liftM2, forM, join, guard)
import Data.Maybe (catMaybes, fromMaybe)
import Debug.Trace

type Board = String

-- Some boards
-- other examples: http://norvig.com/top95.txt
boards :: [Board]
boards = map parseBoard [
        "4.....8.5.3..........7......2.....6.....8.4......1.......6.3.7.5..2.....1.4......",
        "..3.2.6..9..3.5..1..18.64....81.29..7.......8..67.82....26.95..8..2.3..9..5.1.3..",
        "483921657967345821251876493548132976729564138136798245372689514814253769695417382",
        "483...6..967345....51....93548132976..95641381367982453..689514814253769695417..2",
        "..3.2.6..9..3.5..1..18.64....81.29..7.......8..67.82....26.95..8..2.3..9..5.1.3..",
        ".2.4.6..76..2.753...5.8.1.2.5..4.8.9.6159...34.28.3..1216...49.......31.9.8...2.."
        ]

-- The idea is to try all the possibilities by substituting '.' with all
-- possible chars and verifying the constraint at every step. When there are
-- no more dots to try, backtrack.
-- This is done in the List monad.
solve :: Board -> [Board]
-- solve board | trace (showBoard board) False = undefined
solve board = go dotIdxs
        where dotIdxs = findIndices (== '.') board
              go :: [Int] -> [Board]
              go [] = do
                      -- no dots to try: just check constraints
                      guard $ not $ isObviouslyWrong board
                      return board
              -- go dotIdxs | trace (show dotIdxs) False = undefined
              go dotIdxs = do
                      -- in the List monad: try all the possibilities
                      idx <- dotIdxs
                      val <- ['1'..'9']
                      let newBoard = set board idx val
                      -- guard against invalid boards
                      guard $ not $ isObviouslyWrong board
                      -- carry on with the good ones
                      solve newBoard

-- Create a new board setting board[idx] = val
set :: Board -> Int -> Char -> Board
set board idx val = take idx board ++ [val] ++ drop (idx + 1) board

safeHead :: [a] -> Maybe a
safeHead [] = Nothing
safeHead (x:_) = Just x

-- Block of indices where to verify constraints
blockIdxs :: [[Int]]
blockIdxs = concat [
        [[r * 9 + c | c <- [0..8]] | r <- [0..8]] -- rows
      , [[r * 9 + c | r <- [0..8]] | c <- [0..8]] -- cols
      , [[r * 9 + c | r <- [rb..rb + 2], c <- [cb..cb + 2]] | rb <- [0,3..8], cb <- [0,3..8]] -- blocks
        ]

-- Check if constrains hold on grid
-- This means that block defined in blockIdxs does not contain duplicates, a
-- part from '.'
isObviouslyWrong :: Board -> Bool
isObviouslyWrong board = any (isWrong board) blockIdxs
        where isWrong board blockIdx = hasDups $ map (board !!) blockIdx

-- Check if a string has duplicates, a part from '.'
hasDups :: String -> Bool
hasDups s = nubBy (\x y -> x == y && x /= '.') s /= s

-- Filter out spurious chars
parseBoard :: Board -> Board
parseBoard = filter (`elem` "123456789.")

-- Pretty output
showBoard :: Board -> String
showBoard board = unlines $ map (showRow board) [0..8]
        where showRow board irow = show $ take 9 $ drop (irow * 9) board

test :: Maybe Board
test = safeHead . solve $ boards !! 2

main :: IO ()
main = interact $ showBoard . fromMaybe "Solution not found" . safeHead . solve . parseBoard
view raw Sudoku.hs This Gist brought to you by GitHub.

I know that there are already a lot of other solutions, but, hey, it was fun!

Tagged ,

https authentication in nginx

In order not to forget again how to setup http authentication in nginx here is a reminder.

For https, follow these steps:

    $ sudo -s
    # cd /etc/nginx
    # openssl req -new -x509 -nodes -out server.crt -keyout server.key

and add these lines to your server instance:

    server {
        listen 443;
        ssl                  on;
        ssl_certificate      /etc/nginx/server.crt;
        ssl_certificate_key  /etc/nginx/server.key;
    ...

To setup basic authentication, add these lines to the same file:

    location / {
        auth_basic "Restricted";
        auth_basic_user_file /etc/nginx/htpasswd;
        # you might also want to deny access based on IP here
        #allow <ip-address>;
        #deny all;
    ...

Finally, to generate /etc/nginx/htpasswd, use this one-liner:

    echo -e "your-username:`perl -le 'print crypt("your-password","salt")'`" > /etc/nginx/htpasswd

Restart nginx and Bob’s your uncle!

Tagged , ,

ulint, Universal Lint

I’ve been using so many code static checkers lately, that I decided to write a wrapper around the ones I use the most.

I called it: Universal Lint, ulint for short.

At the moment, Python, Javascript and Haskell files are supported, but adding new linters/extension is trivial.

Download it and use it!

500 on Youtube on non-existing videos

Now, this is embarassing…

It looks like requesting a non-existent video on Youtube causes a “500 Internal Server Error”!

500

Requesting a non-existing video on Youtube returns 500!

For example:

Surely, a better response would be something like the one returned when  the videoId is misisng: http://www.youtube.com/watch?v=

Seems like predictions became true.

HSGrep benchmarking

Few days ago, I rewrote sgrep in Haskell. I was curious to know how it compares to grep in term of execution speed. In particular, I was interested to verify that hsgrep scales as O(log n), instead of O(n), with n being the size of the file analyzed.

First of all, in order to have similar performance to grep, I had to convert my original program to use Haskell’s bytestrings. You can find the code here.

Testing files are generated with this script.

Here are the results obtained. grep is still faster for smallish files (I haven’t spent too much time tweaking hsgrep), but hsgrep scales much better and it wins for files larger than few megabytes!

Tagged , ,

HSGrep: Sorted Grep in Haskell

As an exercise to learn Haskell, I wrote a specialized grep to work on sorted files. It uses binary search to scan a text file and print all the (consecutive) lines that start with a user defined string.

My program is a rewrite of sgrep in Haskell: I called it HSGrep.

Code is available on github.

Thanks a lot for all your useful suggestions! As soon as possible, I’ll post some benchmarking here. (EDIT: benchmarks now available!)

After downloading the source code, build install and run it with:

$> cabal build
$> cabal install
$> hslint <string> <filename>
Tagged , ,

git pre-commit hook for python and javascript

Following a recent discussion on HN, I decided to share my own git pre-commit hook.

#!/usr/bin/python

import os
import sys
import re
import subprocess

devnull = open(os.devnull, 'w')

def call(cmd):
    p = subprocess.Popen(cmd.split(),
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE)
    out, err = p.communicate()
    return out.decode('utf-8'), err.decode('utf-8')

def execute(cmd, silent=False):
    if silent:
        params = {
                'stdout': devnull,
                'stderr': devnull,
                }
    else:
        params = {}

    retcode = subprocess.call(cmd.split(), **params)
    return retcode

def exists(cmd):
    return execute('which %s' % cmd, silent=True) == 0

def get_modified(ext):
    modified = re.compile('^(?:M|A).(?P<name>.*\.%s)' % ext)
    out, _ = call('git status --porcelain')
    modifieds = []
    for line in out.splitlines():
        match = modified.match(line.strip())
        if (match):
            modifieds.append(match.group('name'))
    return modifieds

def output(prg, out, err):
    print(' * %s:\n%s\n%s' % (prg, out, err))

def die(msg):
    print(msg)
    sys.exit(1)

def check_python():

    has_pep8 = exists('pep8')
    has_pyflakes = exists('pyflakes')
    if not (has_pep8 or has_pyflakes):
        die('Install PEP8 and PyFlakes!')

    modifieds = get_modified('py')
    rrcode = 0
    for file in modifieds:
        if has_pep8:
            out, err = call('pep8 %s' % file)
            if out or err:
                output('pep8', out, err)
                rrcode = rrcode | 1
        if has_pyflakes:
            retcode = execute('pyflakes %s' % file)
            rrcode = retcode | rrcode

    if rrcode != 0:
        sys.exit(rrcode)

def check_javascript():

    has_jsl = exists('gjslint')
    if not has_jsl:
        die('Install Closure-Lint!')

    modifieds = get_modified('js')
    rrcode = 0
    for file in modifieds:
        out, err = call('gjslint %s' % file)
        if out or err:
            output('gjslint', out, err)
            rrcode = rrcode | 1

    if rrcode != 0:
        sys.exit(rrcode)

def main():
    check_python()
    check_javascript()

if __name__ == '__main__':
    main()

It’s a work-in-progress, so you can find the most updated version here.

To use it, just drop it in your .git/hooks directory. At every git commit, it will run pep8 and pyflakes on .py files, and gjslint on .js files.

Tagged , , , , , ,