Friday, October 17, 2008

Simple example of Threads in Python

The first time it was immediately obvious to me that there would be a significant gain from 'threading' a program I had written - was in the context of screen scraping. I had a handful of HTTP GET requests from almost 20 pages that were being processed one... after... the... other. I realized of course that if I would just start the next request before waiting on the last one to finish - the entire process would be over much more quickly.

In this example the screen scraping 'worker' function is replaced with a simpler 'random wait' function:

#! /usr/bin/env python

import sys
import threading
import time
import random

# The worker function does the processing
def worker(arg):
arg = random.randint(2,10)
time.sleep(arg)
return arg

# The myThreadObj wraps the worker function in a thread
class myThreadObj(threading.Thread):
def __init__(self, arg):
threading.Thread.__init__(self)
self.arg = arg
self.value = 0
def run(self):
self.value = worker(self.arg)
print 'Thread %d Ended.' % self.arg

# my array of arguments to be processed by the worker function
myArgs = range(5)

# create a myThreadObj to process each argument
myThreadList = []
for i in myArgs:
myThreadList.append(myThreadObj(i))
# and start it immediately
myThreadList[i].start()

# wait for all threads to finish
for each in myThreadList:
each.join()

print 'All threads have completed.'

for i in myArgs:
print "myThreadList[%d] = %d" % (i, myThreadList[i].value)

The myThreadObj wrapper should accept whatever arguments you normally pass to the worker, and when the worker is completed - it will store the returned value in 'self.value'

The .join() function blocks until the .isAlive() method would return false. I process each thread iteratively to verify that all have completed. It doesn't matter if .join() blocks for 8 seconds while it's waiting on the first thread, or if it gets to a thread that's already been completed for 6 seconds cause an earlier .join was waiting on a previous thread that took longer. The point is that, by the time all of the .join() statements complete - ALL THREADS HAVE FINISHED.

Once the threads are done we expect myThreadObj.value to contain the return value of the worker function.

If your 'worker' function is something like an API call, or database query - anything with some built in lag from a system that's designed to serve multiple simultaneous requests - as long as you can queue them up - threading will provide a significant improvement.

e.g.

clayg@m-net:~$ cat nonthread.py
#! /usr/bin/env python

import sys
import threading
import time
import random

# The worker function does the processing
def worker(arg):
arg = random.randint(2,10)
time.sleep(arg)
return arg

myArgs = range(5)

for i in myArgs:
print "myThreadList[%d] = %d" % (i, worker(i))
clayg@m-net:~$ time ./nonthread.py ; echo ; time ./simplethread.py
myThreadList[0] = 5
myThreadList[1] = 5
myThreadList[2] = 10
myThreadList[3] = 2
myThreadList[4] = 9

real 0m31.073s
user 0m0.045s
sys 0m0.024s

Thread 4 Ended.
Thread 0 Ended.
Thread 1 Ended.
Thread 2 Ended.
Thread 3 Ended.
All threads have completed.
myThreadList[0] = 5
myThreadList[1] = 10
myThreadList[2] = 10
myThreadList[3] = 10
myThreadList[4] = 2

real 0m10.078s
user 0m0.045s
sys 0m0.030s

No comments: