Transcript
This transcript was autogenerated. To make changes, submit a PR.
Welcome to Cloud native Apache Pulsar Development 101
with Python.
So this is Pulsar for Python people.
And but what is Apache Pulsar?
How do I code against in Python? Three do I consume messages?
How do I produce them? How do I interact via other protocols
like MQTT, websockets, and Kafka? How do
I use Python for making functions? And what
are schemas? How do I use them and why do I care? I'm Tim
Spann. I'm a developer advocate at Stream native. Like mentioned before,
I'm interested in the Flippinstack flink pulsar Nifi,
working together in an ecosystem of lots of cool open sources
projects. I've been doing this for a number of years in the big data
and cloud space. I work at stream Native,
founded by the original developers of Pulsar,
and we're out there to help people with Pulsar
get you into a cloud environment that
scales and is managed for you. I run the Flipstack
weekly weekly newsletter, has a lot of cool content,
videos, articles, code. Sign up and
get it once a week. Free training is available.
If you're really interested in Pulsar, we give you an environment you can code
with and you get some really good training
materials. Walks you through with some quizzes along
the way. Pretty cool way to learn how to do pulsar.
But what is Pulsar? It is a unified messaging platform.
Why? Well, there's a lot of different messaging systems
out there, different types, they do different things. Yahoo saw that and
they couldn't find one that met all their needs. They did not want to
run multiple messaging systems because they wanted to interrupt.
They wanted just one scalable architecture to do
that. Pulsar unifies multiple
messaging protocols, messaging styles, and does
this in a pretty unique way. As expected, with messaging
all your messages are guaranteed very resilient system and
you'll see why. With the underlying architecture, you can scale out
as big as you need to. There are a lot of large companies scaling
out to petabytes, thousands of machines as big as you need
to go. Or if you're just starting out with one docker
container cluster is pretty straightforward.
What's nice is because of things, separation of
components. These are a separation of compute
and storage makes that very easy to get up on.
Kubernetes, we've got a helm chart and operators for
you all in the open source. Don't need to use any
commercial product there. What's nice is the
different layers are isolated and with such
we've isolated the metastore out. The metadata store
is used for metadata that both layers need
and for discovery of services. This is now
independent of a particular implementation.
As of right now we have zookeeper,
RocKsDB and eTCD coded.
And because it's open sources I'm sure people will be adding more.
Got a lot of developers out there. The pulsar brokers
handle all the messages, make sure things are routed connected with
some caching in there. Automatically load balance handles
segmenting the topics. These are very easy to scale out.
They don't need much disk. These are very inexpensive smaller brokers.
They can scale them up and down very easily. Not have to
worry about the storage. Storage is in Apache bookkeeper.
These bookies have the messages and cursors pointing
to everything and they segment them up to
optimize storage. Makes it very easy. You don't have to have
thousands of these giant SSD nodes. And what's cool as
well is you could design your topics to automatically
tier out to storage such as s three or different object
stores or hdfs. Makes it easy for you to store as much
data as you want and not have to worry about that when you need to
consume messages. Even if you're going back through years of data,
petabytes doesn't matter.
Pretty standard in messaging. Publish and subscribe
producers send messages. Ordered name channels
called the topic messages have a payload,
whatever that may be along with some additional metadata and
properties you put on there. Brokers make sure things
are routed properly and everything's connected. You create a
subscription to topics to get what kind of
data you want. These are the Magic way you decide
how I want this system to run. Do I want it
Kafka style streaming? Do I want to fan out?
Lots of options there. Your consumer get these great messages.
These subscription modes are important because this gives you
some cool different styles of messaging.
With exclusive and failover over you get a guaranteed
order. You get a single consumer with failover,
if that consumer can't function the backup takes
over. This is how you do Kafka style streaming.
This is if you need things exactly once in an order your
traditional flink. Things like that work with that type of
streaming. That's a great way to do things. But remember
it's the consumer that decides that. So it doesn't matter how you got
those messages in there. Consumer decides I
want this streaming style, you get it. If you decide I want
a work queue, give me. I'm going to have 1000
little apps running there and as soon as they cloud get something to
run they're going to run it. All of them active. Don't care about order,
just process this stuff as fast as possible. Started is great for
that key shared. Let me split the difference.
Could have multiple active consumers in order,
but it's order for a key so this is good. If maybe you're doing CDC.
Each one has a table. Make that the key. Have each
table consumer get their own table. All those
rows in that table will be in order. Lots of other examples for
that. What's a message? A message
data. Obviously this could be raw bytes,
but you can make sure that it conforms to different
types of schemas. Key highly recommend it.
It's helpful for reducing storage if you need to partition.
If you need to find data randomly later. Having a key is
important. If you can't think of a key, I like to
just auto generate one, just to have one cloud add any kind of key
value map properties. These could be metadata that
you need along with the message without messing with that payload.
We like to name things when it gets produced so you know who sent it.
Good for debugging and other purposes. Auditing sequence
id so you know how it's ordered. Obviously something
you need if you're going to do streaming.
Lots of things you could do. Those are really great basic things.
But I want to get data in and out, especially with Python.
So we support functions. These let me run
basically know in Amazon
you have lambda or you could think of database triggers.
They get triggered by events that occur in
a topic or multiple topics. Pretty straightforward there.
A similar concept are connectors. These are sources and syncs.
These you set. Some configuration data automatically goes
there. That's a great feature. The protocol handler support is
important because this lets us not just do the pulsar libraries.
I could use existing libraries for activemQ,
for RabidmQ, for Kafka,
for MQTT. This makes it very easy to
not have to change code. Lets you support unified
messaging. You want to process data, I'll show you some examples
today. Flink spark presto. Makes it easy for you to
do that. We mentioned that tiered storage important once you get
a lot of data and you don't want to store it. All these expensive SSD
drives put it on that cheap s three storage. So a
function takes some messages in, can log
things to a special log topic, send it to one or more outputs,
or not have an output to a topic. You could update a database
or do something special in there. Get support for Java,
Python and go use any kind of libraries in there, including machine learning.
Pretty straightforward. I have a function that
takes in a chat message, which is just
json, but I take it as a big text file.
If I do use our special library to
build my function, I can get access to a context.
It gives me logging, gives me the message id. It's a
nice feature. And then when I do a return from that, it goes to that
output topic that I specified in my DevOps
setup. If I want an advanced way to run these functions,
by default we just run them as a process or thread
within the broker. You want to do that in a more powerful
mechanism. We can run it in Kubernetes with function mesh.
This makes it very easy to have a scalable architecture
for how are you going to run all these different functions? And there you're
setting a couple of Kubernetes files up,
Kubecontrol deploys them, uses all the standard APIs,
and it sets up a namespace to easily connect with
that pulsar cluster it's associated with for
those different protocols. MQTT is an example. If you
see here, it's not a simple little tiny layer there
or some kind of interpreter or something that
just converts a to b. We have a
full implementation of the MQTT
libraries, so these protocol handlers work as if
you now have an MQTT broker. And what's
cool is it doesn't matter how your data gets into or
out of pulsar, you pick the library you're
comfortable with. I send Kafka in,
I want to get it out as pulsar or I want to get it as
MQTT. Doesn't matter, it's that same
data. An easy way to access your data is to pulsar
SQL, which under the covers is Presto Trino.
And it lets you access the data events if it's already in that tiered
storage. So it lets you do a quick SQL and see
what's in your topics. If you want to do
micro batch coding or maybe some ETL,
spark is great for that. We got a great Spark connector.
Connect right to your topics, do a SQL on
it and then you can figure out where you want to write the stream.
It could just be the console if you're monitoring something. Or you could dump it
to a CSV JSON, Avro, parquet some other file,
maybe combine it with other things, maybe do some machine learning.
Pretty easy to do. You could
do some real time ETL and continuous SQL with
Flink. This is a nice way to process
your data at scale event at a time. What's nice here is
this can be done know either just a SQL
interface or you could use one of Flink's languages like
JavaScript or Python. The queries are pretty
straightforward. Select columns
from a table. That table being a topics you do order
by and group bys Max, all those kind of fun things you expect
in SQL in
the overview of this kind of think this is a universal
Gateway can be used as a buffer while you're doing
processing in different places. Very easy to get in and out of
a lot of different sources and
you cloud do a lot of processing in here, whether it's filtering,
aggregating, enriching, deduping data,
and it's nice now all these systems are decoupled.
You don't have to know if someone wrote something in Java or they wrote Scala
or it's coming from this database or that database.
All I got to know is one of these protocols to get in
and out of pulsar. Pretty straightforward to make things
have a contract. It's great to put a schema in there. It's really
easy to do that in python. If I have a schema, whether it's
Avro, Protobuff or JSON, makes it very easy
for me to know what those fields are, what it should look like.
Makes it easy to format these tables. We might want in Spark,
flink or presto, example of an
architecture here. We touched on those
topics already. I want to show you some code if you want to do this
at home. I've put all the directions
in the slides so you don't have to.
I'm not going to walk through them all because we not
have a super long session. I'll come back to a couple
that are maybe really critical to highlight some
things, but here you could download it if you want to try it as
like standalone on
a small node, like maybe an EC two. There's instructions
for doing it with stream native cloud, which we have a free tier
or some other options there. You could also do it in
Docker, you could use the Kubernetes helm chart or go
to our academy and you get to a cluster for a couple of days connected
to visual code instance to do some development.
But let's look at some real code.
Let's look at some demos. I have a
bunch of open source examples here, fully documented.
This one is for weather and this gives you a little idea of
what's going on here. I show you different runs,
how we interact with things just to make it easy.
All the source code is here. Makes it very easy for you to
run these things. Let's take a look at some of the source
here. This is simple python. The things that you need
here, you need the pulsar library.
You just install that with Pip and the one for schema.
Now, it depends on your infrastructure, lots of different
infrastructure supported. If it's not,
you have to compile it from C Plus plus, but we've got the full build
for that if you need to do that, and it'll even run on raspberry PI.
If I want a schema, which if your data is consistent,
has fields, let's do that. Let's create a schema.
This is an example. I want one for my weather fields.
So this is as simple it is. That's all I have to do.
I don't have to know some kind of special schema language.
Create a class, put the fields in it, we're ready to go.
So here I'm connecting, grabbing some stuff from a sensor.
The part that's important here is connect to my pulsar
cluster. If I have authentication,
I've got that examples in there. We support oauth and tokens
in a bunch of other ways. But just to keep it simple, I've just got
my local hosted environment here.
I've got my topic. If you look at the topic, it's a little unique.
I didn't go into this as we're trying to go through pretty quickly,
but pulsar is multitenant, so I could have
a lot of different people using this, so I can have hundreds of thousands
of topics. And the way to keep that clean and secure is
I have a tenant say that this one's for everybody, but I could
have one specific to your group, your company. Then underneath that
I make any of the namespaces I want, maybe per AOP, per line
of business. And under there are my topic names. So it keeps it nice
and isolated, easy to work through. Here is where
I define the schema. I just say JSON schema
with this class I set up up here for weather.
And if I wanted Avro, I would just say Avro schema. And I've
got an example of that documented there as well. I set a couple
of properties, and that's
really all you have to do to be able to set that up. And then
I have a loop that's just going through the sensor. When it ever gets new
data, I just set up a new weather record,
set all the fields, and then just send them along
with that key that we mentioned before. And I'm
just creating one from a build and then boom,
it's sent. Let's actually go to that device and
I'm going to run that code here. First thing it does
is connect to the cluster, build that producer. And now we're
sending data. I'm printing it out just so I can see
that it's working and it's for debug. Obviously in your code you're
probably not going to want that. That way you have lots of different ways you
might want to run your python. Could be in a notebook,
could be somewhere else. This is good to see that the data is coming in.
I know it's happening. That's great. Now if we
were going to do the Avro version, we could take
a look. The only difference in this whole thing
is here I say schema
is Avro. That's it. To make that
different. Pretty straightforward. Now I can connect to
Presto from the command line, or I could use,
there's some graphical tools I don't know how. Well you could
see this is pretty dark, but I just have a select star here
with just a couple of fields I want. I cloud just browse all the topics
from my tenant namespace. Presto sets that up
for me. And I get access to even all those metadata fields
like the event time, the message id, the key,
and I could even put those in my queries like this message id if
I want. Pretty straightforward.
Presto is pretty quick. I could change this query
and do something like account. Let's see how many rows we have
there. 81, 83. We're adding more
as we're running this, which is the good thing to see. Nothing crashed
here. I'm doing flink and that is a
similar way to do this, but this does this continuously.
Now the thing to show you in that is
that there are a couple of different
settings you need for that. If you're interested in other
examples, I have a lot of different examples, especially for
Python and Java as those are pretty common and
a nice way to be able to go through these things
without too much work.
So I've got some examples here and I've got some functions.
Maybe we should go into a function. I've got one in visual code here.
This one does sentiment analysis. It's pretty straightforward.
I'm using the function here so I get that logger so I
could log it so it comes in. I just
parse the json there and
I just return the sentiment. Pretty straightforward.
And then how I'm running it here is, I have
an example. Let me reload this. It's been sitting here too long.
Hopefully I didn't crash my system here.
Okay, so I'm just going to put a question in here.
Is today the best day for demos ever?
We'll see if everything's working. And it is.
So that's cool. And we can look at the publish time for all the new
messages coming through. That's great.
Is Apache pulsar cool?
Great. How about great? And I'll change my name here.
And it's giving me back a positive. So what you're seeing here
is an HTML form that's just doing a
websockets call to pulsar and
that's going into a topic down here is a separate consumer
that's getting the results of that function. I know it looks like it's
instantly showing up here it is making that Websocket called a
pulsar. Pulsar is triggering that function that does
sentiment analysis. It sends it to another topic and that comes
out here in my HTML page with live websockets.
All the source code for that is out there. Pretty easy to do.
We are running presto. Presto is a nice interface to show
you what's golang on. You can see how long things took,
how much data was accessed. Pretty cool way to do that.
Same thing with Flink as those jobs are running. We could see all
of that happening. Just trying to get you a good example
of flink here. We've got most of another example
documented. It's pretty straightforward to do these things.
But like say we wanted to do some
flink. If you don't know how to work
with Flink and pulsar,
just take a look at my code. I've got a lot of examples
out there. Like here shows you how to build a catalog.
So if I'm logged into flink, I create a catalog.
If I pasted it correctly, it's good to copy and paste
properly here. So let's go to this one. This air quality
is another example. Pretty straightforward.
But I'll create a catalog
for flink. Flink can connect to lots of different data
stores, which is nice. So if I wanted to join,
say, pulsar topics with a table from something else,
I can do that. That's pretty cool. Let's see all the topics
we have. Let's describe one. We're doing PI
weather is the one coming off that device there. So I
could just pick a couple of fields here. If they have
receives name, you'll probably want to make
sure that they're in quotes
so that you don't get those lost.
So this is piweather. When in doubt,
use those quotes. So this is going to launch a
flink job that could be spread across a giant flink cluster.
And again, you could wrap this in some python or Java or scala
code. The job has shown up. It is starting to
process. You can see it's a table source scan because I don't have any
joins with any other topics, but I could. So this is
running. So as long as I'm still sending data here,
should start showing up in flink again.
Lots of different systems can show you this, but as those
records are coming in, they're showing up over here and continuous.
I'm not rerunning the query as an event comes in, comes into
my results. I could have wrote this as an insert into,
put that into another topic or insert this into another catalog.
That could be a data store. And that is as easy as the code is.
Makes it pretty straightforward to run these type of applications
and the code, even if you're doing security,
nothing's too hard here. Do your
processing. That's your hardest part. Connecting into
and out of pulsar is easy.
Whether you want to use MQTT, you want to
use sockets from Python, you want to use the Kafka
protocol. Pretty straightforward. Data comes in,
maybe it goes into a function or to a sync comes out.
If you have an output there and anybody can consume
it. I could have thousands of consumers coming off the same thing,
same data. Just decide on your own which kind of
subscription you want. Pretty straightforward. I have
a lot of examples here we showed you so you can get started pretty
easily. Please join the community if you're interested
or have some more questions. I am available all
the time, which means maybe
I shouldn't tell you that I'm too busy. Don't contact me now.
If you have a question, please reach out. I'm always looking to
put out some cool new demos or help people with some
problems or bring back suggestions to the community.
Thank you for joining me. I'm going to have some meetups soon and
some other talks. If you want to see some different queries,
please feel free to let me know. I'm always looking to
do some cool new stuff.
So here, this is SQL. So if
I want to do something like get a max,
let's do a max on, say, temperature.
Is that a float? Okay, I could cast it
to a number if I defined it as a string.
Sometimes you define things as a string just because you're not
sure. So so far,
82 degrees. I may need to turn my air conditioner
on. It's a little loud. I wish I had central air here. What are you
going to do? So that's not getting any higher.
Pretty consistent if we look here. Hasn't gone up. We could have
done the minimum, could have picked another field.
Lots of options there, depending on what you want to do.
So thanks for attending my
talk. I hope you learned a little bit about pulsar
and why you should start coding today with pulsar and
python. Again, if you want to do it in go or Java
or Scala or Kotlin or C or
C sharp or node js or any library.
Anybody who could speak websockets or someone could
speak MqtT or Kafka,
please start codings. Put your messages in. Pulsar makes
everything easy, regardless of what type of
streaming or messaging you intend to do.