Abstract
As Wix Kafka usage grew to 1.5B messages per day, >10K topics and >100K leader partitions serving 2000 microservices,
we decided to migrate from self-running cluster per data-center to a managed cloud service (Confluent Cloud) with multi-cluster setup.
This talk is about how we successfully migrated with 0 downtime and full traffic and the lessons we learned along the way.
These lessons include:
1. Automation, Automation, Automation - all the process has to be completely automated at such scale
2. Prefer a gradual approach - E.g. migrate topics in small chunks and not all at once. Reduces risks if things go bad
3. First migrate test topics with relayed real traffic - So data will be real but will not effect production.
4. Cleanup first - avoid migrating unused topics or topics with too many unnecessary partitions
5. Adapt to Confluent Cloud APIs - e.g. lag monitoring
Transcript
This transcript was autogenerated. To make changes, submit a PR.
Welcome everyone. My name is Natan Silnitsky and I'm
a backend infrastructure team lead@wix.com.
For most of last year, my team at Wix
worked on migrating Wix's 2000 microservices
from self hosted Kafka clusters. Migrating multi
cluster managed Kafka platform. This was
the most challenging project I have ever led. So today I
want to share with you key to sign decisions. We took
some war stories and best practices for this kind
of migration. But first
a few words about Wix. So Wix is a website
building platform that allows you to manage your online presence
but also manage your business online.
Now, over the last few years, our usage of
Kafka has grown tremendously, from around 5000
topics and a little bit less than half a billion messages
produced per day per cluster in 2019,
to over 20,000 topics
and more than 2.5 billion messages produced every day
in each cluster last year.
So this was a very big traffic increase and
also a lot of more metadata which really
gave our clusters some overloading.
And it means that restarts of brokers took longer
than we expected and it really started to feel
with production impact,
adverse production impact.
So we decided that we have to first split our clusters
to smaller chunks and also manage
this more efficiently by moving this
to the cloud. Moving the Kafka clusters to the cloud.
So why did we do that? Why did it decide it?
With manage Kafka Cloud Poem and we these confluent
cloud you get better cluster performance and flexibility.
You have experts at confluent that balance
your brokers and achieve the most efficient capacity
needed. So you can increase capacity and shrink it really easily.
If you need transparent Kafka version upgrades,
you get all the bug fixes for free without any risk.
During deployments it's really easy to add new clusters,
you just click a few buttons and confluent cloud
also offers tiered storage so you can have much more retention
of your messages for more than a week for as
long as you want, basically, and you are not limited by disk space
and you don't even manage it, it's just there
they use s these and it works great with great performance.
At Wix we use our own Kafka client
which is called Greyhound and it's
an SDK on top of the Kafka client SDK and
it serves as a library for our jvm services with
Scala and Java or Greyhound also serves as a
service to work with Kafka for non jvm
stack like node and go and Python.
So what Greyhound does, it wraps these Kafka producer
and Kafka consumer with its own API,
which is really easy to use and offers a lot of cool
features. For Wix's 2000 microservices,
for example, we have the ecom flow and we have a checkout service
that produces some event the checkout has completed
and then payment service gets notification
and does some payment processing.
So today we're going to see how we decided to split our
cluster into multiple clusters, how developers choose
which cluster to work with and how they do it. Then we'll
talk about the migration, the design of the migration
and best practices, and we'll complete things.
Talk with what you can expect, some war stories and
stuff like that.
So a single cluster became overloaded, like I said,
and we decided that it's time to split
it. So in each data center we has only one single cluster
where you produce and consume two. Of course there has also replication between
dcs, but here we are targeting
a single data center. So for
data centers that had more traffic than others, we decided to add more
clusters and we split the clusters according to SLA.
So different clusters had in charge of different flows
and different types of wix users in order to
reduce the blast radius in case of something happening,
something happening in production. Now if
a developer Wix wants to produce some domain event,
then how does she choose the cluster?
So it's really simple with Greyhound
API that we have, we can
alter it as much as we want. So we just added really
simply to the consumer spec API and
producer spec. Just add the logical cluster name
and then you're done. And if a developer wants to know
which cluster these topic is deployed in, then just go to our
back office, select the topic or type it,
and then they can see the cluster
of this topic so they can start specifying it and consuming from
it. Great, so now
let's talk about the migration itself.
So we started off in a bad place with overloaded
clusters that had unbalanced brokers between them,
an unclear Kafka strategy. Do we maintain it ourselves?
Do we move to the cloud? Do we
have enough expertise? Too many partitions
with huge scale and real production impact.
So we made a decision to move to a multicluster and
managed environment. First we thought
that we're going to completely drain the data center or region
before we switch over producers and consumers,
which meant that the switching itself would be dead simple.
You don't have any traffic to worry about.
But this plan was canceled as we learned
that these are data centers where services had
to rely only on being a single data center and also
it can be quite risky to just switch.
Maybe there's some edge cases that we are not familiar
with, so switching an entire data center could
be quite risky once traffic comes back. It's not
gradual and the longer we wait
with getting traffic back, the riskier this whole plan is
for weeks. So instead we decided that we're going to migrate
with traffic incoming to producers
and consumers, which meant that it had to be
seamless, meaning that should be simple even though
that you migrate with
incoming traffic so you don't want to lose any messages and
it has to be production safe. If in case
some rollback is needed these it should be readily available
and really easy to do. So I
understood that we're going to need to have as much automation in
place for the migration process as possible in order to make it
smooth, seamless and safe, and not rely
on laborious human interaction
and actions.
We could here with this automatic migration utilize
the fact that we have greyhound, which is our own layer
on top of Kafka, so we can request
greyhound to actually do these migration for us.
Migrate from self hosted Kafka cluster,
consuming from that and start consuming from confluent cloud.
Same situation with our producers.
In order to not lose any messages while switching over
and consuming from confluent cloud, we decided we're going to
have a replication service that first replicates
these messages from the self hosted Kafka cluster to confluent
cloud. So how does it work? First it consumes
messages from the self hosted cluster in
each partition. Then it produces the messages
to the target partition in console and cloud in
the correct cluster. And it also saves offsets
mapping. So offsets mapping allows
these consumer to start off from the exact point
it needs to when it starts consuming from console and cloud.
So for instance, if the last committed message in the self hosted cluster
was an offset 57 of this partition,
these replicator server says that it
should start off from the first non
committed message in console and cloud. So the mapping is five,
from 57 to five. That means that the consumer will
start from offset wix.
In addition, we had our migration orchestrator service
that sent but requests to replicator
service to replicate all topics of the
consumer groups that we are migrating and also sends
out the requests for greyhound. So how does it work? In practice?
When you want to migrate a consumer, you first replicate to
confluent. So migration orchestrator sends but
a request to the replicator service for all the topics and they
are replicated and the service makes sure
and the scripts that are running the migration make sure that there are no lags.
These migration orchestrator requests
greyhound in the relevant services through
Kafka topic of course everything here is event driven
to unsubscribe from self hosted Kafka cluster.
Once the migration orchestrator finds that
there are no longer any pods that are subscribed to
the self hosted Kafka cluster, then it then
can safely request Greyhound to subscribe to confluent,
of course utilizing the offsets that were
saved by the replicator service. So Greyhound
these starts off the subscription to consolid
from the correct offsets that it needs to so no
message is lost and also no message is duplicated.
Processing in case of any unexpected
failure, the migration orchestrator will stop, the script will alert,
and then the script runner,
the person can activate automatic rollback
to switch back safely to the self hosted Kafka cluster
where there may be a little bit duplicate and
it's a simpler story for process so I won't repeat it. You just
switch the state of producer and then restart the service.
So it then produces to the new cluster some best
practices. Make sure that you create
a migration script that checks state by itself
so it have as automatic as possible.
But if it sees some unexpected
state, it stops the migration. It doesn't do any
auto healing by itself in order because we deem that too risky.
It alerts the person that's selfrunning the script and then the
person has tools available for
him or her to actually roll back the
migration. So these rollback has to be readily available
even if you're starting off and you want
to check all these flows of forward and backward switching
of clusters with test topics. So you want
to make sure that you don't actually have production impact
when you start off. If you want to have real production
traffic, you can actually relayed the traffic to the
test topic. So you consume from the real production topic and
then you produce the traffic to the test topic
and you migrate the test topic so you have
safe migration at first without
any production impact, but with real production data.
And of course you have all kinds of monitoring dashboards,
I'm sure already. But it's really important to create dedicated
custom matrix dashboards for the migration itself
that show only the information that is required
for the person running the script to make sure that
everything has turned out correctly in a
single quick glance. Really, really important.
Okay, so we talked a little bit about the migration. Now let's move on
to the final part of the talk, which is a little bit about war
stories and what to expect.
So our replicator service started replicating more and
more topics, and we couldn't just stop replicating
these topics very fast because in order to stop
replication, we first have to migrate all of the
topics producers. And in order to migrate the topic producer,
we first needed to migrate all of the consumers from all the services.
So that's a situation that took us a little bit more
time, which meant that more and more topics were getting subscribed
to by the replicator consumer group and it
started to become risky. And indeed
at some point we noticed an
alert that replication has completely stopped and
there is an unexpected error from sync group. So we
learned that one of the probable causes
is that the message maximal
size in the protocol between these consumer group and the Kafka broker
may be too small for all these partitions that are being relayed
by this consumer group. So we went over
and called this dynamic command to actually increase
the amount of message max bytes from 1 megabytes,
and this actually helped and replication resumed and we were
happy. Then on Christmas Eve
2021, the same error happened
again, but we weren't worried because we knew that we
can fix it. So we
again increased the message max bytes
property in Kafka config script from four megabytes
to eight megabytes because we knew that can solve it.
Then something completely unexpected happened.
Kafka records started getting deleted from the Kafka brokers
much faster than expected. And also for compacted
topics where records are deleted not
because they are old in time, but because they are stale
in value, like a storage topic that is
kind of storage. So this is really worrying because
some of these kafka topics were the source of truth for this data.
And thankfully we didn't do the change in
the configuration that caused the situation
in all the data centers. So we were able
to restore records from another data center. So we
were very lucky on this regard. And then
we dug up and understood that there is a bug in Kafka.
In certain versions. If you see here where if
you change these configuration dynamically twice,
it reverts the log configuration to be the default.
So no matter what your topics configuration that you specify,
it's cleanup policy compact or your retention
time, it just reverts to the defaults for the entire
cluster. Meaning that in order to fix this situation,
we had to go and place a change of dummy value for
each of the topic configurations in order to make sure that
the bug is mitigated completely.
And then we said, okay, no more changing any dynamic
configuration. We just added more consumer
group shards for these replicator service. So each
of this consumer group worked on a different set of topics that
they replicated. And indeed this allowed
us to have the scale that we needed and no
more issues with the replication and we finished
the migration successfully.
And now that we have the migration infrastructure in place,
we knew that this can actually help us in a lot of non migrating related
features. For instance, we can actually request
Greyhound from outside the service to switch to a different cluster
if the previous cluster was assigned by mistake or it's no longer relevant.
We can also request Greyhound to skip messages in case there
was some data corruption or some
issue with old messages that shouldn't be processed anymore.
We can just skip it and request Greyhound to do that,
or we can request Greyhound to change the processing rate
in each of the pods. So Greyhound allows
parallel processing and you can limit it or extend it
by what's your relevant use case if you don't
want to hurt your database because
it has partial performance. So you can really
reduce the processing rate and you can do all of that externally
to the service where you don't need to change any code, you don't need to
ga any service, you just send out commands via
some back office, which we're really excited about that.
So to summarize, we switched and migrated
from single cluster topology in each of our data centers,
which we self hosted and managed to a multi cluster
confluent cloud managed cloud platform which is completely
optimized for us. And we used greyhound,
our Kafka client own Kafka client layer and
dedicated orchestration services and scripts to have automation
safe and gradual migrating.
Now if you want to dig deeper into how we perform the migration,
you can actually check out this blog post I written to give
you more information. This is the link
and you can
also check out ground which is an open source library
that has a lot of cool features like I mentioned,
parallel processing, but also these
ability to have all kinds of retry policies,
both of the consumer and the producer, in case you
have potential processing errors and you want to successfully
complete processing eventually, and all kinds of other cool features
like user context propagation and all kinds of
other features. So I highly recommend you to check it out and
I want to thank you very much. And you can find all the slides in
Slideshare and the links I've shared. And you can check out my website@natansale.com
to find out other blog posts I've written and
other conference talks I gave and you can follow me on Twitter
and LinkedIn in order to get updates on everything.
Our data streams team at Wix are up to around Kafka,
around in architecture and also about software engineering
best practices in general. So thank you again.