Transcript
This transcript was autogenerated. To make changes, submit a PR.
Hi everyone, this is Shay Lin. This talk I'm
going to talk about the streaming aggregation of cloud
scale telemetry. First we are going to talk about why
were we even talking about telemetry? In what context?
Then we are going to touch upon the evolution of
telemetry serving platform at confluent and
finally deep dive into the architecture.
Telemetry is the unit measurement for
observability as these foundation of chaos
engineering. As we all know,
observability is here to establish measurements for the knowns.
It helps us to discover the unknown,
unknowns. And finally,
this is often considered as the prerequisite
of continuous experiment.
So we're going to talk about Kafka reliability in
context of KLS engineering.
For those who aren't familiar with Apache Kafka,
the Kafka brokers are a compute
unit within the Kafka cluster. By nature,
brokers handle read and write requests.
Simple. It also handles data replication
and the partitioning strategy.
As simple as it sounds, if we zoom into any given
broker, there are quite a few things
happening. There's your network threads, there's your
IO threads, there's caching disk I o.
Everything is happening.
So now that we mastered our knowledge of
Apache Kafka, let's try and do a global deployment
as an SRE. If you look at this graph,
you're scared. If your backend engineer or product manager
approach you and say, hey shay, can we roll
out this feature tomorrow please? And your boss
asks, what if us west two is down?
Those are the questions you have to answer in order to support
continuous rollout and experimentation of your products.
The first thing an SRE would do is to start instrumenting
telemetry. It means having
your client send out telemetry, having your brokers
send out telemetry so you have your basics elements coming
in. It would probably look like
something like this, different elements coming from everywhere
and all into a big giant data store.
So what do we do about it? After a while,
we quickly find out. The usage patterns are as
follows. Number of total network invites
in the last hour. We're looking for facts.
We're also looking for trends. For example, what's the
cpu usage in the last seven days?
We also try to use this data for diagnostics.
And more importantly, things also start to become
complex in terms of attribution. You want to see
whether there are potential correlations between cpu usage,
trend and fetch requests. Think of a typical
datadog dashboard.
Now that we have a data store rolling and
all the telemetry rolling up flowing in
things start to make sense a little bit you
might have based on the data that you
are looking at, you have some basic ideas
like clients are often getting declined for unknown
reasons and you come up with certain hypotheses such
as your fan out could be a problem. Your deployment
cannot just support too many connections at the same time.
But in many situations,
a simple data store cannot support these
complex diagnose or attribution analysis.
That's where we bring in time series optimized data
stores,
otherwise known as OLAP datastore. We can use Apache
Druid as an example. Here in
the bottom, we can simply assume
data is flowing in in these near real time manner into
the segments. We have a query
engine that's smart enough to take in the incoming queries
and then fetch data
in the corresponding segments. So this
works great. Now we have better visibility
and faster compute in order to answer questions
such as number of fetch requests.
As your business grow, you'll probably run
into these situation. Your segments are
getting filled up quickly. So how does segments even
get filled up in the first place? I'd like to think of it as
filling a cup of water. You have continuous
water raw telemetry that's coming in from a
fire. These each
water bottle has a capacity limit. Once it's
filled up, we push this away,
bring the new water bottle and go from there.
So you will see segments gets created all the
time. And just to put things
in perspective, oftentimes for cloud
providers like confluent data is flowing
in at millions of events per second. And then you
can easily accumulate petabyte scale
amount of data within an hour.
On the consumption side, which is on the upper part of
this diagram, you have increasing
amount of interest of pulling similar data set.
Give me the number of fetch requests for the Kafka cluster XYZ
in the last hour and the question is often asked
by many different parties. You have your internal
customers such as billing Sres,
who's making the continuous monitoring, and then on
the other side of the house we have your external customers.
Obviously for privacy reasons, you don't
want external customers to be able to view data of
other customers. However, they are very
interested in how their own kafka cluster is
doing.
When we look at the status of these things, to serve highly confluent
ingestion and queries at the same time is extremely
hard. Some of the concerns that arises are
repetitive queries on hot entities.
The compute and serving costs are getting extremely expensive.
As you can imagine, segments have been created, queries coming
in concurrently. And more importantly,
it's very hard to control what
customer a thinks about this metric versus what billing
thinks about this metric.
And the goal is to have
everyone come to the consensus of hey, this is how
we define this particular metric and go from
there to
solve the problem of serving telemetry
at scale. The solution that
we landed on is to push asynchronous aggregations
through kafka. What that means is,
yes, data is still falling into these raw segments.
That's all the way on the left. Instead of
serving repetitive queries
to many different customers and internal usages,
the goal is to publish predefined aggregations
that are commonly reused everywhere.
This is done through Kafka streams,
which I'll talk more in a little bit, and then once that's
pre aggregated, once the telemetry gets
aggregated, we publish it back to
our customers as well as druid
to store the aggregation segments.
Here we have a nice set of Kafka stream topologies.
How it works is these is all event driven.
Once a segment gets filled up, there are certain
signals being sent out. So our service
essentially listens to signals coming out of
the segments and then start to compute from
there. We have a central repository registry
for all the metric definitions. That means what are the
predefined aggregations that we want to serve?
In the first topology, there is a global task
manager that tries to break down the task into these
smallest unit that we can work with oftentimes.
That depends on how many clusters we need to aggregate on
what's the metric granularity
in terms of space and time.
Once the aggregation tasks are created, things are
straightforward subtopology.
Two in the middle handles all the processing request
with external compute. And finally,
those predefined aggregations are published into Kafka.
Again here. Just to note,
there are additional processing that might be
needed per your consumption contract.
For example, here we are using
open telemetry for publishing telemetry metrics.
That means certain semantics are applied upon
what if it's a content metric,
it's a gauge metric, et cetera.
You want to make sure that you process and maintain the state
in order for your consumers to consume it correctly
so that the data isn't duplicated
over time. Aside from the
Kafka streams solution that we landed on, there are
obviously alternative architectures to support this.
There are two general routes.
One offline custom roll up tasks.
This is actually made available in
some of these databases today. For example in P zero.
This is done through Star Tree index.
Essentially, if we look at the
current data set, there's likely five different
dimensions, and then the goal is to reduce it into,
say, three dimensions out of the five.
This is done by setting the Star Tree index, and Pinot will
handle it through background tasks called mediums.
On the other side, our party druid also offers
injecting custom roll up tasks and handle it
for you. On the
other hand, of the things we can always process raw
telemetry through stream processing. What we did
find out through POC is
that your partition strategy
should be highly coherent with
how you're aggregating your data.
If you're not doing so,
chances are that you're replicating the data set
throughout the process processing schemes, which is often
less agile and very expensive.
With the scale of data that we are talking about picking
one of the architectures, it is understandable
that you always need to look at your current state of systems.
For example, how's the data
store cluster doing? Is it powerful enough to
do some additional custom roll ups?
How about adding additional clusters? And on the
other hand, you need to understand your consumption
semantics well, to understand whether shrink processing
will meet your needs,
we're talking about two dimensions here.
One processing through space, other processing
through time. And the more compressed
your data is, which is these lower end of this curve,
it's likely that your compressed data is only going to be able to
serve a narrower set of use cases.
However, on the other hand, the more
dimension your data set have,
the more likelihood that you're going to serve a broader set
of use cases. So defining
what set of predefined aggregations so
defining what set of predefined aggregations
you want to perform in this kind of aggregation
platform in this
kind of aggregation engine is key to success.
So now we're getting into a much better
state. As a
site reliability ability engineer, you look at this graph
at any given time, correlations are
very clear attribution. It's very easy
to dive into a specific region.