Conf42 Python 2024 - Online

Using Apache NiFi, Apache Kafka, RisingWave, and Apache Iceberg with Stock Data and LLM

Video size:

Abstract

In this talk, we’ll discuss how to use Apache NiFi, Apache Kafka, RisingWave, and Apache Iceberg to process and analyze stock data. We demonstrated the ingestion, processing, and analysis of stock data. Additionally, we illustrated how to use an LLM to generate predictions from the analyzed data.

Summary

  • Kareem Wallach has a consulting company called Project Elevate. He does developer relations, community stuff. He will talk about a portion of some of our presentation with Tim.
  • Stream processing is basically a approach dealing with data where you're. ingesting data as it's happening. It allows you to do real time analysis on the data as you're bringing it in. downstream side, stream processing is used to extract valuable insights in real time.
  • There's different kinds of queries you can do inside stream processing. The first one is filtering. This involves a selection of specific data from a stream based on predefined conditions. Pattern recognition is used a lot like with anomaly detection related stuff. Attempt stateful processing. Maintain state information to analyze data over time.
  • Rising wave is a stream processing database. It allows you to pull in data as you're ingesting the stream. The storage and computation side of things are separate. It's easy to use because it's all SQL and postgres.
  • streaming ETL is basically the practice of extracting, transforming, and moving data within different systems. It allows you to ingest the data from a wide range of data sources and transform that data in real time and then sync the process data to downstream systems. Real time analytics can provide business with valuable insights and actionable information real time.
  • Tim Spann: With generative AI, now you need a lot of data. He says tools like Apache, Nifi and Python let you get that data into the pipeline. A lot of it's enabled by Python.
  • Python can be used to process documents and different stock APIs. It's easy to send data from Python to Kafka. Something to think about while you're learning, trying things out.
  • I want to show you a couple of different things in Nifi. One of them is I could read data from Slack. There's a lot of different thing I could do within Nifi with Python. Another talk has more in depth on Nifi and working with the various Python processors.

Transcript

This transcript was autogenerated. To make changes, submit a PR.
Hi, everyone. I'm Kareen. I'm going to do my introduction now just because I'm going to do my portion of the talk now. So nice to meet you guys. Let me share my screen so that way you can actually see a picture of me so you know what I look like. That's me. I'm Kareem Wallach. I have a consulting company called Project Elevate. I do developer relations, community stuff. It's basically all things that work with developer tools and the community. Fun people like you and. Yeah, I'm just going to talk about a portion of some of our presentation with Tim. So just to take it off, I'm just going to give some background about what is stream processing, because I don't know what people's experiences are, if they're aware of stream processing or streaming technologies in general. So I'm just going to give a quick overview. So stream processing is basically a approach dealing with data where you're. Well, there's two parts to it. The first is a streaming ingestion part. So streaming technologies, things like Kafka or red panda, where you are ingesting data as it's happening, instead of like batch or like more like batch kind of processing. Batch ingestion, this is more of like a continuous flow. So as the events happen, you are constantly ingesting the data. And then the processing side is you're doing something with that data as you're ingesting it. So it's kind of as real time as you can get. So this process of stream processing, there's a lot of processes of stream processing and processing. The data process of stream processing. So the process of bringing the data in as it's happening and then streaming kind of allows you to be able to do real time analysis on the data as you're bringing it in, and then transformations and computation before you're actually storing the data. So if you're almost thinking about it kind of in a philosophical way, the data is, as it's being ingested, you're able to make those transformations while it's in flight and then storing it or putting it into another stream processing system or something like that. But then there's a downstream side, too. So basically, stream processing is used to extract valuable insights in real time, things like detecting anomalies and trigger actions based on incoming data. I do go over some brief use cases here. There is a lot you can do with stream processing and real time kind of analytics and capabilities, including enabling your users to do more by providing them actionable insights, which is basically real time insights that then they can take action on. If you think about LinkedIn, for example, some of the real time stuff that they have implemented into their systems is if someone views your profile, a user of LinkedIn can then go in and take action. They can message that person, they can view their profile, whatever. They can take action based on that insight that they now have. There's a lot of cases like this where people are exposing real time insights and real time analytics. The insights are a little bit different. It's more of like a decision making capability. But a lot of organizations are doing this now too. So that's something to keep in mind. So there's different kinds of queries you can do inside stream processing. So we kind of talked about the streaming technologies first, where you're ingesting the data and you're able to do these transformations on it as it's in flight. Then you have the type of queries that you're able to do inside of stream processing as the data is in flight. So just to give a couple of examples, I'm going to give some examples of each. These are all listed here, so you can read along if you'd like. So the first one is filtering. This involves a selection of specific data from a stream based on predefined conditions. So an example of that is like filtering out log entries that contain errors that are needed for immediate attention. That's like a filtering example. So this is an example of queries and stream processing. Aggregation is involving of summarizing or reducing data over a certain time window or key. Some of these might be kind of straightforward. You might be already familiar with these types of queries, but this is all stuff that you can do in stream processing in a very real time kind of capability. This allows you to calculate the average temperature over the last ten minutes in a sensor stream joining streams. This one I think is just really cool. Like you're able to basically ingest streams from multiple sources and bring them together to kind of have a little bit more of a dimensional analyzed. So one of these examples is like combining user click data with product data to analyze user behavior. Windowing, they group data into fixed time intervals or Windows for analysis. It's windowing. I think it's pretty straightforward. So example of that is analyzed the total sales for each hour of the day over a week. So that's what an example of windowing would be. Pattern recognition. This is used a lot like with anomaly detection related stuff. It's like identifying a sequence or pattern of events in a data stream. This is where you can find anomalies and things like that. So example of this is detecting a series of login failures as a security breach. Attempt stateful processing. Maintain state information to analyze data over time, such as tracking user sessions. So example, this is monitoring user behavior and detecting when a user session is idle. So I'm going to talk a little bit about rising wave. This is a stream processing database. Rising wave, it's open source. It's a SQL distributed stream processing database. So essentially what it does, this is like a lot of just jargon, but I'll just explain it to you guys. All right, so basically it allows you to be able to pull in data as you're ingesting the stream. So it works with streaming technologies, which I'll show you, kind of dig a little bit deeper into this in a second, but you're able to pull this data in and then you can perform incremental computations as the data comes in and then update the results dynamically. So ingesting the data and doing transformations and queries on this data as you're bringing it in and then storing it in a downstream kind of capacity. So there's a downstream side. So it does have its own storage. So you can access the data in rising wave. Specifically the storage and the computation side of things are separate, so you can actually scale them out separately if you want. And you can also do downstream in another database or data warehouse or whatever if you want to do that. The benefits of having this downstream thing with rising wave is easily accessing this data that you just perform these transformations on. It is SQL based, which is really nice. If you know SQL, it's pretty easy to pick up. It's postgres compatible, which is also pretty great. It's like very easy. If you've ever dug into stream processing, there's some technologies out there that are pretty big builds. Like the learning curve is pretty challenging. So this kind of serves as really easy learning curve because it's all SQL and postgres. So if you know SQL and you're working with any kind of postgres compatible type of data stores or anything like that, then it's pretty straightforward to use. I mentioned the storage and computation are separate. It has incremental updates on materialized views. I didn't put that last part in there, but I think it's important. It's also built with rust. A lot of people love that because it's very performant. Okay, so the data stream process, this is kind of what I was like making jokes about the stream processing, the data stream process, the process of the data streaming and the data stream process is like this. So as I mentioned, rising wave allows you and this just stream processing systems in general. But I'm specifically speaking about rising wave. In this case, it allows you to accept data from a streaming source. So you do have to have something that allows you to be able to ingest this data in a real time fashion. So as an event happens, it's pulling into the data into rising wave. So you can use any like, so these are a bunch of popular streaming technologies like Kafka and Pulsar. There's also red panda. It's not listed on here, but that's also another really popular one, too, Amazon kinesis. So you can pull these streams and these pieces of data into rising wave, and then inside it has the ability to ingest it. You can do kind of a lot of those different kind of queries that I mentioned in the earlier slides of joining streams, filtering, aggregation transformations, whatever, and then have the downstream side where you can store it to either rising wave or other kind of data warehouses or data lakes or whatever kind of processing you're looking for. So here are some use cases and examples. I'm going to run through these really quickly just because I know there's a lot of material in this talk. But streaming ETL is basically the practice of extracting, transforming, and moving data within different systems. So it allows you to ingest the data from a wide range of data sources and transform that data in real time and then sync the process data to downstream systems. Real time analysis, this is kind of a popular trend now everyone's doing real time analytics. So real time analytics is basically the practice of analyzing data as it's generated. So as soon as something happens, you're analyzing it, you're running some kind of transformation before it's even stored. And that offers you the ability to be as real time as possible because you're literally transforming the data. One thing that I think that's just important to keep in mind is a lot of times, and I know this happened for me, and this is why I kind of said this. I said this before, I'm going to say it again, because when I first kind of learned about real time analytics in general, I think a lot of us just think about the generalized uses of analytics systems where it's like dashboards and things like that. The capabilities of what you can do with real time analytics go way beyond dashboards. And this is kind of what I was talking about those actionable insights before, that's like when you're offering your users insights or real time analytics to be able to make smarter or more effective decisions in real time. So things that have that need to know of what's happening right this very second, you can either use it internally and have a system process it like a machine that's like a llamalay detection related stuff. You can have a human process it or you could give it to your end users. And then people productize these things too, where they literally, people pay for premium services either for an entire software or for just certain real time analytic capabilities and things like that. So that's just something that I think is really important to keep in mind in terms of real time analytics. It provides business with valuable insights and actionable information real time. And it does provide competitive advantages by enabling business to respond quickly and changing market conditions. It also offers competitive advantages because if you use it for the actionable insights that I was kind of mentioning with your users, you can make your users happier and they could have a more positive experience using your technology if they have a more real time, insightful experience. I mean, everybody knows we don't have patients, and as time goes, we have less and less and less patients. Okay. And it could also be used to improve operational efficiency and reduce costs so you can use it internally as well. Another example, event driven applications, enables continuous event ingestion and processing, maintains context across multiple events, and performs sophisticated processing, and then detects complex patterns and correlations within a stream of events, and provides direct integration with a variety of upstream and downstream systems. So really quick rising wave architecture, kind of what it looks like. So first is you have a serving layer. This parses SQL queries and performs the planning and optimizations of the query jobs. Then you have a processing layer which performs all your computation. Then you have, I think I clicked too quickly, but it doesn't matter. Metadata management service. This manages the metadata of different nodes and then coordinates operations amongst these nodes. And then you have the storage layer that stores and retrieves data from object stores like storage like s three. And I think that's it. And then, Tim, I'll let you. Thanks. That was a really good introduction to a lot of complex topics there. And I'll take over from here. So we saw some really important things to see on how you're going to be general topics, how you can query it with something like rising wave. But let's cover a little bit more. I'm Tim Spann. I do a lot of streaming stuff I have some virtual meetups around the world. Put a newsletter out, you'll get all these slides. I don't want to spend too much time giving you 10,000 overview things. I think the new thing to mention though is with generative AI, with these large language models and some of the smaller and mid sized language models, now you need a lot of data. You get that processed things like Apache, Nifi and Python let you get a lot of that data into the pipeline, whether it's going right to a model or using to a vector store. And then once you have it cleaned up, maybe I get the results from stock feeds, maybe I get it back from the model or some things stored in a vector store. I get that into something like Apache Kafka, which we have at Cloudera. And then I could share that with rising wave so they could do some real time analytic decisions on there. And maybe they put it into another Kafka topic which I could read later, run some additional processing. There is multiple levels of all of this, and a lot of it's enabled by Python. Python fits into many layers of this process, whether that's getting data into Kafka, and I'll show you that real quick, or it's being called by Nifi to do different processing for chunking up data, getting it into a vector store, running at the edge, working with different databases, lots out there. I have a number of articles on how to use that. Now we mentioned Apache Iceberg, again getting a lot into this one talk. But Iceberg is a very interesting table format that sits on top of usually apache parquet, could be another file format like Orc, but let's just keep it easy and say parquet. And there's tools built into the cloud to get data into these formats. And what's nice is having things in iceberg tables means everybody in every data store, any kind of query engine can access it. So it's very helpful. And I'll show you how to do that on the simple. So you could do that on your laptop just to try out. How can I get stuff into iceberg tables, into Kafka? Just using Python show you how easy that is. And then when you get into the enterprise level, all the governance and all the tools and encryption that you need in there are available. Don't have to learn anything new. Really easy to do that. Apache Nifi again, you can run it on your laptop or in production. Cloudera from Cloudera makes it easy and scalable and it's been around forever. So what's nice is it just doesn't die. It just keeps getting better and better. I've left servers running for months on an old laptop sitting somewhere. It just keeps running. And I show you a quick bit of that. And with the new version, which I have another talk on, we show you how we've made it so you can expand the tool and add your pipelines with Python. I show you a little bit of that today. Providence is important. So everything I run I know what happens. So if people go, how did I load this data? I could show you has a lot of pre built components to handle, lots of different formats for processing. This comes in really handy when you don't want to write your own boilerplate code to do everything. So do Python for the important stuff, complex processing, using some of the amazing libraries for vector stores and LLM, and all the easy stuff you could do with stocks. I'll show you some example code I have here so you could do that yourself really easy. We mentioned Kafka. Think of it as just an infinitely sized pipeline that I can put any data I want into, usually structured Json, avro protobuff, things like that. Get that in there and distribute to as many people who want it, which is really nice. I just have to get it in there. Then anyone who wants to subscribe to and can get to it and it scales out as big as you need to go. It is a nice buffer and pipeline to get my data to where it needs to go. But let's show you stuff actually running. You don't want to just see awesome looking generated graphics, but I'll show you one where I'm taking stuff from the fin hub stock API with Python. And what's nice there is it grabs websockets, push it to Kafka and do what I need to do with it. I can also process things like documents and different stock APIs, and I've got links to all the code here, so you can try that out on your own. But let's go through a couple of quick passes here and I'll first show you where the source code is. I've got a new one on grabbing stock data from Yahoo. Using the Yahoo finance library for Python. I am reading that and then as you can see here, I'm using Py iceberg to set up an iceberg catalog. Again, this is a small one, runs on your laptop using SQL lite, so you don't need a giant catalog or meta store out there, obviously not for production. And I've got a little mini instance running. So again, I don't have to pay for s three. Something to think about while you're learning, trying things out. Especially if you're coming from the Python world where you may not have access to a large data platform like Cloudera and you might have to just start learning on a laptop or whatever you have. So I'm just going through just calling as many of stock prices as I want, getting them back and sending them to Kafka. That's how easy it is to send from Python to Kafka. You set up a producer pointing to whatever servers you have. Again, for me, that's running in Docker. And then I send the message, give it a key that's gone, that's into Kafka. We'll show you that. And then over here I'm grabbing bunches of 1000 of those records and putting them into an iceberg table, appending that got a short sleep there. And every 1000 we refresh that just so the files aren't too small. They're still pretty small, but it's nice to keep them smaller than zero here. And then this is the data that's getting written to Kafka. It's Json, pretty straightforward and I can put as many as I want here and I could share it with whoever wants it. It could be a python user, a Java user, spark, rising wave, flink, nifi. Almost anybody can read Kafka, which is really nice. So we got that data in and on the other end I've got it into iceberg. This is the data. It's in parquet. The amazing part for iceberg is it creates this metadata that lets me do some amazing things like append, like change data, change field, do time travel. So I could say, what did the data look like two days ago, an hour ago? That is really cool. And that might be a feature you want. I'm going to send some more data into there. Let me show you some of this running. And I'm just running this like you saw in the code, just printing it out just to see that it's running. And then that is sending each record as it comes in into Kafka, which makes sense for Kafka. Kafka likes that sort of thing. As you get more and more data there and every thousand, I'm writing that to Parquet, which we can take a look at. There's the head. Just so we don't need a full query engine. I could just use the parquet tools to make sure those files look okay. And I could look at the header for that. You do whatever I want with those files, get a sample, query them, convert them. The command line tools are pretty good for that. And you can see, make sure the data is coming in and you can see we're pushing data out. We've got a couple of other ones. I mentioned that finance one, that is pretty easy for me to just run that and if I could spell properly and then I'd open websockets. You can see this one is super fast, python to websockets to Kafka, amazingly fast. And we just send that here and you can see that just the amount of rows just keeps piling up, really straightforward. And that one is documented here with the source codes. You can give that a try. And then another one I have is the same as that Iceberg one, but it is just sending it to Kafka, really straightforward there. And you can see that coming in last. I want to show you a couple of different things in Nifi, and one of them is I could read data from Slack. So here I put a slack message asking for the current stock price of IBM, and I've got a listener that's pushing it to Kafka. And then here if it is in the right channel, then I'm going to see if it actually asks about stocks. This one did. I'm going to use Python. I have a Python process to pull out the company name, and this is mentioned in that other talk. And then I'm going to use that to call Alpha Vantage to convert it into a stock title because your company name and your stock symbol usually not the same. And then if we got data back from that, I'm going to use Nifi to pull out the company name. And if there's multiple records, parse them, clean them up. Then I'm going to grab stock from twelve data for that, clean it up, add a timestamp and then push that to Kafka over here. Put that in a cache as well if we want to, and also trying to see if there's any wiki data for it. This is also a Python processor. If there's a wiki page for it, I download that as well. And we could push the results to slack, which again is nice. I can have Python do something, push it to Kafka, and then if I push it to slack, and then if I read it from Slack, maybe push it, execute some python, a cycle of what's going on. And you can see here I got some data back on this, there's some wiki information, plus the stock current value pretty easy to do that. There's a lot of different things I could do within Nifi with Python. Pretty easy. But just wanted to give you a short sample of that with stocks as well as over here, we've got stock news, but I've got a lot of records here. We could be sitting a while as it goes through all the news, but just to give you an example there, thank you for sitting through this talk. I hope you enjoy things. I know there was a lot in there. The source code is there. There's another talk. I have a little more in depth on Nifi and working with the various Python processors, so definitely check that one out. That is in this current list of comp 42 Python talks. So definitely check that out. Thank you for listening to my talk. If you have any questions, I'm on medium. I'm on GitHub, I'm on Twitter. You will find me. I am posting stuff everywhere. But thanks a lot for coming to our talk today. And Karen and me say thank you and we will talk to you again. Thank you.
...

Karin Wolok

Developer Relations, Dev Marketing, and Community Programming @ Project Elevate

Karin Wolok's LinkedIn account Karin Wolok's twitter account

Tim Spann

Principal Developer Advocate @ Cloudera

Tim Spann's LinkedIn account Tim Spann's twitter account



Join the community!

Learn for free, join the best tech learning community for a price of a pumpkin latte.

Annual
Monthly
Newsletter
$ 0 /mo

Event notifications, weekly newsletter

Delayed access to all content

Immediate access to Keynotes & Panels

Community
$ 8.34 /mo

Immediate access to all content

Courses, quizes & certificates

Community chats

Join the community (7 day free trial)