Friday, November 20, 2009

Friday Links - 2009/11/20






Not too many interesting links this week unfortunately.

Saturday, November 14, 2009

My Weaksauce Event Loop In Python


I have been writing a bunch of glorified shell scripts in Python. Like any tool, they start to get more complicated. Originally I was just wrapping up os.system into something with a bit more versatility, like throwing an exception if the program failed. Eventually I started to need to get the output of running programs so I switched to using the subprocess module. Then once again I realized I needed to run multiple programs at once to do some things in a reasonable amount of time. So I took the subprocess code and made it run a list of commands and use select to multiplex. Once again though I have found that I need more versatility. In this case I want to run multiple sequences of commands in parallel and I may not be able to predict what commands I need to run in total. For example, I may want to run a command to mount a drive, then if that succeeds do one thing and if it fails retry or error. So I decided to take my select code and change it a bit.




Rather than taking a list of commands to run, it takes a list of iterables. Each iterable returns a function that, when called, returns a list of streams to be processed and a function to call when all of the streams have been processed. The event loop then goes through handling the streams. When all of the streams for a specific iterable are finished, it calls the clean up function and calls the next iteration, if that is the last iteration for that iterable it removes it from the list.




Combining this with generations in Python makes writing this very easy. Here is some usage example code:




def test():
import sys
import random

def f(id):
print id
yield ProgramRunner('date', sys.stdout.write, sys.stderr.write, log=True)
print id
yield ProgramRunner('sleep %d' % random.randrange(0, 10), sys.stdout.write, sys.stderr.write, log=True)
print id
yield ProgramRunner('date', sys.stdout.write, sys.stderr.write, log=True)
print id


runCommandGens([f(v) for v in range(100)])



This defines a generator called f that takes an id. f will run three programs in serial. One just prints out the date, the next sleeps for a random amount of time, and finally prints out the date again. The call to runCommandGens is the event loop. In this case it takes a list of 100 generators. I plan on changing the name of this function because obviously it doesn't need to be a generator, but something iterable.




The object being yielded, in this case a class ProgramRunner implements __call__. The cool thing is you can use whatever you want for this as long as it lives upto the interface required. Eventually having a Sleep instance would be helpful so you can pause reliably. You could also use sockets since the event loop is just selecting on streams. Here is another example:




def test1():
import sys

def f1():
yield ProgramRunner('date', sys.stdout.write, sys.stderr.write, log=True)
yield ProgramRunner('echo what is up', sys.stdout.write, sys.stderr.write, log=True)
yield ProgramRunner('sleep 3', sys.stdout.write, sys.stderr.write, log=True)
yield ProgramRunner('echo that is cool', sys.stdout.write, sys.stderr.write, log=True)

def f2():
yield ProgramRunner('sleep 2', sys.stdout.write, sys.stderr.write, log=True)
yield ProgramRunner('echo this is another thing running concurrently', sys.stdout.write, sys.stderr.write, log=True)


runCommandGens([f1(), f2()])



As you can see in this case it has iterables that it works through.




The use case for this is if you are mostly planning on doing work in serial and then want to do some work concurrently for a bit and when it is done go back to work in serial. This is mostly a toy at this point (although I do use it for simple things in production). I also want to add more specific versions of ProgramRunner. For example, one that constructs an ssh call or an scp call. I plan on using this for working with clusters of machines, for example on EC2. Often times I need to bring up N many machines and then run through a series of steps to properly set it up. I'll post some example code in the near future.




This code is also quite limited currently, in that it only allows output streams. In reality, this is the reverse of what I would like it to look like. What would be really cool is, rather than each iterator yielding something with a bunch of callbacks is if they could do:




def f():
for event in ProgramRunner(...):
process_event(event)



In this case, processing events would be no different than just iterating over a source of events. This is what Meijer appears to be going after in Meijer on .NET Reactive Framework. I really like the idea of what he was going for there.




Here is the code for runCommandGens. I'll probably be posting the complete code somewhere when I get a chance and if there is interest. Bear in mind this is just something I'm playing with, comments, criticisms and suggestions are welcome.




def runCommandGens(generators):
"""
This takes a list of generators and runs through each of them
in parallel running the commands.

Each iteration of a generator must return something that is callable and returns a tuple that looks like:
(oncomplete, [(stream1, function), (stream2, function), ... (streamn, function)])

Where 'oncomplete' is a function that gets called when all streams have been exhausted
and each stream has an associated function that gets called with the output
"""

##
# contain objects being worked on from generator
states = [(g, None) for g in ctorGenerators(generators)]

##
# start initial commends:
states = nextIteration(states)

outputStreams = dict(activeOutputStreams(states))

while outputStreams:
input, _output, _error = select(outputStreams.keys(), [], [])

iterateAndBuild = False
for s in input:
line = s.readline()

if line:
callStreamF(s, line, states[outputStreams[s]])
else:
iterateAndBuild = True

##
# removeStream returns True if this was the last stream to be removed
# which means we need to try to start up the next iteration
# and recreate outputStreams
if not removeStream(s, states[outputStreams[s]]):
state = states[outputStreams[s]]
f = getOnComplete(state)
##
# Finished with this one, call onComplete
f()
# Set the second portion to None, nextIteration uses that to know
# when to get the next iteration
states[outputStreams[s]] = (state[0], None)

# If any of our streams are completely done, lets get new ones and rebuild
if iterateAndBuild:
states = nextIteration(states)
outputStreams = dict(activeOutputStreams(states))

Thursday, November 12, 2009

Friday links - 2009/11/13

Smashing the Mega-d/Ozdok botnet in 24 hours - Pretty neat story of taking down a large botnet

3D Mandelbrot Set - purdy pictures

Google can render LaTeX math - Very nice feature to easily get proper mathematical equations in your webpages

Interview with John Hughes - John Hughes talking about Erlang and Haskell, I need to check out QuickCheck it sounds rad. I really like the end of this when he asks where the next order of magnitude in decreasing code size will come from.

Python Language Moratorium - A moratorium has been put on the Python language so that other implementations can catch up. This might mean we'll get an implementation that makes some aggressive optimizations

Cool Haskell Function - Shows off how simple and powerful Haskell can be

Philosophizing about Programming - MarkCC talks about his experiences with Haskell and why he thinks functional languages are great for building large systems

Google Go Language - Another MarkCC post. Go has made a lot of noise this week, it looks like it has promise

Friday, November 6, 2009

Friday Links - 2009/11/06

I thought I'd add a weekly list of links that I found interesting over the course of the week. Here is the first installment. Some of these links may be old, they are presented in the week that I get to them, not necessarily the week they hit the 'tubes.

Blue Brain Project - A project to simulate a brain, pretty neat.

Where smoking kills most people - In developed countries, surpriiiiiiiise.

Shazam - Not Magic After All - How that Shazam program works, pretty neat.

F# and workflows - A short blurb + links about F# and workflows (which are monads). F# seems to have some pretty neat stuff.

Algae and Light Help Injured Mice Walk Again - Pretty amazing article on 'optigenetics".

Machine Learning - Stanford - Videos of Stanford course on Machine Learning (according to klafka the best part of stats).

Thursday, October 22, 2009

Suggestions On Debugging Distributed Apps?


So far distributed apps seem to be the best use case for why it is important to be able to walk through code and do it by hand with a pencil and paper. I have been writing analytics code that runs on a cluster for close to a year and still have not been able to figure out a good way of debugging the code that runs on it.




Being able to bring it down to a testcase works well in some situations but I have found it ineffective in the long run. When you are dealing with a few terabytes of data it can be difficult to cut the problem down to a few megs of input data that you can reason about. While the framework I am using lets me run code on a single process so I can debug it, it is again not always possible to bring the input data down small enough that it is manageable on a single process. On top of that, the behavior you are trying to debug may not show up on such a simple case. It's pretty impossible to use a traditional debugger if the bug only shows up in a few megs of data when you are dealing with a terabyte, how can you step through a 6 hour run if it only shows up on a small portion of that data? How do you predict what machine that bad data will run on if you don't even know what the bad data is?




I have so far been using three methods to debug a distributed app.


  • If I have some idea what the bad data looks like, I'll look for some approximation of it in the input data and throw an exception with the debugging info I'm interested in. Exceptions make their way back to the main console of where the jobs are run from so I can see them easy enough.

  • Use standard IO. The annoying part is the framework I use only logs this out to local files on the machine it ran on, so I have to wrap the grep in an ssh call to each machine. It's also sometimes difficult to know what to even print out. Too much printing can also significantly reduce performance and even fill up the disk with logging data. Logging data as big as the input data is not any easy to comb through.

  • Just reading through the code by hand and working it out with pencil and paper. This has only really worked if I can get a good mental image of what the input is like. If I have a terabyte of input from untrusted sources it can be quite freeform and difficult to reason about as I walk through the code by hand.





Right now I am experiencing a bug that I have been unsuccessful in tracking down. The problem is it only shows up when I use it on huge amounts of data. I have two ways of doing the same thing. One of them takes way too long and the other takes a very short time, and the short way is not producing the same numbers as the long way. It is off by only about 500 million in 5 billion rows of input and it is being a very frustrating bug to track down. The main problem is that I cannot produce a smaller set of input files to cause the issue. Runs take over 6 hours with the input file I need, so a single test can take almost my entire work day. If anyone has any suggestions, please let me know.

Sunday, October 18, 2009

Sneaking Complexity - Being Asynchronous


I went out to lunch with some coworkers last week and we got to talking about the project they are working on. They had done a great job of implementing the app which pretty much makes up the backbone of our company. They rewrote it in Java and have gotten orders of magnitude better performance. The app needs access to various pieces of persistent data quickly and they have created several data-clusters that perform amazingly. And the code is very clean and easy to read. The problem they are running into though is the client code for the data-clusters.




One of the main reasons that Java was selected as a language to write in is the simplicity of reading and writing it. It is fairly easy to jump into a method and, with a good IDE, figure out what it is doing. It is simple. The problems they are running into, though, is when doing things Teh Java Way isn't working out for the volume and latency they are trying to reach. Specifically, they are hitting issues when they query the data-cluster. Generally when they decide they need to hit the data-cluster they do a query and if they don't get a response back in some timeout period they go on as if they have a new record. In this case, being correct about the data all the time is not very important. But they are getting over the number of timed out queries they feel comfortable with (there is also a concern that the timeouts on the library they are using are not working properly). What they would like to do is handle hitting the data-cluster asynchronously. When they think they are going to need data, they send their query in, go do some more processing, then check to see if the data is there and continue on as if a new record if the timeout is reached. The problem is, this really sucks so far in Java. Being asynchronous is hard if you haven't designed your application around it in the first place. You pretty much have to fire up a new thread for every one of these queries you want to do. When considering a single case, this doesn't sound too bad, but the JVM uses OS threads so if you are already pushing your app to the limit, in the worst case doubling or tripling the number of threads it needs is not going to help. You also have increased the complexity of your application. Most of the threading in this application doesn't really need to share any data, each request is off doing its own thing and rarely hitting shared objects. But in this case, sharing the result of the query will need to share some data. It may not be much but it is added complexity. On top of that, there might be a performance hit in terms of GC. I'm not an expert in the JVM GC but shared memory means you might have to walk the entire heap in order to clean up the objects created by the query thread.




This brings me to something that is so great about Erlang. Accomplishing this is trivial. Since messages in Erlang are asynchronous, you simply send your message to query the data-cluster at the beginning of your block, then do whatever work you'd like, and receive it when you are good and ready. A receive block can take a timeout so you don't have any issues there. Doing things asynchronously is how Erlang works pretty much from the get-go. Erlang processes also have their own heap so cleaning up after one is fairly light on the GC, just throw away that process's heap.




To be fair, I am certain that had they implemented this in Erlang they would have run into their own fair share of issues. No language is perfect and Erlang certainly is no exception. But the application in question is basically what Erlang was designed for and is good at. There are also other issues that we talked that they would benefit from had they used Erlang too that I did not talk about. But this is a pattern that seems common in my short career as a developer. People look at the project they are solving, then look at whatever tools solve all the easy problems fast but in the end leave them in a worse state when it comes to solving the harder problems. Those tools that get you 70% of the way there have forced you to design your app so that it even harder to solve that 30%. This happened at my previous employer. A framework was chosen that implemented all the simple things they wanted to solve but they had now inherited this framework whose design was not made to solve the more complex problems. In the end they had to rewrite a bunch of the framework to get what they wanted. I'm sure that my coworkers are going to be able to solve this problem in Java (they have no choice) and perhaps there is a Java-way that this should have been done and I am sure that had they implemented this in Erlang there would still be problems being discussed over lunch. But I feel confident that they would be frustration Erlang records, or syntax, or strings, not with the sneaking complexity of trying to get it to do things asynchronously (and don't even get me started on fault-tolerance).