Transcript
This transcript was autogenerated. To make changes, submit a PR.
Hello and welcome to my talk on how to build machine learning enhanced event streaming
applications with Java. My name is David Kjerrumgaard and I'm
a developer advocate at Stream Native. I have over two decades of experience in
software development, big data and event streaming. I'm also a committer
on the Apache Pulsar project and a published author. As you can see,
I've worked across a diverse set of companies, everywhere from Amazon and
FedEx to Zappos. As I mentioned in the previous slide,
I'm also a published author. I'm the author of Pulsar in Action by Manning
Press, which is available for free download on the link shown here at the bottom
of the slide, and also a co author of Practical Hive. So let's talk
about the agenda for today. We'll start with why Apache
Pulsar is a great fit for event driven microservices,
and how to build an event driven microservices applications
with pulsar functions. Once we've shown you how to develop a
basic pulsar function, what enhances event driven microservices
to include a machine learning element to it? So let's begin with
event driven microservices and event driven architecture.
Event driven architecture is a completely decoupled
system in which microservices independent systems communicate
with one another asynchronously by exchanging events between them.
Rather than making a point to point call, use a message broker of some
sort as an intermediary to store these events and then deliver
them to all the registered consumers who are interested in these events. So an event
represents something like a change in state to the system,
such as an item being placed into a shopping cart,
ecommerce order being checked out, or an item
being added to your shipment and being delivered on its way for delivery to
your house. Now, event driven architectures are loosely
coupled, and again, they communicate asynchronously, typically via a pub sub mechanism.
Event driven microservices then, are microservices
that are designed to communicate with one another over these message buses, right?
So we talked about event driven architecture, but not necessarily microservices.
So you can couple the microservices philosophy with
decomposing applications in very small pieces based on a business line of unit,
and have them implement that using that same pattern.
Again, communicating over an event bus of some sort. In short,
event driven microservices need an event bus to interact with
one another. That's the critical component that all event driven microservices
share in common, regardless of what programming languages they're written in
such as Java or Python that allows them to be written in any
language of your choice and communicate with one another seamlessly.
So let's pivot a little bit and talk about Apache Pulsar and why it
is a good fit for event driven microservices.
Apache Pulsar was founded originally in Yahoo in
2012, and it was designed to be a cloud native messaging and
event streaming platform. So it includes all the capabilities of
your traditional pub submessaging mechanisms like you would see
in a traditional messaging system like RabbitMQ or IBM
MQ, things like that, and also includes a more modern
event streaming or event consumption messaging model that comes with
Apache kafka or under the covers, Amazon kinesis
and things like that. So it is really designed to support both messaging
semantics natively. Out of the is by default it has a
pub sub model so the producers and consumers don't interact
with one another. A producer connects to a pulsar broker and publishes
messages. These messages are stored on a topic which is
just a name channel between producers and consumers.
Consumers can then, at their leisure when they want to come in attached
to the broker and consume messages from the topic in an asynchronous manner.
So again, they are completely decoupled from one another, they communicate asynchronously,
and they have a single endpoint being the pulsar broker
to communicate between them logically. At the bottom of all pulsar
communications is what's called topics like any other
messaging system. Just wanted to note here that unlike other
messaging systems, it actually has a three tier
URI or uniquely referenceable interface id for each
topic. So a topic exists within a tenant,
and pulsar is a multi tenant system natively out of the box, which is a
different shading feature from other messaging systems. So you can have different
independent departments within your organization, sharing the same
logical cluster and being logically isolated from one another.
Next, under tenants exists the concept of namespaces, which is a logical grouping
of topics who have similar policy requirements around security
or data retention, tiered storage, offloading,
things like that. And then last but not least, there are topics sitting at the
bottom of those, and these are where the messages ultimately end up. And so when
producers produce the topics, they address it that way. And likewise
from a consumer perspective, they give the tenant namespace a
topic to uniquely identify those particular topics they
want to consume messages from I'll touch briefly on the physical architecture
of pulsar cluster that it is. Again, as I mentioned
earlier, it's cloud native, and what this means is it's architected in such a way
that it's designed to run in cloud environments, more containerized environments.
It can take advantage of features like stateful sets
to automatically restore state if one of the instances
goes down or to scale up automatically dynamically. And how we architected
for that way, a pulsar to be that way was that we had a decoupled
architecture in that the serving of the messages, the pulsar brokers shown there
at the top are completely stateless. No data is stored on
the pulsar brokers that's kept completely separate in what's in
a storage layer based on Apache bookkeeper, which is another storage open
source project based on for storage. So when you publish data to a pulsar topic
it is persisted to bookkeeper. This gives a lot of advantages, again for resiliency
because if any broker dies another one could take over
serving that topic because the data doesn't go with it which is completely different
from all other messaging systems out there. This really sets it apart which allows
it to scale dynamically. Now tying the two together is the metadata storage,
right. We're putting this data on a different storage layer. So how do
the brokers know where to go and bookkeeper to get it? Well that's where the
metadata storage comes in. That's where we keep all the
records, policies and what we call the managed ledger
for where this topic components are spread out across Apache
bookkeeper in that component. It's a pluggable architecture. You have a lot of choices
on what you want to use. Historically it's been based on Apache
Zookeeper which is known to have issues and scalability issues at some point.
So if you're running in a containerized environment you might want to consider Etsyd.
We also included a new release by streamnative called Oxia
which is a highly scalable system that solves a lot of the problems that zookeeper
had. So if you're interested in going with pulsar, I highly recommend looking at Oxy
which is indicated by that little pinwheel logo there
on the left inside the metadata storage box. Let's pivot a little bit
and talk about pulsar functions. So now you know what Pulsar is. It's your event
messaging bus. It provides pub sub messaging. That's great David. We need that
for event driven architecture, event driven microservices.
So what are these pulsar functions? Well within patchy Pulsar we decided to create
a serverless computing framework that runs inside pulsar itself. You can think of
them like AWS lambdas, and they enable very low
functional level programming. A very simplistic
API that wires in directly to Apache
Pulsar itself and handles all of behind the scenes
the details of the coding, of setting up a producer to
write data to Pulsar and a consumer to consume
data from Pulsar. And all you have to do is function on the business logic
itself. So you say, I want this to be a pulsar function. This is my
input topic, this is my output topic. And all of that happens for you automatically.
The framework takes care of that for you, and they allow you to run individual
units of code that react to that particular publication of
messages. So they have a programming model, something like this.
Pulsar function is just a deployable piece of code that you control,
and every time a new message comes in, that event gets triggered automatically. We'll see
what the API is here in a second, but every time a message comes in
off of any of the input topics you're subscribed to, boom, your code
gets triggered. Now, as you process that
data, you may want to have an output, generate some output,
do some processing to it, do some manipulation so you can optionally
publish it to an output topic. And also for monitoring and tracking progress,
things like that, debugging, there's an optional logging topic as well, which you can
publish messages to. And this is great. So again, it supports multiple programming
languages, and we'll focus on Java today. But if you want to write your microservices
in Python or go, that is also fully supported.
And the key to this, as we'll see here at the end, is that you
can package third party libraries into these code itself.
So if you want to include a machine learning library,
as we'll see here in the example, you can do that and take
full advantage of that inside your pulsar functions. You don't need to set up
a complex framework to do these sorts of things, you can just easily embed
it and take advantage of that. So why do we think pulsar functions are good
for microservices in general, or just why
pulsar functions versus alternatives, right? When we decided to design
pulsar messaging system, we realized that a lot of
stream processing functionality is really simplistic,
that you need to do a few simple operations on it. And a lot of
people had difficulty in setting up yet an additional data
pipeline tool, something like a stream processing
engine like storm or flink, that's its own distributed system,
just to do some processing of the data. So we
wanted to make it more simplistic. We also wanted to embrace the serverless
computing model so that they can run in containerized
environments. They can be deployed as Kubernetes pods.
This allows them to be resilient to failures and starts and stops, things like that.
And it's also to maximize developer productivity, right? You don't need to
learn a complex framework
like flink and something like Scala to handle spark
or something like that. You can just write a very simple language
native interface in Java and implement a simple method and
it's easy to get going. Again, they're designed for lightweight stream
processing. They excel at basic use cases that don't
require the complexity of full stream processing engine, such as event driven
microservices. Again, so an event comes in, I'm notified of it,
I do my processing logic and I optionally publish an event out, or I
manipulate and store some data. That's entirely up to me and it's a
great fit for that. You can also use them for simple things like
messaging, transformations, ETL sort of processing. I want
to do data enrichment. Something comes in, I want to do lookup in a database,
add that up, clean up some data, maybe offload some bad records coming in,
things like that, and you can chain them together. That's the beauty of them as
well, because you have an input topic in one and an output topic. That output
topic could then be the input to another one. So it really allows you to
start developing some complex chains using some very simple
tools. So let's talk about developing pulsar functions. So I laid the
groundwork here. Now you have a processing engine, you have a compute engine to do
it on what is required to do it. So there are native
pulsar functions, but I won't get into that today. That just
allows you to do it without the SDK, which is an option. But what I'm
going to demonstrate straight today includes a pulsar functions SDK software
development kit that includes an API. Again, it's supported
for all three languages, and it provides a richer set of API
for doing interesting things, right. So you can have a single message come in and
publish it out to multiple output topics if you want. We also
support the retention of state within pulsar functions
themselves. So you can, for example, compute an aggregate,
store that to a state and then come back later on,
or pull that data back out and use that stored value as part of your
manipulation computation. Again, which is great for something like event driven
microservices. Maybe you want to store your database state what you've
seen of these messages, so that you can use that as your external
data store and you can produce to many different topics,
consume data, content based routing. All these different features are available with
the functions SDK. Last but not
least, before I get into the demonstration, obviously we're going to package a
lot of third party libraries together, machine learning libraries specifically for this use
case. And obviously when you deploy them up to this fronttime environment,
it's critical that you bundle all these deployments with them. They're not going to be
available inside pulsar themselves, so they have to
be all within a single deployable artifact.
And for the Java based functions you can use either a Uber
jar or a NAR file. I prefer NAR file
for multitude of reasons, and that's what I'm going to demonstrate today. For Python
you have different options as well, and for go based functions
you can deploy them as a go file.
So let's explore the code from the repository.
There are two maven modules within this repo, the first being
one called sentiment analysis function. This represents
a Java based microservice that will be using the
machine learning model to perform a sentiment analysis on
raw tweet data. As you can see here, it contains
a Java class called sentiment analysis function that implements the
function interface defined in the Apache Pulsar
functions API. It is a typed
interface which specifies the input type value coming in from
the source topic as string, and we will be returning
a type of analyzed tweet which is a user defined
class that will contain the tweet text itself along
with the calculated sentiment for that tweet.
The only method defined in this interface is the one called process
found here, and we mark it with the override annotation.
To note is that we're going to return a type of analyzed tweet,
which is what is specified as the output type in the function,
and these must match. Similarly, the input type
was specified as string and the input parameter. The first one
is going to be of that same type, allowing you to have strongly typed
data coming in so you can manipulate it in the manner you see fit.
We have added a single guard and initializing method to make
sure that we initialize this natural language processing
library just once in order to perform it, rather than do this
multiple times. We can see that this library is
an open source library, third party library available here,
and we include it in the module as a definition
here. So we include it as a normal dependency like you would any other
Java class, and it's freely available inside the pulsar functions
themselves, which is really powerful.
Not a lot is going on inside this particular method, but we just want to
demonstrate the capabilities, right? So a tweet comes in, the text comes in,
and then we leverage this third party library, this machine learning model,
to perform sentiment analysis on it. It calculates a
sentiment for us, and as I mentioned before, we return a value that
contains the original tweet text along with a calculated sentiment.
And this is really it. This is the core of it, but it gives you
an idea of what actually can be done. Now, in addition to including
the third party libraries in the base palm, you can see that we've defined
a plugin, a build plugin here, which is the NAR plugin.
This bundles everything together in a unified single deployable
modules. I discussed in the previous section, makes it easy for deployment.
The second module is called a tweet simulator,
and this is going to publish data out for us.
It is also based on the Pulsar I O
and functions framework, but it implements a different interface here
called source, which is part of the Apache Pulsario
core framework. It allows you to integrate with systems,
publish data in and out. In this case, we're using it to simulate tweet
data, primarily because the tweet API is now locked
down. And after a thousand tweets, you're sort of out
of luck for an entire month for testing. We want
many more tweets than that. So I wrote this little class
instead to generate data into that particular topic to feed our
sentiment analysis function.
This tweet source here reads data from a
local file, just created some random text, some sentences that come in,
some random data that will be passed through, and then we'll perform
analysis on that. So that's really it.
That's where the code is. And this can easily be built.
If you go to your terminal and you run maven clean package,
it will generate NAR files for both the tweet source and
the sentiment analysis function, which we will use for deployment in a
minute. Also want to note that as
part of this particular project, we have a pulsar function,
or pulsar cluster rather, that you want to run locally and can be started with
the start brokers command, which is what we will use down here.
We'll notice that we're already in that particular bin folder, so I'll go ahead
and start the brokers now. So we have a pulsar cluster to
test against locally. Makes it very useful. It's self contained,
and I want to note here that I have specifically used the version 293
of pulsar. And I've used a little bit of a trick to
map the volumes, these internal volumes, to where these nars are being built to.
You can see to the target directory here. I'm mapping it to a file
system on the pulsar cluster,
which is going to make it easier to deploy these using the command line tool
in a second. So I did it once for the source and again
for the analysis function as well. Also note
that you want to make sure that you enable the function worker.
By default, it is disabled in the pulsar standalone.
So you can do that by adding this particular setting and making
sure that you do not have the NFW switch.
A lot of times that's there, and that means no function worker that
disables it. So make sure that that is turned off. Once you've
done that, you should have a pulsar cluster up
and running. Let's verify. Let's look at this
docker logs pulsar f
and we can see it started, it's up and running. Everything is
great. We're going to use this to monitor the progress
of our system. So now let's pivot and
start generating the source data. So as you can
see here, I've already copied and pasted in the command to do it. There is
a command line tool in the pulsar admin called source,
and it has lots of different methods, the first of which is create. And then
you can specify the name of what you want to name that particular source.
And then you specify the archive file or the artifact
that contains the source data. In this case, it's the NAR file that we generated
and specify a destination topic. Where do I want this data to publish to?
So we'll go ahead and execute that and
look at the log here. And we should see it coming in so we can
see that it got received. The tweet simulator came
in. It's being unpacked, the metadata
is coming out. It has some information on how it was deployed. It was
a source, the NAR file was deployed. So if you want
to get down at the details, you can check these things. If there's any issues,
check your log on
that source. We can verify that it was deployed as we wanted.
We can see the configuration values here. Everything looks
correct. We can also check the
status. Right? So let's check the status of
our source just to make sure that we're generating some data.
And it's been executed 17 times already.
So we can come over here and we
can consume from the tweets in. So this particular source is
publishing again, as we saw,
to the tweets in topic. So let's consume.
This is another command we can run in a separate window, create a pulsar client
to consume. N zero means consume indefinitely.
Give it a unique subscription name and the topic name, which is going
to match to where we're publishing the data. And let's verify that the data
is coming in. We're going to see some startup information
here about the kind of connection, but then shortly we'll start
seeing the tweets coming in one by one, data coming from that raw
text file one at a time.
So the source is working.
Now let's pivot and start creating the
pulsar sentiment analysis function.
So I have left in the code itself a
method that encapsulates all this. We don't want to type it
from memory like I don't, so we'll go ahead and use this to
recreate it. I want to break this down a little bit though. So again,
we're executing a command inside pulsar itself,
and inside pulsar there's an admin tool and
a function subcommand, and we tell it to create a new pulsar
function. We specify with a dash jar that where
the entire artifact is to deploy, which is the sentiment analysis nar file.
We just created the class name of the sentiment analysis
function itself, so again, it's the full package name.
This is the class we want it to run. The input
parameters are going to be this tweets input topic here that data is coming into,
and for our output we're going to publish it to a different topic.
Default tweets out. And last but not least, if you want
to, you can specify a log topic as well for informational data there,
but that's really all we need.
This should work correctly. If everything goes correctly, you see
a command created successfully. Again, you'll see things in the log file
coming in, letting you know that
it was successfully received, how it was configured, the tenant
namespace and name class. Everything else should match what you want. You can
quickly verify this as well. Everything's up
and running. If you have any sort of errors, and it would be first noted
here. If you want to
also check on the status and get
the configuration details at any point in the future,
we can copy this real quick.
Run the command here and it should spit
back out the configuration details that we sent in,
again, verifying it was deployed as we see fit. If you want to update
these, if you want to change the output topic,
change the parallelism, things like that, you can change the behavior later,
but it looks like everything is up and running.
So now let's go and see what the output looks
like. So again, as we saw earlier,
it's writing as an output to tweets out, public default tweets out.
So we are going to consume from that same topic over here
in a separate consumer and see this again, there's still the tweets coming in.
So let's see how our machine learning model is working. Is it
getting the information? Is it processing it correctly?
We'll start up here shortly. Again, you see some connection details
coming in and sure enough, you're starting to see here was the original
tweet and it was assigned a sentiment of neutral.
The next one comes in, everyone was curious,
positive, et cetera.
So it starts coming in, it matches. We can get the most recent data and
we're doing some analysis on it. So it's event driven in that regard. As soon
as something comes in, an analysis is done on it
and it's published out. And so you can
see that this is moving and it's going to work in tandem for a while
and that's not a problem.
If you want to check the performance and how long the
processing of each one of these tweets takes, you can use the stats
command, which is going to give you some average processing
latency. This is the average, obviously, processing time
of each tweet. It's going to tell you how many tweets have been
processed, how many over the lifetime of the tweet
itself, and things like that. So lots
of information is available. As you can see here,
everything is working just fine as expected.
So in summary, event driven microservices use a message bus to
communicate between themselves and Pulsar is a cloud native distributed
messaging and event streaming platform that provides the pub subsmantics
that an event driven architecture needs. Furthermore,
Pulsar includes a native lightweight compute capability
called Pulsar functions that allows you to build microservices with
just a few lines of code. And as I've shown you,
you can easily add third party machine learning libraries into your pulsar functions
to enhance your microservices with machine learning capabilities.
Thank you for attending my talk. Let's keep in touch.
Scan the query code for my personal page,
follow me on Twitter, reach out to me on LinkedIn and go get the
source code from my GitHub repo. Again, thank you very much.