Conf42 Cloud Native 2022 - Online

Migrating to Multi Cluster Managed Kafka with 0 Downtime

Video size:

Abstract

As Wix Kafka usage grew to 1.5B messages per day, >10K topics and >100K leader partitions serving 2000 microservices, we decided to migrate from self-running cluster per data-center to a managed cloud service (Confluent Cloud) with multi-cluster setup.

This talk is about how we successfully migrated with 0 downtime and full traffic and the lessons we learned along the way.

These lessons include: 1. Automation, Automation, Automation - all the process has to be completely automated at such scale 2. Prefer a gradual approach - E.g. migrate topics in small chunks and not all at once. Reduces risks if things go bad 3. First migrate test topics with relayed real traffic - So data will be real but will not effect production. 4. Cleanup first - avoid migrating unused topics or topics with too many unnecessary partitions 5. Adapt to Confluent Cloud APIs - e.g. lag monitoring

Summary

  • For most of last year, my team at Wix worked on migrating Wix's 2000 microservices from self hosted Kafka clusters. Migrating multi cluster managed Kafka platform was the most challenging project I have ever led. Today we'll see how we decided to split our cluster into multiple clusters and how developers choose which cluster to work with.
  • Migration to a multicluster and managed environment. Migrating with traffic incoming to producers and consumers must be seamless. If in case some rollback is needed these should be readily available and really easy to do.
  • Our replicator service started replicating more and more topics. Something completely unexpected happened. Kafka records started getting deleted from the Kafka brokers much faster than expected. Now we have the migration infrastructure in place, we knew that this can actually help us in a lot of non migrating related features.

Transcript

This transcript was autogenerated. To make changes, submit a PR.
Welcome everyone. My name is Natan Silnitsky and I'm a backend infrastructure team lead@wix.com. For most of last year, my team at Wix worked on migrating Wix's 2000 microservices from self hosted Kafka clusters. Migrating multi cluster managed Kafka platform. This was the most challenging project I have ever led. So today I want to share with you key to sign decisions. We took some war stories and best practices for this kind of migration. But first a few words about Wix. So Wix is a website building platform that allows you to manage your online presence but also manage your business online. Now, over the last few years, our usage of Kafka has grown tremendously, from around 5000 topics and a little bit less than half a billion messages produced per day per cluster in 2019, to over 20,000 topics and more than 2.5 billion messages produced every day in each cluster last year. So this was a very big traffic increase and also a lot of more metadata which really gave our clusters some overloading. And it means that restarts of brokers took longer than we expected and it really started to feel with production impact, adverse production impact. So we decided that we have to first split our clusters to smaller chunks and also manage this more efficiently by moving this to the cloud. Moving the Kafka clusters to the cloud. So why did we do that? Why did it decide it? With manage Kafka Cloud Poem and we these confluent cloud you get better cluster performance and flexibility. You have experts at confluent that balance your brokers and achieve the most efficient capacity needed. So you can increase capacity and shrink it really easily. If you need transparent Kafka version upgrades, you get all the bug fixes for free without any risk. During deployments it's really easy to add new clusters, you just click a few buttons and confluent cloud also offers tiered storage so you can have much more retention of your messages for more than a week for as long as you want, basically, and you are not limited by disk space and you don't even manage it, it's just there they use s these and it works great with great performance. At Wix we use our own Kafka client which is called Greyhound and it's an SDK on top of the Kafka client SDK and it serves as a library for our jvm services with Scala and Java or Greyhound also serves as a service to work with Kafka for non jvm stack like node and go and Python. So what Greyhound does, it wraps these Kafka producer and Kafka consumer with its own API, which is really easy to use and offers a lot of cool features. For Wix's 2000 microservices, for example, we have the ecom flow and we have a checkout service that produces some event the checkout has completed and then payment service gets notification and does some payment processing. So today we're going to see how we decided to split our cluster into multiple clusters, how developers choose which cluster to work with and how they do it. Then we'll talk about the migration, the design of the migration and best practices, and we'll complete things. Talk with what you can expect, some war stories and stuff like that. So a single cluster became overloaded, like I said, and we decided that it's time to split it. So in each data center we has only one single cluster where you produce and consume two. Of course there has also replication between dcs, but here we are targeting a single data center. So for data centers that had more traffic than others, we decided to add more clusters and we split the clusters according to SLA. So different clusters had in charge of different flows and different types of wix users in order to reduce the blast radius in case of something happening, something happening in production. Now if a developer Wix wants to produce some domain event, then how does she choose the cluster? So it's really simple with Greyhound API that we have, we can alter it as much as we want. So we just added really simply to the consumer spec API and producer spec. Just add the logical cluster name and then you're done. And if a developer wants to know which cluster these topic is deployed in, then just go to our back office, select the topic or type it, and then they can see the cluster of this topic so they can start specifying it and consuming from it. Great, so now let's talk about the migration itself. So we started off in a bad place with overloaded clusters that had unbalanced brokers between them, an unclear Kafka strategy. Do we maintain it ourselves? Do we move to the cloud? Do we have enough expertise? Too many partitions with huge scale and real production impact. So we made a decision to move to a multicluster and managed environment. First we thought that we're going to completely drain the data center or region before we switch over producers and consumers, which meant that the switching itself would be dead simple. You don't have any traffic to worry about. But this plan was canceled as we learned that these are data centers where services had to rely only on being a single data center and also it can be quite risky to just switch. Maybe there's some edge cases that we are not familiar with, so switching an entire data center could be quite risky once traffic comes back. It's not gradual and the longer we wait with getting traffic back, the riskier this whole plan is for weeks. So instead we decided that we're going to migrate with traffic incoming to producers and consumers, which meant that it had to be seamless, meaning that should be simple even though that you migrate with incoming traffic so you don't want to lose any messages and it has to be production safe. If in case some rollback is needed these it should be readily available and really easy to do. So I understood that we're going to need to have as much automation in place for the migration process as possible in order to make it smooth, seamless and safe, and not rely on laborious human interaction and actions. We could here with this automatic migration utilize the fact that we have greyhound, which is our own layer on top of Kafka, so we can request greyhound to actually do these migration for us. Migrate from self hosted Kafka cluster, consuming from that and start consuming from confluent cloud. Same situation with our producers. In order to not lose any messages while switching over and consuming from confluent cloud, we decided we're going to have a replication service that first replicates these messages from the self hosted Kafka cluster to confluent cloud. So how does it work? First it consumes messages from the self hosted cluster in each partition. Then it produces the messages to the target partition in console and cloud in the correct cluster. And it also saves offsets mapping. So offsets mapping allows these consumer to start off from the exact point it needs to when it starts consuming from console and cloud. So for instance, if the last committed message in the self hosted cluster was an offset 57 of this partition, these replicator server says that it should start off from the first non committed message in console and cloud. So the mapping is five, from 57 to five. That means that the consumer will start from offset wix. In addition, we had our migration orchestrator service that sent but requests to replicator service to replicate all topics of the consumer groups that we are migrating and also sends out the requests for greyhound. So how does it work? In practice? When you want to migrate a consumer, you first replicate to confluent. So migration orchestrator sends but a request to the replicator service for all the topics and they are replicated and the service makes sure and the scripts that are running the migration make sure that there are no lags. These migration orchestrator requests greyhound in the relevant services through Kafka topic of course everything here is event driven to unsubscribe from self hosted Kafka cluster. Once the migration orchestrator finds that there are no longer any pods that are subscribed to the self hosted Kafka cluster, then it then can safely request Greyhound to subscribe to confluent, of course utilizing the offsets that were saved by the replicator service. So Greyhound these starts off the subscription to consolid from the correct offsets that it needs to so no message is lost and also no message is duplicated. Processing in case of any unexpected failure, the migration orchestrator will stop, the script will alert, and then the script runner, the person can activate automatic rollback to switch back safely to the self hosted Kafka cluster where there may be a little bit duplicate and it's a simpler story for process so I won't repeat it. You just switch the state of producer and then restart the service. So it then produces to the new cluster some best practices. Make sure that you create a migration script that checks state by itself so it have as automatic as possible. But if it sees some unexpected state, it stops the migration. It doesn't do any auto healing by itself in order because we deem that too risky. It alerts the person that's selfrunning the script and then the person has tools available for him or her to actually roll back the migration. So these rollback has to be readily available even if you're starting off and you want to check all these flows of forward and backward switching of clusters with test topics. So you want to make sure that you don't actually have production impact when you start off. If you want to have real production traffic, you can actually relayed the traffic to the test topic. So you consume from the real production topic and then you produce the traffic to the test topic and you migrate the test topic so you have safe migration at first without any production impact, but with real production data. And of course you have all kinds of monitoring dashboards, I'm sure already. But it's really important to create dedicated custom matrix dashboards for the migration itself that show only the information that is required for the person running the script to make sure that everything has turned out correctly in a single quick glance. Really, really important. Okay, so we talked a little bit about the migration. Now let's move on to the final part of the talk, which is a little bit about war stories and what to expect. So our replicator service started replicating more and more topics, and we couldn't just stop replicating these topics very fast because in order to stop replication, we first have to migrate all of the topics producers. And in order to migrate the topic producer, we first needed to migrate all of the consumers from all the services. So that's a situation that took us a little bit more time, which meant that more and more topics were getting subscribed to by the replicator consumer group and it started to become risky. And indeed at some point we noticed an alert that replication has completely stopped and there is an unexpected error from sync group. So we learned that one of the probable causes is that the message maximal size in the protocol between these consumer group and the Kafka broker may be too small for all these partitions that are being relayed by this consumer group. So we went over and called this dynamic command to actually increase the amount of message max bytes from 1 megabytes, and this actually helped and replication resumed and we were happy. Then on Christmas Eve 2021, the same error happened again, but we weren't worried because we knew that we can fix it. So we again increased the message max bytes property in Kafka config script from four megabytes to eight megabytes because we knew that can solve it. Then something completely unexpected happened. Kafka records started getting deleted from the Kafka brokers much faster than expected. And also for compacted topics where records are deleted not because they are old in time, but because they are stale in value, like a storage topic that is kind of storage. So this is really worrying because some of these kafka topics were the source of truth for this data. And thankfully we didn't do the change in the configuration that caused the situation in all the data centers. So we were able to restore records from another data center. So we were very lucky on this regard. And then we dug up and understood that there is a bug in Kafka. In certain versions. If you see here where if you change these configuration dynamically twice, it reverts the log configuration to be the default. So no matter what your topics configuration that you specify, it's cleanup policy compact or your retention time, it just reverts to the defaults for the entire cluster. Meaning that in order to fix this situation, we had to go and place a change of dummy value for each of the topic configurations in order to make sure that the bug is mitigated completely. And then we said, okay, no more changing any dynamic configuration. We just added more consumer group shards for these replicator service. So each of this consumer group worked on a different set of topics that they replicated. And indeed this allowed us to have the scale that we needed and no more issues with the replication and we finished the migration successfully. And now that we have the migration infrastructure in place, we knew that this can actually help us in a lot of non migrating related features. For instance, we can actually request Greyhound from outside the service to switch to a different cluster if the previous cluster was assigned by mistake or it's no longer relevant. We can also request Greyhound to skip messages in case there was some data corruption or some issue with old messages that shouldn't be processed anymore. We can just skip it and request Greyhound to do that, or we can request Greyhound to change the processing rate in each of the pods. So Greyhound allows parallel processing and you can limit it or extend it by what's your relevant use case if you don't want to hurt your database because it has partial performance. So you can really reduce the processing rate and you can do all of that externally to the service where you don't need to change any code, you don't need to ga any service, you just send out commands via some back office, which we're really excited about that. So to summarize, we switched and migrated from single cluster topology in each of our data centers, which we self hosted and managed to a multi cluster confluent cloud managed cloud platform which is completely optimized for us. And we used greyhound, our Kafka client own Kafka client layer and dedicated orchestration services and scripts to have automation safe and gradual migrating. Now if you want to dig deeper into how we perform the migration, you can actually check out this blog post I written to give you more information. This is the link and you can also check out ground which is an open source library that has a lot of cool features like I mentioned, parallel processing, but also these ability to have all kinds of retry policies, both of the consumer and the producer, in case you have potential processing errors and you want to successfully complete processing eventually, and all kinds of other cool features like user context propagation and all kinds of other features. So I highly recommend you to check it out and I want to thank you very much. And you can find all the slides in Slideshare and the links I've shared. And you can check out my website@natansale.com to find out other blog posts I've written and other conference talks I gave and you can follow me on Twitter and LinkedIn in order to get updates on everything. Our data streams team at Wix are up to around Kafka, around in architecture and also about software engineering best practices in general. So thank you again.
...

Natan Silnitsky

Senior Software Engineer @ Wix

Natan Silnitsky's LinkedIn account Natan Silnitsky's twitter account



Join the community!

Learn for free, join the best tech learning community for a price of a pumpkin latte.

Annual
Monthly
Newsletter
$ 0 /mo

Event notifications, weekly newsletter

Delayed access to all content

Immediate access to Keynotes & Panels

Community
$ 8.34 /mo

Immediate access to all content

Courses, quizes & certificates

Community chats

Join the community (7 day free trial)