Distributed Computing with IPython

Introduction

The purpose of this post is to give a quick and straight-forward introduction to solving embarrassingly parallel problems in a distributed manner with IPython. If you only want to run parallel code on a single machine, you might want to consider Joblib instead, which has a much simpler interface.

Full disclosure: I’m very much a beginner in both IPython and distributed computing, so don’t take anything in this tutorial for gospel. Any and all constructive criticism is welcome in the comments!

There are already a few tutorials out there which may serve you better, and this is of course not going to be as complete as the IPython parallel documentation, but I hope to help you get up and running as quickly as possible, and in particular also handle the case where at least one of the nodes is running Windows.

Before we begin, here’s a quick word about what IPython actually is, taken from the documentation:

One of Python’s most useful features is its interactive interpreter. It allows for very fast testing of ideas without the overhead of creating test files as is typical in most programming languages. However, the interpreter supplied with the standard Python distribution is somewhat limited for extended interactive use.

The goal of IPython is to create a comprehensive environment for interactive and exploratory computing. To support this goal, IPython has three main components:

  • An enhanced interactive Python shell.
  • A decoupled two-process communication model, which allows for multiple clients to connect to a computation kernel, most notably the web-based notebook.
  • An architecture for interactive parallel computing.

Installation

First things first, you of course need to install IPython on all the machines you want to run it on. See here for the instructions, and make sure to install the dependencies for IPython.parallel as well, which is a bit further down the page. I personally used Enthough Canopyy for installation on windows, homebrew and pip for OS X, and the NueroDebian repositories on Linux. Whatever method you use, make sure to install the same version of IPython on all the computers.

If you wish to run the final example in this tutorial, you’ll also need to install the SciPy stack and scikit-learn, but these are not essential for using IPython for distributed computing.

Set up

Once you’ve installed IPython, the first thing you’ll need to do is create a profile. Run the following command on all the computers you wish to use:

ipython profile create --parallel

This will create a default profile in the subdirectory .ipython/profile_default/ of your home directory, and add the necessary configuration files for creating a cluster. IPython allows you to have multiple profiles, but for simplicity here we’re going to assume you want the cluster profile to be your default. You can read more about IPython profiles if you wish.

At this point it is useful for us to introduce a bit of terminology. A controller node is the computer you want to run the computations from, and an engine node is a computer you want to run computations on. A node can be both a controller and an engine, but there should only be one controller in the cluster.

So with that in mind, let’s run the following command on the controller node:

ipcontroller --reuse --ip=*

If you run into any connection problems later on in this tutorial, try substituting your controller’s external IP address for * in the above command. This address will of course need to be accessible by all engine nodes.

The above command will create some files in .ipython/profile_default/security/. Copy those files to the same directory on your engine nodes. This is the reason we gave the --reuse flag to ipcontroller, without it you would need to copy these files over every time you wanted to start up your cluster. Note that this may have some negative security implications if you’re not on a trusted network, in which case you probably want to read the IPython documentation on the matter.

Okay, now that you’ve copied over the generated security files, you’re ready to start your engines! On each engine node run the following:

ipengine

This will only start a single process though, if you have a multi-core machine then you’ll want to start as many ipengines as you have cores. If your CPU has hyper-threading, you can start one ipengine per virtual core if you wish, but I’d suggest timing the execution of your program with and without using virtual cores just to be sure it’s improving performance.

No engine nodes running Windows?

If none of your engine nodes are running Windows then the simpler option for starting your cluster is to use IPython’s SSH support. Consult the documentation for more details.

Are we there yet?

Let’s check to make sure everything is working so far. Run the following commands:

In [1]: from IPython.parallel import Client

In [2]: rc = Client()

In [3]: print rc.ids
Out[3]: [0, 1, 2, 3, 4, 5, 6, 7, 8]

A few things to note before we continue: the above should work regardless of whether it is run as a script, from an IPython shell or from a regular Python shell. However if you’re using a non-default profile and running it outside of the IPython shell you’ll need to pass the profile name as a keyword parameter to the Client constructor. Also, rc.ids should contain as many entries as ipengines you’ve started.

The Client class is your gateway to accessing your ipcontroller in Python code. From it we create “views” which allow us to actually execute things on our engine nodes. Let’s create the first type of view we’ll see in this tutorial, a “direct view”:

In [4]: dview = rc[:]

This creates a direct view that uses all the engine nodes in our cluster. Let’s run something on them:

In [5]: def square(x):
   ...:     return x**2
   ...: 

In [6]: squares = dview.map_sync(square, range(20))

In [7]: print squares
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]

What has happened above is the function square was sent to each node, along with an equal portion of the input. Since we called dview.map_sync() instead of dview.map_async() it is a blocking call for the client and the result is returned directly, which in this case we assigned to squares.

Here is an example using dview.map_async() instead:

In [8]: import time

In [9]: def stall(x):
   ...:     time.sleep(x)
   ...:     return x
   ...: 

In [10]: ar = dview.map_async(stall, [5] * 8)

The above call returns immediately with an AsyncResult object. We can do a non-blocking check to see if the results are ready yet:

In [11]: ar.ready()
Out[11]: True

Hmmmm… that didn’t feel like 5 seconds already, but okay. Lets get our results!

In [12]: ar.result
[0:apply]: 
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last) in ()
 in stall(x)
NameError: global name 'time' is not defined

[2:apply]: 
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last) in ()
 in stall(x)
NameError: global name 'time' is not defined

[3:apply]: 
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last) in ()
 in stall(x)
NameError: global name 'time' is not defined

[4:apply]: 
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last) in ()
 in stall(x)
NameError: global name 'time' is not defined

... 4 more exceptions ...

Ah, that’s why it was so quick. This brings us to an important point on distributed programming with IPython: code dependencies are not transferred for you! This includes modules, functions, and variables

Distributing data and code

Let’s fix things so our previous example works:

In [13]: with dview.sync_imports():
    ...:     import time
    ...:     
importing time on engine(s)

In [14]: ar = dview.map_async(stall, [5] * 8)

In [15]: ar.ready()
Out[15]: False

In [16]: ar.wait()

In [17]: results = ar.result

In [18]: print results
[5, 5, 5, 5, 5, 5, 5, 5]

Much better. The dview.sync_imports() context manager executes import statements on all engines. The wait() call does what you’d expect: blocks until the results are ready.

Here’s how we send variables to engines:

In [19]: def add_x(i):
    ...:     return x + i
    ...: 

In [20]: dview.push(dict(x=5), block=True)
Out[20]: [None, None, None, None, None, None, None, None]

In [21]: print dview.map_sync(add_x, range(5))
Out[21]: [5, 6, 7, 8, 9]

We’ll also need to send any local functions:

In [22]: def square_and_add_x(i):
    ...:     return add_x(i**2)
    ...: 

In [23]: dview.push(dict(add_x=add_x), block=True)
Out[23]: [None, None, None, None, None, None, None, None]

In [24]: print dview.map_sync(square_and_add_x, [1,2,2,3,5,8])
[6, 9, 9, 14, 30, 69]

Load balancing

As was touched on above, a direct view splits the input into equal sized chunks and distributes them evenly across the running ipengines. For a homogeneous cluster this isn’t a problem, but what if some nodes are faster than others? The good news is, IPython also features load balancing! And it’s just as easy to use as the direct view. Here’s how we create a load balancing view over all engines:

In [25]: lview = rc.load_balanced_view()

In [26]: lview.map_sync(square_and_add_x, xrange(5))
Out[26]: [5, 6, 9, 14, 21]

And that’s all there is to it! Neat, huh? Of note here is that even though we pushed the variable x and the function add_x() to the engines via the direct view, they were still available using the load balanced view.

A machine learning example

Let’s finish up this tutorial with a real world example. Cross-validation of hyper-parameters as used in machine learning is an example of an embarrassingly parallel problem. This can be particularly time consuming, so throwing as many cores as possible at the problem makes sense. Unlike our previous examples, we’ll be running this as a standalone Python script. See the code comments for explanations of a few new features:

from IPython.parallel import Client
rc = Client()
dview = rc[:]
lview = rc.load_balanced_view()
with dview.sync_imports():
import numpy
from sklearn import svm
import sys
import time
from sklearn import cross_validation
from sklearn import preprocessing
from sklearn import datasets
# We use a decorator here, to give us a nicer syntax for calling map.
# block=False means svm_params_crossval.map will return an AsyncResult
# chunksize=1 means that each engine will only receive one bit of a data at a time.
# You will want to play with this setting to see what gives you the best results.
@lview.parallel(block=False, chunksize=1)
def svm_params_crossval(indexes):
train_idx, crossval_idx = indexes
X_train = X[train_idx]
y_train = y[train_idx]
X_crossval = X[crossval_idx]
y_crossval = y[crossval_idx]
crossval_err = numpy.zeros((C_range.size, gamma_range.size))
for i, C in enumerate(C_range):
for j, gamma in enumerate(gamma_range):
clf = svm.SVC(C=C, gamma=gamma)
clf.fit(X_train, y_train)
crossval_err[i, j] = 1. - clf.score(X_crossval, y_crossval)
return crossval_err
def short_format_time(t):
if t > 60:
return "%4.1fmin" % (t / 60.)
else:
return " %5.1fs" % t
def wait_progress(ar, interval=5, timeout=-1):
"""Wait on an IPython AsyncResult, printing progress to stdout.
Based on wait_interactive() in IPython and the output of Joblib in verbose mode.parallel
This will work best when using a load-balanced view with a smallish chunk-size.
"""
if timeout is None:
timeout = -1
N = len(ar)
tic = time.time()
print "\nRunning %i tasks:" % N
sys.stdout.flush()
last = 0
while not ar.ready() and (timeout < 0 or time.time() - tic <= timeout):
ar.wait(interval)
progress, elapsed = ar.progress, ar.elapsed
if progress > last:
last = progress
remaining = elapsed * (float(N) / progress - 1.)
print ' Done %4i out of %4i | elapsed: %s remaining: %s' % (
progress, N, short_format_time(elapsed), short_format_time(remaining))
sys.stdout.flush()
if ar.ready():
try:
speedup = round(100.0 * ar.serial_time / ar.wall_time)
print "\nParallel speedup: %i%%" % speedup
# For some reason ar.serial_time occasionally throws this exception.
# We choose to ignore it and just not display the speedup factor.
except TypeError:
pass
def main():
# Load a "toy" data set
iris = datasets.load_iris()
X = preprocessing.scale(iris.data)
y = iris.target
# Set the range hyperparameters we want to search
C_range = 10. ** numpy.arange(-2, 9)
gamma_range = 10. ** numpy.arange(-5, 4)
# Send out the data to the engines via the direct view
dview.push(dict(X=X, y=y, C_range=C_range, gamma_range=gamma_range), block=True)
# Run svm_params_crossval in parallel. Note the nice syntax afforded by using
# the @lview.parallel decorator. This is equivalent to:
# ar = lview.map_async(svm_params_crossval, cross_validation.LeaveOneOut(len(y)), chunksize=1)
ar = svm_params_crossval.map(cross_validation.LeaveOneOut(len(y)))
try:
# Busy waiting on results, to give nice progress updates
wait_progress(ar)
# Handle ctrl-c by aborting jobs before exiting. If we didn't do this, the tasks would
# keep running to completion.
except KeyboardInterrupt:
print "Aborting..."
sys.stdout.flush()
ar.abort()
sys.exit()
# get the actual results
results = ar.result
# Average the results and convert to percent
crossval_err = 100. * numpy.mean(results, axis=0)
# find the C and gamma that gave us the lowest average cross-validation error
min_idx = crossval_err.argmin()
C_idx, gamma_idx = numpy.unravel_index(min_idx, crossval_err.shape)
C_best = C_range[C_idx]
gamma_best = gamma_range[gamma_idx]
err_best = crossval_err[C_idx, gamma_idx]
print "\nBest: C = %s, gamma = %s, err = %s%%\n" % (C_best, gamma_best, err_best)
numpy.set_printoptions(precision=2, linewidth=120)
print crossval_err
if __name__ == '__main__':
# Track the overall time of computation
start_time = time.time()
main()
end_time = time.time()
print "\nTotal time: %s" % short_format_time(end_time - start_time)

Running the above script gives the following output on my machine:

importing numpy on engine(s)
importing svm from sklearn on engine(s)
in sync results 

Running 150 tasks:
    Done   72 out of  150 | elapsed:    5.3s remaining:    5.8s
    Done  138 out of  150 | elapsed:   10.3s remaining:    0.9s
    Done  150 out of  150 | elapsed:   11.3s remaining:    0.0s

Parallel speedup: 775%

Best: C = 1000000.0, gamma = 0.0001, err = 2.0%

[[ 100.    100.    100.    100.    100.    100.    100.    100.    100.  ]
 [ 100.    100.    100.     92.     12.      6.    100.    100.    100.  ]
 [ 100.    100.     79.33   10.      2.67    5.33   11.33   49.33   63.33]
 [ 100.     78.67   10.      4.      4.      6.     11.33   48.     63.33]
 [  78.     10.      4.      3.33    4.67    6.67   11.33   48.     63.33]
 [  10.      4.      3.33    4.67    6.67    6.67   11.33   48.     63.33]
 [   4.      3.33    3.33    4.67    7.33    6.67   11.33   48.     63.33]
 [   3.33    3.33    2.67    4.67    7.33    6.67   11.33   48.     63.33]
 [   3.33    2.      3.33    6.67    7.33    6.67   11.33   48.     63.33]
 [   4.      4.      4.67    6.      7.33    6.67   11.33   48.     63.33]
 [   4.67    4.      6.      6.      7.33    6.67   11.33   48.     63.33]]

Total time:   11.3s

That’s all folks!

Thanks for reading my blog post, and I hope it’s helped. Feel free to leave any questions or constructive criticism as a comment.

Leave a Reply

Your email address will not be published.