Transcript
This transcript was autogenerated. To make changes, submit a PR.
Hey everyone, today we are going to see how to build productionready distributed
task queue system queue management system with celery.
When I say production ready, I mean which is highly efficient,
which is scalable, which is transparent and which is resilient.
So in this talk we are going to cover what are task queues and
why we need them, what is and why celery building a
distributed task queuing system productionready distributed task queue system the
system for better efficiency, adding resiliency to the system,
what to do in times of SOS or emergencies,
monitoring the system we build or keeping an eye on it, and most importantly,
bad jokes. So I've tried to make this talk as descriptive
as possible, but still there are some prerequisites, like some basic knowledge
of python, some basic knowledge of web development, worked or
heard about celery before. And the most important one is a sense of humor and
love for chips. So why task use? So let's assume I
own a mall and I want to keep track of how many people are entering
in my mall. So I installed a small IoT sensor at my entrance
and whenever someone enters my mall it shoots an API request to my web server.
Then the request goes to the database and increments a counter. And at the
end of the day I can just check my database and see the count.
This system was working pretty well for me, so one day I thought I'll stream
a football match in my mall. So a lot of people came to the mall.
I was really excited to see the numbers in my database. But when I checked
my database, the number I observed was relatively low and
I knew something was not right. So I investigated and figured out
when a lot of people entered my mall. For each person an
API request was made to my web server and there were a lot of
concurrent requests trying to talk to the database. And due to the atomicity and
locking at my database, many requests were timed out and that's
why the low counted the database. So there's got to be a better way.
And there is. Task queues come to rescue. So let's see,
what are task queues? If someone asked me this question when I was giving
my university examinations, I would have answered, task queue is
a queue of tasks and that is exactly what it is. I don't know why
teachers don't like those answers, but yeah, it fits
perfectly here. So now in the new architecture, when we
get a request from my web server, instead of going and trying to increment
the counter in the database, it puts it into the task queue and return
the 200 response. And now the DB can consume the request from
the task queue at its own pace. So now we moved from a more real
time approach to a more eventually consistent type of approach. And that is
okay for us because I only needed to see the count at the end of
the day. So what is and why salary? You must have heard
a lot about task queues. There are a bunch of them available like Amazon
sqs, Amazon MQ, Redis, RabbitMQ.
But building a consumption and publishing mechanism for those
task queues is not that straightforward. To help us with that,
celery gives us a plug and play task queue management framework with
which we can manage our distributed task queues with ease. In this talk we
are going to use some keywords, so let's just iterate over them once we
know what task queues are from our previous example. But we'll just say
it again. Task queue are queue of tasks. Then there is task.
A task is the basic unit of work of a task
queue and a task queue can contain n number of tasks.
Then here comes the worker. Worker is the basic unit of
computation which lies outside your application and where a task is processed.
Then in line is broken in layman language helps us with picking an
offloaded task, putting it into the task queue, and then delivering the task to
the worker from the task queue whenever the worker wants to process it. And the
last one is result backend. It is a highly available database which
is used by celery to keep track of all the tasks and their results,
along with storing all kinds of metadata for celery. Some examples for
result backend could be redis, meme, cache et
celery. So okay, before we start building the system,
one question arises which broker to choose. There are a bunch of brokers available
like RabbitMQ, redis, etc. They all are great pieces
of software, but works best for their own specific use cases. So for
example, I'll cover the most common ones, RabbitMQ and redis.
If you're looking for a highly efficient broker which supports several workers
consuming from different queues and also offers some kind
of persistence of tasks when it's shut down, then no doubt RabbitMQ
is the way to go, but RabbitMQ is a little time consuming to set up
and maintain. On the other hand, if you just want to use your broker as
a quick messaging system, Redis is the way to go as it works really well
for quick tasks and is very easy to set up too. Let's start building
the system. Let's think of an e commerce warehouse to build. There are
going to be three things which happen there. Picking of the products, packing of the
products, and delivery of the order. So the most basic
kind of architecture for my warehouse would be something like this. I have
one boy who picks up the products, packs the products and
delivers them. And this worked for me for some time.
But now the orders are increasing and I want to scale my
setup. So I also employed a girl in the warehouse.
Now they both are parallel picking, packing and delivering
the products. Now this is fine as when more order will start coming in,
I'll just add more people in the warehouse. But I think I can improve it
a bit further as I know that these two people are really good at picking,
but they are lousy at packing and they don't even have celery bikes to deliver.
So what if we break this work into smaller fragments and
get specialized people to do what they do best? So now those two
people are just doing the picking. I added an experienced packer
who has its own packaging station and everything. And I added
people with delivery bikes to deliver more efficiently. So this way,
we had one big task. We broke it down into smaller tasks
and executed them in order. And now in our further slides,
we will call this our pipeline. So, should we use pipelines?
There are a bunch of advantages we get while using
pipelines. Let's go by them one by one. First,
it gives us the ability to see bottlenecks and scale smaller
components of the system instead of the whole system, for example.
So now if I see that there are a lot of orders
pending to be packed, I can just add more people in the packing worker and
scale the packing operations instead of scaling the whole pipeline like
we did earlier. Second, this will give the ability to give different kind
of machines to different tasks, as per in our example,
we can see that packing worker needs packaging station, but a delivery
worker needs a celery bike. The same thing happened with the tech system.
Different tasks need different kind of infrastructure. Some might need
more cpu, others might need more memory. Third, it helps
us keep track of the status of the task and will add some kind of
resiliency to the system by enabling retries at every step.
So now if a task fails, it will not retry from the beginning,
but will get retried from the last checkpoint or last succeeded task
in the pipeline. So now let's assume we have a sale going on and
we have a lot of orders pouring in. Our warehouse is already full and
we can't even add more people to the warehouse. So we got two ways.
First thing we can do is to buy a bigger warehouse, move all the operations
to the bigger warehouse and add more people in it. In tech terms we
call it vertical scaling. On the other hand, we can purchase another
makeshift warehouse of the same size, add more people there, and run these two warehouses
in parallel whilst the operations inside them are
concurrent. In tech terms we call it horizontal scaling. In my
case, horizontal scaling makes more sense as the number of orders are variable
and after the sale ends, one warehouse would be able to cater all of them
along. So code of our application would look something like this.
We have an order receiver API which receives an order and offloads it to
the picking worker, which is the entry point in our pipeline.
And the code for our pipeline is something like this. It starts with the picking
worker which picks up stuff from the aisle and passes it to the packing worker.
The packing worker packs the stuff and passes it to the celery worker,
and in the last, the delivery worker delivers the stuff in time and makes the
customer happy. So we have built our salary system,
but we don't know how well it performs. So first things first.
It is always better to benchmark before moving to any
further optimization because in my experience I have seen if we go
by intuition, either we end up over optimizing the system or optimizing
wrong parts of the architecture. So for example, in our pipeline, when I ran a
load test, I saw the number of tasks were queued at the picking worker
were much higher than any other worker. So I knew from where I have to
start optimizing. Can we use batching? So let's assume
what happens in the picking task here. In the picking task,
a person is assigned an order, he or she goes
to the aisle, picks up that order and passes it to the packing worker.
Now assume you have a lot of orders coming in and to cater them you
added a lot of people in the picking worker, and everyone is trying to get
something from the aisle. As lots of people are crowding the aisle and
there will be some kind of wait time for everyone to pick their order,
the exact same thing happens in our concurrent systems. The aisle acts
as our database and people acts as our concurrent threads. To solve
this problem, we can introduce batching. So instead of one person picking
up one order, we can make one person pick up ten orders. This way
we are decreasing our trips to the aisle or our database by ten times.
But as we know, every good thing also comes with
a trade off. So now your retries and failures also happen at batch level.
So if 9th order failed for some reason in a batch of ten,
still the whole batch of ten will be retried. So if you are okay with
this trade off, you can definitely decrease the load at your database. There's not
much change in the code for our application, but instead of offloading it
to the picking worker like before, we will now offload it to the
order aggregator worker, and in the pipeline is also pretty much the same.
Just one more task named order aggregator is added which contains
the order chunking logic, and instead of passing just one order to the picking
worker, it passes a chunk of orders to the picking worker. Next optimization
would be always split your task into iobound
and cpu bound tasks. Iobound tasks are tasks in which thread
blocks the cpu and waits until an input or output
is received. This makes the cpu unusable for the time it
is just waiting. These kind of tasks can be optimized with the help of gvent
or eventlet pools, which helps us enable a non blocking IO approach
in which the thread goes to the cpu, registers its request,
does not block the cpu, and whenever its input or output
is ready, the CPU raises a callback and
the thread goes and collects it. This way our cpu is never blocked
by concurrent IU processes. On the other hand, a CPU bound
task is a task which uses the cpu for crunching numbers or doing
other cpu intensive tasks. For these kinds of tasks we
should use a pre fork pool, as it is based on Python's multi
processing module and helps running parallel processes on multiple
cores. It is very easy to set up too. You just need to pass the
pool name and the desired concurrency needed in the following command and
you will spin up a new worker with the provided configuration in no
time. So use of air optimization when
possible. This is quite interesting. So the default approach
salary uses is round robin approach to distribute tasks among
distributed systems. If you have a set of tasks that take varying
amount of time to complete, either deliberately or due to unpredictable
network conditions, this will cause unexpected delays in total
execution time for tasks in the queue, as you might
end up having tasks queued at some worker whilst some workers are idle.
To solve this problem, you can use of air optimization which
distributes tasks according to the availability of workers instead of number of
workers available. This option comes with coordination cost
penalty, but results in a much more predictable behavior if
your tasks have varying execution times, as most iobound
tasks will, so keeping track of results
only if you need them as I told you, but the result backend which
stores all the metadata and statuses of result and results of salary.
If you know you are not going to use them anywhere in your application,
you can decrease the amount of network calls to your highly available database,
and it can give you some amount of optimization. So now we'll
see how to add some kind of resiliency to the system
or self healing capabilities to the system. So I think we all agree
with what Sentry iOS tagline is. Software errors are inevitable.
Chaos is not. The most basic version of resiliency is
to enable autoretries in times of failure. You can also add
circuit breaking elements. For example, I have added five as
the number of maximum retries, and if a task is retired five
times and still failed, it will be ignored. To make it more resilient, we can
add exponential backup. For example, the task is dependent on
another service and the service is down. And let's assume the
time between consecutive retries is 10 seconds. So if my service
is down, my first retry will happen at 10 seconds, the second one
at 20 seconds, the third one at 30 seconds, the fourth one at 40 seconds,
and the last one at 50 seconds. So in this case, I gave 50 seconds
building time to the other service to come back up so that I don't lose
my task. To increase the amount of breathing room, we can use exponential backup,
which means the first retrace will happen at 10 seconds, the second one at
20 seconds, the third one at 40 seconds, the fourth one at
80 seconds, and the last one at 160 seconds. So now the breathing time
is increased from 50 seconds to 160 seconds. And if you want more
breathing time, you can just change the exponential backup.
Axelate is equal to true. This means late acknowledgment.
So by default, a broker marks a task as acknowledged
when it delivers it to the worker. But if a worker goes down and restarts,
it loses that task. So to make the system resilient towards worker
failures or infrastructure failures, we can use axlate is equal to true,
which means until and unless the task is processed by the worker,
it will not be marked acknowledged. So even if the worker goes down,
the broker delivers the same task to it as it was still stored
in the broker and was unacknowledged. The last argument is redis
jitter is equal to true. This param is used to
some kind of randomness to the system. So let's assume we have a
concurrent system, and there are chances that two tasks are trying to
access the same database resource. When they execute, they'll form a
deadlock and they'll fail. We have our automatic retries enabled they'll get
retried at the same time, form a deadlock and fail again, and this
will repeat till the circuit breaks. In situations like this,
we want some kind of randomness to the retries so that they do
not get retried again and again at the same time. That is why retry
jitter is helpful. And if you want to keep track of your task of
your circuit break failures, you can use a DLQ or a dead litter queue
to store your failed tasks. Okay, so when your system is
down, the first thing you should do is check your cpu and
memory utilization. If your cpu utilization is high,
then horizontally or vertically scale according to your infrastructure.
But if your memory utilization is high and you know
for a fact that your code is not using that kind of memory, there are
chances that there is some kind of memory leak in your code. You might be
wondering memory leak in Python that is impossible. And I'm with
you. If you're working with core python, that is impossible.
But when many of the libraries you are using are built with cpython
or anything else, there are chances that there is some kind of memory leak happening
under the hood which is not in your hands. So to solve that problem,
celery provides two thresholds, max memory per child and max
task per child. With the help of these commands, you can set thresholds either on
the number of tasks executed by a process or the amount of memory
being used by the process. And when any of these threshold is reached, it rotates
the process and clears out the style length memory. When you're running something
in production, you should always have the capability to keep an
eye on it. And flower works really well with just running
one command. You can set up a full fledged monitoring tool for your salary setup.
It gives you the capabilities like purging queues, view acknowledgement rates
view and modify queue worker instances view scheduled tasks.
It also has an HTTP API for all the data points so that
you can integrate it with your own monitoring dashboards as well, and also
using those endpoints to configure alerts so that you know if your
system is going down beforehand. If you're using RabbitMQ as your broker,
and you are more comfortable with RabbitMQ instead of flower, you can use
RabbitMQ admin panel too to monitor your system at the broker
level itself. It also gives you features such as purging,
deleting and monitoring queues. So to conclude, in this talk we
understood why pipelines are better how to tune your salary config
to get maximum performance, how to make your setup resilient
or self healing, what to do when unknown things
are hogging up on memory resources, and how to keep an eye
on your system. So if we follow all these steps while
building our system, we will be facing a lot less issues.
Our system will be productionready and we will sleep
soundly. So that is it from me. If you have any questions
or feedback, I'd be happy to work on it. Also, if you didn't like the
presentation, I'm also open to take virtual tomatoes. Yeah,
that is it from me. Have a good day.