Conf42 Python 2023 - Online

Build ML Enhanced Event Streaming Apps with Python Microservices

Video size:

Abstract

The easy way to build and scale machine learning apps.

Summary

  • Kim: I'm a principal developer advocate for all the major streaming technologies. When you're building these type of real time applications, you need a team. So I thought I would pull in one of the top experts in Apache Pulsar to cover that section.
  • Over 1000 organizations worldwide are using Apache Pulsar to solve their event streaming and messaging use cases. Why is Apache Pulse growing in popularity? It really comes down to the feature set that it offers and some of its architectural differences.
  • Pulsar is the first and only messaging system that supports multiple messaging protocols natively. This allows you to natively take your existing Kafka applications using a Kafka library and switch it to Pulsar. And last but not least, tiered storage offload.
  • Pulsar functions have a very simplistic programming model. There's three languages supported, Java, Python and go. You can still leverage the full power of any third party libraries you want to use inside this code itself. We're going to leverage these and Python library to do some cool machine learning models with Python.
  • You import pulsar, create a client, connect to your cluster on your port, then create a producer for that particular topic. Most topics persist the data for as long as it needs to be. Once you're in production, you probably want someone to control who creates those schemas and topics.
  • Pulsar is a good way to have different protocols as part of your ML stream. Standard websockets and base 64 encoding. Great way to get your data into pulsar. Also a great way to communicate with say a front end client which could be pure javascript.
  • Now we could use a lot of different ways to deploy things. One way is command line and that could be interactive. This could also be done through rest endpoints or through different cloud uis. Great way to communicate between systems. Do that asynchronously and apply ML in different pieces.
  • Tim Spann: Nifi is a great way to get data in, apply some machine learning, get data between different systems. New features coming out in the next release that lets you integrate Python and extend Nifi using Python apps.

Transcript

This transcript was autogenerated. To make changes, submit a PR.
Hi Kim fan here. Welcome to ML enhanced event streaming apps with Python microservices because I couldn't fit any more words in that title. I'm a principal developer advocate for all the major streaming technologies. Flink, Kafka, Pulsar, Nifi. When you use a lot of these together, I like to give them a catchy little name either. Flip, flippin flank, all kinds of cool stuff. I've been doing lots of different streaming stuff for a number of years, trying to grow the open source streaming and doing that for a lot of different companies. Applications all over in the community put out a newsletter every week called the Flipstack Weekly. Covers all the cool tech out there. Scan it, check it out. It is easy to check out. If you don't want to subscribe, just look at GitHub. I put all the previous 71 episodes there. Easy to read, get through quickly. Great stuff. So when you're building these type of real time applications, you need a team for your stream. And now that team is a bunch of open source tools like Nifi, Flank, Pulsar, Kafka, Spark, Python, of course, Trino, and also other people you work with. So I thought I would pull in one of the top experts in Apache Pulsar to cover that section. So make it a little more understandable. I'm going to bring in my friend David, who literally wrote the book on Pulsar to give us a little background on that. Figured might as well go to the source and not have me go through that secondary. And while that's loading, we'll get I'm a committer on the Apache Pulsar project and also author of the book Pulsar in action by Manning Press. I formerly worked at Splunk as a principal software engineer with the day to day operations of the Pulsar as a service platform within that organization and served in several director and leadership roles at startups including Streamlio and Hortonworks. So I want to talk to you today about Apache Pulsar and why it has a growing and vibrant community. As you can see from these statistics here, we have a large number of contributors. We're up over 600 contributors now to the Pulsar project, 7000 plus Slack members in our open source Apache Slack channel, where our day to day Q and A questions are answered vibrantly throughout the community themselves, 10,000 plus individual commits and back into the project and growing. And over 1000 organizations worldwide are using Apache Pulsar to solve their event streaming and messaging use cases. So why is Apache Pulsar growing in such popularity and gaining traction out there. It really comes down to the feature set that Apache Pulsar offers and some of its architectural differences that make it stand out from the other messaging solutions in the market today. Chief among them is the ability to scale elastically both horizontally and vertically to meet your peak demand, and then scale back down to minimize your infrastructure spend after your peak demand workload has been processed. And this is only capable due to the fact that we've architected Apache Pulsar in a cloud native way from day one. And what I mean by that is that we've completely decoupled the storage and compute layers in such a way that they can scale up independently from one another, so that if you have a burst of demand, brokers can be scaled up to serve the incoming request and then they can be scaled back down and the data retains there in the storage layer. This allows for seamless and instant partitioning and rebalancing of partition topics within pulsar without stop the world event. That's common in Apache Pulsar, where the entire topic is down until the data can be repartitioned and a new partition added as data is shuffled across the different brokers. That is not the case for pulsar, but it's more than just that. If it was just that, that wouldn't be really interesting. It's also built in georeplication and geographic redundancy and continuous availability features built into the Apache Pulsar framework that are second to none. That was a primary use case for Apache Pulsar back in 2012 when it was developed internally inside of Yahoo was for full mesh georeplication across eight data eight data centers that are geographically distributed across the globe. This capability is second to none in the messaging system today and allows you to replicate data very easily from point to point and is built in natively to the system, and it goes beyond just the ability to copy the data from one cluster to another. We also have what we call replicated subscriptions, so you keep track of where you are in that processing model as well as you transition clients over. And last but not least, we have failure aware clients built into the Apache Pulsar library where you can specify this is my preferred active primary cluster, and if you lose connectivity to that cluster, you can immediately switch those clients over to what you configure to be a secondary standby cluster so they can continuously operate without any need to doing the DNS entry shuffling or things like that. The clients themselves take care of it and then they switch back when they detect that primaries come back online. Last but not least is we support a flexible subscription model. So Hatchie Pulsar is defined as the cloud native messaging and event streaming platform, meaning they can support both your traditional messaging use cases like pub sub work queue sort of semantics with these different subscription models. And we also support the more modern use cases of event streaming, where processing of the data in order and retaining it in order is critical. Both of those are available with Apache Pulsar, and it's built in natively through what we call subscription models where you specify, this is how I want to subscribe to the data, and both those messaging and event streaming semantics are supported. Another key feature of Apache Pulsar is that we integrate the schema registry natively into the platform itself. Unlike other messaging systems which acted as an add on after the fact, you have to pay additional monies to get these capabilities. That's not the case with Apache Pulsar. Just like other messaging systems, we support schema aware producers and consumers so that when they connect, they can verify that the schema they're publishing with conforms to the expected schema of the downstream consumer, so that there's a data integrity between the two. Also support schema enforcement and auto updating. So if you publish by default a schema of the producer, let's say you connect a producer with a schema that adds a field and you're looking for a backwards compatible schema strategy, then that will be enforced because the consumers aren't aware of that, weren't aware of that field to begin with. They can continue processing using the old Schema version two while you're publishing messages with schema version three, and it interacts seamlessly and this is all built in out of the box and it's stored internally inside of Pulsar and you get these capabilities for free. Another really key feature that I'm high on among so many, that's what I want to call out, is what we call Kafka on Pulsar. Now, Pulsar is the first and only messaging system that supports multiple messaging protocols natively. And what I mean by that is other messaging systems that have been developed thus far usually have a low level wire transfer protocol that's unique to them. For example, RabbitMQ uses an AMQP protocol spec that sends commands back and forth to publish and retrieve data. Kafka similarly has a similar protocol for publishing and subscribing data. Pulsar has what's called a protocol handler framework shown here in this yellow box that would automatically take the commands from one protocol, in this case Kafka, and translate those automatically into the commands that Pulsar needs to handle in order to store data and retrieve data. Whatever the case may be. This allows you to natively take your existing Kafka applications using a Kafka library that you already tested and vetted out, and switch it to. Point to Apache Pulsar and you can operate seamlessly. This is a great use case. So if you've invested a significant amount of time into one of these messaging technologies, or maybe many of them, you can simply change a few configurations and reuse all that code and not have to rip it out and replace it because you want to use a new messaging system. So the barrier to adoption is much lower with this, as we mentioned before, the key to having a really good, strong streaming foundation is a teamwork, and in patchy pulsar that's reflected as a strong and thriving ecosystem across different components, as shown here. So as I mentioned before, the protocol handlers in addition to Kafka we spoke about in the previous slide, we have one for MQTT, we have one for AMQP as well. So you can speak all these different messaging protocols, a single system to speak them all. Multiple client libraries this is just a sampling of some of the ones that are supported. As you can see, all the more popular ones are supported. Java Python for those of you using Python, it's a very big deal. Python is a first order client library and has unique features that we will talk about connectors and sync so you can move your data into and out of these other systems. Process data inside using different stream processing engines shown there down below like flink, do some analytics on it and then push it to your delta lake. That's all very easy to do natively with these connectors. You also have pulsar functions, which is a lightweight stream processing framework. I'll talk about in the next slide that supports Python as a development language for one of these particular features, and we'll get into that in a little bit and how you can leverage those for machine learning applications out at the edge. And last but not least, tiered storage offload. So as I mentioned before, Pulsar is architected in such a way that your storage and compute layers are separated. You can take full advantage of that by moving older data, retaining it beyond the traditional seven days you do in something like a Kafka, 30 days, 90 days, however long you want, by moving it to more cost effective storage. Cloud storage this blob, this object store cloud providers like S three or Google Cloud storage, you can have it internally. If you have an existing hdfs cluster sitting around, you have nat network attached storage sitting around. You can offload that data and use this lower cost storage to put the data there. And Pulsar can read it natively. It doesn't really need to reconstitute it or spin up some clusters to put it back in first. Like other messaging systems, you can offload it to s three, but step one is to create a new broker and load it first. You don't have to do that since we're just reading it from a pointer, whether it's on our local storage, in our bookie layer, what we call bookkeeper, or if it's on s three, it doesn't matter, it's transparent to the end user and all these connectors and documentation and the source code and the download is available at hub streamnative IO here shown on the left. So if you want to learn more about them, get some more information, download them, try them out, it's all available for you to use. I mentioned pulsar functions in the previous slide and it has a very simplistic programming model. So for those of you familiar with AWS, lambdas or maybe Azure functions, it's very simple. You give a single, you provide a user provided function, a single method implemented, and this, that individual piece of code gets executed every time a new event arrives on one or more input topics that you configure the function to listen to. So you can say whatever these topics come in. When an event comes in, I want to execute this logic and boom, it happens. And again, there's three languages supported, Java, Python and go. And more importantly, even though they're simple single method pieces of code, so they're very easy to write, very easy to learn. You can still leverage the full power of any third party libraries you want to use inside this code itself. So we support the importing of these different third party machine learning execution models, for example. So you can do more complex stream processing, machine learning on these individual streams, data streams as they come in very easily on that. So it's a very nice framework for doing that. Here's an example of a pulsar function written in Python. As you can see here we have a couple of different import statements. We brought in a sentiment analysis tool in this regard, and as you can see we've imported the function definition and have a class that implements it here chat. The method there, called process, is the code that actually gets executed every time a new method arrives on one of the input topics, and the input, as you can expect, is that second parameter there called input, and you can manipulate that data as you see fit in this particular code example here, we parse it out as JSON. We know it's JSON. We pull out some individual fields that we know are there, we run some analysis on it, we determine if the sentiment is positive or negative, and then as a result of that we can publish those results to an output topic. And that's where this return statement comes in. So when you run the return statement, that's where the outputting of the data to a downstream topic has occurred. You don't have to do a return every time, but if you do, that constitutes the publishing of a new message at that point in time. So it's a really great feature. Again, this is for Python, and Tim will talk more about it in the presentation about how we're going to leverage these and Python library to do some cool machine learning models and machine learning processing with Python. So thanks David. Let's get to the next slide here. Always fun. Let me get out of full screen mode here, and we'll get to some of the other libraries here. So at a minimum you'll have your Python three environment on whatever kind of machine it is. Mac, windows, raspberry, PI, ubuntu, whatever it is, you do a pip install. 211 is the latest. Now if you're in an exotic platform, weird hardware, you might have to do a build. You can get the source code, do a c plus plus build, and then from there build the python one. Pretty straightforward, but for most platforms you won't need to do that, including Mac, M, one, silicon or intel doesn't matter. The code itself, really simple. You import pulsar, create a client, connect to your cluster on your port, then create a producer for that particular topic. Now the format for that is persistent. Most topics persist the data for as long as it needs to be. You can make it non persistent, not very common, but if you don't care about the messages, you just want fast throughput. Maybe you've got something that's always on, just as on some kind of IoT device. Maybe you don't need to persist it, send it, encode it, close the client, done. That's the simplest way to send messages with Python. That's probably not your production use case. You probably have to log in. So say I wanted to log into a secure cluster running in the cloud. I could use SSL, pick my port there again, connect to that topic. If I'm going to use Oauth, put out the issuer URL for that OAuth provider, point to my private key, set up what my audience is for that cluster when that data comes in. Same idea here, but I'm also importing authentication. It's going to authenticate against that. Got a number of examples on how to do that. If you're trying that out for yourself, you can get a free pulsar cluster at stream native to try this. It's a great way to learn it without having to install stuff or set up your own docker. There's also a cluster available if you do the free training at stream native. Now, if I want to use one of those schemas that I talked about before, Avro is one type, I'll import schema, I'll create a schema. It's a very simple class. Give it a name record which comes from the pulsar schema, and put all my field names and what type they are. You'd also specify if they're nullable or not. Connect to the client again and you could do oauth if you want. And then for thermal, I'll create the schema based on this over here and send that. When I create my producer, I'll specify the schema that way, put in a couple of extra parameters so people know who I am, and then create my record by setting a couple of fields and then send that here. I'm also setting a key. It's good so I can uniquely identify my records, and then bang, it's ready to go. If you've never sent a schema to that topic before, it'll create the first version of it. If this is an update, it'll add a new version. If you haven't created that topic before, if you have permissions, it'll create that topic under that tenant and namespace specified there, and set up that schema for you. It's really easy, especially for development. Once you're in production, you probably want someone to control who creates those schemas and topics and when and who has permissions. All that fun security stuff that people love out there. Now, if I want to do the same thing with Jason, it is not drastically different. I recommend you keep these topics separate, because I can have millions of topics, no reason to get in trouble and trying to put two different types of schema on the same topic. Create a new topic for everything you're doing. So if we want to do JSON schema almost identical, we just use JSON again. I could use that same class, put that in a separate file, and create one topic that has the JSON schema. One has Avro, one has protobuff. Whatever. Your downstream system is happier. Know Nifi really likes the JSON or Avro one. Flink might be better with know you might have a tool that works better with JSOn. Find that out, send that data that way. Very easy to do that. If you want to get data back, you subscribe to it, give it a subscription name. There's different types of subscriptions, it'll default to different types. And I could just go in a loop here, receive it, display it, and very important here acknowledge once I acknowledge a message for my subscription, mine for this topic, under that tenant and namespace, that message has been received and I acknowledged I received it. That means if there's no new messages out there, that data could be expired away if you want, or sent to long term storage, depending on your purpose out there. As long as no one else has a subscription that has unacknowledged messages. This is nice. I can acknowledge messages one at a time. It's not a build like in Kafka. This comes out very handy. If maybe you want to skip a message because it looks a little odd, maybe have someone else look at those later. Lots of reasons why you might want to not acknowledge messages, and you can unacknowledge them, and that way they'll live as long as your system allows them to live, which is configurable. Now, we talked about those other protocols, very important with ML. You may be grabbing stuff off devices, you may want to send things to a device. Maybe it only supports MQTT, use that protocol. And I could still point to pulsar, use those native python libraries, doesn't matter, point to those same topics. It is a very good way to have different protocols as part of your ML stream. Very common for IoT sensor use cases. This does come up a lot. This is a good protocol for that. It is perfect if you're sending a lot of data. It's not my favorite because it can be a little lossy. So be warned. If you're sending data with MQTT, keep an eye on it. But it is a nice way to do that. Now, another great thing, it's very easy to do websockets in Python, so we could use the websocket client to connect to pulsar as well. And we point to our topic. This will go over, rest, encode the data and send it. Very easy to do. We'll get a response back saying that it was received. Great way to get your data into pulsar. Also a great way to communicate with say a front end client which could be pure javascript. Nothing fancy there and we've got some examples later. It's a good way to communicate between apps written and say node js and Python. That can happen a lot in different ways that you want to display machine learning applications. Something to think about. I can get events and switch them over to Websocket style communications without having to add other servers, fancy libraries, or any kind of weirdness straight out of the box. Standard websockets and base 64 encoding. Boom. We can have as much security as you want as well. And including encryption. Of course those things get a little more complex now Kafka is huge and has a lot of great use cases. If I have an existing Kafka app in python, could use that same library port, those things get my data, use the standard Kafka producer. But I'm going to point now to pulsar which will act as those kafka bootstrap ones. Very straightforward, nothing to worry about there. And then do a flush. So real easy to do Kafka and Pulsar and I link to some resources there, show you how you do it again, the same code could just point to a Kafka cluster as well, so no reason to write any custom code. Point it to whatever Kafka compliant broker that you have out there. Really nice. Now I want to show a couple simple DevOps here just to show you that this is a modern system. Now we could use a lot of different ways to deploy things. One way is command line and that could be interactive. It could be command at a time. Obviously this could be automated a ton of different ways. Whatever automation DevOps tool you like, this can also be done through rest endpoints or through different cloud uis. There's a really nice one in the open source for doing pulsar administration. You could look at that one. This is as hard as it is to deploy a function and do a create set auto acknowledge. So when event comes into a function we acknowledge it right away. Point to my python script, give it the class name that I'm using in that application. Point to as many topics as I want to come in here for this one. It's just one. I could change that on the fly later. Also through DevOps add more easily. I had a log topic, so anywhere I use the context to log it will go to a topic. So you don't have to search through logs or write a third party logging system. These will be just data in a topic which I could have a function read or just any app, doesn't matter, including a Kafka app. Give it a name so I know what my functions is and put it an output now inside the code. I could send it to any other topic if I wanted to, including create topics on the fly. Source code for this one is down the bottom. This one's a cool little ML app. What happens is I connect a front end web page and it sends to a kafka topic which you can see here is chat. That triggers an event into this sentiment. Python app which reads that looks at your text there and applies the sentiment, puts the results of that and some other fields in an output topic which it also displays in that front end. Again, a great way to communicate between Python and JavaScript, even if that's in a phone app or a web app, or in a notebook, wherever it happens to be. Great way to communicate between systems. Do that asynchronously and apply ML in different pieces depending on where it makes sense. I don't know what that one was. We're going to walk through some code here. We saw this before. If I'm going to set up a schema, give it a name. Record comes from the schema library. That's important. We set our different types here. These are the names we're using. Very straightforward, but that is my schema. I don't have to know how to build an Avro schema or how to build JSon or protobuff. It'll be build from this, which is very nice. You just write some Python code, create connection here to that server. We'll build a JSON schema off of that, give it a name. You could put some other fields in here for tracking that doesn't affect the data here I like to create a unique key, or relatively unique doing a build in standard Python library format. The time put that together as a big key that's reasonably unique. Create a new instance of my stock, set some values. These are getting pulled off of a rest endpoint. I'm reading format these in the right format, clean them up, add my uuId. I'm also sending that as the partition key. If I got my stock data, send it, and then this could be sent to another processor to read this. And then the end result of this can be displayed in a front end. Because we could read websockets, you could also read rest. Reading websockets from Javascript is really easy. And I apply that with a table library. Boom. You could get live data. So as events come in, the websocket is subscribed and it'll just get more and more data and it'll come into this table and you could sort it. All that kind of fun stuff. Search it. Nice way to do that. When you're setting up these web pages, it could not be easier. You point to a style sheet out there. You could either download it locally or point to the CDN. I'm using jquery to do my formatting. Pretty straightforward. The data table library I'm using is also using jquery. Pretty simple here, and I point to it down the bottom. This also has my function that's doing some processing of some data that's coming from one of our friends in the streaming library. That's Apache nifi. Gets that data into the funnel for me. Very easy to format my table. Set up a head and a foot with some ths in the field names. Make sure I give it an id. I need that later so I can get access to that data table. Connect to Pulsar over that websocket. Set up a subscription name. I also say I'm a consumer for the persistent tenant namespace and topic here, and I give it the subscription name of TC reader and say it shares. If I want multiple people reading these at once, I could do that. Connect open my websocket. If there's an error, print out the errors here. When a message comes in, I parse that JSON data. If everything looks good, I'm going to take the payload that comes from that request, parse that as JSON, and get all these fields out of it. That's how we sent our message. Add a row to the screen, boom, it's displayed. Article shows you some details on that. Very cool way to display your data. Now to get data in, apply some machine learning, get data between different systems, maybe move it from pulsar to data store or say Apache iceberg. Data flows are a great way to do that. We ingest the data, move it around, routing all of it. Visual, very easy to do. We guarantee the data is delivered. We have buffering and backpress is supported. You could prioritize the queuing of data. You pick how much latency or throughput you need. We get data provenance lineage on everything. So I could track everything that happened. Someone tells me the data didn't show up downstream. I could look in the data provenance, which can also be programmatically looked at very easily. Data can be pushed or pulled. Hundreds of different ways to process your data, lots of different sources with version control, clustering, extensibility expands out to support millions of events a second. What's nice with Nifi is I could also move binary data, unstructured data, image data, PDF word documents, as well as data we traditionally think of as tabular data, table data, stuff that you might want to be working with usually. But this is a nice way to move some of that data you might need for machine learning and deep learning. Because I can move images, I could pull stuff off web cameras, you could pull stuff off street cameras very easily. Do some enrichment. I'll do it all visually, do some simple event processing, get that data into some central messaging system, lots of protocols supported Kafka, Pulsar, MQTT, Rabbit, all those sort of things. TCP, IP logs, all those sort of things. Simple architecture, but expands out to as many nodes as I need and could do all that with kubernetes or get some commercial hosting out there very easily in all the big clouds with no headaches. I have been Tim Spann, got all my contact information here. I do meetups like once a month. Definitely would like to see people. We do Python, we do nifi, Spark, Pulsar, Kafka, Iceberg, all the cool tech here. This is Nifi. It is a great way to integrate with Python and there's some new features coming out soon which will be out in the next release that lets you integrate Python and extend Nifi using Python apps. You just drop it in a directory and it'll automatically show up in this list. Or you could just use something like this thing here and this will execute your Python script or other languages. But face it, most people use Python. It's probably the best language for integrating anything and doing DevOps. But I just wanted to show you that. Give you an idea what you could do with something like Nifi and all the different things you can connect to kafka, pulsar, know Amazon, azure, google, lots of cool stuff there. I have example codes out there in GitHub. So if you go to my GitHub t span, hw, I've got a ton of stuff out there. One of them is for reading weather sensor data. Again, using pulsar part is easy. Connect into various libraries, create my schema from the fields, connect to all kinds of stuff there, build up my connection and my producer get my sensor data, get some other data like temperatures and other stuff on that device, format them all, send a record, print them very easily. Could also do things like do SQl against these live events with something like flink. Pretty easy to set up. We've got examples with the full docker here. Download those, get going. I've got other ones for doing different things like grabbing live thermal images, sending them up to imager, sending them slack discord email, and sending all the metadata to Kafka or something that looks like Kafka. And we could process with Spark or Flink or whatever. You could also send other data into Kafka. Pretty easy. Got the breakdown of the hardware here. You want to do that on your own. You could find yourself a raspberry PI somewhere. Example apps tend to be Nifi produces into Pulsar. And then maybe I'll do SQL with Flink. Maybe I'll be producing data with Python. Lots of different options here. Lots of examples out there. Thanks for joining me. If you have any questions, drop them in the system there or check me online. You can get me on Twitter, GitHub, any of my blogs out there. If you search Tim span and anything streaming you will find me. Thanks a lot.
...

Tim Spann

Principal Developer Advocate @ StreamNative

Tim Spann's LinkedIn account Tim Spann's twitter account

David Kjerrumgaard

Developer Advocate @ StreamNative

David Kjerrumgaard's LinkedIn account



Join the community!

Learn for free, join the best tech learning community for a price of a pumpkin latte.

Annual
Monthly
Newsletter
$ 0 /mo

Event notifications, weekly newsletter

Delayed access to all content

Immediate access to Keynotes & Panels

Community
$ 8.34 /mo

Immediate access to all content

Courses, quizes & certificates

Community chats

Join the community (7 day free trial)