Using Execnet for Parallel and Distributed Processing with NLTK

Jacob Perkins

November 2010


Python Text Processing with NLTK 2.0 Cookbook

Python Text Processing with NLTK 2.0 Cookbook

Use Python's NLTK suite of libraries to maximize your Natural Language Processing capabilities.

  • Quickly get to grips with Natural Language Processing – with Text Analysis, Text Mining, and beyond
  • Learn how machines and crawlers interpret and process natural languages
  • Easily work with huge amounts of data and learn how to handle distributed processing
  • Part of Packt's Cookbook series: Each recipe is a carefully organized sequence of instructions to complete the task as efficiently as possible
        Read more about this book      

(For more resources on Python, see here.)


NLTK is great for in-memory single-processor natural language processing. However, there are times when you have a lot of data to process and want to take advantage of multiple CPUs, multi-core CPUs, and even multiple computers. Or perhaps you want to store frequencies and probabilities in a persistent, shared database so multiple processes can access it simultaneously. For the first case, we'll be using execnet to do parallel and distributed processing with NLTK. For the second case, you'll learn how to use the Redis data structure server/database to store frequency distributions and more.

Distributed tagging with execnet

Execnet is a distributed execution library for python. It allows you to create gateways and channels for remote code execution. A gateway is a connection from the calling process to a remote environment. The remote environment can be a local subprocess or an SSH connection to a remote node. A channel is created from a gateway and handles communication between the channel creator and the remote code.

Since many NLTK processes require 100 percent CPU utilization during computation, execnet is an ideal way to distribute that computation for maximum resource usage. You can create one gateway per CPU core, and it doesn't matter whether the cores are in your local computer or spread across remote machines. In many situations, you only need to have the trained objects and data on a single machine, and can send the objects and data to the remote nodes as needed.

Getting ready

You'll need to install execnet for this to work. It should be as simple as sudo pip install execnet or sudo easy_install execnet. The current version of execnet, as of this writing, is 1.0.8. The execnet homepage, which has API documentation and examples, is at

How to do it...

We start by importing the required modules, as well as an additional module remote_tag. py that will be explained in the next section. We also need to import pickle so we can serialize the tagger. Execnet does not natively know how to deal with complex objects such as a part-of-speech tagger, so we must dump the tagger to a string using pickle.dumps(). We'll use the default tagger that's used by the nltk.tag.pos_tag() function, but you could load and dump any pre-trained part-of-speech tagger as long as it implements the TaggerI interface.

Once we have a serialized tagger, we start execnet by making a gateway with execnet. makegateway(). The default gateway creates a Python subprocess, and we can call the remote_exec() method with the remote_tag module to create a channel. With an open channel, we send over the serialized tagger and then the first tokenized sentence of the treebank corpus.

You don't have to do any special serialization of simple types such as lists and tuples, since execnet already knows how to handle serializing the built-in types.

Now if we call channel.receive(), we get back a tagged sentence that is equivalent to the first tagged sentence in the treebank corpus, so we know the tagging worked. We end by exiting the gateway, which closes the channel and kills the subprocess.

>>> import execnet, remote_tag, nltk.tag,
>>> from nltk.corpus import treebank
>>> import cPickle as pickle
>>> tagger = pickle.dumps(
>>> gw = execnet.makegateway()
>>> channel = gw.remote_exec(remote_tag)
>>> channel.send(tagger)
>>> channel.send(treebank.sents()[0])
>>> tagged_sentence = channel.receive()
>>> tagged_sentence == treebank.tagged_sents()[0]
>>> gw.exit()

Visually, the communication process looks like this:

Using Execnet for Parallel and Distributed Processing with NLTK

How it works...

The gateway's remote_exec() method takes a single argument that can be one of the following three types:

  1. A string of code to execute remotely.
  2. The name of a pure function that will be serialized and executed remotely.
  3. The name of a pure module whose source will be executed remotely.

We use the third option with the module, which is defined as follows:

import cPickle as pickle

if __name__ == '__channelexec__':
tagger = pickle.loads(channel.receive())

for sentence in channel:

A pure module is a module that is self-contained. It can only access Python modules that are available where it executes, and does not have access to any variables or states that exist wherever the gateway is initially created. To detect that the module is being executed by execnet, you can look at the __name__ variable. If it's equal to '__channelexec__', then it is being used to create a remote channel. This is similar to doing if __name__ == '__ main__' to check if a module is being executed on the command line.

The first thing we do is call channel.receive() to get the serialized tagger, which we load using pickle.loads(). You may notice that channel is not imported anywhere—that's because it is included in the global namespace of the module. Any module that execnet executes remotely has access to the channel variable in order to communicate with the channel` creator.

Once we have the tagger, we iteratively tag() each tokenized sentence that we receive from the channel. This allows us to tag as many sentences as the sender wants to send, as iteration will not stop until the channel is closed. What we've essentially created is a compute node for part-of-speech tagging that dedicates 100 percent of its resources to tagging whatever sentences it receives. As long as the channel remains open, the node is available for processing.

There's more...

This is a simple example that opens a single gateway and channel. But execnet can do a lot more, such as opening multiple channels to increase parallel processing, as well as opening gateways to remote hosts over SSH to do distributed processing.

Multiple channels

We can create multiple channels, one per gateway, to make the processing more parallel. Each gateway creates a new subprocess (or remote interpreter if using an SSH gateway) and we use one channel per gateway for communication. Once we've created two channels, we can combine them using the MultiChannel class, which allows us to iterate over the channels, and make a receive queue to receive messages from each channel.

After creating each channel and sending the tagger, we cycle through the channels to send an even number of sentences to each channel for tagging. Then we collect all the responses from the queue. A call to queue.get() will return a 2-tuple of (channel, message) in case you need to know which channel the message came from.

If you don't want to wait forever, you can also pass a timeout keyword argument with the maximum number of seconds you want to wait, as in queue.get(timeout=4). This can be a good way to handle network errors.

Once all the tagged sentences have been collected, we can exit the gateways. Here's the code:

>>> import itertools
>>> gw1 = execnet.makegateway()
>>> gw2 = execnet.makegateway()
>>> ch1 = gw1.remote_exec(remote_tag)
>>> ch1.send(tagger)
>>> ch2 = gw2.remote_exec(remote_tag)
>>> ch2.send(tagger)
>>> mch = execnet.MultiChannel([ch1, ch2])
>>> queue = mch.make_receive_queue()
>>> channels = itertools.cycle(mch)
>>> for sentence in treebank.sents()[:4]:
... channel =
... channel.send(sentence)
>>> tagged_sentences = []
>>> for i in range(4):
... channel, tagged_sentence = queue.get()
... tagged_sentences.append(tagged_sentence)
>>> len(tagged_sentences)
>>> gw1.exit()
>>> gw2.exit()

Local versus remote gateways

The default gateway spec is popen, which creates a Python subprocess on the local machine. This means execnet.makegateway() is equivalent to execnet. makegateway('popen'). If you have passwordless SSH access to a remote machine, then you can create a remote gateway using execnet.makegateway('ssh=remotehost') where remotehost should be the hostname of the machine. A SSH gateway spawns a new Python interpreter for executing the code remotely. As long as the code you're using for remote execution is pure, you only need a Python interpreter on the remote machine.

Channels work exactly the same no matter what kind of gateway is used; the only difference will be communication time. This means you can mix and match local subprocesses with remote interpreters to distribute your computations across many machines in a network. There are many more details on gateways in the API documentation at

        Read more about this book      

(For more resources on Python, see here.)

Distributed chunking with execnet

In this recipe, we'll do chunking and tagging over an execnet gateway. This will be very similar to the tagging in the previous recipe, but we'll be sending two objects instead of one, and we will be receiving a Tree instead of a list, which requires pickling and unpickling for serialization.

Getting ready

As in the previous recipe, you must have execnet installed.

How to do it...

The setup code is very similar to the last recipe, and we'll use the same pickled tagger as well. First we'll pickle the default chunker used by nltk.chunk.ne_chunk(), though any chunker would do. Next, we make a gateway for the remote_chunk module, get a channel, and send the pickled tagger and chunker over. Then we receive back a pickled Tree, which we can unpickle and inspect to see the result. Finally, we exit the gateway.

>>> import execnet, remote_chunk
>>> import, nltk.tag, nltk.chunk
>>> import cPickle as pickle
>>> from nltk.corpus import treebank_chunk
>>> tagger = pickle.dumps(
>>> chunker = pickle.dumps(
>>> gw = execnet.makegateway()
>>> channel = gw.remote_exec(remote_chunk)
>>> channel.send(tagger)
>>> channel.send(chunker)
>>> channel.send(treebank_chunk.sents()[0])
>>> chunk_tree = pickle.loads(channel.receive())
>>> chunk_tree
Tree('S', [Tree('PERSON', [('Pierre', 'NNP')]), Tree('ORGANIZATION',
[('Vinken', 'NNP')]), (',', ','), ('61', 'CD'), ('years', 'NNS'),
('old', 'JJ'), (',', ','), ('will', 'MD'), ('join', 'VB'), ('the',
'DT'), ('board', 'NN'), ('as', 'IN'), ('a', 'DT'), ('nonexecutive',
'JJ'), ('director', 'NN'), ('Nov.', 'NNP'), ('29', 'CD'), ('.', '.')])
>>> gw.exit()

The communication this time is slightly different.

Using Execnet for Parallel and Distributed Processing with NLTK

How it works...

The module is just a little bit more complicated than the remote_tag. py module from the previous recipe. In addition to receiving a pickled tagger, it also expects to receive a pickled chunker that implements the ChunkerI interface. Once it has both a tagger and a chunker, it expects to receive any number of tokenized sentences, which it tags and parses into a Tree. This tree is then pickled and sent back over the channel.

import cPickle as pickle
if __name__ == '__channelexec__':
tagger = pickle.loads(channel.receive())
chunker = pickle.loads(channel.receive())
for sent in channel:
tree = chunker.parse(tagger.tag(sent))

The Tree must be pickled because it is not a simple built-in type.

There's more...

Note that the remote_chunk module is pure. Its only external dependency is the pickle (or cPickle) module, which is part of the Python standard library. It doesn't need to import any NLTK modules in order to use the tagger or chunker, because all the necessary data is pickled and sent over the channel. As long as you structure your remote code like this, with no external dependencies, you only need NLTK to be installed on a single machine—the one that starts the gateway and sends the objects over the channel.

Python subprocesses

If you look at your task/system monitor (or top in *nix) while running the execnet code, you may notice a few extra python Processes. Every gateway spawns a new, self-contained, shared-nothing Python interpreter process, which is killed when you call the exit() method. Unlike with threads, there is no shared memory to worry about, and no global interpreter lock to slow things down. All you have are separate communicating processes. This is true whether the processes are local or remote. Instead of locking and synchronization, all you have to worry about is the order in which the messages are sent and received.

See also

The previous recipe explains execnet gateways and channels in detail. In the next recipe, we'll use execnet to process a list in parallel.

Parallel list processing with execnet

This recipe presents a pattern for using execnet to process a list in parallel. It's a function pattern for mapping each element in the list to a new value, using execnet to do the mapping in parallel.

How to do it...

First, we need to decide exactly what we want to do. In this example, we'll just double integers, but we could do any pure computation. Following is the module, which will be executed by execnet. It receives a 2-tuple of (i, arg), assumes arg is a number, and sends back (i, arg*2). The need for i will be explained in the next section.

if __name__ == '__channelexec__':
for (i, arg) in channel:
channel.send((i, arg * 2))

To use this module to double every element in a list, we import the plists module (explained in the next section) and call with the remote_double module, and a list of integers to double.

>>> import plists, remote_double
>>>, range(10))
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Communication between channels is very simple, as shown in the following diagram:

Using Execnet for Parallel and Distributed Processing with NLTK

How it works...

The map() function is defined in It takes a pure module, a list of arguments, and an optional list of 2-tuples consisting of (spec, count). The default specs are [('popen', 2)] , which means we'll open two local gateways and channels. Once these channels are opened, we put them into an itertools cycle, which creates an infinite iterator that cycles back to the beginning once it hits the end.

Now we can send each argument in args to a channel for processing, and since the channels are cycled, each channel gets an almost even distribution of arguments. This is where i comes in—we don't know in what order we'll get the results back, so i, as the index of each arg in the list, is passed to the channel and back so we can combine the results in the original order. We then wait for results with a MultiChannel receive queue and insert them into a pre-filled list that's the same length as the original args. Once we have all the expected results, we can exit the gateways and return the results.

import itertools, execnet
def map(mod, args, specs=[('popen', 2)]):
gateways = []
channels = []
for spec, count in specs:
for i in range(count):
gw = execnet.makegateway(spec)
cyc = itertools.cycle(channels)
for i, arg in enumerate(args):
channel =
channel.send((i, arg))
mch = execnet.MultiChannel(channels)
queue = mch.make_receive_queue()
l = len(args)
results = [None] * l
for j in range(l):
channel, (i, result) = queue.get()
results[i] = result
for gw in gateways:
return results

There's more...

You can increase the parallelization by modifying the specs, as follows:

>>>, range(10), [('popen', 4)])
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

However, more parallelization does not necessarily mean faster processing. It depends on the available resources, and the more gateways and channels you have open, the more overhead is required. Ideally there should be one gateway and channel per CPU core.

You can use with any pure module as long as it receives and sends back 2-tuples where i is the first element. This pattern is most useful when you have a bunch of numbers to crunch, and want to process them as quickly as possible.

See also

The previous recipes cover execnet features in greater detail.


In this article, we saw how to use execnet to do parallel and distributed processing with NLTK.

In the next article we will see how use the Redis data structure server/database to store frequency distributions.

Further resources on this subject:

You've been reading an excerpt of:

Python Text Processing with NLTK 2.0 Cookbook

Explore Title