Transcript
This transcript was autogenerated. To make changes, submit a PR.
Hello everyone. Today I'm going to talk about
the aspects of microservice interactions,
and you might wonder what does Devstar is
doing on the screen. I put this
image here because microservice architectures are
also often referred to as
Dev star architectures. This is coming from
these very complex interaction diagrams
that were generated in well known microservice infrastructures
like you can see on the screen. Left one is from Netflix,
the middle one you can see from one from Twitter,
and the right one is from Amazon.
So having this huge amount of
network communications has its own implications.
And that's why I think it's very important to understand the
upcoming challenges of a network communication
at such a large scale. So that drove
me into this topic, into this area,
and encouraged me to look behind
the scenes and understand the details. And that's how this
presentation was born. So the first section,
let's talk about the
reasons, the driving
forces that are affecting these communications
channels. But first, I would
like to just have a little recap on
what are the main characteristics of the microservice
architecture style and why
are we doing this overall, what's the benefit if we are doing it
well? So one
of the most important aspect of these things is
that microservices are independently deployable.
So I should be able to deploy a single
service without others really noticing
that on top of that comes another
important functionality, autonomity.
So this means that a single feature often
is mapped to a single microservice.
So I'm able to deliver a single feature, a single functional
change, by just modifying one microservice
and ending up in one deployment.
What's also important is that microservices are polyglot.
They're also polyglot in
terms of database usage or database technology choices,
also in language choices, but also in other technological aspects.
So let's say if I want to come up with a
JVM upgrade, I don't have to upgrade
all the services all at once and then deploy them in a
single coordinated, huge deployment.
I'm free to go with a single service only. And then other
teams owning other services are also free to deploy
when they think that's suitable,
that goes on with other technologies. So for instance, if I
want to change from traditional rest based communication to
GRPC, same thing applies.
So for this to work perfectly, we need one
single ingredient, namely network calls.
So you can introduce dependencies between
services in many ways, but network call
is one of the most efficient way of doing that.
So if you have dependencies by
using libraries shared data or anything else, it won't
work. As well as network communication, a single network
communication. So that's why we need to deep dive
into these network calls and understand how we can
optimize them.
These network calls have driving forces, I think about driving
forces. I think we have around
five of the most important ones. These are,
namely latency, availability, reliability,
queuing theory, and Conway slow. So let's go through each them,
each by each, and let's understand, how do they affect microservice
communications. First one is latency.
Latency has a had limit. That's the
speed of light. So if you take the two costs of
the US,
the runtrip time for the light to travel through the
left side to the right side is 27
milliseconds. Okay? But in practice,
we cannot reach this number because the speed of
light is also affected by the density of the material it's
passing through. So, for instance, in fiber optics,
if this would be just a single cable, it's more like
41 milliseconds.
I got this from this website I
have on the references section, which simulates
the expected roundtips time in two parts
of the globe.
So that's where these numbers are coming from. But there are many other
pages where they are expecting you or explaining
you the expected round trip time in
different data centers for a cloud provider.
So, in case of AWS,
this is realistically more like 50
to 60 milliseconds, but it depends on which
region are you connecting to? Another region. But why is
this important for us? So this gives you the minimum latency
if you connect from one region to another.
So if you want to come up with
a multiregion deployment, because of various reasons,
you always have to think about the data synchronization between the regions.
And this will be the minimum latency until
data arrives to the other edge.
And I say this is the minimum latency,
because in reality, when you have more
pressure on your data layer, work will stockpile
up, and this will go up to 506 hundred
milliseconds. So 60 milliseconds
is the optimistic duration.
So if you want to have a synchronous write,
let's say each synchronous write will have at least 40 to
600 milliseconds of latency. This has to be considered
if you are planning to do something which has
its own low latency requirements.
Also, it's important to understand the correlation between latency
and throughput. This comes from another research. So they try to simulate
two things by creating fictional
website. They measured the
page load time and measured the bandwidth, and they were interested
in how the page load time varies based on
the bandwidth and based on the roundtree time. So if you increase
the bandwidth, if you increase the throughput, then you will see that the
benefits are diminishing. Very early page
load time actually maps to around hundreds of requests
of latency because roughly that's the amount of
independent requests required to load whole page.
Okay, but if you try to change the round trip
time, the page load time is really linearly decreasing.
So what does this tell us? First of all, there is
no direct correlation or direct
effect between page load time and bandwidth
or latency and throughput. So this means also that
if you rely on barely,
just on the scalability or auto scaling capacities on your
system, that is not going to have a positive effect on the latency.
So if you want to be efficient in terms of latency,
you have to think about other solutions, not just scaling
out or giving more juice for your instances.
Another thing is availability. Thinking about
availability, I always think in dependency graphs
because that's what determines the availability as a whole.
So let's see here a simplified machine
architecture. For a single transaction I need
all the components to
behave as expected. So I need both cpu,
the memory, the network and the disk component.
So what if we give it each by each availability
number? So let's say each component has now 99% of
availability. This will form dependency
graph and gives us the overall availability of 96%
just because the combination of the components had its own
probability of failure. If the individual
elements have their own probability of failure.
That's how maths work actually.
But if I scale this out to client server model,
you can see that now I have more dependencies between
components. Again, let's say that each component has now 99%
of chance of being successful. Now the
availability will drop to 88 5%.
So with each single dependency your
availability will decrease. But it also depends
on how the dependency is introduced and
in which part of your architecture is introduced.
It depends on your dependency graph as well.
You can do the maths by hand if you're interested in the availability numbers.
I use my own availability simulator which
is just running a couple of cycles and testing each
connection and failing them randomly based on these numbers given.
I also have this in references section.
Okay, now for
reliability, I think the most important
use case is a single client server communication.
So let's say that we have a transaction
that's changing the state of the whole system. So let's say this
is a bright operation. For instance, how things
can fail. Let's go through them
sequentially. First of all,
the write operation can fail when client
sends the request to the server. Then it
can also fail by being processed on the server itself.
But it can also fail when server successfully
processed the write operation and it responds back to
the client. Here comes the problem.
Client can just simply retry the
write operation on the first two cases,
but after server successfully process the write operation,
client does not know what to do. You may
come over this problem by using item potent operations
or something similar, but in other cases
devious solution is not so obvious. And also,
of course I can fail on client side
by the request is being processed. But why
it's important for us. So first of all, that's why for instance,
two phase commit was not really working on
a larger scale. Because if you arrive on a
commit phase, if one of the request is failing on a commit
phase, the client or the coordinator does not really know
how to proceed because other previous operations
are already committed. And I just got one single failure.
Should I just re request the failed node to
commit its changes again, risking duplicated
write or something similar? Or should they just abort the whole operation?
Another example is, let's say exactly
once message semantics. So the server always
have to acknowledge if one message is processed.
If I fail after the message is
processed, the client cannot do anything else,
just resend the message. And that's why we often
have at least once message semantics instead of
exactly once message semantics mapping
again. So mapping reliability statistically
to more requests. We were talking about roundtree time
and talking about that. Or, sorry, not page load time. Yeah, page load time
and talking about that. Page load time usually involves
hundreds of requests, and it needs hundreds of requests to succeed.
So let's say we have theoretically a single request that has
99% of the probability of being successful.
If we have hundreds of requests with the same characteristics,
we cannot just say that less than 40% of the
chance will be that the hundreds
of requests will succeed. All hundreds of requests.
Because we have so many permutations of these hundreds
of requests being failed,
this drops our probability with around
60%. Just for the statistical reasons,
we can't really fight maths here. These are the hard facts.
So this actually resulted
as a couple of artifacts that I
think are very popular in
the engineering world. One of them is these latency numbers.
Every programmer should know that
presented in many forms like this one. This is
coming again from a web page where you have a slider and can change
the year and see how these latency numbers
have changed these are the recent numbers. So, for instance,
if we investigate the main memory reference and the latency for
a typical main memory reference, that's around 100 nanoseconds,
and the runtrip time in same data center as in a cloud infrastructure
is around 500 milliseconds. Why should we
care, you might ask? Because these numbers are fast enough.
So they are very fast, and they improved a
lot in the recent years. 500 nanoseconds
is something I should not care about, right? So if
you. Let's say you want to introduce a caching strategy and you need to
choose between an in memory solution, or maybe a distributed
caching solution, because you want to share it with
multiple services and you want to offer it as a separate
service, think about the runtime time. So,
difference between the in memory and
the distributed solution caching solution is
in terms of latency, is around
5000 more if you choose
distributed cache than if you choose an in
memory cache solution.
Another one. Another paper is this fallacy of distributed computing.
You can find it in Wikipedia. If you look at the top three
of these policies, then I think it's clear that
we covered plenty of aspects of those,
but the others are also important.
But now I want to talk about something else,
talk about the queuing theory and spend a little bit more time.
Because in my experience, how I saw people
in the engineering area are not really familiar with queuing theory
and not really thinking in queues. But in
practice, I think queues are everywhere in a modern architecture
at large scale, also in small scale. So it's very good to
understand the basics. So here comes the basics of queuing theory.
By talking about queues,
I think you can think about the simplified model
that you can see on the screen. So you have queues of these orange marbles,
a single queue of these orange marbles that needs to be processed.
Then you have something on the right side
that is processing the marbles and
producing these green marbles on the right side of the screen. And then
you have these metrics around the queue that determines the queue
performance. We are interested in these four,
mainly so there is execution time needed for a
single node to process an orange marble and
create a green marble. Then there is this
departure rate, meaning the rate of
green marbles being processed. Then we
have the latency that requires duration of
either a single marble traversing cv
up to the right side when it becomes to green marble,
or the overall duration required for all the
marbles being processed. And we have then the
arrival rate on the very left side, which is
the rate of the orange marbles arriving in the queue.
So the most basic question is, what happens
when the arrival rate is much larger than the departure
rate? So, this happens with us all the time,
actually. So if you have something that's publicly available on the
web, you don't have control over the user base
and their usage statistics
and often have to operate in this area.
So, in this case, if you just accept
all the requests and try to process them, you are guaranteed
to fail after a certain period. And you
have to introduce something. Right. This something is called
back pressure or rate limiting. Okay. So,
often in the edge, you have rate limiter service that
determines which endpoint is
limited to what
sort of throughput, and tries to keep the
right side or protected from higher
aid than usual, and tries to introduce some sort of
a logic on limiting those
clients who maybe misbehave or limiting
requests to a specific service overall.
So, this is an important topic, and we will talk about it a.
But more in the second half.
So, coming back to queuing theory, let's have a couple of practical
examples. So, in this simplified scenario,
the execution time is 100 milliseconds. So what
is the throughput? In this case,
we produce ten marbles per second. Because we produce
a single marble each 100 millisecond. The overall latency
for processing all these eight marbles is 800 milliseconds. It's eight
times 100 milliseconds. Right. Very simple. Now, what if
I try to parallelize now and have a singular queue,
but have doubled deburkers? Now I
produce two marbles in each 100 millisecond.
So I have 20 marbles per second as my throughput
or as my departure rate. The latency is also housed
because now I can produce
four times two marbles overall
in four times 100 milliseconds. And that comes up to 400 milliseconds.
Okay, but what if I divide
the work like that? So what? Instead of
having a single parallelized operation,
I try to split the work in
two halves, which can be finished in two times 50 millisecond.
Let's see the numbers. Now, I can produce a single marble within
50 millisecond that comes up with this throughput as before,
as in the previous example, as 20 marbles per second.
So I still improved, doubled my throughput. But how is my
latency changed? My latency will be still 800
milliseconds because I need a single marble,
100 millisecond to travel through from the left side to the right
side. Right. I need two times 50 milliseconds for a single marble
to become an orange marble, to become green marble.
So interestingly, latency did not change, but throughput
increased. And that's the magic, I think, of these reactive
libraries that are becoming very popular these days. So by simply
declaring my work in a different way,
it allows me to have higher level of parallelization.
By splitting my workload into smaller chunks
and introducing more queues and processing
them in smaller units.
Overall it increases my parallelization,
even though I'm not aware of that, because in the code
everything seems sequential calls,
right? So if I have a bottleneck, let's say, then the numbers are
changed as following. So I still have ten marbles
per second, because the bottleneck keeps me
from processing a single marble within
50 millisecond. And it
just allows me to have a green marble in every
100 millisecond, because that's where the bottleneck is.
And in total, I need 100 plus 50 milliseconds
for a single marble to go through. So my latency again is increased to
1200 milliseconds. Were are many other scenarios,
but I think you can do the math easily in
your head. I have a few formulas
that maybe not really precise,
but it's enough to me to understand what's
going on. What's really important, as you can see that the
throughput is not really depending
on the queue length.
It's behaving a little bit differently than the latency.
So as you've seen before in the example were,
I was talking about this research with
page load time and bandwidth, throughput and latency
does not really depend on each other.
What other things can you do with queues? What's very important
is that for each queue you can provide its own quality of
service. So these numbers can be independently provided
for each queue. So let's say if you have producer,
a single producer like the one who is producing the
orange marbles, which needs higher demand,
you can separate it to its own dedicated channel and
it won't affect those which are producing. Want to process
blue marbles and yellow marbles and won't choke the system
so easily with its own requests.
So by separating them to different channels, you can
offer a separate quality of service to each channel.
And this can be done in a couple of ways in microservice.
First of all, a single service concentrates on one specific
workload. And that forms its own queue and
its own special way of optimizing for that kind
of workload. It's independent from other services, but can
be introduced also in a single service.
If you have messaging and use multiple channels for multiple
clients. I will have again, a detailed, practical example
of how we use this feature.
So, about Conway's law. Just very quickly thinking
about Conway's law, I always just consider how many
scenarios we have with
communication, with single communication,
considering teams. Okay, so in this scenario,
in the two part, these two
sides of the communication, a single team is controlling
the change for each side. This is the easiest scenario
because you can do whatever you want. You can proceed
as fast as you would like to. Now we have these scenarios
when a shared responsibility is on one of the other side.
This is some sort of an anti pattern. This is
not frequently used. Only companies use on
those occasions when they don't really need to
change so many things in a legacy service,
or they don't really know how to separate the ownership
of maybe a bigger chunk of code.
This slows things down radically. This is
when you have to be very careful. This is when you need to introduce nonbreaking
changes or have the legacy endpoint live
for a very long time. Now,
there is this more healthier scenario when you have multiple
consumers and you are the producer side or you are the
consumer and there are multiple producers. This can happen in
many situations. What's important to understand, I think that
the service ownership does not necessarily come
with the schema ownership. You are free to
move the scheme ownership to the other side,
back and forth, however you feel it's more
suitable. This comes had in a couple
of situations. So let's say that this is an event
based system and who should control, in this case
the schema, the message producers who are producing
the events themselves, should they tell for the other teams
that, yeah, there's going to be a schema change and be aware of that,
and then just contact all the other teams, see if they
are ready for accepting the new event.
Or should we do it in a different way? Should the consumers
be controlling the scheme ownership and tell the producer that,
okay, we are expecting these kind of messages.
From now on, we are accepting this kind
of change, but not ready for another change and so on.
There are tools and techniques
on how to do this, and it helps visibility.
It had testability helps with many things.
There are also schema registries that you can introduce. You can switch
from something that's schema s
like traditional rest based API,
which is offering just simple JSON
based communication to a more
conservative way of communicating, using strongly typed
APIs like graphQL, GrPC,
or maybe introducing schemas into events or messages as well.
And there is this more most complicated scenario, when there
are multiple teams in each side,
producer and consumer side, that's when you need something more
advanced, or the most advanced things for controlling schemas.
Something like schema Federation, that's storing
different versions and kinds of schemas and schema changes in a
controlled way, most preferably in a venture controlled
way. Okay, so this
is where the first part ends. Now I would like to just
quickly introduce you the toolbox or the
things that I consider and jump right to the next
section. And we will talk about practical examples and
situations that I faced. And I would like to guide
you how we improved situations each by each.
So the tools that I use, usually you
can do something like cqrs,
meaning that you can separate the write and read path.
If you need something special on the read side, or maybe something
special on the right side, then we
talked a lot about schemas. You can introduce contract based testing.
It helps to move the schema ownership to the other side.
Then you can introduce caching. We saw in
the latency part how caching can improve the latency.
With caching, you have to think about data freshness
and multi write helps. Here I
call multi write something that keeps
the cached values fresh in a proactive way.
So if you grab a fresh value from
one side of your system because one of the clients
needs that, you need to proactively write it to other
cache instances to keep the data fresh and
reduce the number of cache misses.
Then you can switch from synchronous to
asynchronous communication by keeping the original
API, by introducing polling, by introducing maybe
synchronous API that sends forward the request
to a message queue and then just
send simple response back to the client. There are
also design practices or design principles
that you can rely on like cloud native and twelve factor.
I won't cover these, just thought it's good to mention
them. Auto scaling can be
effective in many ways and
auto scaling has a positive effect on throughput,
but not on latency. As we discussed, I talked about
back pressure back in this section when talking about queuing
theory, when you have higher
arrival rate than departure rate,
if you need large scale transactions, then you can introduce
sagas in a microservice architecture. You can do it in a
couple of ways. You can control the transaction either
by using orchestration or choreography.
You can introduce a service mesh. I think service meshes are
important because there are many ways to fine tune the communication
channels inside service mesh.
It improves your observability.
It helps you with certain kind of security aspects
and you can introduce many resiliency patterns,
us configurations inside service meshes like
security breakers, timeouts, retries and so on and
so forth. You can be conscious
about your technology choices. So for instance,
if you choose GRPC over traditional
rest based communication, you can expect lower latency
because usually GRPC has less
round trips during a communication,
during a request reasons, and payload is smaller
because it's binary based. So probably you have more throughput.
Messaging has many patterns, so if asynchronous communication
is not enough for you, then you can introduce messaging in one
of the sides. Switch from synchronous to asynchronous communications
and then you are free to use all those messaging patterns
which will increase the robustness of the communication itself
and maybe help in a specific situation.
If you choose your concurrency model well,
it will have higher throughput,
probably won't have a positive effect on the latency, but have higher
throughput with less resource. So it will introduce
more channels, more queues, but not necessarily more threads.
This is very well used
and I think a well settled technology coming in with reactive
programming or coroutines, so they are good
choices if you want to save resources with
your communication. Then there are
these resiliency patterns I think many people know
because they are widely used in
the microservice world. Also, service meshes offer them
by default. There are also libraries that are providing
most of these. So there are circuit breakers, bulkheads,
retries, timeouts, just to name the most important
parts. Timeout comes with all the libraries which are communicating
with the network. Then you have observability
to just review the whole and understand if you
are improved or not. Now let's jump to the example
part. So I will pick a couple of practical examples
that I've met, and I will go through how
in a specific situation things are improved
with what kind of practices. Okay, so one
of the examples I like is coming from the
distributed database called Cassandra, and this is called
this technique called rapid read protection. This is how it works.
So let's say a client needs to read the data
from the database, and it needs the data
to be up to date. So then the client goes to this
so called coordinator node that you can see on the top left,
and the coordinator node then gets
the data from each replicas, then aggregates the data
based on its freshness, and then sends back the update data to the client.
Now what happens if one of the requests is
being slow? Instead of waiting for the request,
the coordinator node is going to fire a so
called backup request, hoping that this backup request
will finish faster,
and hoping that the coordinator node will
send back the data to the client also faster.
Why is this happening? Isn't this just a waste
of effort and just a lot
of complication? Shouldn't be more efficient
if we just wait for
that request to finish and just fail if there is no
answer. So if you think about availability again,
let's say we have 99% of availability for each node
to be successful, successfully responding
the payload to the coordinator node.
In this case of 1% of a chance. When we have failure,
we will still have 1% of chance, or 99% of the
chance to be successful if we use a backup request.
So the overall availability for this simplified scenario
for the simplified query is increased. It's now around 99 99%.
Okay? And if you would also investigate the P 99
latency numbers, we will also see a decrease
compared to the scenario if we would turn this off,
because we won't see timeouts that often. It's true
that overall we have some situations when
it would have been better to wait for
the answer to arrive instead of just going on with another
backup request, because that can also fail. But overall at large scale,
statistically we are still better, still performing better.
Okay, now the other use case is coming from a scenario where
I had to design a system with a read heavy workload,
a very read heavy workload,
and the writes were theoretically
almost immutable. They were not changing at all.
So we just created objects in this
part of the architecture. We were hardly changing
or updating them.
I needed to reach a very low latency requirement.
So this as
the result meant that I had
to prevent scenarios were I had to deal with cold cache with
something that comes and reads
up all the data from the database. And I
couldn't use any distributed cache solution for this situation because
it would have hurt latency so much that
it would have been impossible to meet this low latency requirement.
Were comes cqrs in the play. So instead of just trying
to put everything in a single service and try to fine tune
and optimize that and try to benchmark things and try
to find a bottleneck and improve,
you should think in a larger picture
and you should use the techniques that I talked about. So we
separated the bright path that you can see on the left because we are not
really interested in the latency of the bright operations.
It was not critical for these writes to
happen immediately. So we went
on to AWS queue and then continued the
write to the database side. And it was
true that we were having large write workloads, but because
we were writing through a queue, it helps us to keep
the write operations on the dynamodb lower and
iron out these bursty
operations and help to keep the write capacity unit
lower than usual. Now for
the reads, we introduced a distributed
in memory cache solution with hazelcast that was also replicating
between each read node. So this
resulted us a couple of things. Like if I scale out
and have fresh read node, it should not come up with
an empty memory database. So immediately when a new read node
comes in, it starts synchronizing with the other read
nodes, which helps the data being fresh.
Also, when we have a cache miss in one of the read nodes,
it finds new data. By going to DynamoDB, it immediately
proactively starts replicating this data and writing it
to other need nodes. So that's how we solved
keeping the cache warm with multi writes, with using replication
as well. And the bright instance or
the bright service responsible for the writes
had nothing to do with Hazelcast was
not aware of this complicated configuration
of the memory cache solution.
Also, the read instances did not
have to have the SQS based libraries and did
not have to do anything with SQS at
all with that communication with the access and so on and so
forth. So this simplifies your architecture overall even further.
Okay, now with client libraries, I have a
couple of stories. I treat client
libraries as a double edged sword because I
think it's very hard to design them in an effective way.
They are not considering all these extensibility
options and not all the features can be
turned off in this specific
client library.
This library was included in many, many services as dependencies,
as default dependencies, and this was the only way of use
a specific shared feature of the whole architecture.
This specific client library downloaded a
zipped archive at startup from Amazon s three
and started decompressing
it during startup just to get the configuration data
required for this library to work effectively. This was
the single bottleneck for the startup of the whole service and
unfortunately hidden behind
the scenes, it messed up the startup for
all the development environments.
So all the developers suffered from the startup
penalty introduced by this client library.
Unfortunately, there was no way to turn off the library itself
because the maintaining team wanted to keep it as
simple as possible. They did not want
anyone to change
certain functionalities of the library and they thought that if
they introduced too many features for the library for customization,
then they will have hard times with the maintenance of the library
because were could be plenty of ways teams are using
that. So how
we solve this problem.
Okay, sorry. So another issue with client libraries
was that it often messed up the deployments. So because
health check was depending on the functionality
of the library itself, midbed was failure during downloading the
data. The service often just timed out and
failed to meet the health check requirements. That messed up the deployment.
So even deployments were negatively affected by the usage
of this library. So how do we solve
this issue? We just simply shift things on
the left side during delivery, during deployment.
What this meant for us, we just picked a mock
s three. S three solution, containerized that using
Docker and used it as a sidecar container. We prepared
two mock libraries in
this mock storage. One of them was empty, meaning that it
was very fast to download and extract that, and the other one
downloaded the same way,
downloaded the data from the s three bucket, then extracted that and
just had that certain configuration which
was meaningful for us, and we put that into
this mock container. The thing why this worked was very
simple. It was because they
did not have a feature switch or a queue switch for the library itself,
but the URL was configurable because each specific
environment had its own bucket configuration.
Another way other teams solved this problem was introducing some kind of
programmatic proxy, which was still trying
to keep things on when this connection was not available.
And behind the scenes try to kick in this client library
to retry loading up the data and
delaying the downloading of this data
that was important for them. Okay. Rather thing,
another technique that I want to talk about is what I call region
pinning. This might be not necessarily the appropriate
name for this technique, but this is how everybody was
using that. So this is how I refer to that as well.
Imagine the following scenario.
We had to migrate to the cloud with our own whole architecture,
the whole system, and then had to make it multiregion.
And one of the problem
was what we found was with shopping calls.
So each service was, there was not
a dedicated shopping cart service. Each service was managing
the shopping cart through a shared database,
namely a shared Cassandra cluster. So each service
was reading the shopping cart from a shant Cassandra cluster through a library,
and was updating the shopping cart when was necessary through a
shared library, and then putting the
data to this shared database. Okay. And we
could not really rely on the replication lag because it
was uncontrollable. So the minimum
replication lag was around 60 milliseconds.
But in practice, if you have more pressure on the database,
this could have been easily increased to 500 600 milliseconds
based on the load itself. Okay.
And we were afraid that these shopping carts will disappear. So if one
of the service or the traffic for one of the service is
put to the right side during the user journey, we were afraid
that when the shopping cart was being loaded because of
the replication lag, it was not available, it was empty, or maybe
it's not containing the up to date changes. So how do we
distinguish between users that have up to date shopping cart
in the left region from those that, let's say,
don't have yet shopping cart or started their journey on
the right region? That's where region pinning is coming into play.
We flag the originating
region for each user by using a cookie, and when
moving those certain users who need their shopping
cart, but started their user journey in the left
region to the right side. Based on this cookie, we reach
out through this white line,
paying that 60 millisecond latency penalty,
but loading the data consistently and having
the shopping cart. So with this way we
won't penalize calls the users because we are not going through
this white line for all the users all the time.
We just proactively select those users which needs
more consistency but should be okay
with this latest increase.
In another example, I was using the same approach
when I was designing an auction solution.
So here we had to keep a strict
ordering of the biddings. And no
matter which message provider I looked at,
I figured out that there is no way to keep
the messages in order when I rely on synchronization
between regions. Okay, so if the auction started
on the left side in the
kafka cluster on the left side, and somebody wanted
to participate in that auction on
the other region, they had to pay for this latency
penalty proactively, but right to
the kafka cluster still on the left side.
And then for other services or participants
which were not latency sensitive, the data was
still available, maybe later in a synchronous way through the
mirror maker on the right side. So they could have continued
processing this data or maybe showing this data a bit
later. But those who wanted to participate in bidding had
to write consistently on the same region
as the auction is originated. This is similar
to the things we used to in the gaming
world. So if there's a game started in
one certain region, then when other players
are participating, they have to consider the latency penalty
and maybe not perform that well. But this keeps
the game consistent. Okay, the last example is
coming from the journey of optimizing
a single service in a couple of iterations.
It. So I would explain the
behavior and the functionality of the service a bit more at first.
Okay, so this service had two endpoints.
First of all, it was a very simple key value store.
You can see the value endpoint on the right side of the service,
which reads the values for a certain key given.
Okay? And that's coming to specific
DynamoDB table, which has its own read capacity
unit defined. But we have also another endpoint.
This is how things were when we got the service,
when we started maintaining that. So this is what we
kept. This was the endpoint of the recent keys.
What designpoint did was that it gave back to the clients
the keys that were being accessed by that specific
client in the last seven days. That went off
to another table, which was statistical
table that you can see on the left, which had their own read
and write capacity unit defined. Now, both tables had
auto scaling configured, but there was a problem. As you can see on
the right side diagram, there were one specific client which was
firing up 15,000 requests to this values
endpoint, because it had 15,000 keys in
this usually being used. And it
went with one single request to this recent keys. Endpoint found
that in the last seven days it was using these 15,000 keys,
and iteratively it went through calls, the keys bashing
sequentially, these values endpoint increasing traffic
and the dynamodb auto scaling was not catching up.
There was a time window defined by this
red line that you can see on the screen when DynamoDB
was throttling because it failed to meet the capacity needs
for the traffic. And unfortunately,
when the traffic burst was over,
auto scaling increased for DynamoDB, but then the increase decreased
immediately after that. Okay, now one of the problem was that
the statistics table was written
sequentially before the read happens, when getting the values,
just to keep the statistics fresh. So in
maintenance, during maintenance, we had to keep the write capacity units
the same as the read capacity units for the two
tables, because auto scaling was hard to
fine tune, and because if
the write capacity units are lower, we are just failing
by writing the statistics table. And the reads were also failing.
So what we did first was separating this critical path
from the rest. So we put
the update operation just in a different thread
and just deployed solution. Things worked
bit better than before, but after a short
period of time, there were other problems were seen,
that for some reason the
memory consumption of the services were increasing
and the container orchestrator started killing the services
themselves. And the
reason of that was in the flow of the implementation.
This was the
way we created the thread that
was dealing with this change. This is in Java,
but you don't necessarily have to understand Java to understand this use case.
We just created a separate
thread that had a separate queue that is
processing these requests when they are coming in and updating the statistics
table. Now where comes the problem?
What do you think? Where is the problem with
this implementation that's causing the memory increase and the killing
of the instances from time to time and restarting them?
Well, it's not obvious because calls the implementation details,
but this is the single place when there is a problem.
So very often Java
old school threaded implementations are coming with this unbounded queue,
meaning you have a limitless queue. And what happens when you have
an increased arrival rate and your departure rate
is much lower? You will have this unbounded queue filled up,
your latency will increase up to infinity, and your
memory consumption will also increase up
to infinity, up to where
you can hold this data, because you
have a very huge queue sitting there for no reason.
And actually this is not even true, because this is not an unbounded
queue, it's not erasing data automatically, it gets
full, it's just a very huge queue because it's
implemented by using the maximum value.
Java is declaring for integers,
right? So we can do better than that. So we
iterated with this implementation and we just simply
challenges to a more sophisticated solution. We started using
resiliency for J, which is a library
for implementing resiliency patterns in Java,
and we wrapped the call with a bulkhead which now
had a limited queue capacity, up to 25 items.
And when that queue was full, bulkhead was throwing
an exception. Now we could have exception because
of two cases. First of all, because DynamoDB is starting
throttling, it's just rejecting our request. Or the
bulkhead was full, so were wrapped this whole thing, this whole
calls into a circuit breaker, and the circuit breaker
just opened when it saw these two exceptions,
give the whole thing a pause and then started again updating
the statistics. And this
helped us recover from the situation from
before. We did not have these memory
issues, the clients did not really notice anything
at all. And we looked actually at the metrics of the circuit breaker,
looked at the statistics of when they were opened
up, and it was not a big number.
So statistics not really suffered because of that.
And normally behaving clients could just
keep up with their normal operation, maybe having a couple
of more cache misses than usual, but with
metrics, and with this solution and investigating metrics,
we thought we were fine, so we dropped the right copper units.
Finally, for a statistics table that did not have to match
with the read part of
the values table. Okay.
But we really wanted to reduce also the read capacity unit
for the reads. So again, let's go back to the baseline
and talk about what we have with a bit more detail. So we
have 15,000 items coming in, 15,000 requests
like almost instantly.
And then we had these services packed in
an auto scaling group that was cpu based.
And then Dynamodb was throttling and giving us back HTTP
400 errors on any case when we
breached our read capacity unit.
So what if we do retry? So what if we have this HTTP 400
errors? We just retry the request and hope now that the read
capacity unit was catching up,
unfortunately, this was not introducing any fairness
to this whole solution. So when this misbehaving
client came with this 15,000 keys,
it just choked the whole system with its own request.
Other clients had to wait until DynamoDB
was catching up to get their own answers. Okay,
so it was not introducing any fairness for this whole scenario.
We wanted to do better. We tried also built
in rate limiting, but we did not go into production because for obvious reasons,
it was not working very well. So you can introduce
a quite okay ish implementation with resilience
for j that also has rate limiting. It's quite precise.
So you can have 40 operations per second for
each instance. Now with two instances, you have in total of 80
operations per second. Now, when the auto scaling
kicks in immediately, you have another instance having
40 operations per second, which now also increasing
your rate limit and your capacity, which is not true, of course,
because the original capacity is determined
by the dynamoDb's current capacity and by its
auto scaling characteristics. Okay,
we tried many other things,
but one of these was this one obviously did not
work well. We could have put
rate limiting into something that's used centrally,
namely into the service mesh. So in this case,
istio was also offering rate limiting. It was using redis
to keep the states of each client
to control the rate limits. Problem was that it
introduced an API change. So instead of giving these requests a
post or maybe slowing down the clients, which are misbehaving
similarly to what back pressure does, it immediately
gave them another kind of HTTP response. And we thought
that something, that it caused more trouble
than solve solutions. So we
did not go on with this change. Instead of that, we went
back to queuing theory. So if we simplify this into
a simpler queue, this is what happens. So we have 15,000 items coming
in, you have couple of executors that are on the right side,
and the throughput, and to determine the throughput,
you need the latency. So the overall latency was
equal to the timeout configuration of the client,
which was not easy to figure out, or not difficult to figure out,
sorry. Because it was the default being used. I don't
know where this number is coming from, but for every library
it looks like timeout is 30 seconds. So we
have 30 seconds to consume and
problems these 15,000 items.
This gives us the overall throughput of 500 operations
per second, regardless of the number of executors.
So with two instances working on
that, we need to complete each operation in four milliseconds,
which seems to be nearly impossible. We do not really want to bother
with optimizing the whole thing, but if you scale this out
to five nodes, to five workers, it's a more
user friendly number. It's now ten milliseconds, which seem
to be doable. So we think that if we can
slow down these 15,000 requests
not to be processed immediately,
but instead of them to be processed within 30 seconds,
close to a number that's 30 seconds with five worker,
if we have the execution duration of ten milliseconds for
each worker, we can do it in a sensible
way. So we tried to
find a technology that allows us to do that.
We were looking towards RabbitMQ because a
couple of very interesting features. RabbitMQ has
this queue overflow behavior. So if you set this overflow
setting for a queue, then it will immediately reject
and not consume those requests that was put into the queue.
So I think it will wait on the client side until
the queue still has capacity and until the consumers are catching
up instead of failing them. It's failing after a
certain amount of time. It also have another rate limiting or
flow rate behavior, but it's bound to
the memory and cpusage to RabbitMQ that you can see on the
middle left on the screen. That's very hard to control. This is
not what we were looking for, but still was quite promising then.
What's important is that you can define your prefetch settings
by queues that you can see on the top right
corner. So you can have a single channel of
connection that's connecting through different consumers
to different queues, and have different settings for
each queue. So if you have one of
the clients coming into one queue, you can have
a specific configuration for that single queue.
Now for another queue, you can either have the same or different configuration
as well, and you have many options to acknowledge
the request. So when DynamoDB starts throttling, so you just
simply acknowledge or not negatively
acknowledge or reject the
message from the worker side and you can retry with
the next interactions or with the next worker.
So here was our setup. Basically we had a service or
the array of services now having an auto scaling
group. This was using so
calls fake boundary. This is how I calls that. Maybe there's a
better name. And instead of being synchronous, it was asynchronous,
but it used request reply queues and we separated
the request by each API key. So each client fortunately
forwarded their own API key in the header and
in each queue in each channel. We had
the same configuration, the same configuration of overflow
settings, the same configuration of prefetch rate, and each worker
connected to all the queues at the same time
and have their own auto scaling group.
Creating a new queue if we saw a fresh API
key was not a problem because the configuration was in service itself.
So this had something like global configuration
available and connecting to a new queue
when we see a new queue from the worker side seemed to be a bit
more complicated but doable, because RevitMQ is
offering management API as well,
which helps you discover if there's a fresh queue.
And opening up a new connection seemed
to be, and having a new unit
which is consuming that connection seemed to be
not a big deal. And then
we could connect to Dynamodb and reject
the request if there's throttling, but hope that this
will have the effect what we desired for. Again, we need
to slow down misbehaving clients. So if there's more orange marble coming
in, we have to say after a while that orange marbles have to wait.
Why? Processing the blue and the yellow marbles in their own pace,
and this is the metrics that I've got.
Unfortunately this is not from the real scenario.
I had to rebuild it in a sandbox for certain
reasons, but it's available under my GitHub profile,
so you can try it by your own if you want to. So this
was the baseline of direct reads. Now directly
coming from the service of the database, you can see that we have
the throughput of 300 operations per second, and we
complete all the reads in 15 seconds.
And the response time is around ten milliseconds.
This is quite good because it's quite
close to this scenario with five workers that you want
to reach, and the response time distribution is
very tight. So it's everything from going
to zero to one eight.
I don't know what the unit is, maybe this graph
is messed up, sorry for that. But for the other diagrams
you can see that these numbers change at least. So with single worker
what we can see now is that rate limiting is working
as expected or the back pressure is working as expected. So now
instead of doing
all these reads within 15 seconds we
just give it a pause and we do it instead of that
in I think two minutes,
yeah, in around two minutes. So it's not
reaching that 30 seconds goal what we aimed for, but it's
giving the request a pause when we have more process
than what we want to. The reasons time average is a bit
higher than expected,
but at least
we have, this is just a single worker, so at least
we have this set up as we
want to. And you can see this from the response time distribution.
So the response time distribution now is going to up
to 14 units. So it's better than
it was before. And now with five workers we tried it out also with
five workers we saw that with five workers
we succeeded to reach our goals.
So this exactly takes 30
seconds as expected. And interestingly in
one point we measured even a higher throughput. So the original
throughput was very close to the
baseline and in one case it was a
bit even higher, even 500 operations per second.
It also shows that throughput has nothing to
do really with latency. It has some relations
with latency, but we are able to meet even higher throughput
even if we have higher latency for some of the clients,
for some of the channels and the worker execution time and response
time. Now, especially if you see the worker execution time,
you can see that now it's getting short to getting to
ten milliseconds at the end where we want it to be.
And the response time distribution is again a bit
better distributed than before. So this was
quite promising for us. So that's all I wanted
to say and present to you. Thank you so much
for listening again. I was orthes Margaret and
I work as associate chief software engineer at EPAm.
If you have questions related to the things I talked about,
just feel free to reach out to me by using either Twitter
or LinkedIn or feel free to visit my GitHub
profile. But I have an availability simulator
that helps you to get these availability numbers that I was talking about
and have this example sandbox of back pressure
and ray limiting I was presenting to you and
I get plenty of plenty of references. If you are interested in more just
feel free to look at the end of the slides and discover
a couple of the things I talked about even in more detail. Thank you
very much again for listening. I hope you had a great time and
learned something new.