Introduction
The purpose of this post is to give a quick and straight-forward introduction to solving embarrassingly parallel problems in a distributed manner with IPython. If you only want to run parallel code on a single machine, you might want to consider Joblib instead, which has a much simpler interface.
Full disclosure: I’m very much a beginner in both IPython and distributed computing, so don’t take anything in this tutorial for gospel. Any and all constructive criticism is welcome in the comments!
There are already a few tutorials out there which may serve you better, and this is of course not going to be as complete as the IPython parallel documentation, but I hope to help you get up and running as quickly as possible, and in particular also handle the case where at least one of the nodes is running Windows.
Before we begin, here’s a quick word about what IPython actually is, taken from the documentation:
One of Python’s most useful features is its interactive interpreter. It allows for very fast testing of ideas without the overhead of creating test files as is typical in most programming languages. However, the interpreter supplied with the standard Python distribution is somewhat limited for extended interactive use.
The goal of IPython is to create a comprehensive environment for interactive and exploratory computing. To support this goal, IPython has three main components:
- An enhanced interactive Python shell.
- A decoupled two-process communication model, which allows for multiple clients to connect to a computation kernel, most notably the web-based notebook.
- An architecture for interactive parallel computing.
Installation
First things first, you of course need to install IPython on all the machines you want to run it on. See here for the instructions, and make sure to install the dependencies for IPython.parallel as well, which is a bit further down the page. I personally used Enthough Canopyy for installation on windows, homebrew and pip for OS X, and the NueroDebian repositories on Linux. Whatever method you use, make sure to install the same version of IPython on all the computers.
If you wish to run the final example in this tutorial, you’ll also need to install the SciPy stack and scikit-learn, but these are not essential for using IPython for distributed computing.
Set up
Once you’ve installed IPython, the first thing you’ll need to do is create a profile. Run the following command on all the computers you wish to use:
ipython profile create --parallel
This will create a default profile in the subdirectory .ipython/profile_default/
of your home directory, and add the necessary configuration files for creating a cluster. IPython allows you to have multiple profiles, but for simplicity here we’re going to assume you want the cluster profile to be your default. You can read more about IPython profiles if you wish.
At this point it is useful for us to introduce a bit of terminology. A controller node is the computer you want to run the computations from, and an engine node is a computer you want to run computations on. A node can be both a controller and an engine, but there should only be one controller in the cluster.
So with that in mind, let’s run the following command on the controller node:
ipcontroller --reuse --ip=*
If you run into any connection problems later on in this tutorial, try substituting your controller’s external IP address for *
in the above command. This address will of course need to be accessible by all engine nodes.
The above command will create some files in .ipython/profile_default/security/
. Copy those files to the same directory on your engine nodes. This is the reason we gave the --reuse
flag to ipcontroller, without it you would need to copy these files over every time you wanted to start up your cluster. Note that this may have some negative security implications if you’re not on a trusted network, in which case you probably want to read the IPython documentation on the matter.
Okay, now that you’ve copied over the generated security files, you’re ready to start your engines! On each engine node run the following:
ipengine
This will only start a single process though, if you have a multi-core machine then you’ll want to start as many ipengine
s as you have cores. If your CPU has hyper-threading, you can start one ipengine
per virtual core if you wish, but I’d suggest timing the execution of your program with and without using virtual cores just to be sure it’s improving performance.
No engine nodes running Windows?
If none of your engine nodes are running Windows then the simpler option for starting your cluster is to use IPython’s SSH support. Consult the documentation for more details.
Are we there yet?
Let’s check to make sure everything is working so far. Run the following commands:
In [1]: from IPython.parallel import Client In [2]: rc = Client() In [3]: print rc.ids Out[3]: [0, 1, 2, 3, 4, 5, 6, 7, 8]
A few things to note before we continue: the above should work regardless of whether it is run as a script, from an IPython shell or from a regular Python shell. However if you’re using a non-default profile and running it outside of the IPython shell you’ll need to pass the profile name as a keyword parameter to the Client
constructor. Also, rc.ids
should contain as many entries as ipengine
s you’ve started.
The Client
class is your gateway to accessing your ipcontroller
in Python code. From it we create “views” which allow us to actually execute things on our engine nodes. Let’s create the first type of view we’ll see in this tutorial, a “direct view”:
In [4]: dview = rc[:]
This creates a direct view that uses all the engine nodes in our cluster. Let’s run something on them:
In [5]: def square(x): ...: return x**2 ...: In [6]: squares = dview.map_sync(square, range(20)) In [7]: print squares [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
What has happened above is the function square
was sent to each node, along with an equal portion of the input. Since we called dview.map_sync()
instead of dview.map_async()
it is a blocking call for the client and the result is returned directly, which in this case we assigned to squares
.
Here is an example using dview.map_async()
instead:
In [8]: import time In [9]: def stall(x): ...: time.sleep(x) ...: return x ...: In [10]: ar = dview.map_async(stall, [5] * 8)
The above call returns immediately with an AsyncResult
object. We can do a non-blocking check to see if the results are ready yet:
In [11]: ar.ready() Out[11]: True
Hmmmm… that didn’t feel like 5 seconds already, but okay. Lets get our results!
In [12]: ar.result [0:apply]: --------------------------------------------------------------------------- NameError Traceback (most recent call last) in () in stall(x) NameError: global name 'time' is not defined [2:apply]: --------------------------------------------------------------------------- NameError Traceback (most recent call last) in () in stall(x) NameError: global name 'time' is not defined [3:apply]: --------------------------------------------------------------------------- NameError Traceback (most recent call last) in () in stall(x) NameError: global name 'time' is not defined [4:apply]: --------------------------------------------------------------------------- NameError Traceback (most recent call last) in () in stall(x) NameError: global name 'time' is not defined ... 4 more exceptions ...
Ah, that’s why it was so quick. This brings us to an important point on distributed programming with IPython: code dependencies are not transferred for you! This includes modules, functions, and variables
Distributing data and code
Let’s fix things so our previous example works:
In [13]: with dview.sync_imports(): ...: import time ...: importing time on engine(s) In [14]: ar = dview.map_async(stall, [5] * 8) In [15]: ar.ready() Out[15]: False In [16]: ar.wait() In [17]: results = ar.result In [18]: print results [5, 5, 5, 5, 5, 5, 5, 5]
Much better. The dview.sync_imports()
context manager executes import statements on all engines. The wait()
call does what you’d expect: blocks until the results are ready.
Here’s how we send variables to engines:
In [19]: def add_x(i): ...: return x + i ...: In [20]: dview.push(dict(x=5), block=True) Out[20]: [None, None, None, None, None, None, None, None] In [21]: print dview.map_sync(add_x, range(5)) Out[21]: [5, 6, 7, 8, 9]
We’ll also need to send any local functions:
In [22]: def square_and_add_x(i): ...: return add_x(i**2) ...: In [23]: dview.push(dict(add_x=add_x), block=True) Out[23]: [None, None, None, None, None, None, None, None] In [24]: print dview.map_sync(square_and_add_x, [1,2,2,3,5,8]) [6, 9, 9, 14, 30, 69]
Load balancing
As was touched on above, a direct view splits the input into equal sized chunks and distributes them evenly across the running ipengine
s. For a homogeneous cluster this isn’t a problem, but what if some nodes are faster than others? The good news is, IPython also features load balancing! And it’s just as easy to use as the direct view. Here’s how we create a load balancing view over all engines:
In [25]: lview = rc.load_balanced_view() In [26]: lview.map_sync(square_and_add_x, xrange(5)) Out[26]: [5, 6, 9, 14, 21]
And that’s all there is to it! Neat, huh? Of note here is that even though we pushed the variable x
and the function add_x()
to the engines via the direct view, they were still available using the load balanced view.
A machine learning example
Let’s finish up this tutorial with a real world example. Cross-validation of hyper-parameters as used in machine learning is an example of an embarrassingly parallel problem. This can be particularly time consuming, so throwing as many cores as possible at the problem makes sense. Unlike our previous examples, we’ll be running this as a standalone Python script. See the code comments for explanations of a few new features:
from IPython.parallel import Client | |
rc = Client() | |
dview = rc[:] | |
lview = rc.load_balanced_view() | |
with dview.sync_imports(): | |
import numpy | |
from sklearn import svm | |
import sys | |
import time | |
from sklearn import cross_validation | |
from sklearn import preprocessing | |
from sklearn import datasets | |
# We use a decorator here, to give us a nicer syntax for calling map. | |
# block=False means svm_params_crossval.map will return an AsyncResult | |
# chunksize=1 means that each engine will only receive one bit of a data at a time. | |
# You will want to play with this setting to see what gives you the best results. | |
@lview.parallel(block=False, chunksize=1) | |
def svm_params_crossval(indexes): | |
train_idx, crossval_idx = indexes | |
X_train = X[train_idx] | |
y_train = y[train_idx] | |
X_crossval = X[crossval_idx] | |
y_crossval = y[crossval_idx] | |
crossval_err = numpy.zeros((C_range.size, gamma_range.size)) | |
for i, C in enumerate(C_range): | |
for j, gamma in enumerate(gamma_range): | |
clf = svm.SVC(C=C, gamma=gamma) | |
clf.fit(X_train, y_train) | |
crossval_err[i, j] = 1. - clf.score(X_crossval, y_crossval) | |
return crossval_err | |
def short_format_time(t): | |
if t > 60: | |
return "%4.1fmin" % (t / 60.) | |
else: | |
return " %5.1fs" % t | |
def wait_progress(ar, interval=5, timeout=-1): | |
"""Wait on an IPython AsyncResult, printing progress to stdout. | |
Based on wait_interactive() in IPython and the output of Joblib in verbose mode.parallel | |
This will work best when using a load-balanced view with a smallish chunk-size. | |
""" | |
if timeout is None: | |
timeout = -1 | |
N = len(ar) | |
tic = time.time() | |
print "\nRunning %i tasks:" % N | |
sys.stdout.flush() | |
last = 0 | |
while not ar.ready() and (timeout < 0 or time.time() - tic <= timeout): | |
ar.wait(interval) | |
progress, elapsed = ar.progress, ar.elapsed | |
if progress > last: | |
last = progress | |
remaining = elapsed * (float(N) / progress - 1.) | |
print ' Done %4i out of %4i | elapsed: %s remaining: %s' % ( | |
progress, N, short_format_time(elapsed), short_format_time(remaining)) | |
sys.stdout.flush() | |
if ar.ready(): | |
try: | |
speedup = round(100.0 * ar.serial_time / ar.wall_time) | |
print "\nParallel speedup: %i%%" % speedup | |
# For some reason ar.serial_time occasionally throws this exception. | |
# We choose to ignore it and just not display the speedup factor. | |
except TypeError: | |
pass | |
def main(): | |
# Load a "toy" data set | |
iris = datasets.load_iris() | |
X = preprocessing.scale(iris.data) | |
y = iris.target | |
# Set the range hyperparameters we want to search | |
C_range = 10. ** numpy.arange(-2, 9) | |
gamma_range = 10. ** numpy.arange(-5, 4) | |
# Send out the data to the engines via the direct view | |
dview.push(dict(X=X, y=y, C_range=C_range, gamma_range=gamma_range), block=True) | |
# Run svm_params_crossval in parallel. Note the nice syntax afforded by using | |
# the @lview.parallel decorator. This is equivalent to: | |
# ar = lview.map_async(svm_params_crossval, cross_validation.LeaveOneOut(len(y)), chunksize=1) | |
ar = svm_params_crossval.map(cross_validation.LeaveOneOut(len(y))) | |
try: | |
# Busy waiting on results, to give nice progress updates | |
wait_progress(ar) | |
# Handle ctrl-c by aborting jobs before exiting. If we didn't do this, the tasks would | |
# keep running to completion. | |
except KeyboardInterrupt: | |
print "Aborting..." | |
sys.stdout.flush() | |
ar.abort() | |
sys.exit() | |
# get the actual results | |
results = ar.result | |
# Average the results and convert to percent | |
crossval_err = 100. * numpy.mean(results, axis=0) | |
# find the C and gamma that gave us the lowest average cross-validation error | |
min_idx = crossval_err.argmin() | |
C_idx, gamma_idx = numpy.unravel_index(min_idx, crossval_err.shape) | |
C_best = C_range[C_idx] | |
gamma_best = gamma_range[gamma_idx] | |
err_best = crossval_err[C_idx, gamma_idx] | |
print "\nBest: C = %s, gamma = %s, err = %s%%\n" % (C_best, gamma_best, err_best) | |
numpy.set_printoptions(precision=2, linewidth=120) | |
print crossval_err | |
if __name__ == '__main__': | |
# Track the overall time of computation | |
start_time = time.time() | |
main() | |
end_time = time.time() | |
print "\nTotal time: %s" % short_format_time(end_time - start_time) |
Running the above script gives the following output on my machine:
importing numpy on engine(s) importing svm from sklearn on engine(s) in sync results Running 150 tasks: Done 72 out of 150 | elapsed: 5.3s remaining: 5.8s Done 138 out of 150 | elapsed: 10.3s remaining: 0.9s Done 150 out of 150 | elapsed: 11.3s remaining: 0.0s Parallel speedup: 775% Best: C = 1000000.0, gamma = 0.0001, err = 2.0% [[ 100. 100. 100. 100. 100. 100. 100. 100. 100. ] [ 100. 100. 100. 92. 12. 6. 100. 100. 100. ] [ 100. 100. 79.33 10. 2.67 5.33 11.33 49.33 63.33] [ 100. 78.67 10. 4. 4. 6. 11.33 48. 63.33] [ 78. 10. 4. 3.33 4.67 6.67 11.33 48. 63.33] [ 10. 4. 3.33 4.67 6.67 6.67 11.33 48. 63.33] [ 4. 3.33 3.33 4.67 7.33 6.67 11.33 48. 63.33] [ 3.33 3.33 2.67 4.67 7.33 6.67 11.33 48. 63.33] [ 3.33 2. 3.33 6.67 7.33 6.67 11.33 48. 63.33] [ 4. 4. 4.67 6. 7.33 6.67 11.33 48. 63.33] [ 4.67 4. 6. 6. 7.33 6.67 11.33 48. 63.33]] Total time: 11.3s
That’s all folks!
Thanks for reading my blog post, and I hope it’s helped. Feel free to leave any questions or constructive criticism as a comment.