Transcript
This transcript was autogenerated. To make changes, submit a PR.
Hi, I'm Akshay, data engineer,
currently working as an engineering manager to deliver the data migration solution AI
related solutions, primarily around the topic modeling and the NLP
related text based use cases. The topic for today's
talk is best practices for a streaming data pipeline using
an AWS. And in this particular talk, I'm just going to share
some of my experiences while working on one of the large scale streaming
data projects that we have worked on. So let's go into
the use case and understand more in a detail that what
we are trying to solve and we can just go further on that. So in
our case we used to have an upstream side dynamodB
based system where the event
used to be updated via the APIs and the batch processing jobs
those event used to get passed down to us via the
DynamodB stream. Take those events, we used to
enrich it and then based on the agreed schema,
we used to pass those events to our downstream consumers.
Our downstream consumer used to store that data into the
elasticsearch based system and then they used to display that
data on the web app to display the metric where
the metric was reviewed by the respective user. The entire
goal of the system was to ensure that once the
data become available onto DynamoDB,
we need to pass that data and make it available into the web
app as soon as possible.
Now, in order to implement this particular solution on the data enrichment
layer, what we have did is that in order to listen to this
DynamoDB events, we have set up a ECS cluster
which used to read a data using a KCL application and
then used to write the data onto the queuing system SQs.
Once the data was available over there, we used to trigger
a lambda that used to process those SQS messages,
enrich that data as per the data contract agreed with
the consumers. Once the data is processing
that data, we used to write on the Kinesis stream again and
then it used to go further for the downstream consumption purposes.
In this particular diagram, I have not added a component
which are related to the self service bi or analytics requirement
or a monitoring or alert requirement just to keep the things
simple and explain the problem in appropriate way.
The next set of things, what we will be talking about would be primarily focused
on this particular lambda and the kinesis layer to solve
a particular problems and the scale that we have
seen in this particular use case. Now let me
just go down further and explain you the
requirements or the challenges that we have while
implementing this particular solution on the side of the requirements or
the challenges on the dynamoDB site system we
used to have 950,000,000 events either added
or updated on a daily basis. So that
is the amount of load that we used to get as a part of hours
stream. And our responsibility was to accept
that data, enrich that data as
per the contract, and then pass it to the consumer in a
reliable manner. Over here, the challenge was
most in terms of a pattern in which we used to get a data,
because on the dynamodB, the data used to get updated via the
APIs or via the batch processing jobs. So whenever a
batch process used to update a data on the dynamodb, we used
to get a very high amount of event for a particular period.
And we need to design our system to ensure that
we support those kind of load and pass that data to
the downstream system as soon as possible.
Now let's go and talk more about it. That how we
have managed our infrastructure to meet
those kind of requirements and what is the kind of thought process we have took
over there to ensure that we can manage
those kind of loads. As we go further into this particular
talk. If I talk about this kinesis as a
stream, then in the kinesis generally what happened is that we need to
define a number of shards that we want to keep in
hours stream. This number of shard is something which we
can either use on demand capacity or we can provision
it in advance as well. The on demand
capacity with this kind of scale is going to be a
costly operation. So you need to be a bit careful about what
is the number of amount of shards that you want to use in auto scale
mode or if you're going for a provisional capacity,
then in that case also you need to be very careful around what are the
number of shards that you want to keep in a particular stream
to process this data and move further on that site.
Also from the producer side, when the bulk
of data come in a way and you are processing it with a
lambda and writing it to the shrouds, you also
need to ensure that you write it at a speed to
the shrouds where the throttle
don't occur and you don't go into the throttling
capacity of writing a data onto a particular shroud.
So you need to basically derive a right combination over here that what
is the number of shards that you want to keep in
your stream and what is the kind of a parallel or a concurrent
processing that you want to support in the lambda
with some compromise against the latency to
ensure that your system work in a smooth and a reliable manner.
So that was like one of the core goal for us as a first exercise
to identify that right amount of number of shrouds and the
kind of a concurrency that we can support over here which
can ensure that at a runtime the throttle
don't happen while writing this data on the shroud.
Now also, when we are talking about the shards, we need to ensure that
whatever shroud, while we are writing a data to the shards,
we have a uniform distribution across the shard as soon
AWS possible. So the first thing what we have did is we
have just tried to identify those things that whatever number of shards
we are using over there, we have initially started with the
200 odd shards to go with a particular
stream and we just want to ensure that that whatever number
of shards we are using, our data become uniformly
distributed across the shards that we are having in the stream.
Otherwise, what can happen is that if your data is not getting uniformly distributed
across the shard, then you may go into a scenario where some
of the shards are the hot shards where you have a lot of number of
records being written by your producer
systems, or you may have some of the shards where you have a very less
number of records and they turn into the cold shards.
Now, in order to identify those things that whether the data on
the shards are getting distributed in appropriate manner or not,
you can just enable the logging related activities on those
particular shards and you can just monitor those details accordingly.
So how we can check those detail is that by default,
this particular metric related to the shards are not
enabled for the monitoring purposes.
In order to enable those kind of metrics, you need to use enable
enhanced monitoring API to make
this shard related matrix enable on the AWS ecosystem.
Here the batch is this particular metric.
Monitoring don't come for free, so you
basically need to pay for this metric
availability. But once you get an access to this metric,
you would be able to very reliably see that across the
shards, whether the data is getting written in the uniform format or
not. And that will basically give you an confidence in the sense
of whatever number of shards you are provisioning. Those shards
are getting utilized in appropriate manner
for a further data processing. If you are not willing
to pay and go with this in house solution
Sri the uniform distribution, you can build your
own solution in way as well where you can either
on the consumer side you can just use the custom logging where for
each record you can just basically
identify those details or you can customize
the producer and log the put record response and it will
return you the data that is placed under a particular shroud.
So there are alternate ways also that you can use to check
whether the data is uniformly distributed or not. But I
would prefer in the initial stage, it would be great to just go with the
inbuilt matrix, and that will just give you a good amount of confidence that
whether your data is distributed uniformly across a shard
or not. Now once you identify that, let's say
your data is not uniformly distributed across a shard, if it's
distributed uniformly across a shard, there is a no issue and you can just go
further with a similar kind of implementation. But if, let's say your
data is not uniformly distributed across a shard, and you want to enforce
those kind of mechanism where your data get
distributed uniformly across the shards from the upstream
system, whatever we are using over here for a data processing layer,
in this case we were using a lambda. So in order to
ensure the kind of uniform distribution, there are
two ways in which you can implement it in the AWS
lambda way, where one of the way what you can do is that
whatever put record command,
you are firing. Put record or put records command you are firing to write
a data on the stream. On that one you can explicitly pass
the partition key as an hash value of
this particular record, or by having some kind of hash
of a random number generator, which will basically ensure that
your data will go in a uniformly
manner, because generally the random distribution is
gaussian distribution and that ensure the uniformity
and the uniform distribution. So you can either go with this particular approach
where you can just take a hash of the records
and then you can just pass it further,
or in the second approach, what you can do is you can just
share the data with the explicit hash key.
So in an Aws, you can basically use a command
where you can just describe the stream
and you can get access to what all shard it has and
to that shroud, what are the hash key ranges it
is using to distribute the data. Now once you have those
information, what you can do is in a round robin manner,
whatever data you are having in your stream,
like whatever data you are trying to write on the
stream using a put record or put records command, you can
assign it a partition key, which is a mandatory parameter. And along
with that you can also assign the explicit hash key.
Now if you assign both the parameter partition key and the explicit
hash key, the explicit hash key will get a priority. It is going
to override hours partition key and it is going to deliver
a data to a particular partition where you
are passing with a random distribution. So in this particular
manner, with any of these two approaches, you would be able to
ensure the uniform distribution across hours shard.
And if you are going to do that, it is going to ensure two things.
The first thing is that, that you don't go into the scenario where
you have a hot or cold shard related issues. And second, it will
ensure that you will have a proper utilization
of the shards that you are using in your
ecosystem. So that will helped optimize a
cost and a proper utilization.
And this kind of metric will also help you
manage and see the load. In case of an issue,
if you want to restrict the number of records
that you are writing to the shrouds, not a number of records,
but you want to restrict or slow the speed of writing to your
shrouds, one of the thing, what you can do is that whatever layer you are
using over here, you can just slow down that particular layer
speed as well. To write a data like in our case, you were having a
lambda, and in the lambda like, you can use the
concurrent execution property to say
that how much concurrent executions is going to happen for this particular
layer. And that layer will ensure that only
that much amount of concurrent execution happen. So that will slow
down the speed of writing onto the shards and the processing
speed on the shroud. So you don't go into the throttle issue as well.
So till the time you don't have
a clear idea that to which extent you want to increase
your shard and go in a reliable manner,
you can just use this kind of combination mechanism and
it can help you out derive the right value of your number
of shards and the execution concurrency layer
over here, that can ensure
uniform distribution and a transfer of data into
your stream. Now,
after identifying this kind of right numbers
and ensuring that, that you have a uniform distribution of
records across your shard. The other thing, problem, what we have
seen in our implementation is that the way the
schema was evolving, that schema,
we were getting lots of columns which were having details like
a comment and the descriptions and those kind of descriptive
columns were more and more getting added to
the particular data contract. Now, when we are
getting and adding those kind of more descriptive columns and
the details into our data,
what we are essentially doing is we are basically increasing the
size of the data or a record that
we are writing to the shroud. In the shrouds,
there is a limitation that it can handle up to
1 mb/second data write.
So again, you will go into the same scenario where
if you are writing a more number of data in
a way to your shroud then either you need to increase your shrouds
or you will go into a problem where you will get a
throttling of the records on a particular shard.
So in order to overcome this solution, one of the thing, you know, what you
can do is that whatever data you are writing on
the Kinesis stream,
before writing that data, you can just apply
some kind of compression algorithms like Gzip or LZ
hours or something like that. Or you can just choose a format like
Avro or something, which is a compressed data format.
This will add an additional computation in your lambda layer
and on the consumer side as well to either unzip the
data or to read the data
in a particular format. But it will ensure that
your data transfer happen in a seamless manner.
So if you are going across a similar kind of problem,
this is the kind of thing which you can use for the
other thing. What I recommend is, which was unfortunately we were not able to implement
in our project that for this kind of descriptive
columns and descriptive attributes,
if it's possible, then rather than processing
that data to the stream, it is good to pass that data
via some kind of references where your data can be
stored on let's say some s three locations or databases,
and the end consumers can access those kind of descriptive
data from there if the cardinality of that data
is not high. So in those cases you will a save
on the processing side and b you save on the space
as well. So it can become a minimum situation on both of the sides.
So you can choose either your format rightfully or
you can just define your data context appropriately
to ensure that you are keeping some control
on the amount of data that you are passing in
each record to your the next practices.
What I recommend generally while working with any kind of streaming
platform, is that whatever data we are
passing to the stream, that data has to be bind with
some kind of a data contract. So that while
writing the data, the producer
can basically write a data as per that contract.
And the consumer can also understand using that contract what
data it is going to using
and how to basically use that data or read that data now
in order to maintain those things. If you are going to work with any of
the streaming solution, they generally provide some kind of a
schema registry. Like on the AWS world,
there is a glue catalog available where in the glue you can
define the AWS glue schema registry service
where you can basically define the schema.
And that schema is something that can be consumed by both producer and consumer
to understand the data along with that schema registry,
what feature it provide is it also support a versioning of the schema.
So if you have a scenarios where hours schema is getting
evolved over a period of time, then the admin
can control that particular schema and based on the
version and the contract the schema get
evolved. So your producer always know in
which way it has to write a data and your consumer
will also get an idea that how to read a data. So what I generally
recommend is to go for the schema registry first before even
implementing streaming solutions.
And that will give you and save you lots of time
while working further on the project. In case you don't want to use any kind
of schema registry based solutions. In that way
you can just deliver some kind of code bases that basically
contain the definitions and that shared code
base can be or those schema
definitions can be used in both the sides on a producer
and consumer to write a data in a particular
schema and deliver it in an appropriate way. After doing
this schema registry, the last thing, what I would like to mention is the validation
technique which we have used and which
was more kind of inspired from the canary implementation
to say that whatever data we are getting from our
producer system and whatever data we are passing to our
consumer, that data is getting passed in a reliable manner.
So over here, one of the thing what we have did is that whatever data
we are getting from a producer and our processing used to write that data
to the stream and used to consume from the along
with that, what we have did is we have also start publishing
a dummy events into our data stream.
So what we used to do is we used to have a certain
lifecycle of the product that we are having, which used to have a
lifecycle state like open, in review, progress,
close, et cetera. So whatever those dummy
events were there, that dummy events we used to publish
on the stream for a further consumption
purposes. And those dummy events related metric we
used to publish on the cloud batch as well.
Once those event get passed over here,
those event also used to go to the stream.
And on the consumer side also we used to read those
particular dummy event and we used to pass
the custom metrics around those event status and the other
dimensions into this particular cloudwatch matrix.
After publishing these events on a periodic manner
and consuming again and publishing
its data onto the Cloudwatch kind of layer,
what we were able to achieve is that on a periodic manner,
we used to compare both the metrics that whether whatever
metrics we have published from this dummy event
versus whatever metric we are catching on the consumer
side, whether those metrics are matching and
with an acceptable deviation range or not.
And if we identify that, that the number of open or
closed events, if that deviation range is
too high across the one which we are sending
from the producer side and one we are getting from the consumer side,
then in that case there is a delay is happening in
the streaming data that we are passing through our
processes, or there is a high volume load or some
kind of issues are there. So on those kind of metrics,
we have basically created an alerts in which
that we used to say that yep, whether the
data is getting processed in a smooth manner or not. So if
you are also working on a streaming related solutions,
then this is one of the canary based technique that can be implemented
where you can put some kind of dummy events
on the producer side, on the consumer side you read the
same event and on those events lifecycle
you produce a custom matrix on the Cloudwatch
or the related tool. And from there
you can generate alerts in case you see a higher deviation
while processing the data and the respective turnaround time.
So that can help you identify that whether hours streaming
pipeline is working, AWS expected or not,
or there is some fix or concurrency,
or a capacity increase is required to support those
kind of use cases. So that's all about it.
And these are like some of the things which I just want to cover in
this particular sessions around the best practices.
If you want to talk more about the streaming
solutions or the things around it, feel free to reach out
to me on the LinkedIn or some social media platform
and would be happy to talk more over there.
Thank you and thanks for your attention while listening to this talk.