Search icon
Arrow left icon
All Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletters
Free Learning
Arrow right icon
Parallel Programming with Python

You're reading from  Parallel Programming with Python

Product type Book
Published in Jun 2014
Publisher
ISBN-13 9781783288397
Pages 124 pages
Edition 1st Edition
Languages

Table of Contents (16) Chapters

Parallel Programming with Python
Credits
About the Author
Acknowledgments
About the Reviewers
www.PacktPub.com
Preface
1. Contextualizing Parallel, Concurrent, and Distributed Programming 2. Designing Parallel Algorithms 3. Identifying a Parallelizable Problem 4. Using the threading and concurrent.futures Modules 5. Using Multiprocessing and ProcessPoolExecutor 6. Utilizing Parallel Python 7. Distributing Tasks with Celery 8. Doing Things Asynchronously Index

Chapter 7. Distributing Tasks with Celery

In the previous chapter, we learned about using parallel Python. We saw the implementation of case studies, including Fibonacci series terms and Web crawler using the parallel Python module. We learned how to establish communication among processes using pipes and how to distribute processes among different machines in a network. In this chapter, we will study how to distribute tasks among different machines in a network by using the Celery framework.

In this chapter, we will cover the following topics:

  • Understanding Celery

  • Understanding Celery's architecture

  • Setting up the environment

  • Dispatching a simple task

  • Using Celery to obtain a Fibonacci series term

  • Using Celery to make a distributed Web crawler

Understanding Celery


Celery is a framework that offers mechanisms to lessen difficulties while creating distributed systems. The Celery framework works with the concept of distribution of work units (tasks) by exchanging messages among the machines that are interconnected as a network, or local workers. A task is the key concept in Celery; any sort of job we must distribute has to be encapsulated in a task beforehand.

Why use Celery?

We could justify the use of Celery by listing some of its positive points:

  • It distributes tasks in a transparent way among workers that are spread over the Internet, or local workers

  • It changes, in a simple way, the concurrence of workers through setup (processes, threads, Gevent, Eventlet)

  • It supports synchronous, asynchronous, periodic, and scheduled tasks

  • It re-executes tasks in case of errors

Tip

It is common for some developers to claim that synchronous tasks are the same as real-time tasks. This is an unnecessary confusion as the concepts are totally different...

Understanding Celery's architecture


Celery has an architecture based on pluggable components and a mechanism of message exchange that uses a protocol according to a selected message transport (broker). This is illustrated in the following diagram:

The Celery architecture

Now, let us go through each item within Celery's architecture in detail.

Working with tasks

The client components, as presented in the previous diagram, have the function of creating and dispatching tasks to the brokers.

We will now analyze a code example that demonstrates the definition of a task by using the @app.task decorator, which is accessible through an instance of Celery application that, for now, will be called app. The following code example demonstrates a simple Hello World app:

@app.task
def hello_world():
    return "Hello I'm a celery task"

Tip

Any callable can be a task.

As we mentioned earlier, there are several types of tasks: synchronous, asynchronous, periodic, and scheduled. When we perform a task call, it returns...

Setting up the environment


In this section, we will set up two machines in Linux. The first one, hostname foshan, will perform the client role, where app Celery will dispatch the tasks to be executed. The other machine, hostname Phoenix, will perform the role of a broker, result backend, and the queues consumed by workers.

Setting up the client machine

Let us start the setup of the client machine. In this machine, we will set up a virtual environment with Python 3.3, using the tool pyvenv. The goal of pyvenv is to not pollute Python present in the operating system with additional modules, but to separate the developing environments necessary for each project. We will execute the following command to create our virtual environment:

$pyvenv celery_env

The preceding line creates a directory called celery_env in the current path, which contains all the structures necessary to isolate the developing environment in Python. The following screenshot illustrates the structure created in the celery_env...

Dispatching a simple task


At this point, we have a ready environment. Let's test it by sending a task that will calculate the square root of a value and return a result.

First, we must define our task module tasks.py inside the server. Let's check the description of the tasks.py module. In the following chunk of code, we have imports necessary for our function that will calculate the square root:

from math import sqrt
from celery import Celery

Now, let's create the following instance of Celery, which will represent our client application:

app = Celery('tasks', broker='redis://192.168.25.21:6379/0')

We have created an instance of Celery that will control some aspects of our application. Notice that in its initializer, we informed the name of the module in which definitions of the task are present and we stated the address of the broker as a second argument.

Then, we have to set up our result backend, which will also be in Redis, as follows:

app.config.CELERY_RESULT_BACKEND = 'redis://192.168.25...

Using Celery to obtain a Fibonacci series term


Let us again go and distribute our multiple inputs in order to calculate the nth Fibonacci term, each of them in a distributed way. The function that calculates Fibonacci will change a little in relation to the previous chapters. The changes are small; now we have the @app.task decorator and a small change in the return message.

In the tasks.py module (created previously), which is in the server machine where also the broker is, we will stop the execution of Celery (Ctrl + C) and add the fibo_task task. This is done by using the following code:

@app.task
def fibo_task(value):
    a, b = 0,1
    for item in range(value):
        a, b = b, a + b
    message = "The Fibonacci calculated with task id %s" \
        " was %d" % (fibo_task.request.id, a)
    Return (value, message)

A point to observe is that we obtain the ID of the task with the <task.request.id> call. The request object is an object in the task class, which provides a context to...

Defining queues by task types


The task that is responsible for calculating Fibonacci was implemented and is running. We can see that all tasks are being sent to a default queue of Celery. However, there are ways to route a task to different queues; let us refactor our architecture in server side and implement what is known as routing task from the client side. We will specify queues for each type of task.

At the moment we start the Celery server in the server side, we will establish three different queues. These will now be seen and consumed by the workers. The queues are fibo_queue for Fibonacci tasks, sqrt_queue for square root tasks, and webcrawler_queue for the Web crawler ones. However, what is the advantage of having task fluxes separated? Let's observe them as follows:

  • It groups tasks of the same type to make their monitoring easier

  • It defines workers dedicated to consume a specific queue, thereby enhancing performance

  • It distributes queues with heavier tasks to brokers allocated in machines...

Using Celery to make a distributed Web crawler


We will now move on to adapting our Web crawler to Celery. We already have webcrawler_queue, which is responsible for encapsulating web-type hcrawler tasks. However, in the server side, we will create our crawl_task task inside the tasks.py module.

First, we will add our imports to the re and requests modules, which are the modules for regular expression and the HTTP library respectively. The code is as follows:

import re
import requests

Then, we will define our regular expression, which we studied in the previous chapters, as follows:

hTML_link_regex = re.compile(
    '<a\s(?:.*?\s)*?href=[\'"](.*?)[\'"].*?>')

Now, we will place our crawl_task function in the Web crawler, add the @app.task decorator, and change the return message a bit, as follows:

@app.task
def crawl_task(url):
    request_data = requests.get(url)
    links = html_link_regex.findall(request_data.text)
    message = "The task %s found the following links %s.."\
    Return message...

Summary


In this chapter, we discussed the Celery distributed task queue. We also visualized its architecture, analyzed its key components, and saw how to set up an environment to build basic applications with Celery. It is possible to write a book only about Celery, and I hope that I have been fair and just while choosing the topics throughout.

In the next chapter, we will study the asyncio module as well as learn how to execute processing in an asynchronous way. We will also have a brief introduction to coroutines, and learn how to use them with asyncio.

lock icon The rest of the chapter is locked
You have been reading a chapter from
Parallel Programming with Python
Published in: Jun 2014 Publisher: ISBN-13: 9781783288397
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $15.99/month. Cancel anytime}