Transcript
This transcript was autogenerated. To make changes, submit a PR.
Hi, thank you for joining. I'm super excited to talk
to you today about how you can use Apache Airflow to create
generative AI pipelines. Specifically, I will talk about
a demo that shows a full generative AI pipeline for
content generation with Airflow.
This is the agenda for today. I will give a quick introduction into
the topics of Airflow and astronomer. Then I'll talk about some of
the challenges that you might run into when you want to take generative
AI projects into production, and how airflow addresses those challenges.
Next, I will cover some key airflow features that I think
you should know when you are creating generative AI challenges,
especially features that are used in the demo code. So wanting to
help you to understand the demo better, I will specifically
focus on two features that got a major update in
the latest airflow release. Airflow 29 came out a little lower
a month ago, and there were significant updates to advanced dataset
scheduling as well as a dynamic task mapping that I will cover
in the slides. Then we will spend some time in
the demo. The demo is a pipeline that creates content
using a retrieval augmented generation pattern,
together with also fine tuning GPT for your
use case all of the code in the demo is open source
and available for you to use, and this is the link to the GitHub repository
where you can find that code. I will show this link several
times throughout the talk. One note before we get
started, I will assume that you have some familiarity with
Apache Airflow, so I will not cover the very basics
of how to write an airflow dag or how to define an
airflow task. Really, if you are completely new to
the topic, if you want to do generative AI pipelines and are new to airflow,
I highly recommend that you take a look at some of our get started started
resources. Those are available at Astronomer IO docs.
Learn or you go to the Airflow Academy
that astronomer has, which is available at Academy Astronomer
IO. With that said,
what is airflow? It's of course the open source standard for workflow
management. It's extremely popular. It's being downloaded 22
million times per month, which is just wild to think about.
And the one thing that I want to call out on this slide is
that airflow now has 46,000 members
in the airflow slack. So if you ever have a question
about airflow, about any of your use cases, it's one
of the best places to ask.
If your question is, well, are people using airflow
for ML and AI already, or is this completely new?
The answer is it's all already a very established pattern.
So at the end of December last year,
we did a survey among airflow users, the Apache Airflow survey.
And among the almost 800 people who responded,
28% said that they are already using airflow
to train or serve generally machine learning
models, or just manage their mlops use cases.
So this is not something new. This is a pattern that already a lot of
people are using now. Why is airflow so popular
to use for AI and also for general data orchestration?
It's because it can sit in the middle of your whole data
and ML stack. So if you are in the data and ML space,
you will recognize some of these logos, maybe you recognize all of them,
but these are just a few of the tools that airflow can connect to
and orchestrate actions. In and with airflow being the orchestrator,
what you really want is to have a way to make sure that
all of the tasks, all of the actions, and all of these tools are
happening at the exact right moment and with the right dependencies.
The prime example is you cannot train your model before
your training data is ready. So all of this is connected,
and airflow can sit in the middle and be the orchestrator to
your data symphony. The reason why airflow is
so good at this is because in airflow, you can define
everything you need in python code. So if you have any action
that you can define in python code, any call to another API
of a different data tool, you can turn that into a
task inside of airflow very easily.
That's airflow. Who is astronomer? Why am I talking about airflow to you?
Astronomer is the driving force behind Apache Airflow. So astronomer
is a company that drives. Currently, all of the new airflow
releases, has a lot of the major committers and PMC members
on board, and also contributed over half of the current
airflow code. So Airflow is open source, but a lot of the code is
contributed by employees of Astronomer. And there's also a growing
academy ecosystem. I mentioned the academy earlier. It has beginner courses
and more advanced modules, and now there are over 30,000 people who have taken
courses in the academy. So airflow education is also something
that's very important to astronomer. Of course,
Astronomer is a company, so there's a commercial offering which is called Astro.
Astro is a place where you can run airflow deployments
at scale, and many of them without needing to worry about infrastructure.
So Airflow itself is open source, and you can run it however you want
to. But if you want to have an easy way to spin up a lot
of airflow deployments, especially if you have different teams
that use airflow and want to have additional features on top of airflow like
RBAC management, then Astral is a great option for
you. If this is something that interests you, this QR
code and the link on the slide here give you
a free trial, a 14 day trial with $300 in
credits so you can try out what it feels like to run airflow
with astronomer. And if you sign up with this link or with this QR
code, you also get a complimentary airflow fundamentals certification
exam try. So if you go to the academy that I mentioned and
want to take the certification exam, you get one try for free with
the code that sent to you. If you sign up via this link,
I will show this again at the very end of the talk. But important
to note, everything else that I'm showing, all of the code that I'm
showing is all possible with open source airflow.
All right, so you want to do Genai with airflow,
and usually you have some prototype, maybe in a Jupyter
notebook, and the prototype works great, but putting
this into production is a different beast. So you have all
of these challenges that arise. You run into API outages and
rate limits. You need to make sure that your training data is always
up to date or you're augmenting data. Because if you think
about it, everyone who's watching this talk can
make a call to an API of a large language model.
So it's very easy now to just create a wrapper around these
APIs of large language models. But you need to set yourself apart
from your competitors and the way that you can do that, or one way that
you can do that is by having better data,
is by using your valuable, your organization's data or your
own data to augment the responses that you can get
from a large language model. You have changing
tools and APIs every day. I see a new model that has
come out almost every day, literally, and new benchmarks.
So it's possible that a month from now there will be a new model for
your use case that is just a lot better than the one that you are
using now, and that might be hosted it on a different service.
So you really need to be able to adapt to changing tools and APIs.
You can end up with quite complex pipeline structures,
and you need to be able to determine which data went to do training.
In a lot of cases, especially for compliance reasons,
everyone who, like me, is in Europe. You will
be aware of GDPR. That's one of the
reasons why you always you need to be able to audit your pipelines.
And then of course, every time you put something into production, you need to worry
about scalability and reliability.
Airflow has an answer for all of these challenges.
With API outages and rate limits, airflow can have automatic
weak tries configured Airflow is already the standard
to keep your training data up to date. And as I mentioned before,
Airflow can connect to any tool that you can connect
to using Python code. So Airflow is tool agnostic at
its core. And especially if you use the task flow API,
which is something I'm going to show in this talk, then it's very easy to
just change which tool you are connecting to.
You can create complex pipeline structures, among other things,
with data sets and dynamic task mapping and branching, which I will all
cover in this talk. And there are ways to
determine which data went into training by using observability features
and an open lineage integration. For scalability, you have
pluggable compute and reliability wise. Airflow has been
around for a while now and is really battle tested. And because Airflow
itself is all defined in code, it's all defined in Python.
You can have CI CD and DevOps best practices on top
of your pipeline code to really make sure that your pipelines are
robust and your genai application is reliable.
Little side note, in the Conf 42 talk
about Python this year, in 2024,
I gave a talk about how to do testing and CI CD with Airflow,
if that's something that interests you. So after
seeing all of these answers to the challenges, we can kind
of append our statement from earlier and say it's not just your
data that sets you apart from your competitors, it's also the
quality of your orchestration. If your orchestration is reliable,
never goes down, and is scalable, you will have the better product than
your competitor.
All right, picking out some of these features. These are the
features that I think are essential to learn about when you
create best practice generative AI pipelines. And I
will cover the first six ones in this talk,
but for the other ones there are guides available as well
on our learn page on astronomer IO learn docs
learn. For the first four, the light blue
ones here, I will just cover them briefly, and then I will spend some more
time talking about dataset scheduling and dynamic task mapping,
specifically about what is new about these features in
Airflow 29. So let's get started with quick
airflow features in rapid succession.
First of all, the task flow API. Now this is something that
a lot of you might be familiar with if you are writing modern airflow DAX,
but if you've used airflow a while ago, or if you are on an older
version, you might not know about this yet. And the task
flow API really is a pythonic way to write airflow dags,
and it's a much more natural way that you can create and
chain your tasks. On the left
hand side here you can see a traditionally defined airflow task.
So we're using a Python operator to run a function called
say hi func. And this is
possible. It's still possible with airflow, but it takes a lot of boilerplate
code. And what you can do with the task flow API is you can
create the exact same output, so you can run the exact
same task by simply having a python function and
putting task on top of it. So what happens now is
I can have any Python function, and by putting at task on
top of it, Airflow realizes this is an airflow task now and does
everything that you previously had to define this Python operator for.
It's also much easier to create dependencies in between tasks when
you're using the task flow API, and you can see what this looks
like in the example code. You can also mix and
match traditional operators and taskflow API. That's also something
I've done in demo. And there are not just this.
There's not just this one decorator at task, there are many others.
There's kubernetes, there's at task bash,
and you can learn more about them in our guide about airflow decorators.
Personally, I've switched over to using decorators whenever
possible over traditional operators, though traditional
operators still have their use, especially when you
are interacting with complex tools and the operator does a lot of
work for you and abstracts over a lot of python logic for you.
All right, now we're writing pythonic airflow Dax let's protect
against API failures and rate limits. When I was creating
the demo for this talk, I actually rate limited myself. I ran
into this exact problem. So what you can do is you
can configure automatic retries, and this is really a best practice in
production. You always want retries configured,
unless for a specific task, there's a reason why you do
not want this task to automatically retry. You can
of course configure the number of retries. You can configure a specific delay
in between the retries, or you can also say you want to have an exponential
back off, so it takes longer and longer after each
try to try again. You can set a maximum delay as
well. The ways to configure retries,
you can configure them in the airflow configuration so you
can set them for your whole airflow environment. You can set them
for all tasks in a specific DAC in the default arguments,
and you can override that at the specific task level. So a
common pattern is to have an airflow configuration that says all
tasks have free retries by default. But for specific tasks where
you already know that API is finicky, you might set retries
to ten with an exponential back off.
Okay, we have task flow API retries. Now.
Our pipelines are getting more complex because a lot of times you run into
the issue that depending on different aspects of your
data or different situations, you want different tasks to
be running. And the way you can achieve that is by using branching.
What you can see here is an abbreviated version of a task that
exists in the demo. And the the graph view of this
dag is the challenger versus champion dag
that is also in the demo as well. And what you can see
is I have a python function and I'm again using a
decorator on line 35. I'm saying at task
branch, and this turns my task into a branching
function. And this function is very simple.
I have some logic that I abbreviated here that figures out if
a current champion model exists, and if
it does, I return the string champion exists. If it does not,
I return, there's no champion. So this task
is what creates this 1st 1st branching task here.
And the string that is returned is the
task id of the downstream task that I want to run.
So if there is a champion present, I want to run the task
called champion exists and go that way in my dag
structure. If there's no champion present, I want to run the
task no champion and skip all of the intermediate
tasks that are here. Because if there's no champion, I don't need to do a
champion challenger comparison, I will just use the challenger.
In this specific DAC, we have two more branching tasks.
We have ischallenger, which does the same thing to
evaluate whether a current challenger model exists when this DAG is running.
Because if there's no challenger, I would just keep the current champion.
And if both exist, if a current champion model exists
and a challenger model has been presented, then I want to actually
pick the better one. And then I have my comparing accuracies
task. And depending on which model was more
accurate, I either switch to champion, so I'm using the challenger,
or I keep my champion. Now you can
create very fun dags with this. You can create little games
with this as well. One thing that you have to keep in mind is
by default, airflow only runs tasks
if all the upstream tasks were successful.
But if you are using branching, it's very easy to get a
situation where one task gets skipped and
one is successful, and the downstream task that is scheduled,
depending on both of these tasks, should always run
anyway. And that's when you need to adjust the trigger
rules. So the trigger rules downstream of branching tasks
always need a little thought. A common trigger rule that people use in
this case would be, for example, one success. In that case,
the task runs as soon as one upstream task
has been successful, and it doesn't matter if the other ones get skipped
or even fail.
The other thing I wanted to mention really quickly in just
one slide are deferrable operators,
because if you are creating generative AI pipelines, it's likely
that you have tasks that run for a long time.
So the prime example is model training and fine tuning.
That might take a while, or you are waiting for an event
to occur in an external system with a sensor.
You're probably familiar with sensors. A lot of deferrable operators are deferrable
versions of sensors. If you
are waiting for a long time, you don't want that worker slot
to be taken up while waiting in your production environment, you want
to release the worker slot and for that worker to be able to run
a different task in parallel. So what you're doing is
you define it. You use a deferrable operator which will
go to a whole different airflow component called the trigger component,
and held that component to start an asynchronous
process waiting for something to happen.
Meanwhile, the worker slot gets released and you can have
the advantage of resource optimization so the worker can
do a different task in the meantime. And as soon as the condition is
happening that the async process is waiting for, the task will
get picked up by a worker. Again, the best practice here
is to use the variable operators whenever possible for
longer running tasks. In the demo, there is one deferrable operator,
and I will show you how that works in practice.
Okay, those were the four features I wanted to cover a little bit.
Now let's dive deeper into data set scheduling. This is my personal
favorite way of scheduling DAX, and I hope it will become
yours as well, because what you can do with datasets is
you can chain your dags based on the data that
they are updating. So this is the data sets view in the airflow environment.
This is available as of Airflow 24. So if you do not
have that tab, you might need to upgrade your airflow.
And what we have here is we have one upstream dac
which is called my ETL Dac. And there is a task
in this DAC where I as the DAC offer know this
task updates some data. So this task updates
the data CSV file in my s three bucket.
It doesn't actually matter if that update happens.
Task could do something completely different or it could be an empty task.
I, as the DAG offer tell the task, hey,
when you finish successfully, you produce
an update to that data set. I always imagine it as the task waving a
little flak at as soon as it's done and then
it updates this data set and you can have
as many data sets as you want and then you can have downstream
dags. So my ML DAC here is a different Dag
and you can have these dags scheduled based on updates
to datasets. So I can say, well, whenever that data
CSV file is updated, I want to run the
my ML DAc in this
case to show you how this can be chained.
There is another task inside of this mYml dag
which produces an update to a new data set
called Snowflake Mytable. A few
notes here. The data sets are defined in a
URI syntax. That is just a best practice.
You could theoretically call the data set, my model, or something else.
There are a few strings that are not allowed, but generally
it's very possible to use different strings as soon as they would
evaluate to a valid Uri. But it is,
as I said, a best practice to use uris, especially waiting for
potential future airflow features.
And the other question that always comes up is airflow is currently
not aware of the underlying data. So it doesn't
matter whether that snowflake table exists, whether that s three bucket exists.
You as the DAG offer name these datasets and
you determine which tasks actually produce updates to
these datasets. This is what it
looks like in the UI if you are running Airflow 29. So this is
a brand new 2.9 feature. You can see this is the consumer
DAC that we had in the previous slide, and we have
one task in this DAG called my task. But you can also see
the data set that this DAg is scheduled on, that's to the very
left of the DaG graph. And then you can
see we have the new producer task and the next dataset that
this task is producing too. So now you can see all of
your data sets in your DaG graph as well, which I personally find really
nice. But that was not the only
update to dataset scheduling in Airflow 29. We also have
these three new additions which open up a ton of new use cases for
dataset scheduling. The first one is you can
now schedule on logical dataset expressions.
So previously you could schedule either on one dataset or
having an update to a whole list of datasets with and
logic. But now you can use any combination of and
and or logic. So you can say, I want the stack to
run as soon as this data set or this
other data set has been updated, and you can do that with as many data
sets as you want, and you can build complex logical structures
like that. The other new update
is that you can also have a DaG now that is running on both time
and data set. The prime use case is that
you have a Dag that updates a table and you want this
dag to run once per day, but also whenever
a certain data set is updated. And you can now do that with
the data set or time schedule. And the last one
which I'm really excited about because it opens up airflow to the
world in a way. We now have a rest API
endpoint to update datasets.
So previously, before Airflow 29, you could only
have an update to a data set from a task within the same
airflow deployment, but now you can send that update from anywhere.
This could be used for cross deployment dependencies. So you have
two Airflow deployments and they can now talk to each other very easily using
this feature, or it could also be
from anywhere. So you could have a button on a website and
then you run some JavaScript code and that creates the
update to the dataset. And that kicks off a
whole set of downstream dags. So you can really make these
updates now from anywhere. I will show how
to do that and also how to do the two other features that are
new in the demo. So the demo shows all three of these new advanced
dataset scheduling features.
The other airflow feature that I want to
dive in a little bit more deeply is dynamic task mapping,
because dynamic task mapping is really a core
airflow feature now that you should be aware of, especially if you're using
airflow for generative AI. What dynamic task mapping
can do is you can have kind of like a template task,
and you can say, well, at runtime I want airflow to
figure out how many copies we need of that task
to run in parallel. And this can be based on inputs
that occurs at runtime. At the bottom of the
slide you can see a very simple dag that has two tasks.
We have one task that's called getfilepaths.
So we are getting a number of different file paths and then the downstream
task is processing files.
And I want to have one task for each of the file
paths that I'm getting, but I don't know how many file paths
I will get every day. Maybe these are files that are dropped into an
object storage by a domain expert on my topic and
I never know to how many files they will get that day. Maybe one day
it's two, like in this example, and the other day they are dropping ten files.
And this way my pipeline is the dynamic, so it will
automatically create one parallel task instance
for each of the file piles that it is given.
The important thing to remember is that you have two ways of defining
parameters. In a dynamically mapped task, you can have parameters
that stay the same in between these copies. These go into
partial and parameters that change in between the
dynamically mapped task instances. These go into existence. Expand or
expand quarks the best practice here, use dynamic tasks
whenever possible, especially over dynamic Dax.
Dynamic Dax, that's still a valid use case.
Use case for them in many cases. But there are some cases where
people use dynamic DAX that would be really possible to address
with dynamic tasks which are much more observable and
easier to use. The other best practice is to
customize the map index. This is an airflow 29 feature.
So this is fairly new, but it makes it much easier
to find the logs of a specific dynamically mapped task.
Now, how do you do that in practice?
So we have the basics, which is we have the partial method,
and as I said before, all the parameters that you put into partial
will stay the same for each dynamically mapped task instance.
So in this case I'm saying the parameter a is always
equal to two. The second method
that you put on your call of a task
is dot expand. So here I have another parameter
called b, and I'm saying, well, b is
the parameter that I want to change in between the dynamically mapped
task instances. So to b I'm giving a list of
inputs, in this case zero and one. This will create two
dynamically mapped task instances, the first one using zero
for the parameter b, and the second one using one
for the parameter b.
Of course, here I hard coded a list. This could be the output
of an upstream task, and that's how you can create these dynamics
pipelines. One thing to note, it's important that
you name the keyword argument here you have to say b equals.
You cannot forget that, otherwise you will get a dag input error,
but the error tells you specifically what you did wrong in this case.
And lastly, the other basic if you are running Airflow 29,
you want to create a map index template. This is a parameter
that goes directly into either the partial method in
a traditional operator or into the task decorator if
you're using the task flow API, and then you can customize the map index.
If you're already familiar with dynamic task mapping or a more advanced
airflow user, you will probably be interested to learn about the more
advanced methods. There's expand quarks.
This allows you to map over sets of keyword arguments. I highly recommend
looking into that because it also opens up a lot of use cases and
one of my favorite little airflow things that not that many people know about,
there is a function called dot map that allows you to transform
the output of an upstream task before you map over
it. So without having an extra task, you can change
the output of any upstream task if it's
not in the right format for your mapping. I use this a lot,
especially if I have an upstream traditional operator and the output isn't
exactly what I need. And it's a very neat feature
to use in dynamic task mapping. If you want
to learn more about dynamic tasks, especially about the more advanced use cases,
I highly recommend our guide on dynamic task mapping.
Now, there are several examples of dynamic task mapping in
the demo, but since there's always a lot of code in the demo, I wanted
to show you just the very basics of what is necessary so
you can use this today in your airflow pipelines.
All right, so here we have two tasks to find. The first one,
line 44, we have the task decorator, and then we have
the function on line 45. So the first task is get file
paths, and the second task is on line 51,
called process file, also defined with a taskflow API
decorator. Little side note, I'm using the task flow
API a lot here, but you can also dynamically map traditional
operators. And in the demo there are traditional operators that are
dynamically mapped. So I can show you what this looks like with a traditional operator
in a second in the demo. So we
had three components for basic dynamic task mapping. The first one
was the elements that stay the same in between
the dynamically mapped tasks. So this is
what goes into that partial. So we have our function process
file and we call that function with the method partial.
And here is where we enter the elements that we want to
stay the same in between the copies of the task.
I very creatively named this parameter constantly.
Course this could be named anything. And I'm saying the constant is
always 42. All right,
so no matter how many map task instances we get, we could get
two. Like in the example here, we could get 100. The constant
will always be 42. Now, what is the element that changes
we from the upstream task, get a list of
file paths. In this case it's hard coded. We will get two
file paths, but this is the task that would,
in our example return maybe two file paths one
day and 100 the next day. So this is what we
put into the expand method.
We have our keyword argument called file. We set that
equal to that list of inputs. And in this case
I don't hard code the list in the method call, but I
get the list as the output from my upstream task.
On line 60 you can see file paths equals get
file paths. So this is the output. Whatever my upstream task returns,
whatever that list is, gets parsed to file.
And then I get one dynamically mapped task instance.
So one copy per element in that list.
So in this case I will get two copies of this task,
one for the first file and one for the second file.
Lastly, let's say we are on Airflow 29. I want
to customize the map index template. So on
line 50 you can see I add a parameter
into my task decorator. I say the map index template is
equal to this jinja template, and this is a ginger template that
doesn't exist yet. You could use an existing one, but I want
to create my new one specifically for this task.
And the way to create that inside of your task is, is at
line 57 you can see I get the current airflow context
and then I add this variable to the context on
line 58. In this case it's equal to a string that
uses both the name of the file and the constant.
And the cool thing now here is because I can define
this parameter value inside of the task,
I could set this to outputs that I'm generating in the task.
So the prime example here would be if I'm testing a machine learning
model and I get a test accuracy back. I could use
the test accuracy in my custom map index template
by defining the custom map index at the very bottom of this
task. So when I hop over into the airflow
UI, you can see here the map index. I clicked on the
mapped task tab and then I can see all my dynamically map task
indices. The custom map index
is not zero and one anymore as in previous airflow versions. But now I
have this string that I defined. So what you could do,
for example, with model testing, you could have your model
testing parameter like the accuracy or your r
squared value displayed on your map index.
And then you can very easily find the one mapped
task index for the model that had the best value.
Okay, that was all of the airflow features. I hope this gives you
a good basis to understand the demo code better.
And let's hop into the demo. So we are talking about content
creation today, and I wanted to
have a demo repository that creates LinkedIn posts
for me about Apache Airflow. So that's useful for
me personally. But you can very easily adapt this
repository to create any type of content for
any type of topic. So let's say you are really into metal music
and you are running a blog post about metal. You could provide
this application with specific information about
the music you want to write about, like the newest albums.
And you could also provide it with examples of your previous blog
posts. And then tell the app instead of posts
about Airflow, I want you to create whole blog posts about
the newest metal bands so you can really adapt this to anything.
Again, the code is in the repository and at a high
level. This airflow project has two distinct
paths. The first one on top here in blue is a
traditional RG pipeline. So we take some knowledge,
some proprietary knowledge that in my case is
about airflow. I'm using some of our own airflow guides and
we ingest that and embed it into a vector database.
In this case, I'm using viviate. Little side note, because Airflow
is too liagnostic, you could very easily swap out any of the elements
here. Like you could, for example, use a different vector database or a different large
language model if you wanted to. The second part
of the demo pipeline is the fine tuning. So not only
do I want more information to be available to
the large language model, I want to fine tune the language model to
use a similar tone that I want and similar length of posts.
So I have some very short examples of LinkedIn posts about airflow,
both training and validation examples. Then I make
sure that the format is correct for fine tuning. If you've fine
tuned GPT before you know that you have a very specific format that
the examples need to be in. And I also added
a little step that makes sure that the fine tuning will be in budget.
So I'm actually counting the tokens and estimating how much the fine tuning
will cost. Then I run the actual fine tuning,
this is run in a deferrable way because it takes a while.
And lastly, I have the champion versus challenger Dag that
I showed in the slides earlier when I talked about branching. That makes sure
that we pick the actual better model. So either the current champion
model or the new challenger model. So if you keep training this with different
examples, it will always pick the best accurate
model. Then on the front end, I just use
the streamlit app. So this is a very simple frame, but you
could connect this to any front end that you wanted to connect to.
And this is where the user can put in a prompt. It gets augmented
with the newest knowledge provided by the retrieval augmented generation
pipeline. And then it uses the current champion GPT
model to create a response. I also have a second
part of the app where the response gets sent to Dali to create a
picture that is mostly for fun because I find the pictures quite amusing that
it creates about airflow. All right,
that's the general pipeline. Let's look.
Let's hop into the code and see what actually happens.
So, as I said, this is the repository. It's all open
source. You're very free to fork it and to use it.
And this is what you get if you fork this repository and
clone it locally. This is the whole
repository. And you can see. See, there are a lot of dags here.
Not only are there dags tagged with use case, those are the dags that
are actually pertaining to the use case that I'm going to show.
There are also a lot of dags tagged with simple dags.
And you can see by the title here that these are about the features
that I talked about in the slides. So I
was thinking you might want to have more code examples for these features.
So with these dags, you can check out more simpler
code examples about a specific feature.
But in our case, we want to look at the use case. So let's
only filter for use case and zoom out a little.
So these are all the dags that are about the use case
that we're going to talk about. You can see it already ran a few times,
and here we have two dags that are currently paused.
They are tagged with helper. Those are helper tags that help you
when you're developing. So we have one dag that can
start the whole fine tuning pipeline. And we have one dag that
deletes everything that's currently in vv eight. Because when
you're developing, you might want to iterate over the content that
you put into VB eight. So we can ignore these two
dags for now and let's just focus on the other ones.
And the first stack that I want to talk about is the
one for the full first part of the pipeline. So this blue
part of the pipeline, the rig pipeline, is all happening
in one dag. So let's click on this dag.
It's called the ingest knowledge base stack.
And when we zoom into the graph view, you can see the stack is
scheduled based on both time. So we have
a data set or time schedule, one of the new airflow to nine features,
or on either of these data sets receiving an update.
Then what happens next? And we can actually see some of the
scheduled runs. This is one of the runs that happened recently.
And you can see I make sure that my
class exists in the VV eight database, so I interact with
viviate. I have a branching task here.
If the task already exists, then I do nothing,
just an empty operator if it doesn't exist. If it's the
first run of the DaG, I actually will create the class.
Then we start with ingesting the document sources.
And here you can see that I have two arms that are being created
to ingest from different sources. This is just
one way that you can structure an IG Dag. And this is actually
structured in a very modular, best practice way that you can write
airflow data so that you can very easily
add another one of these branches here if
you had a new ingesting source. So if you had a different
type of file that you want to ingest, maybe CSV files.
And what happens with each of the sources is
that we extract the information from the source. I mentioned that we use
airflow guides, so extract from guides. We extract
from text files. We use Lang chain to split up the
guides and text files and then they are being
prepared for the ingestion. So they are being put into the exact
right format that we need to ingest into viviate.
And this all happens with the task flow API. You can see add task here.
But for the last task I'm using an operator because
the viviate ingest operator makes it a lot easier to interact with
VB eight. And I don't need to think about the call to the VB eight
API at all. In this case I just provide the data and
then it will be ingested into the local VV eight instance.
This is the general dag. Let's look at the code that was used
to create this dag. So let's hop over. This is the
versus code environment where I'm running this airflow
project. Of course you can use any editor that you like.
And if we look at the dags here, ingest knowledge base,
then we can see all of the code that went into creating
the dag that I've just shown. And you can see here
we have some description. We have all of our imports. This is
where we import functions that are being modularized.
So one way that you can make your airflow easier to use and to
reuse certain parts of your code is by
modularizing functions. So extract, split and prep for ingest
are functions that I've modularized here. I'm importing
some environment variables and then
I define my different document sources. Again, this is just the
way that the Dag has been optimized. You could also
just have the functions directly in your dag code and
only use one source here. In this case
I'm saying, well, I have a source that's called guides.
And when we are using the guides, I want to use this
specific extract function. So this is the extract function that
can extract from markdown files and I
want to use this specific folder path. The other
source is I have text files and then I want to use
a different extraction function because, because I have to handle text a
little bit differently than markdown and I will use
this other folder path.
Now next we define the actual dag. And here you
can see several features that I mentioned in the slides earlier.
So I said that we are using a data set
or time schedule. So I want this dag to run every day at midnight
to update, but also if I send a specific update
to one of the datasets. So what I did here is I'm using
the data set or time schedule. You can scroll up,
you can see this is being imported from airflow timetables,
datasets and to the schedule
I have to provide two arguments. I provide a timetable.
In this case I'm using the cron trigger timetable and this
just says every day as midnight. And I
have to provide some data sets. I here modularize
this so it will actually use a list of
data sets and then it will create a list
that says, well use update on any of these
data sets. But you could also just put any data
set expression here. So any logical data set expression
directly if you don't want to make it dynamic like this.
Alright, so now this is running both every day at midnight
and whenever any of the datasets that have a URI
in this environment variable get updated.
The next feature that I covered in the slides is
retries. In this case, I'm setting my retries at the DAG level.
So I'm saying all of the tasks in this dagger should
have at least three retries, should have free retries, and I
want to wait five minutes in between retrying my tasks.
This is the default for all the tasks in the dag, but you can override
this at the task level. Two more things
I wanted to mention, because maybe some of you are thinking,
are thinking, wait a second, how do you have emojis in your dag
names? That's possible. Also, as of Airflow 29,
you can have a dag display name. So the Dag id is still
ingest knowledge base, but you can have now a dag
display name that includes emojis or special characters.
And the other thing which is still considered experimental.
But I think a great best practice tool going forward is,
I say if the stack fails five times in
a row, I want it to stop. If I go on
vacation and this dag runs every day at midnight and it has failed for
five days, then there's probably something wrong that I need to
fix. So I don't want it to incur more costs by running
more times. And you can very easily set that now by using Max
consecutive failed Dag runs. I set that to five
here and in the UI, you can also,
if you hover here, you can see this dagger auto pauses itself
after it failed for five times. Very neat, very small
feature that is new. And that I think gives
you a great way to make your dags stop if they
fail too many times. All right,
we have our branching. So this is in essence
the same as I've showed in the slides. We have the task branch decorator,
and then we return a string based.
This is modularized. I put the string into the task call, but we return
the string of the downstream tasks that we want to run.
If there is current, if our current, the class that we want to have in
our VV eight database doesn't exist yet, then we use the VV
eight hook from the provider in order to interact
with viviate and create that class.
And then lastly, we want to ingest our sources. So we
have a loop to create one of the branches for each of
the sources. And I can say the first task,
which is the extraction task, this one should use
my extraction function. So what this creates
is it creates both of these tasks here,
one in the first run of the loop and the other one in the next
run. Again, very modularized. You could also
have your regular at task decorated tasks defined in here.
Then I'm splitting up my text. I use the
splittext function, which is also modularized. Maybe in this case
I can show you what the function looks like. If we go here,
split, you can see this is all of the code that
is used to chunk the text into
sizable elements that can be used and ingested into
viviate. And because I didn't want all of this code
to be in my dag, I modularized it out and just
imported it because at the end airflow is all just python code.
So you can do things like that. And the texts
are the ones that were extracted by the first task and I can
just input it into my second task.
Next I'm preparing the embedding for import.
And this is a task that, that is dynamically mapped. So you can see
here, last time I had 14 chunks
of airflow guides. And if I click here you can
actually see the map index. So I say which
is the guide that I'm ingesting? I'm ingesting the one about dynamic tasks
very fittingly. And which chunk am I ingesting?
And this is what happens here. I have this task
flow decorated task that I call with the function that
is modularized. And then I say, well, the VV eight class is always
the same, so that's my default, that's goes into partial,
but I want to create one dynamically mapped task instance per
chunk, so per element that is being returned from
this upstream task. And lastly
we want to ingest this information into viviate. And in this case
I'm using, using a traditional operator, the VV eight
ingest operator. And I'm also dynamically mapping this operator.
So I'm saying I want to have one ingestion
click from here. So we also have 14 dynamically mapped
task instances here. I say I want to have one
instance of this VB eight ingest operator for
each of the chunks that I'm giving it. And here
you can see they are named in the same way.
And this is done by using the partial on most of
the arguments. Like most of the arguments stay the same. I always use
the same connection to bv eight. I use the same trigger
rule, but I want to have one dynamically mapped task
instance per element in the list that was outputted
by the upstream task. So per embed option object.
And this has a length of 14, had that in the last run.
So that's why I get 14 dynamically mapped task instances here,
the map index template, if you're using a traditional
operator can be set with direct jinja templates.
You can see here, I'm saying I'm using of this task,
the input data parameter and then I'm indexing
into that and I'm using the URI that is being returned
from this upstream task. And that's what gives me
the Uri of this or the path in this
case of this guide.
All right, lastly, I set some dependencies. I chained this all together
down here as well. And that gives me this highly
adaptable, highly dynamic rag dag. So this is already
all that is necessary to do this first part of the demo.
So to get the up to date knowledge and get it into my
VBA database now, next I
want to actually fine tune a model. And all of
the other dags are about the fine tuning. And since I modularized
this pipeline into different dags, but I want them to always
happen in the same order, I always need the examples first.
Then I check the examples. Alright, then I fine tune.
Then I have champion versus challenger competition.
Extremely great case for using datasets. So let's
look at the datasets view of this project. So these
up here are just data sets from the simpler DAX,
but these are the data sets from the actual use case.
And what you can see here is that we have all
of these local file uris that I've scheduled
these dags on. Of course, in a production case you're usually not using
local files and this is likely to refer to
files in an object storage.
And what you can see here, for example, on this site
is we have some training files and as soon as any
of these training files are updated, our ingest train
examples starts. Because if the training examples
are new, I say okay, this means I want to kick off a new
fine tuning run. This ends
with the formatted examples. So I'm getting the examples,
putting them into the right format, I have a new data set,
then I evaluate the cost and also the
format of the examples with the next tag.
This produces a new file with the evaluated
format and then I can fine tune my DaX.
So everything is happening in succession based on dataset
scheduling. Once the fine tuning is done, I have a new
results file that gives me an accuracy.
And this kicks off the champion challenger dag.
Okay, so there are different ways to kick this off when you are
using this repository locally,
and one is to just run this dag. That's just for convenience.
But there was one more new data set feature that I wanted to show.
So let's kick this off using the API.
So if I select this here,
this is the script. It's a very, very basic API script.
You could run this from anywhere. This could be run
from a web application, this could be run from a curl
command. But in my case just want to run Python
three API scripts and
then start fine tuning pipeline. So let's run
this example. And you can see I print
out the response here and it says it updated this
one data set, and this is one of the data sets where
my ingest train example stack is being scheduled on.
So this stack just started because I made this call to
the airflow API, and this will kick off this whole pipeline.
The stack itself is very simple. It just
gets the example folders, it creates the JSON L that
GPT needs for fine tuning, and then it makes
sure that we update the next dataset. We say that
now this JSON L file exists that has just finished,
and once this has finished, the evaluate tag is very.
So we didn't see this running, but the evaluation dag,
this is the dag that just ran, you can
see is very structured Dag that uses
a lot of branching. So this is an example that you can look at
in your own time, where the branch will have
four possible outcomes and decides which parts are running.
Because if we have no validation examples or no training examples,
for whatever reason, we do not need to kick off the fine tuning.
It won't work. So we stop the pipeline here in its tracks.
But if we have both, we want to evaluate the formatting
of both. So both the validation examples and the training
examples get evaluated. And for the training examples,
I actually want to know. I want to count the tokens
and calculate the expected cost here. Decide if
that's within budget and only if it's within budget.
I will actually continue and this will later kick off the fine
tuning task. If this is over budget, I currently put the budget, I think at
$20. Then the DAC will stop as
well. So this is one way that you can use branching
to protect yourself from accidentally incurring a lot of cost.
All right, so the
fine tuning DAC is currently running. So let's look at
this. And you can see here the
fine tuning is also scheduled on some datasets.
And we make sure we have the right validations file and
examples file. We upload those files to OpenAI
because I'm using OpenAI to fine tune.
And then we have our fine tuning task. And this
uses a deferrable operator. You can see currently this task is deferred
because it's waiting for the fine tuning to finish.
If we go into the details of this stack and the details of this
task here, this uses the deferrable finetune
openair operator. We actually have an extra link here.
So if I click on this link, it takes me directly to my finetune.
So this just started. It's not very smart yet,
but this is starting my fine tuning and I
can directly look at the job and I could copy the ID or
I could also cancel it from here. But this
will take a while to fine tune a new model.
But yeah, let's look at the operator in the meantime,
because this is a deferrable operator, which actually currently doesn't exist
yet in the provider. So let's hop over into
the code again, and this
is our fine tuning dag. So let's
clear this and
look at the fine tuning dag. So again we have imports,
we get some environment variables, we get this operator,
and because it's a custom operator, we can get it from a local file
in the include folder. And this fine tune
operator is then used in this dag down here.
So here you can see all of the code that I need
to write in order to fine tune an OpenAI model. If I
use this fine tune operator,
I give it a task id, I give it my fine tuning
file, give it a validation file, I tell it which
model to use, I create a suffix for it.
I'm using a parameter here, but you could also hard code this.
And then I have two options to
make this operator more efficient.
So the first option is wait for completion.
By default, this is false and the operator will just kick
off the fine tuning job and then be successful
and the pipeline continues. But in a lot of
cases you only want to continue your pipeline if you know
the fine tuning has been successful and has happened. So that's when
you turn wait for completion to true, and then the operator
will wait. In that case it still uses up a
worker slot. So if you're saying, well, I have a trigger or component,
I want you to actually release the worker and be deferable.
Then you also set the parameter deferable to true. This is a pattern
that is also, for example, used in the trigger diagram operator
with the exact same two parameters.
We can have a quick look at the code of this
custom operator.
Let's meet here. Custom operators GPT fine tune you
can use this operator as a template for your own custom operators,
especially if you want to fine tune a model, or if you want to tune
a model for service where there is no provider yet.
You're very welcome to use this. You can
see that's the link that I defined. So that's how to
define an operator extra link. And this is the trigger.
So this is the process that's actually being handed to
the trigger, a component to do the async call.
And this is where the asynchronous waiting is happening.
And this is using a function to wait for
the OpenAI API to return that the fine tuning
status is succeeded or failed or canceled.
So this is what's happening currently in the trigger component running
asynchronously. Once this has completed,
the operator will actually complete and then it will give me
all of the information that I want to have about the fine tune.
So if we scroll down in the operator code, you can see down
here is all of the information that it will log to the logs
about which model. I fine tuned the model id, how many
training tokens, all those things. That's the fine tuning duration.
And then it will also push the model name
to XCom because I can use that in a downstream dag.
I know this was quick, just scrolling through the code.
As I said, feel free to use it. Feel free to look at it in
more detail. And it's just one of the ways that you can
standardize how you interact with generative AI.
Okay, it has finished, so we can look at the logs.
And here you can see the output of all of these print statements.
And you can see this is my fine tuned model.
And it took about five minutes in this case, and I have a result file.
So the next stack that then automatically runs and already finished
because it's also a very fast stack, is the dag
that actually pulls this results file and then
makes sure it compares the champion and the challenger.
And in this case, you can see I have a champion and I
have a challenger. I've ran this before. So it ran the middle part and
did all of the comparison. And when it compared the
accuracies, the new model was better. So it decided
to switch to champion.
And that's all that is happening on the airflow side. And if you're
running this in production, this is just running without you needing to
do anything. You can run that on any schedule or on
specific triggers like with the data set, and you don't have to
worry about any individual tasks that are happening.
Let's see the results. So this is the
frontend app, this is the streamlit app. And let's say,
actually,
let's say I want a LinkedIn post about time and dataset
schedules. All of my examples are very short,
so I'm expecting a very short post also to
be able to show this better. So it's thinking for a little bit.
This makes a call to my fine tuned model. So this call
did not go to regular chat GPT or to the
regular GPT model, but it went to this fine tuned
iteration, the last one. And yeah,
you can see. Did you know you combine data set based schedule with time based
scheduling in Airflow 29? I did, but the
current GPT 3.5 model does not because this is very
new information. And that's exactly how you can be better than your
competition, by providing your own data to your
genai applications. It even gives me the exact
name of the class, which is cool. And it gives me a fun fact
about space. Also for fun we
can generate a picture. This takes a second. This takes the
post that was created and sends it to Dali.
Okay, while this is running, let's talk about
the steps that you need to take to run this repository
by yourself today. Because it's not many steps.
First you go to this GitHub repository.
If you want to be nice, you can star it, but what you need to
do is you fork it. Then you clone it locally
so you get the local address. Then you
need to make sure that you have the Astro CLI installed.
This is an open source project that astronomer provides.
That makes it very easy to run airflow in Docker. So even
if you've never run airflow on your computer, you can do this.
Today. There are install instructions
in our documentation. But if you are on a Mac,
close all of this. If you are on a Mac,
you can just run brew, install Astro,
and then you get the Astro Cli,
you clone the repository, you go to the root of it, and then
you need to make a few things. So one thing that I
have here that you will not have is the env file.
You just have the env example file. And what you need
to do is you need need to copy paste the whole content
of this file and create
your own env file and copy the content in
it. Because this is the file that airflow will use for all of the
environment variables to configure in the DAG.
What you need to do here, because we're using OpenAI, you need to put in
your OpenAI API key once here in the environment
variable and ones here in
the vv eight connection. So with this you'll
be able to able to use OpenAI both for
your call to create the inference. So to create the
post, that's the first one and also in
your vv eight connection in order to create the
embeddings. These are also created by using OpenAI.
That's the only tool you need. You do not need a vv eight account
because this project runs vv eight locally.
Here you can see the docker compose override file and this
creates a vv eight container as well as the streamlit
container. So you do not need to have streamlit vv eight installed
or anything. This will all happen automatically. Once you
copy pasted the contents here and created the env file,
you just run Astro def start and the
first time you run this it takes a couple of minutes. And then at localhost
8080 you will have your
airflow environment and at localhost
8501 you will have the finished app.
Also, that's our dataset and time schedule image. That's actually
very pretty, especially the second one.
Then you just run this first stack or run the API
script that I showed and all of the dags
will be running. You also need to unpause them of course, but as soon
as all of the dags have run, you can start creating your own
posts about airflow. If you want to adapt
this to your own data, to your own use case, for example
to write blog posts,
let's say blog posts about your favorite dog,
then you need to make a few more changes in
the include folder here, the knowledge base.
This is where all of the knowledge is. So what you want to do is
you switch out these markdown files and these text files with
your own information. Let's say if you really like golden
Retrievers and you run a dog about golden retrievers, you could put
all of your golden retriever extra information into these files here,
and that way the app will have access to this extra
information. You will also want to have different fine
tuning examples. So in the examples folder you will need
to have different training examples here. These folders
will not exist the first time you run this,
and then you add your training examples here and
you can also add validation examples.
So with the examples you change the tone
and the format of the response, and with the knowledge base
you change the information that your application has access to.
The last thing that you need to change if you adapt this to your own
use case is if you go into the streamlit folder, include streamlit.
This is the app code. So this is the code that creates the streamlit
app down here in the getresponse function
you can see the actual prompt so if I look
bigger, you can see here is
what I tell the large language model every
time additionally to the user provided query.
So I'm saying you are the social post generator Astra,
you might want to change this to. You are the blog
post generator and you don't want to have interesting fact
post about airflow, but interesting fact about golden retrievers.
Then it retrieves all of the articles, it gets the query.
And lastly, this was my addition to add a
space fact. You may want to change that to something else,
but that's all that's needed. Important side note,
you need to have at least something in
your VBA database in order to be able to run
the pipeline and this file
in order to need to run the pipeline. But you do not need to run
the fine tuning pipeline if you don't want to. If there's no
fine tuned model, it will just use GPT 3.5 turbo
by default. All right,
so that was everything that I had to show you today.
Again, in case you missed it. If you want to try out Astro,
if you want to take Dax like these and run them in the cloud automatically
and have that very easy, then you can use
this QR code or the link on this slide and you will
also get a free complimentary airflow fundamentals certification exam
with that. The take home message of this whole talk.
Other than use this code and play around with it,
it's really both your data and your orchestration
with airflow that sets you apart from your competitors
when creating any genai applications. Because anyone can
wrap around the OpenAI API,
but only you have your data. And now you are one of
the people who know how to best orchestrate the data pipelines and
the ML pipelines with Apache Airflow with that.
Thanks so much for watching. I hope this is helpful for
you. I hope you fork the repository and have fun with it,
create amazing things with it, and that you have a great rest of the
conference.