Transcript
This transcript was autogenerated. To make changes, submit a PR.
Hi, everyone. You should know by now that my name is Danica Fine,
and I am a developer advocate for confluent.
Beyond that very minimal piece of information, the only
other thing that you really need to know about me is that I like house
plants. And if you needed a little bit of proof,
here it is. These are all houseplant that I own,
but it's actually only a subset. There's actually about four or
three or four dozen of them in my home. And if any of you
own house plants, you probably know how much work can be
involved, right? Especially if you have a lot of them like I do.
For me, at the peak of the pandemic, when I had so much free time,
hands so many more plants, there was really a lot of
work involved, right? I needed to go around every morning and do the rounds,
right? Check to see which plants needed to be watered, if any of them needed
to be rotated to have better sunlight, or maybe be fertilized.
There is a lot to do, right? And this
is really, really great over the pandemic, because I got so much out of it,
right? I loved having these plants. They made me so happy. But as
I started getting back into the world, right, going back into
the office, going to conferences, just living my life,
I realtime that maybe this wasn't the best thing
for me, right? I started to realtime that I was just being
a bad plant mom, right? So this is a very, very welted plant.
Poor guy. Totally my fault. I forgot to water him one morning before
I went to the office. And then when I got back later in the day,
this is what it looked like. It's completely fine now. Don't worry about
it. But I still felt bad, right? Because I wanted to be able to care
for my plants. I wanted to be a better plant parent.
So something had to be done, right? This wasn't ideal.
So I asked myself, was there a better way for me to be monitoring my
house plants? Of course there's a better way, right?
You can go to the store right now. I could go and buy a premade
solution to help me monitor my house plants. But that's not very
fun, right? That doesn't make a good story. So maybe that's not the
right question to be asking, right? We are engineers. Or at the very least,
you're a curious or creative person. You're attending this session. You want to see
if there is a more interesting way, right? That's a better question.
Just to give you a little bit of background on myself, I spent
a handful of years as a software engineer building out streaming data pipelines
with Apache Kafka. When I became a developer advocate, I knew
that I wanted to build out projects with Kafka that
were actually useful to me, that I could have in my home
that served a real purpose for me. And on
top of that, I always really wanted to build out a hardware project using
a raspberry PI. So this was my opportunity. This was my chance, right?
I could use Apache Kafka in my own home. I could build a practical,
event driven pipeline for myself. Hands. I could also use a
raspberry PI. So this is going to be great. I could incorporate all
the things that I've been wanting to incorporate. So let's get into it.
Let's talk design. So the bottom
line is that I needed something to help me monitor my house plants,
and at the very least, just let me know when to monitor them. That was
good enough for me, right? So the system that I envisioned
was pretty simple. So I'd have some soil moisture sensors in my
plants. I would capture moisture readings regularly and
toss them into Kafka, and I would do this as those events happen.
That information probably wouldn't be enough on its own, though. So I
would also need to enrich those moisture readings with some extra details,
some metadata, and that would help me to decide whether or not
the plants actually needed to be watered in that moment. And then I would
combine that data and use it to compute outlier readings.
So I would do some stream processing, and at the end, I wanted to receive
some sort of alert, ideally on my phone. I mean, we're attached to our
phones, right? So that made sense to me. And I'm pretty fond
of the telegram messaging app. We'll get into that a
little bit more later. But I thought it would be a really convenient way to
receive that information on my phone. So that sounds good. That's a high
level look at what I wanted to achieve. So quick
aside, though, at this point, I've mentioned Apache Kafka a number
of times. I don't know if you were counting, but since it's such a
big part of my project, I really want to take a quick step back hands,
make sure that everybody is on the same page, that we all know what we're
talking about here. So what is Kafka? The most concise
definition that I could come up with is this. It's a distributed
event streaming platform, and there are really only three words in
there that matter. Very, very concise, boiled down,
but there's a lot to unpack in those three words. So I just want to
do that now. The first is that Kafka is
a streaming platform. And I don't mean streaming like Netflix, although they
do use Kafka. So if you're curious, you can check out a number
of talks from them. But really, I want to focus on how we're
processing data. You may have noticed that we've been undergoing a paradigm
shift in how we do this, how we work with our information.
We're moving from slower, batched, focused processing
to stream processing. Instead of waiting
minutes, hours, or days to group information in batches
and then process over that batch, we're readings to smaller amounts of data
and producing actionable results more quickly, usually in real time.
So moving to stream processing, there's a ton of different ways you can
do it. A lot of different technologies that you can use,
but generally it's going to have a lot of benefits, right? It's going to help
you increase the accuracy of your data and your results. You're going to be building
out more reactive systems, and depending on which
technology you use for stream processing, you can
increase the resiliency of your overall system as well. So Kafka
is a streaming platform. It also serves as a messaging system,
a persistent storage layer, and a processing layer,
as we'll see in a little bit. But it's not just a streaming platform.
It is an event platform. And this is one of those big
takeaways. I want to make sure that everybody keeps this piece of information with them.
In order to use Kafka successfully, you really need to start thinking
in events. And I want to preface that by saying that that's not
a huge ask. Okay, I promise I'm not asking you to rewire your brain,
because as a human, as programmers, as users,
we already think in and process in events. It's a very natural way for
us to do things. We submit web forms, updating information
online. We look at application logs when we're debugging projects.
We react to notifications on our phones.
All of those things are events. All I'm asking you to do is be a
little more conscious of that fact. So those were
examples of events. But what's an event, really? It's a piece of information,
ideally the smallest amount without too much excess, that fully
describes something that hands happened. So what do you need to know?
To know something's happened, you need to know when it happened. Right. You need a
timestamp. And you should also know who or what was
involved. Right. The subject of that. And then any sort of supporting
details you might need. So making this a little more tangible, going back to the
system I wanted to build, this monitoring system, I would be capturing
moisture readings from my plants. So say that yesterday at
02:30 p.m. This plant here, my umbrella plant, had a
moisture level reading of 19%. So we have the when.
Yesterday at 02:30 p.m. That timestamp. We have the subject,
the who or what, which is this umbrella plant here. And then that supporting
detail, which is that it had a moisture reading level of 19%.
Great. We checked all the boxes. That is an event. But another
key component of events that you really, really need to make sure you
keep near and dear to your heart you remember, is that they're meant to be
immutable. And that's just an unfortunate fact of
life. It's due to the fact that they've described things that have
already happened. Okay, I don't have a time machine. Neither do
you. I don't think so, at least. But going back to
this example here, I was a little sad yesterday at 02:30
p.m. Because this plant was clearly dry, right. It had wilted.
So I can't undo that. Right. Yesterday at 02:30 p.m.
That plant was dry, was wilted. What I could do
now is I could water the plant hands, raise its moisture level to,
say, 70%. But doing that and watering
that plant doesn't erase the fact that yesterday it had a low moisture reading.
Right. All I've done is I've generated a new event,
and I've added it to the stream of immutable events that describe the
moisture level of that plant. It's sort of a timeline, right.
That describes that plant over time. So based
on that, based on what you just learned, we saw how Kafka is a streaming
platform. It facilitates the movement of data in real time across your
system. But it's also an eventing system.
Kafka allows you to communicate immutable facts.
Immutable events have occurred throughout your system and then gives you
the power to process and move and react to those events in real time,
which is pretty wild. There's a lot you can do with that.
But how does Kafka do this? Let's look a little bit closer at the architecture
of Kafka. And it is a distributed platform.
That's a really important part. So when you work with Kafka,
the first thing you need to know is that the primary unit of storage is
a Kafka topic. These topics typically represent a
single data set consisting of events.
You get data into Kafka, you write them as
key value pairs using separate clients called producers.
We can have as many Kafka producers as we want writing to as many kafka
topics as we'd want from there, once the data is in Kafka,
we can read it back out of a Kafka topic using another separate client called
a kafka consumer. And again, we can have as many of these consumers as we
want reading from as many kafka topics as we'd want.
A couple cool things about consumers that you should remember is that as
they consume and process events, consumers have a way to keep track
of the last one, the last event that they saw that they processed.
And they do this using a bookmark called an offset.
Another cool thing about consumers is that they have the ability to work together
hands, share the work of reading data from a single kafka
topic or a set of topics and potentially parallelizing
the processing of that data. Or they're free to consume the
data independently from the beginning of the topic. And that's a
really, really important thing that you should remember.
It's easy when you're working with kafka for the first time, to compare it
to other, maybe similar technologies. And the first one that usually comes
to mind is a queue, a messaging queue. But a kafka topic
is not a messaging queue. When data is consumed
from a Kafka topic, it doesn't disappear.
Instead, you should really be thinking of a kafka topic as a log.
And similar to application logs that you use to debug,
right? Think about the last application you try to debug. You open up
the application log file and you read line by line through that
log file. And I just want to go on a
quick aside here. As you read through those lines,
every line in a log file is itself an event,
right? It has a timestamp. And every
line in that log file describes something specific that has happened to
a component across your system. It gives you a timestamp, it gives you supporting
details. It tells you where it happened, right? But as you
read line by line through that log file, nothing happens to those events,
right? They're still there. They're durably stored in that
log file until maybe the log file rolls at some
point, but it's there until that point, right?
And during that time, you can invite as many colleagues as you want
to read that same information in that log file
at the same time. All right? So what that means is that each of you
are free to read line by line through that log file, build up the same
state of that application in your mind, and potentially help debug,
right? It's the same thing with kafka consumers, all right?
As they read the data from a kafka topic, nothing happens to
those events. They don't disappear. Another thing
to keep in mind about kafka topics hands consumers and producers
is that they are fully decoupled from one another. Producers don't
care who's going to read that data eventually. Consumers don't care
who wrote the data, right? The topics sort of sit in between kafka
is that layer that sits in between the consumers and producers at any given time.
Kafka topics are actually further broken down into smaller components
called partitions. And these partitions themselves are actually
the immutable append only logs where the individual events are stored.
All right, so you see here an example of a kafka topic with three partitions.
We have 15 events stored throughout it.
You can see them, they're numbered, and you see that those individual events
are distributed across those partitions. All right,
so kafka is a distributed system. So these partitions are actually stored
across different nodes in the cluster, ideally.
And this is to provide better resilience and also facilitate the replication
of your data. So just to
give you an example of this, if you're a visual learner,
the nodes of a kafka cluster are called brokers. And these brokers
can be running anywhere. They can be bare metal vms, containers in the
cloud, pretty much wherever you want them to run. So this is
a simple cluster setup. The three nodes, we have three topics
with some number of partitions in them, and your
results may vary, but this might be how these partitions are distributed
across the cluster. But why bother?
Why bother putting the partitions on different nodes? The biggest
reason is, when it comes to the processing of the data later on, the consuming
of that data. As I mentioned before, consumers have the ability to work together
to process data, and the parallelization of that work happens
at the partition level. So what this means is that if we had,
say, topic b, we have three partitions there. If we wanted
to optimize or maximize the processing of this data, we'd have three
consumers, each consuming from different partitions. And the
fact that those partitions are on three different brokers means that those brokers aren't trying
to serve up data for all three partitions
at once. Each broker can handle serving up that data to the individual
consumer at the same time. So it just helps to make that a little
more efficient. So what this means is that you'll want your data
to be as spread, as evenly across your partitions and your cluster as
possible, right? And that's great. That should make sense.
But in any distributed system, you want to consider what happens when a node
goes down, right? That this situation clearly is not ideal.
Broker zero went down. We've lost some data. We didn't lose it
all, but we lost some data. But thankfully, Kafka takes
this a step further with something called replication. And replication
is a configurable parameter that determines how many copies of a
given partition will exist across your kafka cluster.
Okay, so let's look at a three node cluster
here from the perspective of a single topic with three partitions,
right? In this time, I'm going to use a replication factor of
three. So with replication enabled, when a data is produced to a partition,
say we're writing data to partition zero, it's going to
be written to what's known as the lead partition. This is the partition that
consumers and producers will usually interact with. All right,
so we're going to write that data to broker zero first, partition zero.
And then at the same time, data is synchronously going to be copied over to
the configured number of follower partitions. So not only is
the data first written to broker zero, it's also going to be written to
brokers one and two to those follower partitions
on those brokers. All right, so now
if a broker goes down, right, we've lost our lead partition zero.
We've lost some additional copies, but that's not too important.
But what we can do is we can have one of our follower replicas be
promoted to leader, and we can quickly resume
our normal activities. Right? Our consumers and producers can do what they were doing before.
I know that was a bit of an aside, but all that to say is
that Kafka is pretty good at storing your data, your immutable events, your immutable facts,
and it stores them in a reliable, persistent and distributed way
for fast consumption, for moving those events quickly and efficiently,
and also offering some cool data processing capabilities on top.
All right, we all know what Kafka is. Now let's
get back to the actual project and what I needed to do to make it
come together. This is the high level view that I gave to you that I
said we wanted to produce this information into Kafka, do some stream processing, and then
get it out, right? So what do we have to do first?
Well, we probably need a Kafka cluster,
right? So a main goal of mine
for this project, besides wanting to use Kafka and doing a
hardware project, right. The other goal was to make it as simple as possible
to manage as little infrastructure as I possibly could.
This was my first hardware project, so I really wanted to focus on actually
building out the physical system hands, not maintain any other
software infrastructure if I could. So for this component,
for my Kafka cluster, I decided to use confluent. It offers
Kafka fully managed in the cloud and it's perfect for a
lot of projects. But I think that it's really, really good for a project like
this where I wanted to use Kafka, but I didn't want to deal with
the infrastructure at all. So I was able to spin up a cloud
based Kafka cluster and then I also got a couple additional
auxiliary tools for free, right? And so there were some
that would help me integrate external sources in syncs using Kafka Connect.
I also had stream processing available in my gui
and we'll see a little bit more of this later on, but I'm going to
take advantage of all of these in console cloud. All right, so I set up
a cluster. If you want to follow along and
build something similar, just know that you need Kafka running somewhere.
All right. I don't care where you're running it, just get a Kafka cluster.
All right, next up we have a cluster.
So let's build the physical system. Let's focus on the raspberry PI and the
sensors. I know that this isn't really meant
to be a hardware talk, so I'm not going to focus too much on it,
but I really want to give everybody the tools that they need to build some
system like this if you want, right? So here
are the main things that I used to build this out.
I'm going to gloss over it a bit, but I do want to touch on
the sensors that I chose. So I ended up choosing these. I squared C's,
capacitive moisture sensors. They seemed like pretty good bang for your buck.
They were relatively high quality, they seemed pretty consistent.
They weren't going to rust over
time. They were going to work really well for this, but they're I
squared C and I squared C. Components communicate over a
unique address which is set at the component level,
right, per sensor. So all of these sensors,
what I do is I wire them up to the breadboard shown here,
and then I have a single set of wires from the breadboard into
the raspberry PI, so that all of these sensors are actually communicating over
the same input channel into the raspberry PI.
And then when I want to fetch a moisture reading from an individual sensor,
I need to call that sensor by name, by its address,
right? So, unfortunately, the sensors that I chose
had a physical limitation in that this I squared
C address could only be set to one of four values.
I did not read the fine print ahead of time. So what this meant is
that for this particular system, this first iteration of it,
I could only monitor four plants at a given time.
There are some ways around this if you're more adept at
hardware projects than I am, but they're definitely outside of the scope of this talk.
But we'll find a way around it in a little bit. So, I know that's
a little bit of handwriting, but I built the system, I hooked up the sensors
to the raspberry PI, and it was ready to collect data from my plants.
All I had to do now was get this information into Kafka.
But again, before we get a little too ahead of ourselves,
I want to take, again, a quick aside and think about the data that we're
writing, right. And what I really want to do is be mindful
of it and craft a schema. Okay, I hope
none of you are groaning, because schemas are really good.
Best practice to adhere to. Seriously, I really recommend it
for any project that you do. Schemas are a really great
way to make sure that downstream systems are receiving the data that they expect
to receive. Schemas also help you reason about your
overall data processing as well, before you get into the weeds with
it. Okay, so I think it's really nice to take a step back and understand
all the fields that you might need for a given project. And so
I did that. I defined an average schema for the
readings that I would be capturing from the sensors. This one was pretty short,
pretty simple. I had my percentage moisture that I'd be fetching.
I also got temperature for free on the sensors as well. So I figured I
would throw that in there just in case. And beyond that,
I added another field for the plant id that would help me keep
track of individual plants know, do the data processing
later, and. Yeah, perfect. So I knew how this data
should look, but how do I get it into Kafka now?
And if you're new to Kafka, there are two main ways to go
about getting your data into Kafka. You can use the producer API or
Kafka connect. So the
producer API is your low level, sort of vanilla option for writing data
into Kafka. And it's really great because you have the ability
to write a producer client in pretty much any language you want.
You'll really want to use the producer API when you either
own or have access to the application that's generating
the data that you're dealing with. So for example,
if I wanted to capture log events from an application and push
them into Kafka, well, I would probably just add a couple of lines of
code. Add a producer that writes that data to Kafka as
that message happens, as that sort of, we hit that log.
The other option is Kafka Connect. And as the name implies, Kafka Connect is
a pretty good way to connect external data sources and also
syncs to Kafka. It's really great
because you don't actually need to write any code to make it happen. To get
up and running, you write a quick configuration file, you point to
your data store and bam, all right, you are capturing data in
real time. You're converting it into events, you're pushing it into Kafka.
It's a really, really good option to consider if you're looking to integrate data
from outside of the Kafka ecosystem, right? Data that's at
rest, that's within a database, that's at the other end of an API call,
maybe. And you're just going to make that data a
little more real time, a little more event driven.
So what did I end up using in my case? Well, I own the raspberry
PI, right? I have access to the script that's collecting the sensor data. So the
producer API made sense, right? So what does that look like?
First of all, you should know that I'm using the confluent Kafka Python library
for my producers. You'll recall that I need to reference
each moisture sensor by its unique address. So I
started with a hard coded mapping of moisture sensor to plant
id. And then within the script, every 30 seconds
I'm looping over this mapping and then I'm capturing the moisture
and temperature data from the appropriate sensor. From there I build
up a readings object according to the schema that I defined.
And then I have a serializing producer that's going to serialize
the data and produce it into Kafka. Perfect. If you want to look
a little bit at the code here. So I'm looping over those plant addresses.
I am accessing the unique address, that sensor using
its unique address, capturing the moisture. I'm doing a little bit of
massaging of this data to convert it into a percentage,
grabbing the temperature and packaging it and sending it off to Kafka.
I will link to the source code later on so you'll
see a little bit more detailed, more than a code snippet.
All right, so using this I can capture these readings from
my raspberry PI and start writing them into Kafka. That's great.
But like I said, those readings, data information
isn't actually enough for me to understand and act on the
watering needs of my plants. Right. I need a little bit more information,
some metadata to actually do something with the information.
Okay. So again, after thinking about
it for a bit, I created an Avro schema for my plant metadata,
right. The biggest thing that this data set contains
is the individual plants and their desired moisture
levels. All right? I also included their names, like the scientific name, their given
name, their common name, just for fun and make the information a little bit easier
for me to read later on. And I also included that plant id
in there as well for joining and processing later.
Again, how do we get this data into Kafka? We have the producer
API or Kafka connect. What should we use?
So this is relatively slow changing data,
right? And Kafka Connect is a fantastic candidate for data
that's at rest or data in a database, something that's not changing.
Often some reference data sets and
metadata or reference data sets like this should probably
live in a database. But I'm going to do some hand waving
here. I still ended up using the producer API in this case.
So even though my plant metadata is slow changing its data at
rest, remember, I only had four plants that I could monitor
at a given time due to that hardware limitation.
Right? So it was a bit of overkill to set up a
database to maintain such a small data set, right?
So for now, I put together a separate python script to serialize
my plant metadata messages according to the schema that I defined using
Avro. And I assigned each plant an id and produced it in a Kafka.
Great. This stage of the project was actually really interesting for me
just as a plant parent. In order to produce the plant
metadata, I needed to do a lot of research to understand my individual
plant's needs. So there was a lot of guessing and checking involved,
like letting the plant get dry hands, checking what that moisture level was.
And it's still not perfect. I still sometimes update the metadata based on
what I learned about my plants over time. So it's actually a really cool learning
curve for me. So the script that I wrote to produce data
during this stage was pretty similar to what I showed earlier.
Again, I'll have that full code linked at the end of this talk if you
want to check it out. So I was able to get my
plant metadata into Kafka. I have my readings being
generated into Kafka as well,
but. All right, is this setup ideal? Right. I have
my collection script on my raspberry PI and there's a hard coded
mapping of sensors to plants. And I
also have to add my plant metadata manually. Right.
That's not very event driven. Right. What if I wanted to switch things up
a bit and change which plants I'm monitoring at a given time? Right.
The first thing I'd have to do is write new plant metadata using
that script that I put together. And that's a manual process. Right. I have to
input that, execute the script.
It's silly, too much. Then I would have to go
into the collection script and alter that hard coded mapping from
that sensor id to the new plant id that I'm inputting.
And then finally to have to move that physical sensor from the old plant to
the new plant. And that's a lot of steps for something that should
be pretty simple. Right? What can I do to make this a little more seamless
for me, the user?
So there's a couple of things that we can improve in here and I want
to focus first on that first part, making it easier to write the
plant metadata into Kafka. All right.
Since I was going to be sending alerts to my phone through the
Telegram app eventually, I thought it would also be cool and confluent
to write data to Kafka, also using my phone.
So I could actually use Telegram to define a conversation workflow
that would allow me to input the plant metadata through a conversation
on my phone. And Telegram could write that data directly to Kafka for
me, which I thought was pretty cool. So for
those of you who aren't familiar, Telegram is a messaging, you know,
WhatsApp or WeChat. But Telegram offers a very convenient
messaging API as well as the ability for users to
create and define their own bots. So it's pretty easy.
You register for a bot, you receive an API key that you can use to
connect to that specific bot, and then from there you can define
how users will have conversations with it.
So specifically there's a pretty nice telegram python
library, and with it you can write a script that processes and handles
any incoming messages with the bot. And then at
the end of this we're going to produce that plant metadata into Kafka. So what
does that look like for any telegram bot
that you design? You're going to follow a pretty similar process, right?
You define a high level conversation handler that lays out
how to start the conversation, the different states that that conversation
should go through, and then the functions or message handlers
that are associated with each conversation state. So if you're into this sort of
thing, it's basically a finite state machine. You just kind of map out the flow
and it's pretty easy to set up once you get the hang of it.
So here you see I'm just laying out the conversation flow
for updating the plant metadata. So I'm capturing the individual
plant, the plant id that I want to update and then all the details that
I need to adhere to that schema that I created. Right?
Hands diving into one of these message handlers a little more closely.
So here I've already prompted myself, the user
for the low moisture threshold. And so I'm capturing that low moisture threshold,
storing it temporarily within that conversation state,
and passing it to the next stage of the conversation. So after
I've captured this low moisture threshold, I'm going to prompt myself to fill in the
high moisture threshold and then return the new state of the conversation that we
should go into, which in turn is that high moisture threshold state.
Once I've gone through all the steps of collecting that information, then it's
time to produce the data, right? And I've captured all
the information according to that schema that I defined.
This is a basic Kafka producer. There's nothing really fancy up
my sleeves here. I did make my life a little bit simpler within the
context of this project, though. I defined some helper classes for
both the clients and the individual data classes. So you
see, I have a means for creating a producer using the houseplant metadata specific serializer
as well as a function for converting a dictionary, which is
the temporary stored state, into a houseplant data type.
So from there, the production process is pretty standard.
And once I start running this bot, I can have a conversation
with it. I could choose the update plant command and
it's going to prompt me for the information that it needs. I'm going to go
through the full flow of it and at the end it's going to ask me
to confirm the information before I produce it into Kafka.
That's awesome. Now I don't have to actually go onto my computer
to produce new plant metadata, right? I can just do it from
my phone which is cool, but that's not really enough, right?
We've handled one aspect, one problem of this situation,
which is getting more metadata into Kafka.
But I still had a hard coded mapping of sensor ids to plants
within my raspberry PI collection script, right? Even if I updated the
metadata and added new plants, I still needed to alter that script manually,
which is not good. So rather than hard code the sensor
to plant mapping in my collection script, I figured, okay, maybe I
can make the script a little more event driven. Hands, get those mappings from Kafka.
All right, so let's collect a little more data
then. Before we do
that though, we need a new schema. All right, I set up a new topic
that would only contain the mappings from a sensor id to a plant
id hands. This way I could easily change which plant was
using which sensor, right? So I did exactly the same thing
I was doing for the metadata collection within the telegram bot.
And so I set up a very similar conversation handler.
This time it prompts me to select a sensor id and input the new plant
id that it should be mapped to.
So now I just had to make my collection script use those
new mappings from Kafka. And similar
to producing the data, there are a couple of ways to get data out
of Kafka. You can either use the consumer API or Kafka connect as well.
Much of the same considerations come into play for data consumption as it does
production. But generally, if you want to react to every kafka event,
you're likely going to use the consumer API. This is pretty
flexible. It's available in most languages you want, and you're free to do whatever you
want with the data at that point. On the other hand, if you're looking to
move the data somewhere like a database or to some longer term
data store, then Kafka connect is going to help you consume the data and move
it to that sync. In my case,
the consumer API made sense. So the first thing I did was add
a Kafka consumer to my collection script to read from
the topic and build up a dictionary of those mappings from sensor
id to plant id. There are a few extra details to be aware of,
though. The first is that the mapping topic was configured to be compacted.
This basically makes the topic like a table
where it maintains the latest value per key. And that should
make sense because I don't really care about which plant the sensor was mapped to
before. What matters is what that sensor is mapped to.
Now also recall that Kafka consumers have
a way to keep track of the last message that they consumed and processed,
right? It's called an offset. So offsets are
really great when you have a way to maintain state in the consuming application.
But I didn't want my application, this script to be stateful,
right. I didn't want to deal with persisted state in my collection script.
So to get around this, I needed my consumer to start from the
earliest message in the topic every time it started up.
So do this. I used a few configuration parameters
for my consumer and made my consumer non
committing. Basically, it's never going to keep track of a bookmark,
it's never going to commit an offset. And every time I start up that
consumer, it's going to start from the beginning of that topic, every time.
So on startup, when I execute this script, my consumer
is first going to block the rest of the script. It's going to read from
the Kafka topic and keep collecting readings until it has a full
mapping for all four sensors.
Once that's done, in between the looping of every 30 seconds,
my consumer is going to try to fetch new mappings just in
case we've added a new one. So if I ever move a sensor around
or update anything, that new mapping is going to be propagated within
30 seconds, which is good enough for my use case. So at
this point I was free to move my plants around, right? I was free to
add new plants from my phone, define new mappings,
and at that point I could rest assured that my raspberry PI was going
to be fetching the data that it should be for the plants that it should,
which is great. So the first half of
my pipeline is complete, right. It's fully event driven. I'm capturing
this data. I don't have to worry about manually inputting anything pretty much,
but now all the information is in Kafka. Now I just need to start
making sense of it. Using stream processing for
stream processing with kafka you have options. You have a lot of
options. The OG way is to use the producer and consumer API
directly. And these APIs are great because they're available in all your favorite
languages and then some. But to use them for streamed processing,
you'll have to go through the full process of consuming the data from Kafka,
then doing any of your transformations in memory.
If you're doing stateless transformations,
great, move on with your life. Transform them, produce the data
back to Kafka and you're done. If you're doing stateful transformations though,
there's a lot more to consider, right? You have to handle your state,
you have to decide how to make it fault tolerant. What happens if your application
goes down? And that's a lot to think of. For stream
processing, this involves a lot more work,
especially from the producer consumer API. They're the lowest level option
available to you, but there are some opportunities to be a
little more flexible in what you can do with the code at that level.
So do what you will. If you're a
sane person, though, you'll likely use one of the next two options. So moving
up in ease of use is Kafka streams. And Kafka
Streams is a Java and scala library that takes the hassle of managing state
off of your hands. So out of the box already, it makes stateful processing
so much easier for Kafka. It gives
you a lot of stateful transformations out of the box available.
It's built on top of the consumer producer API, so you get
a lot of cool things for free, like the scalability from the
consumer group protocol that allows the consumers to parallelize the processing
from a given set of input topics. And what that means for you
is that you can spin up multiple instances of a Kafka streams application and
they're going to coordinate and share the input data across the
running instances. And if one of those instances goes down
for any reason, the remaining instances will
rebalance. All right, they're going to take whichever partitions that
one instance was processing, redistribute them
across the other remaining instances and also bring the state with
it, right, so you never lose that state. It's always persisted to
Kafka. And the running instances are just going to be able to find that and
keep going way better than the consumer producer API. And you
get that for free, right? There's very little that you have to set up to
make that happen. And finally,
if Java isn't your thing, the easiest and most convenient way to transform
your data from Kafka is using KSQlDB.
The really cool thing about this is that it's just SQL syntax,
and within that you have access to pretty powerful stream processing that's built
on top of Kafka streamed. So you can filter, you can
join, you can aggregate a lot more, and you can do this
entirely within the confluent cloud console,
which was another reason that I wanted to do it because again, I didn't want
to deal with any infrastructure and this allowed me to do
it within the web console. So that's what I chose to use. Let's get
into it let's process this information. When you're working with KSqlDB,
the first thing you need to do is get your data into it, right?
And to do so, you need to decide how your data has to
be represented. There's only two choices,
right? There's two main constructs in KSQlDB, you have a table, you have a streamed
streams represent unbounded ongoing series of events.
Great tables on the other hand, show the current state for
a particular key. So let's look at my houseplant metadata
topic first. The metadata should
probably be represented as a table. Tables keep track of the
most recent value for key per key, as I said. So if I ever wanted
to update a value for a given plant id, say tweak the low
moisture threshold, I would probably want my processing application to leverage
that most recent value first. I don't care about the old ones.
So the first thing I'm going to do is point to the Kafka topic where
that houseplant metadata lives and specify
the value format, right? How am I viewing this data?
And it's Avro. We have an Avro schema for it.
You'll also note that this is a pretty SQL esque statement,
right? A create table statement. The biggest part
of the statement is, like I said, pointing to that Kafka topic, defining that value
format. And the cool thing is
about defining an Avro schema. Taking the time to do that in the
beginning is that I don't have to specify all the fields and
their types because KSQlDB as a consumer can
access the schema registry where that topic schema
is stored, access that schema and use
it to parse that field and type information for me. So you'll recall
that there are about ten or so fields in that
schema and I only had to type one, right? So no
typos here, which is great. The only fields that I hands to specify
here explicitly is the key because a table needs
to know which key to determine the most recent value for,
right? So in this case, I just want to use the plant id so
I can take that SQl that create table statement, I can run it in the
confluent cloud KSqldb editor and I can start up my streaming
application behind the scenes and it's going to bring this data in hands,
start building up a table for it,
for my houseplant readings data. Every message is relevant
in that ongoing series of events, right? So it should be a stream,
right? And in this case we're going to use a very very similar statement
to the create table one, but in this case we're using create streamed.
Again, very similar. We're pointing to the Kafka topic where
those readings reside. We're pointing to that value format.
And since we used Avro, there are some perks. I don't have
to explicitly write out any of the fields and their types
for consistency. I'm just specifying we're bringing in that
plant id as the key. So cool.
I have my metadata as input in my Kisco DB application.
I have my houseplant readings being captured,
hands reflected as a streamed within this application as well.
Now we need to enrich the data sets
with one another. Remember, the readings aren't good enough
on their own. We need the metadata in order to make sense of them.
So I needed to join these two data sets together. This is going
to help me with that processing that I need to do later on. This is
a pretty hefty statement. So let's focus on a couple of components individually.
The first is that select statement. This is
effectively SQL, right? I'm joining two data sets, so I'm going to do a
join. I'm going to first select the fields from each data
set that I want to be contained in the output. And I'm using
something called a stream to table join that should feel very similar to
a regular SQL join. I'm just doing an inner join to make sure that
every output row has all of the information that I need from each data
set and so that I have everything that I need for that processing
in the next step. So if I just run this SQL query,
it's going to just give me what's currently in the application.
Right now, the current state of the application, all the data that's currently there,
it will execute this join hands, spit it out.
But what I really want is for this statement, the result of this statement,
to be persisted somewhere. And I'm
going to persist it to a stream, right? Because everything
that's output from this is going to be relevant. I want to process it,
I want to look at it. Okay, so I'm going to use a
create stream statement. I'm going to specify the topic where I want to
persist this data to specify the value format.
And so now when I execute that SQL, it will give me
everything that's currently available in the state of this application,
do the join, output it and persist to that stream.
Great. There is one more detail to consider though. Okay.
And there's one little line at the bottom of this query and that
is emit changes. So emit changes indicates
a push query. So what this means is that the result set is
going to be an open ended stream that's pushed me as output.
So not only when I run this SQL statement,
not only am I going to get the current state of the full application,
everything that's currently in those Kafka topics. Now,
every time a new reading flows in through the houseplant readings topic,
we're going to execute this query. That result row is going to
be output and it's going to be appended to this stream. So every time new
information comes in, we're going to enrich it hands. I'm going to get that
information out. This is contrasted with
something called a pull query, where I'm pulling a finite result set
that reflects the current state of the underlying streaming application.
Right. That's the example that I saw that I gave you earlier.
It's just going to tell me the current state of the application, execute the query.
Done. It's not going to keep running. All right,
so now I've enriched these data sets, I have all the information I need.
Every time new data comes in, we're executing that join and I have
that data available to use in the next step. So let's
revisit the overall goal here.
Whenever a house plant has enough low moisture readings
telling me that it needs to be watered, right, I want to receive an alert
on my phone. Now, I've changed this value
a couple of times since I've started this project about a year ago,
but right now I am collecting moisture data every 30
seconds or so. So with that in mind, I decided that
if a plant needs to be watered, I don't want to receive an alert every
30 minutes. Every 30 seconds. Right. That's entirely too many alerts on my
phone. So in the event that I'm out of the house or I'm busy,
I figured that receiving an alert every 6 hours would
give me enough time to act on and water my plant, or at the very
least come home, tell my partner to go water the plant, whatever I
need to do. That gives me enough time. I also noticed that
these sensors weren't perfect. Right. They were a pretty good value. They were a couple
of dollars apiece. But what that meant is
that sometimes I can get false low or false high readings.
But the general trend over time is downward, right. The plant
is drying out, the water percentage is going to decrease.
So within a given six hour period, it's not going to be
a perfect decrease in moisture over that entire period.
It's not going to be monotonously decreasing. So it would be good
enough to send an alert whenever I received at least
one hour's worth of low readings within that six hour period.
Right? So with the readings being taken every 30 seconds,
that means I should at least receive 120 low readings before
I trigger an alert.
That might sound like a lot, but this is the query that
I wrote to achieve what I just said.
Let's break this down, focus on a few key details.
The first thing is that I wanted to receive an alert at most
every 6 hours, right? So this is the most important part.
I'm setting up the query to aggregate over non overlapping
six hour windows. So already when an event flows in,
the query is first going to bucket it into the appropriate
six hour window. Great.
Within that window, I want to count the events per plant
where the moisture reading was lower than the low moisture threshold
as determined by my plant metadata.
So when I get at least 120 of those readings within that six hour
window, we're going to output a result. And note that
I'm grouping by plant id. Also, within that select
statement, I am grabbing all the details that I might
need to alert on and I'm building up an alert
message here saying this particular plant is dry,
so that's going to be involved as part of the output. I don't
want to oversimplify this because it's really easy to look at this and forget that
you're building a streaming application. And for
better or for worse, with all the things that are involved with a stateful streaming
application, there is state. You have to keep
that in mind, you have to think about it. So let's review this again with
that in mind. So first of all, another thing to
keep in mind is that when you're windowing and conducting an aggregate in KSQldb,
the output is going to be a table where each row is computed
per key per window. So per plant per six hour
period. This should make sense because a table is going to
provide the latest value per key in the end. So that
should feel right. So as each input
record is processed, what happens first?
Well, we're first bucketing it into that appropriate six hour window and we're looking
at the key, right? We're looking at the individual plant id.
If the record makes it through the filter, we're updating
the underlying state for that window per
that plant id. So every reading
that breaches that moisture threshold is going to be counted toward that 120 readings
that we need in order to output a result. So every time
something flows in and breaches a threshold. We're updating the state, we're incrementing that counter,
and then once we reach 120, we output that result.
Great. You might be asking yourself what happens after we
reach that 120th load moisture reading, right, because we're just saying having
count over 120. Well, will the events
continuously be output? Am I going to receive an event, an alert every
30 seconds after that point? No, because I've included
an additional line at the end. Emit final.
I love this feature. It saves me a lot of time as opposed
to emit changes. Emit final says wait until the
window hands closed before we output any result.
Okay, so we're going to wait for that six hour window to fully close
and then we're going to assess per plant id,
does that plant have at least 100 hands, 20 low readings? If it does
output a row, that table is going to contain a message. All right,
and that's great. Now, I had a Kafka topic containing messages
for every time a plant needed to be watered. At most once
per six hour period. All I hands to do now was get that information out
of Kafka. So let's see how to do this
with Telegram. We already saw how Telegram could be used to define
a conversation flow and write data to Kafka. But this time I just
wanted to push the data to the conversation I already had going with my telegram
bot. So the telegram API allows me to look
into the conversations that were currently going on with my bot and
see which ones were there using a unique
chat id. So I hands a chat id defining that
conversation between me and my bot.
Using the bot API key that I had for my bot
as well as the conversation id, I could define a unique endpoint
that I could use to send data directly to that conversation.
So conveniently, there's a Kafka connect HTTP
sync connector that only requires an HTTP endpoint to send data
to. As a bonus, this connector is offered as a fully managed component
in confluent cloud, meaning that I didn't need to run it on my own machines.
All I had to do was configure it through confluent cloud,
input that HTTP endpoint,
and tell it how to extract
the information from the Kafka message that's driving and triggering those alerts.
And then once I start that connect, it's going to consume from that alert topic.
It's going to be triggered by the messages on that topic. It's going to
use Regex to extract the alerting message. That field
that I wrote up that I defined, and it's going to send that alert directly
to the conversation I had started with my bot.
And then this is what it looks like. This is exactly
the message that I receive on my phone every time a plant
needs to be watered.
So I'm pretty proud of the fact that I have a fully functioning, event driven
pipelines for a very real use case that I care about.
All right, this solved me. This saved me so much time and really solved a
real problem that I had. I hope you enjoyed it.
But what do you get out of this, right? You're probably asking yourself,
you followed along, you saw a cool thing that I built,
but I want you to look a little bit deeper, okay? You actually learned
quite a bit about streaming data pipelines and what you can do with them.
So you saw the different ways that you can get data into Kafka, right?
Either using the producer API or maybe Kafka connect if you have
the right data source. I introduced you to stream processing
how to do it in different ways, either using the consumer and producer API,
maybe Kafka streams, and specifically a KSQlDB in
my situation. And finally you saw how to get data out
of Kafka with Kafka Connect, and how also to use the consumer
API to make my collection script a little more event driven.
So every streaming data pipeline needs some combination
of these three things. All right, so now you have the
tools to get started on your own, and I hope you will.
I hope that I have planted the seeds in your mind that you're now curious
and you would want to try out Kafka maybe for something
in your own home. So I have a link here to my link
tree that in turn has many more links to my
source code, the repository outlining the collection scripts,
the metadata creation script, and also the
telegram bot as well as the SQL that I use to
actually do the data processing. And I've also included some additional
resources there that link to confluent developer, our specific developer
portal that has many tutorials, how to guides,
language guides if you want to learn a bit more about Kafka.
So I hope you'll check it out. And if
you have any questions at all, feel free to reach out to me. I am
very much open to chat about the system and kafka at
any time. Until then, thank you for attending my talk. I really
appreciate it.