Conf42 Python 2021 - Online

To build a production-ready distributed task queue system with celery

Video size:

Abstract

We hear a lot about queuing technologies like Redis, RabbitMQ etc., but building and maintaining a consuming and publishing mechanism is not that easy.

In this talk, we will see how we can take advantage of Celery to build a Highly efficient, Resilient, transparent, and scalable set up to run at scale.

Summary

  • In this talk we are going to see how to build productionready distributed task queue system queue management system with celery. We will cover what are task queues and why we need them, what is and why celery building a distributed task queuing system. And most importantly, bad jokes.
  • There are a bunch of brokers available like RabbitMQ, redis, etc. They all are great pieces of software, but works best for their own specific use cases. Use pipelines to scale smaller components of the system instead of the whole system.
  • In tech terms we call it horizontal scaling. To solve this problem, we can introduce batching. It is always better to benchmark before moving to any further optimization. Next optimization would be always split your task into iobound and CPU bound tasks.
  • The most basic version of resiliency is to enable autoretries in times of failure. You can also add circuit breaking elements. To make it more resilient, we can add exponential backup.
  • Celery provides two thresholds, max memory per child and max task per child. With the help of these commands, you can set thresholds either on the number of tasks executed by a process or the amount of memory being used by the process. When you're running something in production, you should always have the capability to keep an eye on it.

Transcript

This transcript was autogenerated. To make changes, submit a PR.
Hey everyone, today we are going to see how to build productionready distributed task queue system queue management system with celery. When I say production ready, I mean which is highly efficient, which is scalable, which is transparent and which is resilient. So in this talk we are going to cover what are task queues and why we need them, what is and why celery building a distributed task queuing system productionready distributed task queue system the system for better efficiency, adding resiliency to the system, what to do in times of SOS or emergencies, monitoring the system we build or keeping an eye on it, and most importantly, bad jokes. So I've tried to make this talk as descriptive as possible, but still there are some prerequisites, like some basic knowledge of python, some basic knowledge of web development, worked or heard about celery before. And the most important one is a sense of humor and love for chips. So why task use? So let's assume I own a mall and I want to keep track of how many people are entering in my mall. So I installed a small IoT sensor at my entrance and whenever someone enters my mall it shoots an API request to my web server. Then the request goes to the database and increments a counter. And at the end of the day I can just check my database and see the count. This system was working pretty well for me, so one day I thought I'll stream a football match in my mall. So a lot of people came to the mall. I was really excited to see the numbers in my database. But when I checked my database, the number I observed was relatively low and I knew something was not right. So I investigated and figured out when a lot of people entered my mall. For each person an API request was made to my web server and there were a lot of concurrent requests trying to talk to the database. And due to the atomicity and locking at my database, many requests were timed out and that's why the low counted the database. So there's got to be a better way. And there is. Task queues come to rescue. So let's see, what are task queues? If someone asked me this question when I was giving my university examinations, I would have answered, task queue is a queue of tasks and that is exactly what it is. I don't know why teachers don't like those answers, but yeah, it fits perfectly here. So now in the new architecture, when we get a request from my web server, instead of going and trying to increment the counter in the database, it puts it into the task queue and return the 200 response. And now the DB can consume the request from the task queue at its own pace. So now we moved from a more real time approach to a more eventually consistent type of approach. And that is okay for us because I only needed to see the count at the end of the day. So what is and why salary? You must have heard a lot about task queues. There are a bunch of them available like Amazon sqs, Amazon MQ, Redis, RabbitMQ. But building a consumption and publishing mechanism for those task queues is not that straightforward. To help us with that, celery gives us a plug and play task queue management framework with which we can manage our distributed task queues with ease. In this talk we are going to use some keywords, so let's just iterate over them once we know what task queues are from our previous example. But we'll just say it again. Task queue are queue of tasks. Then there is task. A task is the basic unit of work of a task queue and a task queue can contain n number of tasks. Then here comes the worker. Worker is the basic unit of computation which lies outside your application and where a task is processed. Then in line is broken in layman language helps us with picking an offloaded task, putting it into the task queue, and then delivering the task to the worker from the task queue whenever the worker wants to process it. And the last one is result backend. It is a highly available database which is used by celery to keep track of all the tasks and their results, along with storing all kinds of metadata for celery. Some examples for result backend could be redis, meme, cache et celery. So okay, before we start building the system, one question arises which broker to choose. There are a bunch of brokers available like RabbitMQ, redis, etc. They all are great pieces of software, but works best for their own specific use cases. So for example, I'll cover the most common ones, RabbitMQ and redis. If you're looking for a highly efficient broker which supports several workers consuming from different queues and also offers some kind of persistence of tasks when it's shut down, then no doubt RabbitMQ is the way to go, but RabbitMQ is a little time consuming to set up and maintain. On the other hand, if you just want to use your broker as a quick messaging system, Redis is the way to go as it works really well for quick tasks and is very easy to set up too. Let's start building the system. Let's think of an e commerce warehouse to build. There are going to be three things which happen there. Picking of the products, packing of the products, and delivery of the order. So the most basic kind of architecture for my warehouse would be something like this. I have one boy who picks up the products, packs the products and delivers them. And this worked for me for some time. But now the orders are increasing and I want to scale my setup. So I also employed a girl in the warehouse. Now they both are parallel picking, packing and delivering the products. Now this is fine as when more order will start coming in, I'll just add more people in the warehouse. But I think I can improve it a bit further as I know that these two people are really good at picking, but they are lousy at packing and they don't even have celery bikes to deliver. So what if we break this work into smaller fragments and get specialized people to do what they do best? So now those two people are just doing the picking. I added an experienced packer who has its own packaging station and everything. And I added people with delivery bikes to deliver more efficiently. So this way, we had one big task. We broke it down into smaller tasks and executed them in order. And now in our further slides, we will call this our pipeline. So, should we use pipelines? There are a bunch of advantages we get while using pipelines. Let's go by them one by one. First, it gives us the ability to see bottlenecks and scale smaller components of the system instead of the whole system, for example. So now if I see that there are a lot of orders pending to be packed, I can just add more people in the packing worker and scale the packing operations instead of scaling the whole pipeline like we did earlier. Second, this will give the ability to give different kind of machines to different tasks, as per in our example, we can see that packing worker needs packaging station, but a delivery worker needs a celery bike. The same thing happened with the tech system. Different tasks need different kind of infrastructure. Some might need more cpu, others might need more memory. Third, it helps us keep track of the status of the task and will add some kind of resiliency to the system by enabling retries at every step. So now if a task fails, it will not retry from the beginning, but will get retried from the last checkpoint or last succeeded task in the pipeline. So now let's assume we have a sale going on and we have a lot of orders pouring in. Our warehouse is already full and we can't even add more people to the warehouse. So we got two ways. First thing we can do is to buy a bigger warehouse, move all the operations to the bigger warehouse and add more people in it. In tech terms we call it vertical scaling. On the other hand, we can purchase another makeshift warehouse of the same size, add more people there, and run these two warehouses in parallel whilst the operations inside them are concurrent. In tech terms we call it horizontal scaling. In my case, horizontal scaling makes more sense as the number of orders are variable and after the sale ends, one warehouse would be able to cater all of them along. So code of our application would look something like this. We have an order receiver API which receives an order and offloads it to the picking worker, which is the entry point in our pipeline. And the code for our pipeline is something like this. It starts with the picking worker which picks up stuff from the aisle and passes it to the packing worker. The packing worker packs the stuff and passes it to the celery worker, and in the last, the delivery worker delivers the stuff in time and makes the customer happy. So we have built our salary system, but we don't know how well it performs. So first things first. It is always better to benchmark before moving to any further optimization because in my experience I have seen if we go by intuition, either we end up over optimizing the system or optimizing wrong parts of the architecture. So for example, in our pipeline, when I ran a load test, I saw the number of tasks were queued at the picking worker were much higher than any other worker. So I knew from where I have to start optimizing. Can we use batching? So let's assume what happens in the picking task here. In the picking task, a person is assigned an order, he or she goes to the aisle, picks up that order and passes it to the packing worker. Now assume you have a lot of orders coming in and to cater them you added a lot of people in the picking worker, and everyone is trying to get something from the aisle. As lots of people are crowding the aisle and there will be some kind of wait time for everyone to pick their order, the exact same thing happens in our concurrent systems. The aisle acts as our database and people acts as our concurrent threads. To solve this problem, we can introduce batching. So instead of one person picking up one order, we can make one person pick up ten orders. This way we are decreasing our trips to the aisle or our database by ten times. But as we know, every good thing also comes with a trade off. So now your retries and failures also happen at batch level. So if 9th order failed for some reason in a batch of ten, still the whole batch of ten will be retried. So if you are okay with this trade off, you can definitely decrease the load at your database. There's not much change in the code for our application, but instead of offloading it to the picking worker like before, we will now offload it to the order aggregator worker, and in the pipeline is also pretty much the same. Just one more task named order aggregator is added which contains the order chunking logic, and instead of passing just one order to the picking worker, it passes a chunk of orders to the picking worker. Next optimization would be always split your task into iobound and cpu bound tasks. Iobound tasks are tasks in which thread blocks the cpu and waits until an input or output is received. This makes the cpu unusable for the time it is just waiting. These kind of tasks can be optimized with the help of gvent or eventlet pools, which helps us enable a non blocking IO approach in which the thread goes to the cpu, registers its request, does not block the cpu, and whenever its input or output is ready, the CPU raises a callback and the thread goes and collects it. This way our cpu is never blocked by concurrent IU processes. On the other hand, a CPU bound task is a task which uses the cpu for crunching numbers or doing other cpu intensive tasks. For these kinds of tasks we should use a pre fork pool, as it is based on Python's multi processing module and helps running parallel processes on multiple cores. It is very easy to set up too. You just need to pass the pool name and the desired concurrency needed in the following command and you will spin up a new worker with the provided configuration in no time. So use of air optimization when possible. This is quite interesting. So the default approach salary uses is round robin approach to distribute tasks among distributed systems. If you have a set of tasks that take varying amount of time to complete, either deliberately or due to unpredictable network conditions, this will cause unexpected delays in total execution time for tasks in the queue, as you might end up having tasks queued at some worker whilst some workers are idle. To solve this problem, you can use of air optimization which distributes tasks according to the availability of workers instead of number of workers available. This option comes with coordination cost penalty, but results in a much more predictable behavior if your tasks have varying execution times, as most iobound tasks will, so keeping track of results only if you need them as I told you, but the result backend which stores all the metadata and statuses of result and results of salary. If you know you are not going to use them anywhere in your application, you can decrease the amount of network calls to your highly available database, and it can give you some amount of optimization. So now we'll see how to add some kind of resiliency to the system or self healing capabilities to the system. So I think we all agree with what Sentry iOS tagline is. Software errors are inevitable. Chaos is not. The most basic version of resiliency is to enable autoretries in times of failure. You can also add circuit breaking elements. For example, I have added five as the number of maximum retries, and if a task is retired five times and still failed, it will be ignored. To make it more resilient, we can add exponential backup. For example, the task is dependent on another service and the service is down. And let's assume the time between consecutive retries is 10 seconds. So if my service is down, my first retry will happen at 10 seconds, the second one at 20 seconds, the third one at 30 seconds, the fourth one at 40 seconds, and the last one at 50 seconds. So in this case, I gave 50 seconds building time to the other service to come back up so that I don't lose my task. To increase the amount of breathing room, we can use exponential backup, which means the first retrace will happen at 10 seconds, the second one at 20 seconds, the third one at 40 seconds, the fourth one at 80 seconds, and the last one at 160 seconds. So now the breathing time is increased from 50 seconds to 160 seconds. And if you want more breathing time, you can just change the exponential backup. Axelate is equal to true. This means late acknowledgment. So by default, a broker marks a task as acknowledged when it delivers it to the worker. But if a worker goes down and restarts, it loses that task. So to make the system resilient towards worker failures or infrastructure failures, we can use axlate is equal to true, which means until and unless the task is processed by the worker, it will not be marked acknowledged. So even if the worker goes down, the broker delivers the same task to it as it was still stored in the broker and was unacknowledged. The last argument is redis jitter is equal to true. This param is used to some kind of randomness to the system. So let's assume we have a concurrent system, and there are chances that two tasks are trying to access the same database resource. When they execute, they'll form a deadlock and they'll fail. We have our automatic retries enabled they'll get retried at the same time, form a deadlock and fail again, and this will repeat till the circuit breaks. In situations like this, we want some kind of randomness to the retries so that they do not get retried again and again at the same time. That is why retry jitter is helpful. And if you want to keep track of your task of your circuit break failures, you can use a DLQ or a dead litter queue to store your failed tasks. Okay, so when your system is down, the first thing you should do is check your cpu and memory utilization. If your cpu utilization is high, then horizontally or vertically scale according to your infrastructure. But if your memory utilization is high and you know for a fact that your code is not using that kind of memory, there are chances that there is some kind of memory leak in your code. You might be wondering memory leak in Python that is impossible. And I'm with you. If you're working with core python, that is impossible. But when many of the libraries you are using are built with cpython or anything else, there are chances that there is some kind of memory leak happening under the hood which is not in your hands. So to solve that problem, celery provides two thresholds, max memory per child and max task per child. With the help of these commands, you can set thresholds either on the number of tasks executed by a process or the amount of memory being used by the process. And when any of these threshold is reached, it rotates the process and clears out the style length memory. When you're running something in production, you should always have the capability to keep an eye on it. And flower works really well with just running one command. You can set up a full fledged monitoring tool for your salary setup. It gives you the capabilities like purging queues, view acknowledgement rates view and modify queue worker instances view scheduled tasks. It also has an HTTP API for all the data points so that you can integrate it with your own monitoring dashboards as well, and also using those endpoints to configure alerts so that you know if your system is going down beforehand. If you're using RabbitMQ as your broker, and you are more comfortable with RabbitMQ instead of flower, you can use RabbitMQ admin panel too to monitor your system at the broker level itself. It also gives you features such as purging, deleting and monitoring queues. So to conclude, in this talk we understood why pipelines are better how to tune your salary config to get maximum performance, how to make your setup resilient or self healing, what to do when unknown things are hogging up on memory resources, and how to keep an eye on your system. So if we follow all these steps while building our system, we will be facing a lot less issues. Our system will be productionready and we will sleep soundly. So that is it from me. If you have any questions or feedback, I'd be happy to work on it. Also, if you didn't like the presentation, I'm also open to take virtual tomatoes. Yeah, that is it from me. Have a good day.
...

Vishrut Kohli

Software Engineer @ Grofers

Vishrut Kohli's LinkedIn account Vishrut Kohli's twitter account



Join the community!

Learn for free, join the best tech learning community for a price of a pumpkin latte.

Annual
Monthly
Newsletter
$ 0 /mo

Event notifications, weekly newsletter

Delayed access to all content

Immediate access to Keynotes & Panels

Community
$ 8.34 /mo

Immediate access to all content

Courses, quizes & certificates

Community chats

Join the community (7 day free trial)