Transcript
This transcript was autogenerated. To make changes, submit a PR.
Hi, Tim Spann here. My talk is adding
generative AI to real time streaming pipelines,
and we're here for the large language model
conference at Comp 42, which is always a nice one,
great place to be. I'm going to discuss a couple of
different open source technologies that work together to enable
you to build real time pipelines
using large language models. So we'll touch on Kafka,
Nifi, Flink, Python, Iceberg,
and I'll show you a little bit of each one in the demos.
I've been working with data machine learning,
streaming IoT, some other things for a number of years,
and you could contact me at any of these places,
whether Twitter or whatever it's called, some different blogs,
or in person at my meetups and at different conferences around the world.
I do a weekly newsletter, cover streaming ML,
a lot of LLM, open source, Python, Java, all kinds of fun stuff,
as I mentioned, do a bunch of different meetups. They are not
just in the east coast of the US, they are available
virtually live, and I also put them on
YouTube, and if you need them somewhere else, let me know.
We publish all the slides, all the code
and GitHub. Everything you need is out there. Let's get
into the talk. Llm, if you didn't know,
is rapidly evolving. While you're typing down
the things that you use, it's changed, right?
So trying to keep up to dates a lot. There's a
lot of different models, and I'll show you it doesn't really matter how,
you know, how we integrate, because we could do it all
through a couple different ways with open source and the models
is some closed ones, we'll integrate with some open ones.
Which one makes the most sense for you? Depends on your use
case, but we'll integrate with GPT 35,
we'll integrate with Olama running some models.
We're running with Mistral and mixtural, working with
pine cone today and chroma. Touch on those
a little bit. There's a bunch of other ones out there, all of them pretty
good. We do a very little bit with lang chain.
It is used by some of my processors
that are written in Python, which are integrated through Nifi,
and we're using a bunch of libraries like transformers
and also hugging face hosted models,
as well as models hosted by Watson X and some hosted by Cloudera.
There's a lot of different ways to interact with models. We'll also do
Olama, which is running on my laptop along
with my whole open source stack, so I wish
I had more ram and maybe a GPU in there, but you
do what you can. Lots of different use cases.
If you're at this conference, you probably know them. We're not going to
touch on all of them. They take forever. But the same ideas,
though, get your prompt ready,
send it to the model, get your results back and do something with them.
That enrichment, transformation,
processing, all that you need to do around prompt
engineering really needs tools. So you're not handwriting,
everything. That's why we like to use something like Apache, Nifi,
which I'll show you can help you with all of this, whether it's
getting stuff into your vector store, helping,
cleaning up, calling multiple models, you know, maybe code assistant,
step one. And then you need to summarize documentation
and translate it into another language. You know, maybe you
need to build some content, push it somewhere.
NiFi really helps enable a lot of those use cases.
You know, this is probably more important in this decision. You could
change, you know, maybe you start off with a closed model because
they are probably the best at this point. They do a
lot of good stuff. But then you're worried, can I get the data there safely?
Maybe I can, maybe I can't. Is there data? I don't want
to be used in their models. Open source,
because we love open source. We love these open models. That works
really well with our ideas around open source
tools for streaming. So it makes sense. But if that's
not what makes sense for use case, you know, make that decision. What's nice is
with Nifi I can run and I'll show you an example where
I have like four or five different models in
different locations that I could decide or not decide when to
run. Maybe when one is cheaper, maybe when one is
better for certain use cases, you know,
you make that decision if you're going to do this. In the enterprise
cloud, Arrow works with AWS and IBM
and Pinecone and Nvidia and hugging face and ray to make sure you can do
everything you need to do to work with the foundation models,
have the proper search and vectorization, have as much GPU
powered performance, you know, all the right tooling,
and run all the different compute options that you
need. And again, the amount of
models and projects and
software, and software as a service and open source tools
that come available is massive
and it's changing constantly. New versions of things. You're like,
oh, this is the coolest new model. It's like, ooh, Google Gemma
is the new one. Oh, no mixture. Now this, it's, it's fun
because there's always something new going on, and probably during
this conference there'll be two or three new things.
So I'm keeping my eyes open, I want to see what comes out.
But the middle orange bar is where I'm mostly working,
is getting that real time ingest, enrichment,
wrangling, transformation, all of that. Get that data stored,
get the data to the model, whether it's for training, whether it's
for context, for prompts, whether that's calling
the model, get results back, get your classification all
important, whether that's in private cloud, public cloud,
hybrid cloud. So maybe you need to run on Amazon Azure,
Google and your own red hat
Kubernetes. You could do that. Doesn't matter if it's
Google specific hardware. AMD, really good chips,
Nvidia, IBM, Intel, Dell,
everybody's got good stuff that helps you out there.
You just got to know that you're using the right framework
and libraries and versions of Python and
Java that are tweaked for those things. And we definitely have
people that can help there. Get all your models out there and
running common use case.
And this is where NiFi really shines for this price process is
interacting with people and data
and for, you know, automated tools,
they may not know the difference. Now you know the difference if you're doing
live QA and I show you that through slack because
you're typing it. But that data could be done by another bot. That data
can be coming from a database, it can be coming from rest,
endpoints, documents, social media transactions,
public data feeds, s three and other files somewhere logs,
ATM's, other live things,
weather reports, wherever that is. We're getting that data in,
sending data out when it needs to go out, you know,
cleaning it up, get all the enrichments, do any alerts that need to happen
right away. Get things vectorized,
chunked, parsed and into whatever database
or data stores it needs to go to get it to
the right models, wherever they may be, whether that's in clutter,
machine learning, Watson, 100 other places.
Get that into Kafka. So we could distribute it to any information
that needs to be shared and shared instantly with as many
people in as many places as possible. We get it into Kafka
if that needs to spread, if that can't be accessed, because maybe
it's in a very secure internal Kafka,
we could replicate that one to ones in the public cloud
so it could be shared with other applications systems. Great way
to distribute your data quickly to whoever needs it without
duplication, without tight coupling. It's really nice
feature. Get it stored, get it enriched, and we'll show you
that in examples. And a common way to
do that is CDC. And we can pull that CDC instantly,
get it working like we mentioned before, get all those llms
together. A common thing
that we do as part of these flows is ingest.
We're ingesting and that could be, you know,
constantly a stream that's getting pushed, we're pulling.
It could be Cron's, it could be
one time grab documents, messages,
events, files, whatever it is, it doesn't matter.
Nifi supports hundreds of different inputs and can convert
them on the fly. We can parse documents very
easily. Thank you. Unstructured IO, chunk them up into the
right size pieces so that it'll be optimal for your different
vector store or if you're pushing it somewhere else. I mean,
the nice thing is now if I could send it to as many places as
possible, get that into the stream, whatever we need to do there,
the data pipelines, getting that external
context when you need it. So I type a question,
what is Apache nifi? Grab some external
context so I could pull all of my recent articles from
medium, get them cleaned up, enriched transform,
parsed, chunked, vectorized and available so
that when I ask a question, if that's
already you there, I pull that, add that as a context with my prompt,
clean that prompt up, get it in the format that's
needed by whoever's model I'm calling, you know, a llama.
Slightly different from hugging, face rest versus Watson versus Cloudera.
Get it in that right format, probably Jason.
And make sure it fits, you know, make sure it's not too big,
get that to them, get, get those results back, maybe start caching
at different layers, maybe that goes into a database, maybe that goes into
ram, lots of options there. And do
the round tripping for you so you don't have to write an application for
everything. You know, someone type something in discord or
arrest or comes from Kafka, you can pull it from a database,
pull it from Slack and send the messages out. Whether it's going to
email slack, a database, wherever you
need it to go, we'll send it there pretty easy and
we could deal with all your types of data, even if it's
zipped up, tarred up, if it's images,
videos, documents obviously is a big one
for most of the use cases for large language models,
so, but it doesn't matter. Thanks to unstructured IO and
some other processors in IFI, I could deal with HTML and Markdown and
RSS and PDF, word docs and
Google Docs and RTF and regular text. And I added
one to do VTT. If you know about that format,
sound any kind of the feeds from social
and XML too, and we can figure
out what type it is, chunk it up, store it,
parse it, do all those things you need with that unstructured
and semi structured data interface with whoever you
need in the cloud or on premise or in your private
kubernetes, all the major ones. If it's not listed,
it means we haven't tried it yet. Definitely reach out. I'm always looking
to find new things to try and integrate.
Fun stuff out there. Now if I just got into version two,
this is the one you want to use. This will be official
production release very shortly, possibly by the time
you're seeing this. It's got hardcore python integration.
So I can run really cool stuff in there. Really cool way to deal with
parameters in it so you can do a lot of interesting
DevOps stuff using the latest JDK. If you're not a Java
person, that one has a lot of enhancements that makes
Java incredibly fast. If you saw that 1 billion row challenge out there,
really cool stuff there. And just to show you how fast
it is, I also recently found that we
can do real time integration with models
while we're running real time SQL flink. SQL is a real
time SQL on top of fast
data such as Kafka, such as Pulsar,
such as real time streams. And we can also grab data from
things like Kudu and Iceberg,
and JDBC stores like postgres,
Oracle in real time or through CDC.
But what this means is by writing a simple function,
I can call a model as part of a
query, and I'll show you that it's really cool. And I could
do that whether I'm doing that directly to something like cloud
error, machine learning, or if it's something that takes a little more enrichment,
like that process we were mentioning. So I'm going to have Nifi
host a rest endpoint for me so I can have this tool,
call that, and that'll do all the cleanup. Make sure your prompt
is nice and send that over to say hugging face
and get your results and send them back to you
so you could use them in your live system.
Got a link to the article here and an
example of the SQL that we have there. Pretty straightforward.
Like I mentioned, I'm working with the Gemma model so you
can access that. Take a look at that example
again. The difference for using one model versus
another. Not really that much for me. Now,
I mentioned being able to use Python.
We'll go through that pretty quick. If you saw the comp 42 python,
I explained that in detail, how to build your own python processor.
This is that VTT I told you so I can parse that
so we can grab those web video text, use that
to either put in your vector store, enrich prompts,
or other stuff. We can call Watson right
from there, create our own fake data, grab wikipedia feeds
and get them in a clean format. Very nice. Saves a couple steps
there. This one is important for me because I found sometimes
I want to pre process a prompt, and the reason I might do
that is someone might ask a question that I don't need to send
to a model. Save me some money, save me some time.
If you don't need to use a large language model,
maybe use regular ML or regular small
model. Or in some cases just do a rest call.
Like if someone goes what is the current stock price for Amazon?
Don't send that to chat GPT. So I parse
out the company names with some cool libraries out
there. And then I could do another thing where I can look
up the stock symbol, get the current stock price and send that back.
That is faster and cheaper than calling a language
model, even on premise. And I have that in some
of the articles. It's very easy. Now I've also added a new
library using salesforce blip image
captioning to be able to generate captions for images.
Because remember, I could have flows full of images and I use
this as part of parsing out things when
I do meetups. And I think it's generally
useful thing to have and I could change the model if there's a better
model for that. I also added one for Resnet 50 just
to do classifications.
Pretty useful for flows. Again, as part of that multi step process,
I might want to caption an image. I might want to make sure it's
not problematic image. So I'm using this one to make sure
we don't even deal with those, we just delete them. Maybe I need
to see people's face in the image again. Getting more
data could be helpful before you send things downstream to other
models. And again, there's Python processors to
do chunking, parsing, calling chat GPT,
working with vector stores, just to give you an example there.
Let's get into some demos. I just have
some links to some other articles here, but let's
get into the demo. You don't need
to see any more slides you will get these slides and they are
fully available for everything you need there. So no problems.
Very easy. So first
thing we do is this is Nifi. It is running for us.
And what we can do here is, you know,
if we wanted to build new stuff, we can add any of these processors.
If you want to see the cool new Python ones, just type in Python.
There's a bunch of them, as you saw in the list there
before, like extracting company name, just to give you an
example. So what we can do is, well,
I want to get a meetup chat going.
So I have a processor here that just listens for events
as they come from slack. And there's a couple tokens you got to
set. I have an article shows you how to do that. It's very easy to
set up those. So here is my slack
group. If you're interested, you could join it and you could use
it to ask application questions.
Let's see if it knows this one. I don't think it does. When is the
comp 42 large?
Should I do LM lm conference?
So what happens next is it comes in here.
As you can see now it's running because it just got messages. And just
for fun, I store them in a database. Oh, unless someone
stopped this. The nice thing with Nifi is you could start and stop
things either manually as you see through this,
or through a DevOps process. That could be rest or command line.
You get the idea. But let's make sure that data went through.
This is data provenance. This is really good
because I don't know what that black box LlM is doing, but I
could tell you exactly what I did to prepare that prompt
when I sent it, how big it was, and what I got back.
So you don't have the provenance inside that
model, but I know the providence getting there and coming back.
So that could be very helpful. So this is my
message here. It's me because I
typed the message in. There's my input, there's an id
timestamps where I am in the world.
Helpful stuff. So that message went in, I saved it in a
postgres database. I also pushed it into Kafka for
slack messages. And you'll see why when we get
to that next step. So we've got that data again.
We're using two parts of the process already, and then I'm going to
do the prompt cleaning
and engineering. So I'm just going to pull out the fields
from that slack message that I want. If this is coming from something
else, you might have a different extraction,
but pretty easy. I make sure that it's not myself.
You know, I don't want to reply to the reply
we send to it. Filter this out. I forgot to
mention this model. There's a model from hugging face
for not safe for work text,
so I want to parse them out. I don't want those.
Oh, someone added another step here. Yeah, I was running a demo. I wanted
to look at those. We were just going to delete that, which is the
awesome thing that we started up. We have these configurable queues that run
even on a big cluster and then make sure things don't go
somewhere. Oh, one of the questions was not safe,
so that didn't make it through the system, which is fine.
And here we're going to query pinecone with that input,
see what we get back, extract the results
there, build up our new prompt, and then figure
out who we want to send it to. Do we want chat GBT to get
it? Do we want someone else to get it?
I've been just doing mixture most recently, but I also have mistral
running on Olama. And down here I've got Google Gemma
running again, pushing the cleaned
up prompts to Kafka as well. So we come into,
into here and we format the prompts exactly
with some instructions added. You kind of need that for mixed roll eight
seven B instruction. I mean, every model is a little bit
of different tweaking. So then we call it, and if
we look at the results here, we could see what we got back from
there. And then I'm going to clean it up, add a
couple fields and push that into a different
topic and also push that out to slack. While I
say this, we got our threaded result back and
see if it's any good. Comp 42, April 11 oh, and it's got
the link. Oh, look at that. Okay,
I must have had that. Remember we looked into Pinecone.
Well, that's where I put my medium. And in my medium I've
linked to the conference because I'm speaking
here. So I like that. Pretty good,
pretty good thing. So we got the data and we got
it back. And like I said, we could have sent it there, could have
sent it to Watson, Google, Jim. Lots of
different options here to get things back to slack. I have
just a little cleanup and I just,
you know, add that generated text and I push
it to the right channel. This gives me the right timestamp.
So I'm in that thread. You don't have to thread the results back.
I put a lot of extra metadata in here just
for myself. I don't need to put this here. As you can see,
it tells me what model, you know, how long the compute took,
all those sort of things. So if you see that this is just metadata
for me or just to be interesting. Oh, it was mixtural.
It took 100 tokens, you know, that kind of stuff.
We could filter that down if we don't. So remember I said I pushed
stuff to Kafka. Let's get there. These are these
Kafka messages I pushed out there.
Well, I can do real time queries on them with flink
against those Kafka messages. And if you look here
as part of that, I'm passing in that message text and
sending it against a function which
is this function here which calls into Nifi
to get the results back. And then we get there's
that Kafka we saw. So it is running and
getting the generated text here which
are results for that. And you can see the flink
job is running. If I had a big cluster it could
be running there. Not much going on here,
but I have a materialized view. And what's nice about
that is it produces all this data as a JSON
rest endpoint so I can ingest it somewhere.
Now like I mentioned here, that USB is calling
out, that is calling out to
another process here. And this is
how Nifi acts as a whole website. So it receives
a message. And as you see here, it can be get, post, put, whatever you
want. I can limit that. And then I parse what
they send in here. I'm going to route it because I want to
send to LLM and I have Google Gemma reply to it.
So Google Gemma, clean it up, make my prompt,
call Gemma, get the results back again,
push it into Kafka so I can, you know, have a copy for someone else.
And then we return it
to that rest endpoint which is the response
here. We send that response back to
flink and it shows up here. And as you see, that ran pretty
quickly even with all that round turn.
That's really it. When I want to show you. Thank you
for attending this talk. I'm going to be
speaking at some other events very shortly. If you're
in the air, if you're in New York or Seattle or Boston,
say hi. It's been really good showing you what we could do today.
Thank you.