Transcript
This transcript was autogenerated. To make changes, submit a PR.
Hi, Tim Spann here. I'm going to be
talking today, be building Apache 9520
Python processors. Welcome to my talk here
at Comp 42 for Python. I am a developer advocate
covering all types of things around streaming, big data,
LLM, gen AI, all that fun stuff.
And you can get in touch with me through medium,
GitHub, whatever Twitter's called now my
blog, all over the place. Every week I
have a newsletter that comes out. You can get it on GitHub,
LinkedIn, it's all over the place,
all the streaming, mLai, open source tools,
lots of fun stuff. I also do meetups around
the New Jersey, New York and Philly area, plus virtual plus.
All the content gets put on GitHub and YouTube and Slideshare.
So you'll be able to get it out there if you can't make it to
an event. And I've got one coming up soon in New York
and in Princeton in Boston. One of
the main purposes of us needing to
support Python in the streaming tool Apache Nifi is to
be able to interface with the new machine learning and
AI and Gen AI. They have a ton of awesome
Python libraries, as you probably already know. And to do that
with Nifi before was difficult with
Java. So with the addition of Python,
we could do a lot of cool stuff, integrate with a lot of different
libraries. So that's pretty awesome.
Generally we use Python to do
some of our processing on unstructured file types,
which Cloudera dataflow can ingest structured
sources. We're handling those with Nifi to
the box. Same with a bunch of enterprise data APIs,
application streams, all that data, we get it
in, and then when we want to push it to an
AI model or different vector stores.
The best way to do that is with Python
processors, and we'll show you some of those features.
Not going to go into generative AI, but generally we
work with it around text to text,
text to image, text to speech. And getting
that data processors chunked
up, vectorized and ready to go is usually done with
Python, some of it in standard Nifi. But Python
is a real game changer for us.
Within Cloudera we're dealing with a lot within know
the major llms out there, whether they're closed like OpenAI
or they're open models like Mistrial or Metai,
and a bunch more in hugging face, as well as some through Google
work with all the major vector databases.
I'll show you a little bit with pine code and Chroma, but Milvis
and solar and all the other ones are out there we
do a lot through hugging face because of their open community
and open model really gels well with our
open source and our open community model
there. We work with Langchain and that's in some of our
pre built python processors and we're doing some
things with llama index. Lots of cool frameworks out there
every couple of hours I think there's a new one. There's a new Google
model just came out which looks pretty awesome. That's pretty
standard. Up top we have generative AI applications
and we make these prototypes available for you in GitHub
so you could try them out. And we work with different closed source
foundation models, whether that's through OpenAI,
Amazon, Breadrock, IBM, Watson, lots of different
ways to do that. We have model hubs,
we can find two models and we work with the major open
source foundation models like Metal, llama. Two we
could work with the managed vector stores. Pine cone
right now is the main one, private vector stores
like Solar or Milvis. Underneath that we provide a massive
open data lake house that does the real time data ingest
and routing of this data. The data wrangling which you can imagine has a lot
of python Spark, flink,
Nifi, all those fun tools. We store the data,
help you visualize it, have the training and serving of the models
and we run this where you need to be, whether that's public
cloud, private cloud, hybrid, multi cloud
from AWS, Azure, Google, red hat,
wherever you need to run. And we work with all the
cool fast hardware out there, whether it's Nvidia's Google
stuffs, AMDS, IBM, Intel, Dell,
all those are partners and friends and we're know
really cool stuff with lots of open source and Python
everywhere. And obviously model hub's like hugging
face. Now Nifi is a great tool
for working with Genai and having that support for Python
just brings us to that next level. We could do a
lot through rest and through Java, but being able to
add that next step now of Python
as part of our pipelines makes it very easy
for us to do a lot of stuff with lots of different data
sources. And I've got a ton of articles out there working with
things like live question and answers, travel advisories,
weather, tons of different formats, ingest that real
fast, enrich it, do real time
alerts if necessary, push things to vector databases using
Python, get things into prompts for LLM
wherever they have to run, push real time alerts
and data into Kafka and also write
into your data warehouse so you'll have petabytes worth of data to
train your models or to act as additions to
that. As we've seen, some of the models out there will
not always give you back the perfect answer. And the best way to
augment that is with rag. And we
could support that by having data in our data warehouse, in our
vector database. And we can cache things and we could pre
compute things. A lot of advantages of having a lot
of data, all of it connected real time together.
Build out real time reports and visualizations and
alerts and aggregation with lots of the latest open source
tools that run in this very mixed environment,
regardless of what you need there. And if we're working
with data flowed in the public cloud, we make it really easy.
Go to our ready flow gallery, press a couple of buttons,
boom, deploy a flow and get ready to start
taking data from a database. Pushing it to s three,
that might be the first step in your real time
gen AI pipeline. Again,
we mentioned before lots of ways we work with LMS, help serving
them, fine tune them, get data
into and query the vector data stores.
But the main place I work is within data preparation
and data engineering. We make sure all this data gets processed
in whatever size cluster it needs to be, all the way
to the edge from wherever it needs to be, whether it's on cpus
or gpus. All that with a lot of open source.
Keep that knowledge in a secure repository
that can span multiple public and private clouds,
and make sure you have everything you expect to have with that
data storage. Nifi 20 is
the latest and greatest version of Apache Nifi. This has
brought us the magic of really supreme
Python integration. Before this we could work with Python,
but it was a little kludy. We'd execute it and
it would kind of run on its own. Not the best way to do that.
We now added support for the JDK 21,
which is a lot faster, leaner, and better
use of threads. And we've added some
other enhancements along with that process. But let's
look into some Python processors and
pretty easy to write your own. Basically, take any
if you have a library you want to wrap and you can see my examples
is pretty much what I did. It's a pretty base plate.
Here you have a Python class. You import these things
you see at the top that you need for building this type
of class. We point to the Java that
we need to point to that connects Java and Python together.
We give ourselves processor details.
This tells me what version I work with, a description.
You'll see that when we go into the tool where that pops up some
tags there to say what this is related to, and more
importantly our property descriptors. These are
the properties that we want to have inside
of our application, and these are important
ones, as you can see here. I want the prompt that
I'm going to send to for this case Watson
x AI API, and I'm going to need the
key and I'm going to need the project ID for this particular project
that I have rights to execute.
And this is the main body of the
Python code. Pretty straightforward.
This doesn't change much. You start off with
the definition name is transform self,
context and flow file. Those are important. Context gives
me access to lots of
things in the environment. Like you see the properties.
The flow file is any file that's coming into
the system. If I want to change it, remove it,
make my own. It's important to get that in.
So I get those properties in,
set my credentials. I have some libraries from IBM
to use. I call it convert the response
to JSON and just return it here
with the success. If we had a failure, we can have a relationship
of failure. A couple different options on those.
Our contents are what we're outputting, which in this case
is the output of this call.
And we could send any attributes we want. Here I just set the mime type.
You could also, and you'll see in some other examples, I don't change the
contents. I give you whatever flow file you gave me. Just pass that
along, give you some attributes. That way you don't have to change
whatever data was coming in your pipelines. You're just going to
add some metadata around it. It's a great way to pass a file
along without changing it too substantially,
which is fast and just putting things you might need around it.
If it's small data or something that augments it, you could
decide later if you want to rewrite everything.
So an example of one that doesn't change what's coming
in is processor I wrote for this one
you need and for everything with Nifi. Two, Python 310
is the minimum, which you're probably at, but I know some
of the older machines don't have it. We really need you to have Python
310 and again JDK 21 on your machine.
This uses libraries from hugging face, NLP,
Spacey and Pytorch. This is based on an example
I found in stack overflow and it's pretty cool.
What it basically does is you pass in some text
and you get back one or more companies and I've got a
couple of attributes get returned. One is a list of all the
companies that might be listed in your data you passed in.
And the first company I put in
a separate attribute. Pretty straightforward source code
is listed at the bottom and we'll run through some examples.
So you see it running, but it's a nice little processor
to get you company names. I use this in some
flows where I want the company name, and then I'm going to call
a rest endpoint that gives me the stock symbol based
on that company name. And then I could use that stock symbol to get
stock values. Cool stuff to have.
Also, sometimes when you're doing something
in slack, someone gives you a question. Maybe it doesn't need
to go to a large language model every time. Maybe sometimes
it's a lookup from a databases or it's a call
to a rest endpoint to get the current stock. You got
to be smart about how you use these models. Don't use them for
things they're not designed for. Another cool one.
If you're doing any kind of transit data, and if you've been
watching my blog, you'll see I've been doing a lot of transit data.
They have a lot of data. It's always in motion.
Lots of interesting stuff. Real data, real world and all over
the world. Well, some of these return
more than one type of data, which I found out and
that can be problematic. So for me,
I wrote this compound gtfs data
reader takes a URL header and a
key if you need it and you tell me what kind of feed
you want here. There's three types in the GTFS spec.
GTFS is a transit spec out
there, trip updates, say when the bus or
subway or whatever it is has an update vehicle tracks
the vehicles and alerts or something happens which
happens again. The links are down there, could try it out.
Someone gave me a file full of weird looking data I've never
seen before. It's this webvtt.
Well this is a web video text tracks format.
Well this is for when you're doing training classes
or you have text that go along with a video.
Well, if I want to process this in a generative
AI or LLM, I can extract just the
text out there. I don't need the number or the timestamp
in there for just grabbing that data that I may want to
push to a vector data store or I might want to use
in a prompt whatever I'm using there. And I've got the processor
for that. And you can see the API there mentioned
earlier. And I showed you that code. The Watson XSDk
to access their foundation LLM models. This one
does the inference, it's secure and it's the official SDK.
Other ways to do this, and I was doing this through rest, but having
this as Python with the standard SDK is
nice. And this runs fast, pretty easy to do,
and we saw those parameters before
and it fits really nice in a streaming pipelines
get a nice way to do that. Now another
one that's just wrapping a library is my system
process monitoring Python processor.
Again, Python 310 and more. This is accessing
PS util, which is a great python library.
And I'm just grabbing and running as many of these that
make sense in a context on multiple machines,
which if you want to tweak this source code is available
on that QR. But you see where my GitHub is,
you'll be able to find all this stuff and this outputs
all of these different results as
attribute values. Again, metadata not messing up
your data. Nothing changes for you, which is great.
Another one. Sometimes you need to test things, sometimes maybe
you're doing a demo or you just want synthetic data.
Well, the faker library is pretty awesome. I have
it so you could choose as many of these as you want.
True false. There is a ton of them. I'll let you
get one of each right now. It goes to attribute value.
Maybe I'll create an output flow
file. I'm still debating that. You let me know which
one is better for you if returning a flow
file. Maybe I'll create a second processor that does a
flow file. Or maybe we'll put another parameter that
picks. Let me know if that matters to you, how important
that is for sometimes I need wiki
pages. These are useful in a Genai pipeline
if I want to train a model, add some augmentation,
look it up, add that to the prompt as part of the rag
or prompt engineering. I use the Wikipedia
API in Python and it lets you choose whether you want the HTML
back or the text. And you could
pass in what wikipage dynamically.
Again, just use that QR code if you want to see it.
The example grabs, that company we parsed
with the extract company grabs their wiki
page. Just showing you how you could use these in multiple steps.
Again, I'm learning to write new Python processors
here based on things we need.
And before I added that fancy compound one,
I have a basic one. This is the most standard transit
URL. They return one of these three types, and usually
it's in the name. It's like MTA,
blah blah blah blah blah,
alertsgb or
Proto or gtfs or whatever it is,
and they just have one. So this returns the JSOn for you
pretty easy. Takes gtfs format,
makes it easy to format and grabs it from the URL.
Couldn't be easier than that. Now there's a bunch that
come prepackaged in Apache 9520.
I've tweaked a couple of these based on some of my needs,
but there's ones for pine cone to do queries and to
store your data. There's a great one for chunking
up documents to make them small enough to fit in the vectors.
Right? There's a parse document one. This is great.
This is for parsing your markdown,
PowerPoint, Google Doc, Excel,
parse that data, get the text out. Again,
great for pushing that to a vector store using that as
a prompt, just storing that in a database, using that part as a
slack message, whatever. Got another one that converts
CSVs into Excel if you need Excel, one that
does our favorite deep learning, some object detection.
And right now I'm working with Yolo eight, so I
might run write a new one on that.
The best and most useful to
have one Python processor do all this for you.
The prompt chat GBT you put in your text prompt,
couple parameters your id
because you got to have access to OpenAI and calls,
the model gets your results back.
Really nice. Thank you Python. And then the same thing
for the Chroma vector DB. There's a put in a query.
Those are great ones as well. We're still in the
early world of python
processors, so now's the time to start putting yours
out there. Love to see a ton of people after this conference
write their own. Definitely contact me if you need a little help,
and if you have some, I'll publish them in the weekly newsletter and
I'll tell the NiFi engineering team about them. Maybe they'll go into
the official line or they will definitely be promoted and
used. Example of a flow we
get slack messages in transform and clean them.
Query a vector store, build my prompt,
call my model with python, transfer and clean the
results, push them to slack in kafka. Pretty easy.
We will show you some demos here and then
let you get on with the many more talks
that you're seeing today, which is a pretty awesome list.
This is 9520.
This is my environment running on my
laptop, and in this example I consume
a feed from medium, which happens to be my
feed to get all my articles and process them,
clean them up. I do some of this with regular nifi
processors, nothing too fancy.
And then when it comes out here, here we start
getting into these fancy python ones. As you can see, Python extensions,
this one is taking plain text in and
extracts a couple of metadata fields along with the
data so it parses the data out. We could do one here,
I've got a couple sitting here. Let's just run one,
have that come in and then we will parse that.
Now if we needed to look at all the python processors,
I can just type python. You can see there's a couple versions of different
ones in there. Can see my extract company name,
fake record a bunch of different ones in here.
Pretty straightforward. Okay, that finished this parse
document, finished the chunk
chunk is also a python one, again to make things
fit into a vector. So this processed
them up, chunked them up, and now we could push these to
say a chroma data store with some parameters there.
Pretty easy to do that. Just run one there.
Same thing here. If the chroma happens to be remote,
same idea query, pretty straightforward,
very easy to work with
this without too much trouble. Down here you could see we
have some other stuff going on, so I'm going to let one through so you
could see the other processors. And after that
gets pushed, this one is the extract company here.
I parsed in the prompt. Now this is
one way to get data into a processor. The other
way is it could come in as a full flow file. Depends what
makes sense for you. If you look there, you could see it ran
right now and I could see the output
here and the results. What came in,
what came out pretty straightforward.
This one didn't work because it probably didn't have a company name. Some of
these don't have a company name, but you do have always
have the values coming out of our utils.
So we'll get back any of the things
that make sense for the different util.
They're pretty straightforward. If you're using any
of the system processing. We get the PS
util stuff out here, we got some output from the
wiki, and if we see here a
couple different attributes come back from the wiki.
Yeah, you could see in here where we had the company name. If I
have a company name, then I can get
a wiki page and then just have it output.
But as you can see, we could put a ton of parameters here,
very easy to do. There's the pids
on the running, there's our text, there's our
results here, and it was an HTML page. We get back again,
there's a couple of different formats you could pick when you're grabbing
a wiki page. And now I could push it to something else
if I wanted to. Depend on what makes sense for your use
case over here, I've got another use case. If we
didn't let this guy time out, make sure
these guys are loaded so we get those ready.
Okay, so this one I'm using that gtfs processor here.
And this is calling Halifax, Canada. Their vehicle
positions. So I'll run this once.
And that already finished. And that gave me back
the results here. 500 bytes in,
can't really see that till it gets split out. And you can see
we get a lot of rows came out of there and
we could take a look at one and see that it got converted into JSoN.
That tells me about that vehicle and the trip and how
fast it's going. Different data around those
attributes. Pretty straightforward. Now when we are
parsing documents here, again,
this is the Python one I'm picking PDF.
Let it parse it the way it makes sense. There's a couple of options there.
I like this PDF parsing model. This one works
pretty good for me. Just English. Add a couple of metadata
fields. Not really, I don't need them because here I'm going
to split that data out into pieces.
Like we could take a look at a chunk, that's a chunk of text
in a JSON format. Then I run
that, get a chunk, and that chunk I'm going to push to
a vector store so it's not too big to go into
the vector databases. And then the results of that will
go into a slack just to
track it. I've got one to do that.
Translate wvtt. Don't need any parameters.
This just takes the whole file coming in
and converts it to just the text again. And then
I could push it into a prompt if I wanted to.
Pretty straightforward, whatever makes
sense. I could parse powerpoints. Lots of
different things you could do. There. Example is
using the prompt chat GBT. I'm using the turbo,
get my inputs parsed in temperature,
API key, all that kind of fun stuff. So we could run
one of those and get the results back. This is coming from
slack events. Pretty straightforward on that.
And then I push that to a slack channel and we could
take a look and see the results of that.
We'll go to. Here is where it got the chat here
it posted the results and we could add
other stuff to that. But pretty easy to use that
Python processor there. It's just an
easy way to be able to get your data.
Move it really quickly. Do what you need to do.
Thanks for listening to my talk. If you're interested
on writing your own python code for Apache
Nifi, definitely reach out and thank.