Conf42 Cloud Native 2024 - Online

Best practices to manage AWS streaming data pipeline at scale

Video size:

Abstract

I am a data engineering professional and have helped companies like AWS manage streaming data at scale and support analytics over it. My solution helped organizations convert batch processing pipelines to real-time pipelines and hence reduce data freshness from 75+ hours to less than 30 minutes.

Summary

  • Akshay: The topic for today's talk is best practices for a streaming data pipeline using an AWS. He says the challenge was most in terms of a pattern in which we used to get a data. How we have managed our infrastructure to meet those kind of requirements.
  • The validation technique which we have used to say that whatever data we are passing to our consumer, that data is getting passed in a reliable manner. If you want to talk more about the streaming solutions or the things around it, feel free to reach out to me on LinkedIn or some social media platform.

Transcript

This transcript was autogenerated. To make changes, submit a PR.
Hi, I'm Akshay, data engineer, currently working as an engineering manager to deliver the data migration solution AI related solutions, primarily around the topic modeling and the NLP related text based use cases. The topic for today's talk is best practices for a streaming data pipeline using an AWS. And in this particular talk, I'm just going to share some of my experiences while working on one of the large scale streaming data projects that we have worked on. So let's go into the use case and understand more in a detail that what we are trying to solve and we can just go further on that. So in our case we used to have an upstream side dynamodB based system where the event used to be updated via the APIs and the batch processing jobs those event used to get passed down to us via the DynamodB stream. Take those events, we used to enrich it and then based on the agreed schema, we used to pass those events to our downstream consumers. Our downstream consumer used to store that data into the elasticsearch based system and then they used to display that data on the web app to display the metric where the metric was reviewed by the respective user. The entire goal of the system was to ensure that once the data become available onto DynamoDB, we need to pass that data and make it available into the web app as soon as possible. Now, in order to implement this particular solution on the data enrichment layer, what we have did is that in order to listen to this DynamoDB events, we have set up a ECS cluster which used to read a data using a KCL application and then used to write the data onto the queuing system SQs. Once the data was available over there, we used to trigger a lambda that used to process those SQS messages, enrich that data as per the data contract agreed with the consumers. Once the data is processing that data, we used to write on the Kinesis stream again and then it used to go further for the downstream consumption purposes. In this particular diagram, I have not added a component which are related to the self service bi or analytics requirement or a monitoring or alert requirement just to keep the things simple and explain the problem in appropriate way. The next set of things, what we will be talking about would be primarily focused on this particular lambda and the kinesis layer to solve a particular problems and the scale that we have seen in this particular use case. Now let me just go down further and explain you the requirements or the challenges that we have while implementing this particular solution on the side of the requirements or the challenges on the dynamoDB site system we used to have 950,000,000 events either added or updated on a daily basis. So that is the amount of load that we used to get as a part of hours stream. And our responsibility was to accept that data, enrich that data as per the contract, and then pass it to the consumer in a reliable manner. Over here, the challenge was most in terms of a pattern in which we used to get a data, because on the dynamodB, the data used to get updated via the APIs or via the batch processing jobs. So whenever a batch process used to update a data on the dynamodb, we used to get a very high amount of event for a particular period. And we need to design our system to ensure that we support those kind of load and pass that data to the downstream system as soon as possible. Now let's go and talk more about it. That how we have managed our infrastructure to meet those kind of requirements and what is the kind of thought process we have took over there to ensure that we can manage those kind of loads. As we go further into this particular talk. If I talk about this kinesis as a stream, then in the kinesis generally what happened is that we need to define a number of shards that we want to keep in hours stream. This number of shard is something which we can either use on demand capacity or we can provision it in advance as well. The on demand capacity with this kind of scale is going to be a costly operation. So you need to be a bit careful about what is the number of amount of shards that you want to use in auto scale mode or if you're going for a provisional capacity, then in that case also you need to be very careful around what are the number of shards that you want to keep in a particular stream to process this data and move further on that site. Also from the producer side, when the bulk of data come in a way and you are processing it with a lambda and writing it to the shrouds, you also need to ensure that you write it at a speed to the shrouds where the throttle don't occur and you don't go into the throttling capacity of writing a data onto a particular shroud. So you need to basically derive a right combination over here that what is the number of shards that you want to keep in your stream and what is the kind of a parallel or a concurrent processing that you want to support in the lambda with some compromise against the latency to ensure that your system work in a smooth and a reliable manner. So that was like one of the core goal for us as a first exercise to identify that right amount of number of shrouds and the kind of a concurrency that we can support over here which can ensure that at a runtime the throttle don't happen while writing this data on the shroud. Now also, when we are talking about the shards, we need to ensure that whatever shroud, while we are writing a data to the shards, we have a uniform distribution across the shard as soon AWS possible. So the first thing what we have did is we have just tried to identify those things that whatever number of shards we are using over there, we have initially started with the 200 odd shards to go with a particular stream and we just want to ensure that that whatever number of shards we are using, our data become uniformly distributed across the shards that we are having in the stream. Otherwise, what can happen is that if your data is not getting uniformly distributed across the shard, then you may go into a scenario where some of the shards are the hot shards where you have a lot of number of records being written by your producer systems, or you may have some of the shards where you have a very less number of records and they turn into the cold shards. Now, in order to identify those things that whether the data on the shards are getting distributed in appropriate manner or not, you can just enable the logging related activities on those particular shards and you can just monitor those details accordingly. So how we can check those detail is that by default, this particular metric related to the shards are not enabled for the monitoring purposes. In order to enable those kind of metrics, you need to use enable enhanced monitoring API to make this shard related matrix enable on the AWS ecosystem. Here the batch is this particular metric. Monitoring don't come for free, so you basically need to pay for this metric availability. But once you get an access to this metric, you would be able to very reliably see that across the shards, whether the data is getting written in the uniform format or not. And that will basically give you an confidence in the sense of whatever number of shards you are provisioning. Those shards are getting utilized in appropriate manner for a further data processing. If you are not willing to pay and go with this in house solution Sri the uniform distribution, you can build your own solution in way as well where you can either on the consumer side you can just use the custom logging where for each record you can just basically identify those details or you can customize the producer and log the put record response and it will return you the data that is placed under a particular shroud. So there are alternate ways also that you can use to check whether the data is uniformly distributed or not. But I would prefer in the initial stage, it would be great to just go with the inbuilt matrix, and that will just give you a good amount of confidence that whether your data is distributed uniformly across a shard or not. Now once you identify that, let's say your data is not uniformly distributed across a shard, if it's distributed uniformly across a shard, there is a no issue and you can just go further with a similar kind of implementation. But if, let's say your data is not uniformly distributed across a shard, and you want to enforce those kind of mechanism where your data get distributed uniformly across the shards from the upstream system, whatever we are using over here for a data processing layer, in this case we were using a lambda. So in order to ensure the kind of uniform distribution, there are two ways in which you can implement it in the AWS lambda way, where one of the way what you can do is that whatever put record command, you are firing. Put record or put records command you are firing to write a data on the stream. On that one you can explicitly pass the partition key as an hash value of this particular record, or by having some kind of hash of a random number generator, which will basically ensure that your data will go in a uniformly manner, because generally the random distribution is gaussian distribution and that ensure the uniformity and the uniform distribution. So you can either go with this particular approach where you can just take a hash of the records and then you can just pass it further, or in the second approach, what you can do is you can just share the data with the explicit hash key. So in an Aws, you can basically use a command where you can just describe the stream and you can get access to what all shard it has and to that shroud, what are the hash key ranges it is using to distribute the data. Now once you have those information, what you can do is in a round robin manner, whatever data you are having in your stream, like whatever data you are trying to write on the stream using a put record or put records command, you can assign it a partition key, which is a mandatory parameter. And along with that you can also assign the explicit hash key. Now if you assign both the parameter partition key and the explicit hash key, the explicit hash key will get a priority. It is going to override hours partition key and it is going to deliver a data to a particular partition where you are passing with a random distribution. So in this particular manner, with any of these two approaches, you would be able to ensure the uniform distribution across hours shard. And if you are going to do that, it is going to ensure two things. The first thing is that, that you don't go into the scenario where you have a hot or cold shard related issues. And second, it will ensure that you will have a proper utilization of the shards that you are using in your ecosystem. So that will helped optimize a cost and a proper utilization. And this kind of metric will also help you manage and see the load. In case of an issue, if you want to restrict the number of records that you are writing to the shrouds, not a number of records, but you want to restrict or slow the speed of writing to your shrouds, one of the thing, what you can do is that whatever layer you are using over here, you can just slow down that particular layer speed as well. To write a data like in our case, you were having a lambda, and in the lambda like, you can use the concurrent execution property to say that how much concurrent executions is going to happen for this particular layer. And that layer will ensure that only that much amount of concurrent execution happen. So that will slow down the speed of writing onto the shards and the processing speed on the shroud. So you don't go into the throttle issue as well. So till the time you don't have a clear idea that to which extent you want to increase your shard and go in a reliable manner, you can just use this kind of combination mechanism and it can help you out derive the right value of your number of shards and the execution concurrency layer over here, that can ensure uniform distribution and a transfer of data into your stream. Now, after identifying this kind of right numbers and ensuring that, that you have a uniform distribution of records across your shard. The other thing, problem, what we have seen in our implementation is that the way the schema was evolving, that schema, we were getting lots of columns which were having details like a comment and the descriptions and those kind of descriptive columns were more and more getting added to the particular data contract. Now, when we are getting and adding those kind of more descriptive columns and the details into our data, what we are essentially doing is we are basically increasing the size of the data or a record that we are writing to the shroud. In the shrouds, there is a limitation that it can handle up to 1 mb/second data write. So again, you will go into the same scenario where if you are writing a more number of data in a way to your shroud then either you need to increase your shrouds or you will go into a problem where you will get a throttling of the records on a particular shard. So in order to overcome this solution, one of the thing, you know, what you can do is that whatever data you are writing on the Kinesis stream, before writing that data, you can just apply some kind of compression algorithms like Gzip or LZ hours or something like that. Or you can just choose a format like Avro or something, which is a compressed data format. This will add an additional computation in your lambda layer and on the consumer side as well to either unzip the data or to read the data in a particular format. But it will ensure that your data transfer happen in a seamless manner. So if you are going across a similar kind of problem, this is the kind of thing which you can use for the other thing. What I recommend is, which was unfortunately we were not able to implement in our project that for this kind of descriptive columns and descriptive attributes, if it's possible, then rather than processing that data to the stream, it is good to pass that data via some kind of references where your data can be stored on let's say some s three locations or databases, and the end consumers can access those kind of descriptive data from there if the cardinality of that data is not high. So in those cases you will a save on the processing side and b you save on the space as well. So it can become a minimum situation on both of the sides. So you can choose either your format rightfully or you can just define your data context appropriately to ensure that you are keeping some control on the amount of data that you are passing in each record to your the next practices. What I recommend generally while working with any kind of streaming platform, is that whatever data we are passing to the stream, that data has to be bind with some kind of a data contract. So that while writing the data, the producer can basically write a data as per that contract. And the consumer can also understand using that contract what data it is going to using and how to basically use that data or read that data now in order to maintain those things. If you are going to work with any of the streaming solution, they generally provide some kind of a schema registry. Like on the AWS world, there is a glue catalog available where in the glue you can define the AWS glue schema registry service where you can basically define the schema. And that schema is something that can be consumed by both producer and consumer to understand the data along with that schema registry, what feature it provide is it also support a versioning of the schema. So if you have a scenarios where hours schema is getting evolved over a period of time, then the admin can control that particular schema and based on the version and the contract the schema get evolved. So your producer always know in which way it has to write a data and your consumer will also get an idea that how to read a data. So what I generally recommend is to go for the schema registry first before even implementing streaming solutions. And that will give you and save you lots of time while working further on the project. In case you don't want to use any kind of schema registry based solutions. In that way you can just deliver some kind of code bases that basically contain the definitions and that shared code base can be or those schema definitions can be used in both the sides on a producer and consumer to write a data in a particular schema and deliver it in an appropriate way. After doing this schema registry, the last thing, what I would like to mention is the validation technique which we have used and which was more kind of inspired from the canary implementation to say that whatever data we are getting from our producer system and whatever data we are passing to our consumer, that data is getting passed in a reliable manner. So over here, one of the thing what we have did is that whatever data we are getting from a producer and our processing used to write that data to the stream and used to consume from the along with that, what we have did is we have also start publishing a dummy events into our data stream. So what we used to do is we used to have a certain lifecycle of the product that we are having, which used to have a lifecycle state like open, in review, progress, close, et cetera. So whatever those dummy events were there, that dummy events we used to publish on the stream for a further consumption purposes. And those dummy events related metric we used to publish on the cloud batch as well. Once those event get passed over here, those event also used to go to the stream. And on the consumer side also we used to read those particular dummy event and we used to pass the custom metrics around those event status and the other dimensions into this particular cloudwatch matrix. After publishing these events on a periodic manner and consuming again and publishing its data onto the Cloudwatch kind of layer, what we were able to achieve is that on a periodic manner, we used to compare both the metrics that whether whatever metrics we have published from this dummy event versus whatever metric we are catching on the consumer side, whether those metrics are matching and with an acceptable deviation range or not. And if we identify that, that the number of open or closed events, if that deviation range is too high across the one which we are sending from the producer side and one we are getting from the consumer side, then in that case there is a delay is happening in the streaming data that we are passing through our processes, or there is a high volume load or some kind of issues are there. So on those kind of metrics, we have basically created an alerts in which that we used to say that yep, whether the data is getting processed in a smooth manner or not. So if you are also working on a streaming related solutions, then this is one of the canary based technique that can be implemented where you can put some kind of dummy events on the producer side, on the consumer side you read the same event and on those events lifecycle you produce a custom matrix on the Cloudwatch or the related tool. And from there you can generate alerts in case you see a higher deviation while processing the data and the respective turnaround time. So that can help you identify that whether hours streaming pipeline is working, AWS expected or not, or there is some fix or concurrency, or a capacity increase is required to support those kind of use cases. So that's all about it. And these are like some of the things which I just want to cover in this particular sessions around the best practices. If you want to talk more about the streaming solutions or the things around it, feel free to reach out to me on the LinkedIn or some social media platform and would be happy to talk more over there. Thank you and thanks for your attention while listening to this talk.
...

Akshay Jain

Data Engineering Manager @ Innovate UK

Akshay Jain's LinkedIn account



Awesome tech events for

Priority access to all content

Video hallway track

Community chat

Exclusive promotions and giveaways