Chapter 7. Distributing Tasks with Celery
In the previous chapter, we learned about using parallel Python. We saw the implementation of case studies, including Fibonacci series terms and Web crawler using the parallel Python module. We learned how to establish communication among processes using pipes and how to distribute processes among different machines in a network. In this chapter, we will study how to distribute tasks among different machines in a network by using the Celery framework.
In this chapter, we will cover the following topics:
Understanding Celery
Understanding Celery's architecture
Setting up the environment
Dispatching a simple task
Using Celery to obtain a Fibonacci series term
Using Celery to make a distributed Web crawler
Celery
is a framework that offers mechanisms to lessen difficulties while creating distributed systems. The Celery framework works with the concept of distribution of work units (tasks) by exchanging messages among the machines that are interconnected as a network, or local workers. A task is the key concept in Celery; any sort of job we must distribute has to be encapsulated in a task beforehand.
We could justify the use of Celery by listing some of its positive points:
It distributes tasks in a transparent way among workers that are spread over the Internet, or local workers
It changes, in a simple way, the concurrence of workers through setup (processes, threads, Gevent, Eventlet)
It supports synchronous, asynchronous, periodic, and scheduled tasks
It re-executes tasks in case of errors
Tip
It is common for some developers to claim that synchronous tasks are the same as real-time tasks. This is an unnecessary confusion as the concepts are totally different...
Understanding Celery's architecture
Celery has an architecture based on pluggable components and a mechanism of message exchange that uses a protocol according to a selected message transport (broker). This is illustrated in the following diagram:
Now, let us go through each item within Celery's architecture in detail.
The
client components, as presented in the previous diagram, have the function of creating and dispatching tasks to the brokers.
We will now analyze a code example that demonstrates the definition of a task by using the @app.task
decorator, which is accessible through an instance of Celery application that, for now, will be called app
. The following code example demonstrates a simple Hello World
app:
Tip
Any callable can be a task.
As we mentioned earlier, there are several types of tasks: synchronous, asynchronous, periodic, and scheduled. When we perform a task call, it returns...
Setting up the environment
In this section, we will set up two machines in Linux. The first one, hostname foshan
, will perform the client role, where app Celery will dispatch the tasks to be executed. The other machine, hostname Phoenix, will perform the role of a broker, result backend, and the queues consumed by workers.
Setting up the client machine
Let us start the setup of the client machine. In this machine, we will set up a virtual environment with Python 3.3, using the tool pyvenv
. The goal of pyvenv
is to not pollute Python present in the operating system with additional modules, but to separate the developing environments necessary for each project. We will execute the following command to create our virtual environment:
The preceding line creates a directory called celery_env
in the current path, which contains all the structures necessary to isolate the developing environment in Python. The following screenshot illustrates the structure created in the celery_env...
Dispatching a simple task
At this point, we have a ready environment. Let's test it by sending a task that will calculate the square root of a value and return a result.
First, we must define our task module tasks.py
inside the server. Let's check the description of the tasks.py
module. In the following chunk of code, we have imports necessary for our function that will calculate the square root:
Now, let's create the following instance of Celery, which will represent our client application:
We have created an instance of Celery that will control some aspects of our application. Notice that in its initializer, we informed the name of the module in which definitions of the task are present and we stated the address of the broker as a second argument.
Then, we have to set up our result backend, which will also be in Redis, as follows:
Using Celery to obtain a Fibonacci series term
Let us again go and distribute our multiple inputs in order to calculate the nth Fibonacci term, each of them in a distributed way. The function that calculates Fibonacci will change a little in relation to the previous chapters. The changes are small; now we have the @app.task
decorator and a small change in the return message.
In the tasks.py
module (created previously), which is in the server machine where also the broker is, we will stop the execution of Celery (Ctrl + C) and add the fibo_task
task. This is done by using the following code:
A point to observe is that we obtain the ID of the task with the <task.request.id>
call. The request
object is an object in the task
class, which provides a context to...
Defining queues by task types
The task that is responsible for calculating Fibonacci was implemented and is running. We can see that all tasks are being sent to a default queue of Celery. However, there are ways to route a task to different queues; let us refactor our architecture in server side and implement what is known as routing task from the client side. We will specify queues for each type of task.
At the moment we start the Celery server in the server side, we will establish three different queues. These will now be seen and consumed by the workers. The queues are fibo_queue
for Fibonacci tasks, sqrt_queue
for square root tasks, and webcrawler_queue
for the Web crawler ones. However, what is the advantage of having task fluxes separated? Let's observe them as follows:
It groups tasks of the same type to make their monitoring easier
It defines workers dedicated to consume a specific queue, thereby enhancing performance
It distributes queues with heavier tasks to brokers allocated in machines...
Using Celery to make a distributed Web crawler
We will now move on to adapting our Web crawler to Celery. We already have webcrawler_queue
, which is responsible for encapsulating web-type hcrawler
tasks. However, in the server side, we will create our crawl_task
task inside the tasks.py
module.
First, we will add our imports to the re
and requests
modules, which are the modules for regular expression and the HTTP library respectively. The code is as follows:
Then, we will define our regular expression, which we studied in the previous chapters, as follows:
Now, we will place our crawl_task
function in the Web crawler, add the @app.task
decorator, and change the return message a bit, as follows:
In this chapter, we discussed the Celery distributed task queue. We also visualized its architecture, analyzed its key components, and saw how to set up an environment to build basic applications with Celery. It is possible to write a book only about Celery, and I hope that I have been fair and just while choosing the topics throughout.
In the next chapter, we will study the asyncio
module as well as learn how to execute processing in an asynchronous way. We will also have a brief introduction to coroutines
, and learn how to use them with asyncio
.