First up, Celery is an open-source Python library which is used to run the tasks asynchronously. A simple use case is where you need to run a webserver and a compute intensive process on the same machine. The compute intensive process can hold up the entire machine leading the webserver to crash and denial of service to users
In this part, we will see how to set up a basic celery instance and run it asynchronously. Also we will run tasks in parallel as well as sequentially
The official celery docs do a good job getting started, however might be a bit too basic for many of us First steps with Celery – official
- Setup
a. Message broker
Celery runs with the help of message broker. A message broker is software that enables applications to communicate with each other and exchange information even if they were written in different languages or implemented on different platforms. Popular message brokers are Redis and RabbitMQ
Install redis by following this Redis installation guide
Once redis is started, you can test on your terminal by getting a PONG in response to a ping
% redis-cli
127.0.0.1:6379> ping
PONG
127.0.0.1:6379>
b. Next install celery by pip install celery
2. tasks.py
Celery follows a convention of all asynchronous functions being located in tasks.py file. While in larger modules, we will have functions in tasks.py and call them from other files, today we are going to use only tasks.py to demo the basic fuctionality
First we create a celery app with the name of the module – ‘tasks’ in this case (will be name of the folder in larger modules ) and a broker url for redis server. lets us the example from the official doc to see if it works
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
add.delay(4, 10)
Now run this in the terminal (in the same folder as tasks.py)
celery -A tasks worker –loglevel=INFO
Here we are invoking the app named ‘tasks’ by celery. Currently the only task as in the @app.task decorator is add(x,y). These tasks get registered in celery task queue.
Next we call these tasks with a delay function which indicates that this is a asynchronous call and should return back immediately without stalling the process
You can check the status of the result object here. Status will show as pending immediately (Note the change in the app initiation, since we are storing the value of the status in a backend DB again in redis)
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
res = add.delay(4, 10)
print(res.status)
3. Multi processing in tasks.py
Now with the basics out of the way , lets look at how we can run multiple functions concurrently
I want to run an array of 100 numbers with both add and multiply running concurrently
Its as simple as just calling two different functions. Celery will automatically assign them to two different worker processes (limited to max number of cores in your machine)
@app.task
def xsum():
return sum(i + i for i in range(10000))
@app.task
def xmult():
return sum(i * i for i in range(10000))
xsum.delay()
xmult.delay()
Both the workers get the task assigned in the same time and process it in two different threads
Ok now finally I want to further divide xsum and xmult functions so that its processed in chunks of 100 each with each chunk running parallel and finally add up the result. How do we do that
First point to note is one can break the processing into chunks and run with GROUP function
@app.task
def xsum(start, end):
return sum(i + i for i in range(start, end))
@app.task
def xmult(start, end):
return sum(i * i for i in range(start, end))
# Divide the range into chunks of size 100
chunks = [(i, i+10) for i in range(1, 1000, 10)]
xsum_tasks = group(xsum.s(start, end) for (start, end) in chunks)
xmult_tasks = group(xmult.s(start, end) for (start, end) in chunks)
Here we divided the processing into chunks of 10 each and processed chunks in parallel. However this will still be limited to the number of cores on your machine (in my case 2) and will not achieve the parallelism you want. Group is good for few functions to run in parallel
4. Linking and chaining tasks together
Finally you can chain the tasks together so that one task or group of tasks start only after the previous task finishes
@app.task
def xsum():
time.sleep(2)
return sum(i+i for i in range(100))
@app.task
def xmult():
time.sleep(2)
return sum(i * i for i in range(100))
@app.task
def xsquare(result):
# i power i from 1 to 50
return result
# run xsum and xmult in parallel and chain to xsquare
g1 = group(xsum.s(), xmult.s())
g3 = chain(g1, xsquare.s())
g3()
In the above example, we are first running xsum and xmul in parallel and then running xsquare. Do note that by default you will need to pass arguments in a chain, so if you have no need for the same (like in my case with xsquare, simply ignore the same
Having understood the base case, lets look at how we can setup celery in a django application