Saturday, November 14, 2009

My Weaksauce Event Loop In Python

I have been writing a bunch of glorified shell scripts in Python. Like any tool, they start to get more complicated. Originally I was just wrapping up os.system into something with a bit more versatility, like throwing an exception if the program failed. Eventually I started to need to get the output of running programs so I switched to using the subprocess module. Then once again I realized I needed to run multiple programs at once to do some things in a reasonable amount of time. So I took the subprocess code and made it run a list of commands and use select to multiplex. Once again though I have found that I need more versatility. In this case I want to run multiple sequences of commands in parallel and I may not be able to predict what commands I need to run in total. For example, I may want to run a command to mount a drive, then if that succeeds do one thing and if it fails retry or error. So I decided to take my select code and change it a bit.

Rather than taking a list of commands to run, it takes a list of iterables. Each iterable returns a function that, when called, returns a list of streams to be processed and a function to call when all of the streams have been processed. The event loop then goes through handling the streams. When all of the streams for a specific iterable are finished, it calls the clean up function and calls the next iteration, if that is the last iteration for that iterable it removes it from the list.

Combining this with generations in Python makes writing this very easy. Here is some usage example code:

def test():
import sys
import random

def f(id):
print id
yield ProgramRunner('date', sys.stdout.write, sys.stderr.write, log=True)
print id
yield ProgramRunner('sleep %d' % random.randrange(0, 10), sys.stdout.write, sys.stderr.write, log=True)
print id
yield ProgramRunner('date', sys.stdout.write, sys.stderr.write, log=True)
print id

runCommandGens([f(v) for v in range(100)])

This defines a generator called f that takes an id. f will run three programs in serial. One just prints out the date, the next sleeps for a random amount of time, and finally prints out the date again. The call to runCommandGens is the event loop. In this case it takes a list of 100 generators. I plan on changing the name of this function because obviously it doesn't need to be a generator, but something iterable.

The object being yielded, in this case a class ProgramRunner implements __call__. The cool thing is you can use whatever you want for this as long as it lives upto the interface required. Eventually having a Sleep instance would be helpful so you can pause reliably. You could also use sockets since the event loop is just selecting on streams. Here is another example:

def test1():
import sys

def f1():
yield ProgramRunner('date', sys.stdout.write, sys.stderr.write, log=True)
yield ProgramRunner('echo what is up', sys.stdout.write, sys.stderr.write, log=True)
yield ProgramRunner('sleep 3', sys.stdout.write, sys.stderr.write, log=True)
yield ProgramRunner('echo that is cool', sys.stdout.write, sys.stderr.write, log=True)

def f2():
yield ProgramRunner('sleep 2', sys.stdout.write, sys.stderr.write, log=True)
yield ProgramRunner('echo this is another thing running concurrently', sys.stdout.write, sys.stderr.write, log=True)

runCommandGens([f1(), f2()])

As you can see in this case it has iterables that it works through.

The use case for this is if you are mostly planning on doing work in serial and then want to do some work concurrently for a bit and when it is done go back to work in serial. This is mostly a toy at this point (although I do use it for simple things in production). I also want to add more specific versions of ProgramRunner. For example, one that constructs an ssh call or an scp call. I plan on using this for working with clusters of machines, for example on EC2. Often times I need to bring up N many machines and then run through a series of steps to properly set it up. I'll post some example code in the near future.

This code is also quite limited currently, in that it only allows output streams. In reality, this is the reverse of what I would like it to look like. What would be really cool is, rather than each iterator yielding something with a bunch of callbacks is if they could do:

def f():
for event in ProgramRunner(...):

In this case, processing events would be no different than just iterating over a source of events. This is what Meijer appears to be going after in Meijer on .NET Reactive Framework. I really like the idea of what he was going for there.

Here is the code for runCommandGens. I'll probably be posting the complete code somewhere when I get a chance and if there is interest. Bear in mind this is just something I'm playing with, comments, criticisms and suggestions are welcome.

def runCommandGens(generators):
This takes a list of generators and runs through each of them
in parallel running the commands.

Each iteration of a generator must return something that is callable and returns a tuple that looks like:
(oncomplete, [(stream1, function), (stream2, function), ... (streamn, function)])

Where 'oncomplete' is a function that gets called when all streams have been exhausted
and each stream has an associated function that gets called with the output

# contain objects being worked on from generator
states = [(g, None) for g in ctorGenerators(generators)]

# start initial commends:
states = nextIteration(states)

outputStreams = dict(activeOutputStreams(states))

while outputStreams:
input, _output, _error = select(outputStreams.keys(), [], [])

iterateAndBuild = False
for s in input:
line = s.readline()

if line:
callStreamF(s, line, states[outputStreams[s]])
iterateAndBuild = True

# removeStream returns True if this was the last stream to be removed
# which means we need to try to start up the next iteration
# and recreate outputStreams
if not removeStream(s, states[outputStreams[s]]):
state = states[outputStreams[s]]
f = getOnComplete(state)
# Finished with this one, call onComplete
# Set the second portion to None, nextIteration uses that to know
# when to get the next iteration
states[outputStreams[s]] = (state[0], None)

# If any of our streams are completely done, lets get new ones and rebuild
if iterateAndBuild:
states = nextIteration(states)
outputStreams = dict(activeOutputStreams(states))

No comments:

Post a Comment