Conf42 DevOps 2023 - Online

Building Modern Data Streaming Apps with Python

Video size:

Abstract

In my session, I will show you some best practices I have discovered over the last seven years in building data streaming applications, including IoT, CDC, Logs, and more. In my modern approach, we utilize several open-source frameworks to maximize the best features of all. We often start with Apache NiFi as the orchestrator of streams flowing into Apache Pulsar. From there, we build streaming ETL with Apache Spark and enhance events with Pulsar Functions for ML and enrichment. We build continuous queries against our topics with Flink SQL. We will stream data into various data stores. We use Python for microservices as Functions and stand-alone apps. We use the best streaming tools for the current applications with FLiPN. https://www.flipn.app/

Summary

  • Tim Spann talks on building modern data streams applications with Python. Pulsar in the middle is a hub to distribute your data wherever it needs to go. You can use a lot of different data processing and data query tools. If the application is small enough, you could do this by yourself.
  • Pulsar is a streaming tool that makes writing modern applications and using DevOps to put them into wherever environment you are using very easily. Over 560 continuous to the project out there, over 7000 people in the slack.
  • Pulsar is a great way to have your Python app talk to other apps, different systems or tools. It has support for different protocols. Recently there's been the addition for pulsar of a pulsar shell. Allows DevOps automation on the fly.
  • Pulsar is multi tenant, so I'm going to create a tenant for this application. Then I can create my topics. You could have millions of topics if you want here I could list them. Applications can be really simple. Full app available at the link below.
  • We have a free book out there for learning. Covers everything in pulsar. Scan that, get the full pDF. Or contact me through LinkedIn or through Twitter if it's still out there. Make it pretty easy.
  • Python app consuming data from a Raspberry PI, sending it to pulsar. Pretty straightforward, but makes for a nice way to do applications. Ask any questions to the different channels and I'll be very glad to get back to you.

Transcript

This transcript was autogenerated. To make changes, submit a PR.
You hi, Tim Spann here and welcome to my talk on building modern data streams applications with Python. And we'll cover this in a DevOps manner, but this is important way to get started. I'm Tim Spann. I'm a developer advocate at stream native and you could scan all my info here. I've been doing different types of systems for a long time, focusing on streaming applications, Python, Java at a lot of different big data and streaming companies over the years. Every week I put out a newsletter. The flipstack weekly covers all the big open source tools out there. Lots of good videos, podcasts, articles, tools, examples, demos, everything I could find. That was cool that week I put it in there. It's archived lots of different ways to take a look at it, check it out. So when you're building modern data applications, it's not something you could do with one tool, one system, one application requires a team, that usually requires a team of people, but if the application is small enough, you could do this by yourself. If you've got the right tools. Python makes it pretty easy. Pulsar in the middle is a hub to distribute your data wherever it needs to go, and you can use a lot of different data processing and data query tools to make this easier. Depends on what you're familiar with and what you like working with. Examples are Apache Nifi, Apache Flink, Apache Spark, Apache Zeppelin, Trino. Lots of good products out there, going to start off and do most of this talk is going to be around Pulsar which is a streaming tool. It is growing, a lot of people are using it, over 560 continuous to the project out there, over 7000 people in the slack, tons of work going on, logs of organizations using it, so definitely check it out. Pulsar has some pretty cool features that make writing modern applications and using DevOps to put them into wherever environment you are using very easily. IoT is very easy to manage your clusters from one place, and again, whether it's through rest, endpoints, command line tools, or you automate it as much as you can through the different DevOps tools that you have out there, it's very easy to do so, and it's very easy to scale things out and up elastically when you need it. And again through those APIs or DevOps tools. It was designed for the cloud. So where the compute happens and where the storage happens, two different places, very easy to partition, rebalance things, don't have to take things down, could spread this around the world and again do that through automation and it's very easy to do a lot of different use cases there. And since it has support for a lot of different protocols and libraries, very easy for you to take what you have now and move it over. Not a lot of extra work when it comes down to it. When you're doing messaging, streaming, you've got a unit of data. Now, some people call it an event, an object message, we'll call it a message. And there's a couple of components for it. Obviously you've got your data. That's the payload, the value. This is what you care about most. This is your record, whatever it is, wherever IoT came from, we put that in the value. This is the most important thing to you because this is your data. You want to distribute, deliver, wherever it's going. Now, in pulsar, that's raw bytes. Now you can have that message data conform to different data schemas. And obviously since it's raw bytes, I can put anything I want in there. So I could put a PDF, but I could also put JSON, Avro, Podobuff, all the different data types in there. Really nice. Now a commonly used thing, and it's important for performance queries, a lot of different things. Is a key, a key per one of these messages. Please set this comes in useful for partitioning, compacting data, making things more performant, and also for your own use. Cases of finding that message later, maybe for debugging or just being able to do value lookups. This key can also be really important if you want to keep the latest version with some of the newest APIs. The table API lets you keep and access these very quickly. So definitely set a key. It's a good property to have for any use case. Do it. If you want to have more key value sets, a bunch of properties, you can add as many as you want to a message. Obviously don't put a whole database in there. That's kind of silly. Set a name for who produced this record. It's good for debugging, auditing, all kinds of things. Just do it. Otherwise we put a really dumb name in there. So put your own in there, make it smart. Every message is going to have a sequence id. This is so things are ordered. You don't have to do anything for it, but if you want to query it or access it, that is available so you know what's going on. Now, we talked about bytes, raw bytes. Might be hard to have a contract between your different systems, different users of data. You probably want a schema for your data. If it's structured or semi structured. Fortunately, pulsar handles this for you. You don't have to spin up something else, don't have to use some third party tool. Schema registry is built in, don't even have to think about it. So in Python I can just create a record, put data in it, tell the producer what type of schema it is, and it'll build it for me. Ready to go. It'll version Iot if I change it. It's nice. Now, by default we support Avro, Protobuff, and JSON. Those are the most common. This is flexible, so if you need to add more types of schemas, you could do that, but those are the most common. Reproduce it, person on the other side reads it, use that schema, deserializes it. We're good to go. So we've got data, we know what it looks like. That's great. A quick look into some new DevOps stuff. Recently there's been the addition for pulsar of a pulsar shell. This lets you interactively, or if you want to run a set of commands in a list. Again, easy to DevOps at. The pulsar shell is pretty cool. It envelops all the different command line interfaces that you could do. So administration, sending, receiving messages, configuration. Great system to have. Now, one unique thing with pulsar is support for different protocols. So to show this to you in a way that makes sense, I put in my cat, his name is splute, and he's showing you the four cool ways to get your streams out there or into the data stream. We could use mop, which is MQTT common in know you get all these devices floating around here, pop that data out of there, get that into your stream. AoP, little less common, but you'll see that in a lot of old enterprises, whether that's AMQP or RabbitMQ, we can take part of those messages. Kop is for Kafka. We could talk and receive Kafka messages. And finally, sometimes you want to do websockets, very easy to do in Python, so you could do that. We support full Kafka protocol, so you use all the existing Kafka libraries, including the ones for Python, and it'll just act as a Kafka broker. So this is nice. If you have some legacy Kafka apps, you can have them come right through. Very easy to mix and match Pulsar and Kafka makes it easy. There's a big ecosystem out there with all the different things that you could use within pulsar again, we mentioned those different protocol handlers, ton of client libraries. So say you don't want to always do Python. You could do Java, go node JS R C Plus plus C sharp, Scala, ton of other ones gives you lots of different options for programming. This is a great way to have your Python app talk to other apps, different systems or tools, lots of connectors. So if you don't want to write code, or maybe don't want to write code for the source, have one of these connectors do it for you. Just do a little configuration. Again, DevOps app, so you can automate that very easily and have the data come into the stream very easily. You write some functions and I'll show you a little about that. If you want to process your data once it's in the system, all the major ones are out there. Trino Presta Spark, flank lambda very easy. Once your data gets huge, if you don't want to store it all in pulsar, we can automatically tier that out to the major cloud storage or hadoop, whatever makes sense for your environment. Very easy to do. Now one of the cool things you could do with Python and pulsar is write functions. What's nice here is it's kind of like your own little AWS lambda. Wherever you're running this, you write a function in Python or another language like Java go. You deploy this out there and very easily it is ready to be distributed and you could subscribe it to multiple topics. You could change this on the fly with DevOps automation. So I could decide, oh, let me change what topics come in, decide what topic comes out, change what the log topic is. So something comes into your function which is called every time an event or message hits the topic, runs your code, and your code does whatever you want it to do. Could call ML models, anything from any Python library runs that code, and by default it could just return it to the output topic that's been applied. And again that could be changed with automation or within your code. You could have it decide to go to another topic based on the data or based on some circumstance, maybe an error, what have you. Very easy to do, and by default using configuration we could have any of the logs from this go to another topic. So you could dog food IoT, have it go to another function or anything that reads pulsar for IoT. To process your logs makes it very easy. Don't have to worry about command line stuff all over the place, or how do I get to my logs? Very easy. This is a small example of a Python function that I wrote for pulsar. As you can see, very easy to do. I'm just using a standard Vader sentiment library, and this is how it works. You set up a class, give it a name, it's a function, you set it in it. You can do stuff in there if you need to. Process is what's called per event, and that is the input field you get there. Context gives you access to anything from pulsar. This gives you access to different topics, logs, buffers, lots of interesting things you might need. That's up to you if you want to use it. By default, it's just a string. You could change this based on how you want that data coming in. Obviously processing strings of JSON is trivial in Python, so I'm going to do a sentiment on one of the fields. I'm going to create a new output format in JSON by taking the data that was there already and adding the sentiment, sending that with that return that's going to go to whatever topic that was configured to be my output topic. You can't get much easier than that. And when I want to deploy it again, like we mentioned, automate that as much as you want. This is the command line tool. Create your function, point to wherever that data is. If it's java, it's a jar. If it's Python, it's just that Python file or a zip or a directory. Give it the name, any inputs you want. This can be as many topics as you want. Point it to your output topic, give it a name, and it's deployed. What's nice is lots of different ways to deploy this by default. If you just want to run that in pulsar, let pulsar handle it. It could do it. You deploy Iot separately to kubernetes. There's also an open source project called function Mesh. This is a very cool system that will run all your functions for you. Scale them up, do monitor, manage, all that kind of stuff. Pretty cool. Opensource project now when I'm going to automate setting up a cluster, these are the command line tools to do that. Pulsar is multi tenant, so I'm going to create a tenant for this application, create a namespace for this app, check, make sure they're all there. Then I can create my topics. You see here, the topic name is a little long. Persistence tells me store that message until I've acknowledged it, or told you to delete it, or put an expiration on it. They could be non persistent if you want. Most people make them persistent for your own preservation of the data. Then I've got the tenant. You could have as many tenants as you want, then the namespace underneath there, then a topic. You could have millions of topics if you want here I could list them. Obviously this can be done through the rest endpoint, and this could be automated with any tool that you like using. Now, if I'm going to do anything with Python, install the Python poster client. We are only supporting Python three, the latest couple of versions. So use the latest version if you have to, if you're on some weird format. This has been built for most platforms out there, intel and Mac and arm and all those fun things. But if you're running something unique, maybe you got your own environment. You could build the Python client by building the C client that is built on takes a little extra work, but you've done that. I've done that on my own. I did this before the official client was out, but that's out there now. But just as an exercise, if you're into building long applications, you could do that. I recommend going back a couple of versions depending on where you're going to install this. Right now we're up to 211, but two, 9210 depends on what cluster you're running. If you're starting new, let's start with 211. There's a lot of new features in there, so make it happen. Applications can be really simple. Import pulsar, create your client, point it to your cluster again. If you're running in docker or on your laptop, local host or the name of the machine, create a producer pointing to that topic, send a text message, we'll encode Iot UTF eight, close the client, boom, you sent a message. Pretty easy. Now, unfortunately, real world's not that easy, right? You have security, you got all kinds of fun stuff you got to do well. I can pass all those in with parameters and use authentication if you want to. OAuth is the most common. If you want to get a free cloud of this, if you don't want to install it, your own stream native has a free instance you could use. It is secured with SSL plus Oauth secure way to do this, you get a key, boom, point to it, it'll connect just as fast as the other one. We've got some examples out there. Now I mentioned schemas, pretty easy to create a schema. Got to import this extra library here. I'm going to set up a class using records from that library, and you put a bunch of fields in there and put your types in there, create your client like before. But now I'm going to create a schema for that class. And then when I do a producer, I'm going to pass that schema in. So now that topic uses that schema and versions of it. If you send something that doesn't have the schema, Iot won't meet the contract. Your data quality is not going to work, and we'll let you send that message. So think about it before you do that, make sure people agree to it. So I set the record, send it, and set my key there and we're ready to go very easy. Full app available at the link below. Do the same thing with JSon. You see the idea, really simple. Can use those same records. Maybe you want one topic with JSON schema, one with Avro. Depends on your downstream systems. Maybe it's up to you again. If you want to produce messages, pretty easy. If you want to consume them, you could put it in a loop or have some callbacks. It gets its own unique subscription name. We get the message, we acknowledge it. Let me point out a couple of things. There subscription every client has one, or you could share one. What that means is I have a unique pointer stored in the broker that tells me where I am. So if I disconnect, come back later, it's going to put me in the right spot so I don't lose messages, I don't go out of order. As soon as I acknowledge it, I move my pointer forward Iot. Going to reread those again. Unless you want to do another interface and scroll back to an earlier one, this is great. If you are done with it after acknowledgement, it could be cleaned up and deleted if you want to save space, up to you. Now if I want to use a different protocol to send messages to pulsar, I could use the MQTT libraries out there. Paho has a great one. Same idea. Connect to Mqtt, give me a client name, send some jSon, point to the server, use a default mqtt port of 1883. Obviously you could change that to whatever you want. You could turn on SSL if you like. Point to my topic, send the data boom. Makes Iot easy. If maybe you don't want to use the pulsar libraries or you have mqtt app. Same idea for websockets, use that websocket client out there. There is a little bit unique syntax in here. Make sure you point to your pulsar server with your rest point typically 80 80, but you can change it producer. And again, that tenant namespace and topic create to it, send a message, encode it. We have to base 64, encode it, and put it in a special JSON package to get it over there. But then we'll get it back. We've got examples of that. If you want to use Kafka library, use the Kafka python library that's out there in the open source. Nothing specific to pulsar. Again, semi json over there. Use any of the standard kafka. We're just pointing to pulsar now. Same idea. We talked about deploying functions before. Here's one specific to that ML little function I did again point to that Python script, give it the class name. What's the input? What's the log, what's the output? There's a couple of other options you could put in there. I'm doing auto act true. So what that means is when that function runs just before it finishes, it's going to acknowledge that it consumed that message. So if you need to sweep it up or put it into tier storage, that'll be done. Otherwise, if you don't do that with this subscription, that data is going to wait around till the system is out of space, or give you a warning to say, hey, you've got these things lying around for a while. We have a free book out there for learning. Pulsar is written by my friend David. He's been doing pulsar for a long time. He's a committer, does some really cool talks. He's done some talks with me recently. Definitely check this book out. Scan that, get the full pDF. It's the complete edition, not an early release. Every chapter, all the example code. It is great. Covers everything in pulsar. This is the way to go. Download it. How can you complain about a free book? This is me around halloween time. Scan my thing. If you want more information, I've got a bunch of python and pulsar resources. Or contact me through LinkedIn or through Twitter if it's still out there. Or definitely check out my GitHub. I've got a ton of examples with different languages, different systems, what have you. Make it pretty easy. So we've covered the slides, you'll have them as part of this talk, so you don't have to worry doing screen prints and stuff. You'll get everything you need there. But let's look at some demos. So this one's pretty simple, but I think it's pretty cool. On a raspberry PI, I've got a PI aware device, a usb stick and a little antenna so I could look at all the planes flying over my house over here in New Jersey, and that'll work anywhere where you have planes. I'm near a couple of airports, so I see know there's the Newark airport, JFK, some other ones, some smaller airports down here, and military ones, so you get to see different planes flying around. Pretty cool. What's nice is I wrote an application here that runs on the PI, and that is a python app that is consuming data from that device, pushing it out into pulsar, and then we could use it for some fun stuff. I have a function out there that takes it and cleans it up, turns it into other records, and they could do things like consume it from spark or with flink. Pretty straightforward here, and it's a lot of data. If we go into here, I could show you how easy that is to run it. It is just sending big chunks of data. It is the simplest app you're ever going to see, but I'll show it to you anyway. Source codes available. So what we do here is import all those fun things you need. I like to get information on my device as extra fields, so look at the temperature, all those things on the pie. Get the arguments for where we're sending this to pulsar because you got a bunch of different options there. And then we connect to pulsar if we have security. And then I'm going to create a unique id. I like to have a unique id for every record I send again for that key. So I'm calling an API that you saw displayed as a web page on that device, grabbing it, grabbing that raw ugly JSon and just sending it to a message. And then I've got a function that cleans it up. But just to give you an example of what you could do here on this device, I also have some sensors. Again, since there's so few raspberry pis left in the world, I'm trying to get as much out of these devices as possible. So I've got some sensors running on there. And again, all the code is out there. This is running a couple of sensors formatting as JSON and sending it to pulsar. If we look at this code here, you'll see again another Python app and I'll show you the code here. And pretty straightforward bunch of imports and a bunch of fields for this one I put required on there because I need that downstream when I'm going to drop this into a data lake. So I don't want to have any empty fields there. Sort of requirement for that data sync. So I grab the data from those sensor, format it the way I want, put it into the record in the format that I want, and then send it with a unique key, and then it's just a loop. So you see when we run it, it's just passing through there. Once it's in pulsar, I could do anything I want with it. Obviously you can add any kind of deployment scripts around this. Pretty straightforward, pretty easy, but makes for a nice way to do applications. Also have one that does stores. Again, pretty easy in pulsar to grab stuff from. This case a websockets. Pound that data out, send that where it's going. Same thing for mastodon. Pull that data using the Python API, send that to pulsar. Again, pretty straightforward. Make a record, set a key, send the data. Pretty easy. Schema is accessible from the DevOps tools. You could see what's going on, change the versions, all that sort of fun stuff. Hope you liked my talk. Ask any questions to the different channels and I'll be very glad to get back to you and give you a hand. Thanks for attending my talk. If you need any other information, feel free to reach out.
...

Tim Spann

Developer Advocate @ StreamNative

Tim Spann's LinkedIn account Tim Spann's twitter account



Join the community!

Learn for free, join the best tech learning community for a price of a pumpkin latte.

Annual
Monthly
Newsletter
$ 0 /mo

Event notifications, weekly newsletter

Delayed access to all content

Immediate access to Keynotes & Panels

Community
$ 8.34 /mo

Immediate access to all content

Courses, quizes & certificates

Community chats

Join the community (7 day free trial)