Conf42 Python 2023 - Online

Practical Pipelines: A Houseplant Soil Alerting System with ksqlDB

Video size:

Abstract

Be a better plant parent and build a practical, event-driven pipeline with Raspberry Pi and Apache Kafka! Soil moisture readings are streamed into Kafka and transformed, driving real-time alerts. Learn how ksqlDB and Kafka Connect made this pipeline possible as we dive in and get our hands dirty!

Summary

  • Danica Fine is a developer advocate for confluent. She wanted to build a system to monitor her house plants. She used Apache Kafka and a raspberry PI to create a practical, event driven pipeline. At the end of the app, users would receive an alert on their phone.
  • Apache Kafka is a distributed event streaming platform. To use Kafka successfully, you really need to start thinking in events. It's going to help you increase the accuracy of your data and your results. Depending on which technology you use for stream processing, you can increase the resiliency of your overall system.
  • When data is consumed from a Kafka topic, it doesn't disappear. Instead, you should really be thinking of a kafka topic as a log. These partitions are stored across different nodes in the cluster. This is to provide better resilience and also facilitate the replication of your data.
  • For this component, for my Kafka cluster, I decided to use confluent. It offers Kafka fully managed in the cloud. If you want to follow along and build something similar, just know that you need Kafka running somewhere.
  • The Raspberry PI connects to capacitive moisture sensors. Components communicate over a unique address which is set at the component level. You can use the producer API or Kafka connect to get your data into Kafka. Schemas are a great way to make sure that downstream systems receive the data.
  • In order to produce the plant metadata, I needed to do a lot of research to understand my individual plant's needs. I still sometimes update the metadata based on what I learned about my plants over time. What can I do to make this a little more seamless for me, the user?
  • Use Telegram to define a conversation workflow that would allow me to input the plant metadata through a conversation on my phone. And Telegram could write that data directly to Kafka for me.
  • Using stream processing for stream processing with kafka you have options. The easiest and most convenient way to transform your data from Kafka is using KSQlDB. Kafka Streams is a Java and scala library that takes the hassle of managing state off of your hands.
  • KSQlDB has two main constructs: a table and a streamed streams. The metadata should probably be represented as a table. The only fields that I hands to specify here explicitly is the key. This is going to help me with processing that I need to do later.
  • Whenever a house plant has enough low moisture readings telling me that it needs to be watered, I want to receive an alert on my phone. The KSQldb query is designed to aggregate over non overlapping six hour windows. Once we reach 120, we output a result.
  • All right, so now you have the tools to get started on your own. I hope that I have planted the seeds in your mind that you're now curious and you would want to try out Kafka maybe for something in your own home. If you have any questions at all, feel free to reach out to me.

Transcript

This transcript was autogenerated. To make changes, submit a PR.
Hi, everyone. You should know by now that my name is Danica Fine, and I am a developer advocate for confluent. Beyond that very minimal piece of information, the only other thing that you really need to know about me is that I like house plants. And if you needed a little bit of proof, here it is. These are all houseplant that I own, but it's actually only a subset. There's actually about four or three or four dozen of them in my home. And if any of you own house plants, you probably know how much work can be involved, right? Especially if you have a lot of them like I do. For me, at the peak of the pandemic, when I had so much free time, hands so many more plants, there was really a lot of work involved, right? I needed to go around every morning and do the rounds, right? Check to see which plants needed to be watered, if any of them needed to be rotated to have better sunlight, or maybe be fertilized. There is a lot to do, right? And this is really, really great over the pandemic, because I got so much out of it, right? I loved having these plants. They made me so happy. But as I started getting back into the world, right, going back into the office, going to conferences, just living my life, I realtime that maybe this wasn't the best thing for me, right? I started to realtime that I was just being a bad plant mom, right? So this is a very, very welted plant. Poor guy. Totally my fault. I forgot to water him one morning before I went to the office. And then when I got back later in the day, this is what it looked like. It's completely fine now. Don't worry about it. But I still felt bad, right? Because I wanted to be able to care for my plants. I wanted to be a better plant parent. So something had to be done, right? This wasn't ideal. So I asked myself, was there a better way for me to be monitoring my house plants? Of course there's a better way, right? You can go to the store right now. I could go and buy a premade solution to help me monitor my house plants. But that's not very fun, right? That doesn't make a good story. So maybe that's not the right question to be asking, right? We are engineers. Or at the very least, you're a curious or creative person. You're attending this session. You want to see if there is a more interesting way, right? That's a better question. Just to give you a little bit of background on myself, I spent a handful of years as a software engineer building out streaming data pipelines with Apache Kafka. When I became a developer advocate, I knew that I wanted to build out projects with Kafka that were actually useful to me, that I could have in my home that served a real purpose for me. And on top of that, I always really wanted to build out a hardware project using a raspberry PI. So this was my opportunity. This was my chance, right? I could use Apache Kafka in my own home. I could build a practical, event driven pipeline for myself. Hands. I could also use a raspberry PI. So this is going to be great. I could incorporate all the things that I've been wanting to incorporate. So let's get into it. Let's talk design. So the bottom line is that I needed something to help me monitor my house plants, and at the very least, just let me know when to monitor them. That was good enough for me, right? So the system that I envisioned was pretty simple. So I'd have some soil moisture sensors in my plants. I would capture moisture readings regularly and toss them into Kafka, and I would do this as those events happen. That information probably wouldn't be enough on its own, though. So I would also need to enrich those moisture readings with some extra details, some metadata, and that would help me to decide whether or not the plants actually needed to be watered in that moment. And then I would combine that data and use it to compute outlier readings. So I would do some stream processing, and at the end, I wanted to receive some sort of alert, ideally on my phone. I mean, we're attached to our phones, right? So that made sense to me. And I'm pretty fond of the telegram messaging app. We'll get into that a little bit more later. But I thought it would be a really convenient way to receive that information on my phone. So that sounds good. That's a high level look at what I wanted to achieve. So quick aside, though, at this point, I've mentioned Apache Kafka a number of times. I don't know if you were counting, but since it's such a big part of my project, I really want to take a quick step back hands, make sure that everybody is on the same page, that we all know what we're talking about here. So what is Kafka? The most concise definition that I could come up with is this. It's a distributed event streaming platform, and there are really only three words in there that matter. Very, very concise, boiled down, but there's a lot to unpack in those three words. So I just want to do that now. The first is that Kafka is a streaming platform. And I don't mean streaming like Netflix, although they do use Kafka. So if you're curious, you can check out a number of talks from them. But really, I want to focus on how we're processing data. You may have noticed that we've been undergoing a paradigm shift in how we do this, how we work with our information. We're moving from slower, batched, focused processing to stream processing. Instead of waiting minutes, hours, or days to group information in batches and then process over that batch, we're readings to smaller amounts of data and producing actionable results more quickly, usually in real time. So moving to stream processing, there's a ton of different ways you can do it. A lot of different technologies that you can use, but generally it's going to have a lot of benefits, right? It's going to help you increase the accuracy of your data and your results. You're going to be building out more reactive systems, and depending on which technology you use for stream processing, you can increase the resiliency of your overall system as well. So Kafka is a streaming platform. It also serves as a messaging system, a persistent storage layer, and a processing layer, as we'll see in a little bit. But it's not just a streaming platform. It is an event platform. And this is one of those big takeaways. I want to make sure that everybody keeps this piece of information with them. In order to use Kafka successfully, you really need to start thinking in events. And I want to preface that by saying that that's not a huge ask. Okay, I promise I'm not asking you to rewire your brain, because as a human, as programmers, as users, we already think in and process in events. It's a very natural way for us to do things. We submit web forms, updating information online. We look at application logs when we're debugging projects. We react to notifications on our phones. All of those things are events. All I'm asking you to do is be a little more conscious of that fact. So those were examples of events. But what's an event, really? It's a piece of information, ideally the smallest amount without too much excess, that fully describes something that hands happened. So what do you need to know? To know something's happened, you need to know when it happened. Right. You need a timestamp. And you should also know who or what was involved. Right. The subject of that. And then any sort of supporting details you might need. So making this a little more tangible, going back to the system I wanted to build, this monitoring system, I would be capturing moisture readings from my plants. So say that yesterday at 02:30 p.m. This plant here, my umbrella plant, had a moisture level reading of 19%. So we have the when. Yesterday at 02:30 p.m. That timestamp. We have the subject, the who or what, which is this umbrella plant here. And then that supporting detail, which is that it had a moisture reading level of 19%. Great. We checked all the boxes. That is an event. But another key component of events that you really, really need to make sure you keep near and dear to your heart you remember, is that they're meant to be immutable. And that's just an unfortunate fact of life. It's due to the fact that they've described things that have already happened. Okay, I don't have a time machine. Neither do you. I don't think so, at least. But going back to this example here, I was a little sad yesterday at 02:30 p.m. Because this plant was clearly dry, right. It had wilted. So I can't undo that. Right. Yesterday at 02:30 p.m. That plant was dry, was wilted. What I could do now is I could water the plant hands, raise its moisture level to, say, 70%. But doing that and watering that plant doesn't erase the fact that yesterday it had a low moisture reading. Right. All I've done is I've generated a new event, and I've added it to the stream of immutable events that describe the moisture level of that plant. It's sort of a timeline, right. That describes that plant over time. So based on that, based on what you just learned, we saw how Kafka is a streaming platform. It facilitates the movement of data in real time across your system. But it's also an eventing system. Kafka allows you to communicate immutable facts. Immutable events have occurred throughout your system and then gives you the power to process and move and react to those events in real time, which is pretty wild. There's a lot you can do with that. But how does Kafka do this? Let's look a little bit closer at the architecture of Kafka. And it is a distributed platform. That's a really important part. So when you work with Kafka, the first thing you need to know is that the primary unit of storage is a Kafka topic. These topics typically represent a single data set consisting of events. You get data into Kafka, you write them as key value pairs using separate clients called producers. We can have as many Kafka producers as we want writing to as many kafka topics as we'd want from there, once the data is in Kafka, we can read it back out of a Kafka topic using another separate client called a kafka consumer. And again, we can have as many of these consumers as we want reading from as many kafka topics as we'd want. A couple cool things about consumers that you should remember is that as they consume and process events, consumers have a way to keep track of the last one, the last event that they saw that they processed. And they do this using a bookmark called an offset. Another cool thing about consumers is that they have the ability to work together hands, share the work of reading data from a single kafka topic or a set of topics and potentially parallelizing the processing of that data. Or they're free to consume the data independently from the beginning of the topic. And that's a really, really important thing that you should remember. It's easy when you're working with kafka for the first time, to compare it to other, maybe similar technologies. And the first one that usually comes to mind is a queue, a messaging queue. But a kafka topic is not a messaging queue. When data is consumed from a Kafka topic, it doesn't disappear. Instead, you should really be thinking of a kafka topic as a log. And similar to application logs that you use to debug, right? Think about the last application you try to debug. You open up the application log file and you read line by line through that log file. And I just want to go on a quick aside here. As you read through those lines, every line in a log file is itself an event, right? It has a timestamp. And every line in that log file describes something specific that has happened to a component across your system. It gives you a timestamp, it gives you supporting details. It tells you where it happened, right? But as you read line by line through that log file, nothing happens to those events, right? They're still there. They're durably stored in that log file until maybe the log file rolls at some point, but it's there until that point, right? And during that time, you can invite as many colleagues as you want to read that same information in that log file at the same time. All right? So what that means is that each of you are free to read line by line through that log file, build up the same state of that application in your mind, and potentially help debug, right? It's the same thing with kafka consumers, all right? As they read the data from a kafka topic, nothing happens to those events. They don't disappear. Another thing to keep in mind about kafka topics hands consumers and producers is that they are fully decoupled from one another. Producers don't care who's going to read that data eventually. Consumers don't care who wrote the data, right? The topics sort of sit in between kafka is that layer that sits in between the consumers and producers at any given time. Kafka topics are actually further broken down into smaller components called partitions. And these partitions themselves are actually the immutable append only logs where the individual events are stored. All right, so you see here an example of a kafka topic with three partitions. We have 15 events stored throughout it. You can see them, they're numbered, and you see that those individual events are distributed across those partitions. All right, so kafka is a distributed system. So these partitions are actually stored across different nodes in the cluster, ideally. And this is to provide better resilience and also facilitate the replication of your data. So just to give you an example of this, if you're a visual learner, the nodes of a kafka cluster are called brokers. And these brokers can be running anywhere. They can be bare metal vms, containers in the cloud, pretty much wherever you want them to run. So this is a simple cluster setup. The three nodes, we have three topics with some number of partitions in them, and your results may vary, but this might be how these partitions are distributed across the cluster. But why bother? Why bother putting the partitions on different nodes? The biggest reason is, when it comes to the processing of the data later on, the consuming of that data. As I mentioned before, consumers have the ability to work together to process data, and the parallelization of that work happens at the partition level. So what this means is that if we had, say, topic b, we have three partitions there. If we wanted to optimize or maximize the processing of this data, we'd have three consumers, each consuming from different partitions. And the fact that those partitions are on three different brokers means that those brokers aren't trying to serve up data for all three partitions at once. Each broker can handle serving up that data to the individual consumer at the same time. So it just helps to make that a little more efficient. So what this means is that you'll want your data to be as spread, as evenly across your partitions and your cluster as possible, right? And that's great. That should make sense. But in any distributed system, you want to consider what happens when a node goes down, right? That this situation clearly is not ideal. Broker zero went down. We've lost some data. We didn't lose it all, but we lost some data. But thankfully, Kafka takes this a step further with something called replication. And replication is a configurable parameter that determines how many copies of a given partition will exist across your kafka cluster. Okay, so let's look at a three node cluster here from the perspective of a single topic with three partitions, right? In this time, I'm going to use a replication factor of three. So with replication enabled, when a data is produced to a partition, say we're writing data to partition zero, it's going to be written to what's known as the lead partition. This is the partition that consumers and producers will usually interact with. All right, so we're going to write that data to broker zero first, partition zero. And then at the same time, data is synchronously going to be copied over to the configured number of follower partitions. So not only is the data first written to broker zero, it's also going to be written to brokers one and two to those follower partitions on those brokers. All right, so now if a broker goes down, right, we've lost our lead partition zero. We've lost some additional copies, but that's not too important. But what we can do is we can have one of our follower replicas be promoted to leader, and we can quickly resume our normal activities. Right? Our consumers and producers can do what they were doing before. I know that was a bit of an aside, but all that to say is that Kafka is pretty good at storing your data, your immutable events, your immutable facts, and it stores them in a reliable, persistent and distributed way for fast consumption, for moving those events quickly and efficiently, and also offering some cool data processing capabilities on top. All right, we all know what Kafka is. Now let's get back to the actual project and what I needed to do to make it come together. This is the high level view that I gave to you that I said we wanted to produce this information into Kafka, do some stream processing, and then get it out, right? So what do we have to do first? Well, we probably need a Kafka cluster, right? So a main goal of mine for this project, besides wanting to use Kafka and doing a hardware project, right. The other goal was to make it as simple as possible to manage as little infrastructure as I possibly could. This was my first hardware project, so I really wanted to focus on actually building out the physical system hands, not maintain any other software infrastructure if I could. So for this component, for my Kafka cluster, I decided to use confluent. It offers Kafka fully managed in the cloud and it's perfect for a lot of projects. But I think that it's really, really good for a project like this where I wanted to use Kafka, but I didn't want to deal with the infrastructure at all. So I was able to spin up a cloud based Kafka cluster and then I also got a couple additional auxiliary tools for free, right? And so there were some that would help me integrate external sources in syncs using Kafka Connect. I also had stream processing available in my gui and we'll see a little bit more of this later on, but I'm going to take advantage of all of these in console cloud. All right, so I set up a cluster. If you want to follow along and build something similar, just know that you need Kafka running somewhere. All right. I don't care where you're running it, just get a Kafka cluster. All right, next up we have a cluster. So let's build the physical system. Let's focus on the raspberry PI and the sensors. I know that this isn't really meant to be a hardware talk, so I'm not going to focus too much on it, but I really want to give everybody the tools that they need to build some system like this if you want, right? So here are the main things that I used to build this out. I'm going to gloss over it a bit, but I do want to touch on the sensors that I chose. So I ended up choosing these. I squared C's, capacitive moisture sensors. They seemed like pretty good bang for your buck. They were relatively high quality, they seemed pretty consistent. They weren't going to rust over time. They were going to work really well for this, but they're I squared C and I squared C. Components communicate over a unique address which is set at the component level, right, per sensor. So all of these sensors, what I do is I wire them up to the breadboard shown here, and then I have a single set of wires from the breadboard into the raspberry PI, so that all of these sensors are actually communicating over the same input channel into the raspberry PI. And then when I want to fetch a moisture reading from an individual sensor, I need to call that sensor by name, by its address, right? So, unfortunately, the sensors that I chose had a physical limitation in that this I squared C address could only be set to one of four values. I did not read the fine print ahead of time. So what this meant is that for this particular system, this first iteration of it, I could only monitor four plants at a given time. There are some ways around this if you're more adept at hardware projects than I am, but they're definitely outside of the scope of this talk. But we'll find a way around it in a little bit. So, I know that's a little bit of handwriting, but I built the system, I hooked up the sensors to the raspberry PI, and it was ready to collect data from my plants. All I had to do now was get this information into Kafka. But again, before we get a little too ahead of ourselves, I want to take, again, a quick aside and think about the data that we're writing, right. And what I really want to do is be mindful of it and craft a schema. Okay, I hope none of you are groaning, because schemas are really good. Best practice to adhere to. Seriously, I really recommend it for any project that you do. Schemas are a really great way to make sure that downstream systems are receiving the data that they expect to receive. Schemas also help you reason about your overall data processing as well, before you get into the weeds with it. Okay, so I think it's really nice to take a step back and understand all the fields that you might need for a given project. And so I did that. I defined an average schema for the readings that I would be capturing from the sensors. This one was pretty short, pretty simple. I had my percentage moisture that I'd be fetching. I also got temperature for free on the sensors as well. So I figured I would throw that in there just in case. And beyond that, I added another field for the plant id that would help me keep track of individual plants know, do the data processing later, and. Yeah, perfect. So I knew how this data should look, but how do I get it into Kafka now? And if you're new to Kafka, there are two main ways to go about getting your data into Kafka. You can use the producer API or Kafka connect. So the producer API is your low level, sort of vanilla option for writing data into Kafka. And it's really great because you have the ability to write a producer client in pretty much any language you want. You'll really want to use the producer API when you either own or have access to the application that's generating the data that you're dealing with. So for example, if I wanted to capture log events from an application and push them into Kafka, well, I would probably just add a couple of lines of code. Add a producer that writes that data to Kafka as that message happens, as that sort of, we hit that log. The other option is Kafka Connect. And as the name implies, Kafka Connect is a pretty good way to connect external data sources and also syncs to Kafka. It's really great because you don't actually need to write any code to make it happen. To get up and running, you write a quick configuration file, you point to your data store and bam, all right, you are capturing data in real time. You're converting it into events, you're pushing it into Kafka. It's a really, really good option to consider if you're looking to integrate data from outside of the Kafka ecosystem, right? Data that's at rest, that's within a database, that's at the other end of an API call, maybe. And you're just going to make that data a little more real time, a little more event driven. So what did I end up using in my case? Well, I own the raspberry PI, right? I have access to the script that's collecting the sensor data. So the producer API made sense, right? So what does that look like? First of all, you should know that I'm using the confluent Kafka Python library for my producers. You'll recall that I need to reference each moisture sensor by its unique address. So I started with a hard coded mapping of moisture sensor to plant id. And then within the script, every 30 seconds I'm looping over this mapping and then I'm capturing the moisture and temperature data from the appropriate sensor. From there I build up a readings object according to the schema that I defined. And then I have a serializing producer that's going to serialize the data and produce it into Kafka. Perfect. If you want to look a little bit at the code here. So I'm looping over those plant addresses. I am accessing the unique address, that sensor using its unique address, capturing the moisture. I'm doing a little bit of massaging of this data to convert it into a percentage, grabbing the temperature and packaging it and sending it off to Kafka. I will link to the source code later on so you'll see a little bit more detailed, more than a code snippet. All right, so using this I can capture these readings from my raspberry PI and start writing them into Kafka. That's great. But like I said, those readings, data information isn't actually enough for me to understand and act on the watering needs of my plants. Right. I need a little bit more information, some metadata to actually do something with the information. Okay. So again, after thinking about it for a bit, I created an Avro schema for my plant metadata, right. The biggest thing that this data set contains is the individual plants and their desired moisture levels. All right? I also included their names, like the scientific name, their given name, their common name, just for fun and make the information a little bit easier for me to read later on. And I also included that plant id in there as well for joining and processing later. Again, how do we get this data into Kafka? We have the producer API or Kafka connect. What should we use? So this is relatively slow changing data, right? And Kafka Connect is a fantastic candidate for data that's at rest or data in a database, something that's not changing. Often some reference data sets and metadata or reference data sets like this should probably live in a database. But I'm going to do some hand waving here. I still ended up using the producer API in this case. So even though my plant metadata is slow changing its data at rest, remember, I only had four plants that I could monitor at a given time due to that hardware limitation. Right? So it was a bit of overkill to set up a database to maintain such a small data set, right? So for now, I put together a separate python script to serialize my plant metadata messages according to the schema that I defined using Avro. And I assigned each plant an id and produced it in a Kafka. Great. This stage of the project was actually really interesting for me just as a plant parent. In order to produce the plant metadata, I needed to do a lot of research to understand my individual plant's needs. So there was a lot of guessing and checking involved, like letting the plant get dry hands, checking what that moisture level was. And it's still not perfect. I still sometimes update the metadata based on what I learned about my plants over time. So it's actually a really cool learning curve for me. So the script that I wrote to produce data during this stage was pretty similar to what I showed earlier. Again, I'll have that full code linked at the end of this talk if you want to check it out. So I was able to get my plant metadata into Kafka. I have my readings being generated into Kafka as well, but. All right, is this setup ideal? Right. I have my collection script on my raspberry PI and there's a hard coded mapping of sensors to plants. And I also have to add my plant metadata manually. Right. That's not very event driven. Right. What if I wanted to switch things up a bit and change which plants I'm monitoring at a given time? Right. The first thing I'd have to do is write new plant metadata using that script that I put together. And that's a manual process. Right. I have to input that, execute the script. It's silly, too much. Then I would have to go into the collection script and alter that hard coded mapping from that sensor id to the new plant id that I'm inputting. And then finally to have to move that physical sensor from the old plant to the new plant. And that's a lot of steps for something that should be pretty simple. Right? What can I do to make this a little more seamless for me, the user? So there's a couple of things that we can improve in here and I want to focus first on that first part, making it easier to write the plant metadata into Kafka. All right. Since I was going to be sending alerts to my phone through the Telegram app eventually, I thought it would also be cool and confluent to write data to Kafka, also using my phone. So I could actually use Telegram to define a conversation workflow that would allow me to input the plant metadata through a conversation on my phone. And Telegram could write that data directly to Kafka for me, which I thought was pretty cool. So for those of you who aren't familiar, Telegram is a messaging, you know, WhatsApp or WeChat. But Telegram offers a very convenient messaging API as well as the ability for users to create and define their own bots. So it's pretty easy. You register for a bot, you receive an API key that you can use to connect to that specific bot, and then from there you can define how users will have conversations with it. So specifically there's a pretty nice telegram python library, and with it you can write a script that processes and handles any incoming messages with the bot. And then at the end of this we're going to produce that plant metadata into Kafka. So what does that look like for any telegram bot that you design? You're going to follow a pretty similar process, right? You define a high level conversation handler that lays out how to start the conversation, the different states that that conversation should go through, and then the functions or message handlers that are associated with each conversation state. So if you're into this sort of thing, it's basically a finite state machine. You just kind of map out the flow and it's pretty easy to set up once you get the hang of it. So here you see I'm just laying out the conversation flow for updating the plant metadata. So I'm capturing the individual plant, the plant id that I want to update and then all the details that I need to adhere to that schema that I created. Right? Hands diving into one of these message handlers a little more closely. So here I've already prompted myself, the user for the low moisture threshold. And so I'm capturing that low moisture threshold, storing it temporarily within that conversation state, and passing it to the next stage of the conversation. So after I've captured this low moisture threshold, I'm going to prompt myself to fill in the high moisture threshold and then return the new state of the conversation that we should go into, which in turn is that high moisture threshold state. Once I've gone through all the steps of collecting that information, then it's time to produce the data, right? And I've captured all the information according to that schema that I defined. This is a basic Kafka producer. There's nothing really fancy up my sleeves here. I did make my life a little bit simpler within the context of this project, though. I defined some helper classes for both the clients and the individual data classes. So you see, I have a means for creating a producer using the houseplant metadata specific serializer as well as a function for converting a dictionary, which is the temporary stored state, into a houseplant data type. So from there, the production process is pretty standard. And once I start running this bot, I can have a conversation with it. I could choose the update plant command and it's going to prompt me for the information that it needs. I'm going to go through the full flow of it and at the end it's going to ask me to confirm the information before I produce it into Kafka. That's awesome. Now I don't have to actually go onto my computer to produce new plant metadata, right? I can just do it from my phone which is cool, but that's not really enough, right? We've handled one aspect, one problem of this situation, which is getting more metadata into Kafka. But I still had a hard coded mapping of sensor ids to plants within my raspberry PI collection script, right? Even if I updated the metadata and added new plants, I still needed to alter that script manually, which is not good. So rather than hard code the sensor to plant mapping in my collection script, I figured, okay, maybe I can make the script a little more event driven. Hands, get those mappings from Kafka. All right, so let's collect a little more data then. Before we do that though, we need a new schema. All right, I set up a new topic that would only contain the mappings from a sensor id to a plant id hands. This way I could easily change which plant was using which sensor, right? So I did exactly the same thing I was doing for the metadata collection within the telegram bot. And so I set up a very similar conversation handler. This time it prompts me to select a sensor id and input the new plant id that it should be mapped to. So now I just had to make my collection script use those new mappings from Kafka. And similar to producing the data, there are a couple of ways to get data out of Kafka. You can either use the consumer API or Kafka connect as well. Much of the same considerations come into play for data consumption as it does production. But generally, if you want to react to every kafka event, you're likely going to use the consumer API. This is pretty flexible. It's available in most languages you want, and you're free to do whatever you want with the data at that point. On the other hand, if you're looking to move the data somewhere like a database or to some longer term data store, then Kafka connect is going to help you consume the data and move it to that sync. In my case, the consumer API made sense. So the first thing I did was add a Kafka consumer to my collection script to read from the topic and build up a dictionary of those mappings from sensor id to plant id. There are a few extra details to be aware of, though. The first is that the mapping topic was configured to be compacted. This basically makes the topic like a table where it maintains the latest value per key. And that should make sense because I don't really care about which plant the sensor was mapped to before. What matters is what that sensor is mapped to. Now also recall that Kafka consumers have a way to keep track of the last message that they consumed and processed, right? It's called an offset. So offsets are really great when you have a way to maintain state in the consuming application. But I didn't want my application, this script to be stateful, right. I didn't want to deal with persisted state in my collection script. So to get around this, I needed my consumer to start from the earliest message in the topic every time it started up. So do this. I used a few configuration parameters for my consumer and made my consumer non committing. Basically, it's never going to keep track of a bookmark, it's never going to commit an offset. And every time I start up that consumer, it's going to start from the beginning of that topic, every time. So on startup, when I execute this script, my consumer is first going to block the rest of the script. It's going to read from the Kafka topic and keep collecting readings until it has a full mapping for all four sensors. Once that's done, in between the looping of every 30 seconds, my consumer is going to try to fetch new mappings just in case we've added a new one. So if I ever move a sensor around or update anything, that new mapping is going to be propagated within 30 seconds, which is good enough for my use case. So at this point I was free to move my plants around, right? I was free to add new plants from my phone, define new mappings, and at that point I could rest assured that my raspberry PI was going to be fetching the data that it should be for the plants that it should, which is great. So the first half of my pipeline is complete, right. It's fully event driven. I'm capturing this data. I don't have to worry about manually inputting anything pretty much, but now all the information is in Kafka. Now I just need to start making sense of it. Using stream processing for stream processing with kafka you have options. You have a lot of options. The OG way is to use the producer and consumer API directly. And these APIs are great because they're available in all your favorite languages and then some. But to use them for streamed processing, you'll have to go through the full process of consuming the data from Kafka, then doing any of your transformations in memory. If you're doing stateless transformations, great, move on with your life. Transform them, produce the data back to Kafka and you're done. If you're doing stateful transformations though, there's a lot more to consider, right? You have to handle your state, you have to decide how to make it fault tolerant. What happens if your application goes down? And that's a lot to think of. For stream processing, this involves a lot more work, especially from the producer consumer API. They're the lowest level option available to you, but there are some opportunities to be a little more flexible in what you can do with the code at that level. So do what you will. If you're a sane person, though, you'll likely use one of the next two options. So moving up in ease of use is Kafka streams. And Kafka Streams is a Java and scala library that takes the hassle of managing state off of your hands. So out of the box already, it makes stateful processing so much easier for Kafka. It gives you a lot of stateful transformations out of the box available. It's built on top of the consumer producer API, so you get a lot of cool things for free, like the scalability from the consumer group protocol that allows the consumers to parallelize the processing from a given set of input topics. And what that means for you is that you can spin up multiple instances of a Kafka streams application and they're going to coordinate and share the input data across the running instances. And if one of those instances goes down for any reason, the remaining instances will rebalance. All right, they're going to take whichever partitions that one instance was processing, redistribute them across the other remaining instances and also bring the state with it, right, so you never lose that state. It's always persisted to Kafka. And the running instances are just going to be able to find that and keep going way better than the consumer producer API. And you get that for free, right? There's very little that you have to set up to make that happen. And finally, if Java isn't your thing, the easiest and most convenient way to transform your data from Kafka is using KSQlDB. The really cool thing about this is that it's just SQL syntax, and within that you have access to pretty powerful stream processing that's built on top of Kafka streamed. So you can filter, you can join, you can aggregate a lot more, and you can do this entirely within the confluent cloud console, which was another reason that I wanted to do it because again, I didn't want to deal with any infrastructure and this allowed me to do it within the web console. So that's what I chose to use. Let's get into it let's process this information. When you're working with KSqlDB, the first thing you need to do is get your data into it, right? And to do so, you need to decide how your data has to be represented. There's only two choices, right? There's two main constructs in KSQlDB, you have a table, you have a streamed streams represent unbounded ongoing series of events. Great tables on the other hand, show the current state for a particular key. So let's look at my houseplant metadata topic first. The metadata should probably be represented as a table. Tables keep track of the most recent value for key per key, as I said. So if I ever wanted to update a value for a given plant id, say tweak the low moisture threshold, I would probably want my processing application to leverage that most recent value first. I don't care about the old ones. So the first thing I'm going to do is point to the Kafka topic where that houseplant metadata lives and specify the value format, right? How am I viewing this data? And it's Avro. We have an Avro schema for it. You'll also note that this is a pretty SQL esque statement, right? A create table statement. The biggest part of the statement is, like I said, pointing to that Kafka topic, defining that value format. And the cool thing is about defining an Avro schema. Taking the time to do that in the beginning is that I don't have to specify all the fields and their types because KSQlDB as a consumer can access the schema registry where that topic schema is stored, access that schema and use it to parse that field and type information for me. So you'll recall that there are about ten or so fields in that schema and I only had to type one, right? So no typos here, which is great. The only fields that I hands to specify here explicitly is the key because a table needs to know which key to determine the most recent value for, right? So in this case, I just want to use the plant id so I can take that SQl that create table statement, I can run it in the confluent cloud KSqldb editor and I can start up my streaming application behind the scenes and it's going to bring this data in hands, start building up a table for it, for my houseplant readings data. Every message is relevant in that ongoing series of events, right? So it should be a stream, right? And in this case we're going to use a very very similar statement to the create table one, but in this case we're using create streamed. Again, very similar. We're pointing to the Kafka topic where those readings reside. We're pointing to that value format. And since we used Avro, there are some perks. I don't have to explicitly write out any of the fields and their types for consistency. I'm just specifying we're bringing in that plant id as the key. So cool. I have my metadata as input in my Kisco DB application. I have my houseplant readings being captured, hands reflected as a streamed within this application as well. Now we need to enrich the data sets with one another. Remember, the readings aren't good enough on their own. We need the metadata in order to make sense of them. So I needed to join these two data sets together. This is going to help me with that processing that I need to do later on. This is a pretty hefty statement. So let's focus on a couple of components individually. The first is that select statement. This is effectively SQL, right? I'm joining two data sets, so I'm going to do a join. I'm going to first select the fields from each data set that I want to be contained in the output. And I'm using something called a stream to table join that should feel very similar to a regular SQL join. I'm just doing an inner join to make sure that every output row has all of the information that I need from each data set and so that I have everything that I need for that processing in the next step. So if I just run this SQL query, it's going to just give me what's currently in the application. Right now, the current state of the application, all the data that's currently there, it will execute this join hands, spit it out. But what I really want is for this statement, the result of this statement, to be persisted somewhere. And I'm going to persist it to a stream, right? Because everything that's output from this is going to be relevant. I want to process it, I want to look at it. Okay, so I'm going to use a create stream statement. I'm going to specify the topic where I want to persist this data to specify the value format. And so now when I execute that SQL, it will give me everything that's currently available in the state of this application, do the join, output it and persist to that stream. Great. There is one more detail to consider though. Okay. And there's one little line at the bottom of this query and that is emit changes. So emit changes indicates a push query. So what this means is that the result set is going to be an open ended stream that's pushed me as output. So not only when I run this SQL statement, not only am I going to get the current state of the full application, everything that's currently in those Kafka topics. Now, every time a new reading flows in through the houseplant readings topic, we're going to execute this query. That result row is going to be output and it's going to be appended to this stream. So every time new information comes in, we're going to enrich it hands. I'm going to get that information out. This is contrasted with something called a pull query, where I'm pulling a finite result set that reflects the current state of the underlying streaming application. Right. That's the example that I saw that I gave you earlier. It's just going to tell me the current state of the application, execute the query. Done. It's not going to keep running. All right, so now I've enriched these data sets, I have all the information I need. Every time new data comes in, we're executing that join and I have that data available to use in the next step. So let's revisit the overall goal here. Whenever a house plant has enough low moisture readings telling me that it needs to be watered, right, I want to receive an alert on my phone. Now, I've changed this value a couple of times since I've started this project about a year ago, but right now I am collecting moisture data every 30 seconds or so. So with that in mind, I decided that if a plant needs to be watered, I don't want to receive an alert every 30 minutes. Every 30 seconds. Right. That's entirely too many alerts on my phone. So in the event that I'm out of the house or I'm busy, I figured that receiving an alert every 6 hours would give me enough time to act on and water my plant, or at the very least come home, tell my partner to go water the plant, whatever I need to do. That gives me enough time. I also noticed that these sensors weren't perfect. Right. They were a pretty good value. They were a couple of dollars apiece. But what that meant is that sometimes I can get false low or false high readings. But the general trend over time is downward, right. The plant is drying out, the water percentage is going to decrease. So within a given six hour period, it's not going to be a perfect decrease in moisture over that entire period. It's not going to be monotonously decreasing. So it would be good enough to send an alert whenever I received at least one hour's worth of low readings within that six hour period. Right? So with the readings being taken every 30 seconds, that means I should at least receive 120 low readings before I trigger an alert. That might sound like a lot, but this is the query that I wrote to achieve what I just said. Let's break this down, focus on a few key details. The first thing is that I wanted to receive an alert at most every 6 hours, right? So this is the most important part. I'm setting up the query to aggregate over non overlapping six hour windows. So already when an event flows in, the query is first going to bucket it into the appropriate six hour window. Great. Within that window, I want to count the events per plant where the moisture reading was lower than the low moisture threshold as determined by my plant metadata. So when I get at least 120 of those readings within that six hour window, we're going to output a result. And note that I'm grouping by plant id. Also, within that select statement, I am grabbing all the details that I might need to alert on and I'm building up an alert message here saying this particular plant is dry, so that's going to be involved as part of the output. I don't want to oversimplify this because it's really easy to look at this and forget that you're building a streaming application. And for better or for worse, with all the things that are involved with a stateful streaming application, there is state. You have to keep that in mind, you have to think about it. So let's review this again with that in mind. So first of all, another thing to keep in mind is that when you're windowing and conducting an aggregate in KSQldb, the output is going to be a table where each row is computed per key per window. So per plant per six hour period. This should make sense because a table is going to provide the latest value per key in the end. So that should feel right. So as each input record is processed, what happens first? Well, we're first bucketing it into that appropriate six hour window and we're looking at the key, right? We're looking at the individual plant id. If the record makes it through the filter, we're updating the underlying state for that window per that plant id. So every reading that breaches that moisture threshold is going to be counted toward that 120 readings that we need in order to output a result. So every time something flows in and breaches a threshold. We're updating the state, we're incrementing that counter, and then once we reach 120, we output that result. Great. You might be asking yourself what happens after we reach that 120th load moisture reading, right, because we're just saying having count over 120. Well, will the events continuously be output? Am I going to receive an event, an alert every 30 seconds after that point? No, because I've included an additional line at the end. Emit final. I love this feature. It saves me a lot of time as opposed to emit changes. Emit final says wait until the window hands closed before we output any result. Okay, so we're going to wait for that six hour window to fully close and then we're going to assess per plant id, does that plant have at least 100 hands, 20 low readings? If it does output a row, that table is going to contain a message. All right, and that's great. Now, I had a Kafka topic containing messages for every time a plant needed to be watered. At most once per six hour period. All I hands to do now was get that information out of Kafka. So let's see how to do this with Telegram. We already saw how Telegram could be used to define a conversation flow and write data to Kafka. But this time I just wanted to push the data to the conversation I already had going with my telegram bot. So the telegram API allows me to look into the conversations that were currently going on with my bot and see which ones were there using a unique chat id. So I hands a chat id defining that conversation between me and my bot. Using the bot API key that I had for my bot as well as the conversation id, I could define a unique endpoint that I could use to send data directly to that conversation. So conveniently, there's a Kafka connect HTTP sync connector that only requires an HTTP endpoint to send data to. As a bonus, this connector is offered as a fully managed component in confluent cloud, meaning that I didn't need to run it on my own machines. All I had to do was configure it through confluent cloud, input that HTTP endpoint, and tell it how to extract the information from the Kafka message that's driving and triggering those alerts. And then once I start that connect, it's going to consume from that alert topic. It's going to be triggered by the messages on that topic. It's going to use Regex to extract the alerting message. That field that I wrote up that I defined, and it's going to send that alert directly to the conversation I had started with my bot. And then this is what it looks like. This is exactly the message that I receive on my phone every time a plant needs to be watered. So I'm pretty proud of the fact that I have a fully functioning, event driven pipelines for a very real use case that I care about. All right, this solved me. This saved me so much time and really solved a real problem that I had. I hope you enjoyed it. But what do you get out of this, right? You're probably asking yourself, you followed along, you saw a cool thing that I built, but I want you to look a little bit deeper, okay? You actually learned quite a bit about streaming data pipelines and what you can do with them. So you saw the different ways that you can get data into Kafka, right? Either using the producer API or maybe Kafka connect if you have the right data source. I introduced you to stream processing how to do it in different ways, either using the consumer and producer API, maybe Kafka streams, and specifically a KSQlDB in my situation. And finally you saw how to get data out of Kafka with Kafka Connect, and how also to use the consumer API to make my collection script a little more event driven. So every streaming data pipeline needs some combination of these three things. All right, so now you have the tools to get started on your own, and I hope you will. I hope that I have planted the seeds in your mind that you're now curious and you would want to try out Kafka maybe for something in your own home. So I have a link here to my link tree that in turn has many more links to my source code, the repository outlining the collection scripts, the metadata creation script, and also the telegram bot as well as the SQL that I use to actually do the data processing. And I've also included some additional resources there that link to confluent developer, our specific developer portal that has many tutorials, how to guides, language guides if you want to learn a bit more about Kafka. So I hope you'll check it out. And if you have any questions at all, feel free to reach out to me. I am very much open to chat about the system and kafka at any time. Until then, thank you for attending my talk. I really appreciate it.
...

Danica Fine

Senior Developer Advocate @ Confluent

Danica Fine's LinkedIn account Danica Fine's twitter account



Join the community!

Learn for free, join the best tech learning community for a price of a pumpkin latte.

Annual
Monthly
Newsletter
$ 0 /mo

Event notifications, weekly newsletter

Delayed access to all content

Immediate access to Keynotes & Panels

Community
$ 8.34 /mo

Immediate access to all content

Courses, quizes & certificates

Community chats

Join the community (7 day free trial)