Transcript
This transcript was autogenerated. To make changes, submit a PR.
Hi and welcome to everyone who decided to watch this talk.
I am very excited to talk to you about orchestrating data and ML
workflows with Apache Airflow because there are quite a few
topics that I want to talk about. I thought I'd give you a quick overview
over what you can expect in this talk. First, I'm going to talk about
the term ML orchestration. Next, just in case you've never used
Airflow before, I'm going to give you a little crash course with everything you need
to get started. Then I will talk about the data that I was analyzing
that I built my pipeline around, and the main part of
the talk will be a walkthrough this data driven ML
pipeline in airflow, both with screenshots of the Airflow UI
and with little code snippets highlighting certain airflow features
that I think will be useful for you if you build your own ML pipelines.
The features I'm going to highlight will be data sets,
asynchronous tasks, and deferrable operators.
I will talk about dynamic tasks, the Astro SDK, my own
custom huggingface operators, and lastly, about how I enabled
slack alerting for certain events in my pipeline.
After all that, I will hop over in the actual airflow environment.
So you have a short demo of what this looks like when it's running,
and I will talk to you about the results that I got for the data
that I was analyzing. Lastly, we will talk or
think about what could be next for this pipeline and for ML
and Airflow in particular. And I will close with a few
ML and airflow resources in case you want to get started
on your own. All right, what is
ML orchestration? It's a fairly new term. It is part or
a subset of ML ops. Now,
if you work in any capacity in the data sphere, you know
that there are many different job titles and responsibilities. There are machine
learning engineers, data engineers, DevOps engineers, data scientists,
and a lot of times they do overlapping things. And this is the same
situation here with the different terms. You do have some overlap between
machine learning and DevOps. You have overlap between machine learning, DevOps,
and data engineering, which is commonly what we call ML Ops. And then
over all of that, you have orchestration. And an orchestrator
is a tool that sits on top of your stack that
you're using for a certain action, your ML stack or your data engineering
stack, and then goes to each tool and
tells it what to do at what time. So an
orchestrator is kind of like the conductor in an orchestra
that makes sure that all of the instruments are playing in tune on the
right schedules, and that everything happens at the right time.
And airflow is one of those orchestration tools. And airflow is
very multipurpose in that you can see that little orchestrator
sphere overlaps all of the other spheres, and there
is an overlap between mlops and the orchestration
or airflow. And this is what we call ML orchestration
for this talk, this will be very
familiar for you. If you are working in the ML sphere,
you have your typical data science machine learning process.
Usually you get some data, and the first rule of data is it's
never in the format that you need and it always needs cleaning.
So the first steps are usually data extraction,
validation, what is classically part of the data engineering sphere.
Then usually the data scientist comes in, does some manual exploration,
engineers the features, builds the model, and lastly, the machine learning engineer,
most of the time puts all of this in production.
And along the way here you have
a lot of steps that could be automated. Not all of them,
of course. The exploratory analysis of the data scientist is
really hard to automate. But a lot of things, once you know what
the data is, what it looks like, and you know your model can really
be automated. This is where airflow
comes in, because you know that from any workflow
in your life, the automated steps are very quick.
But as soon as someone comes in, that's the
cause of your latency. If someone has to copy paste something
or if someone has to manually kick off the next script,
that will really pull your process and it
will make it much longer. And that's what we are trying to combat with airflow.
We try to automate everything that is somehow possible
to automate. All right,
that's ML orchestration. Now, if you've never used Airflow
before, I'm going to give you a very short crash course. What is
Apache Airflow? It is a tool to schedule and monitor
your data pipelines or any other of your Python code.
Airflow is very versatile, so you can schedule anything that
you can define in Python code, and that's why it's a multipurpose
orchestrator. It's the most popular open source choice
for this use case with 12 million downloads per month. And everything
in Airflow is written as Python code. You're going to see what that
code looks like. If you're using Python for machine learning, it will be very familiar
to you. And that's also some of the big strengths of
airflow, because it's defined in Python code, you can have CI CD
on your pipelines process and you can infinitely scale and
extend what you're doing with airflow. And of course,
if after this talk you want to get started, there is a large and
vibrant open source software community. We have our own
slack. There are a lot of alters on stack overflow. There's the whole
open source GitHub. Please get in touch if you have any issues or
want to learn more.
Now, apart from being defined in Python code,
Airflow comes with a very functional and very
pretty UI. What you see here is the overview.
Over the Airflow UI you can see all of the dags. This is what
we call one pipeline in Airflow stands for directed
acyclic graph. And here you can see we have nine dags.
I have some of them paused, some of them are activated,
and I will not go through everything that you see here, but there's a lot
of observability. You know exactly how your
workflows are scheduled, when it ran the last
time, when the next one will be, and what happened to the tasks? You can
see green and red here on the screen. Green means something was successful,
red means something failed. And there are other colors. So there's a
lot of things you can see at one glance.
And of course you can drill down here and look at individual dags,
individual tasks, down to the very logs
of the python script that you are executing.
Okay, I already said what a dag is. A dag is comprised
of the different tasks. So on the right hand side here we have
a very small dag, just with two tasks. And you can see it's also a
graph. It has two nodes. The nodes are the tasks and the edge is the
dependency between them. It's a little arrow here. This just means the extract
task needs to happen before the write to minio task,
and by default the upstream task needs
to be successful. But there's a lot of customizability here.
On the left hand side you can see all of the code that went into
creating this tiny dag. Of course, in production and in
real life use cases, your dags are often much longer. But here
you can see really everything that is needed to create one of these dags.
On line eight, you can see where we define the DAC.
So to tell airflow, hey, I want to make a new dag, you just
add a decorator. The add Dac decorator on top of a
python function and that will tell airflow. Everything that's in this function
is a DAC. I only have to give it free parameters. Here I
say this should happen every day, and I want it to start at the
1 January this year. Then within that
python function on line 13 we have two tasks that
are defined, and I on purpose am showing the two
different ways that you can define airflow tasks. So on line
14 you can see I have an add task decorator and
I put this on a Python function. And you might be thinking
that just looks like a normal, like any regular python function. It's very true.
You could take your current ML script that you
have, put the add task decorator on top of it,
and then it's a task in airflow. And if that's all you need,
if you have several scripts and you want to run them in a certain order
or with certain dependencies, then you know now
everything you need to know to make that happen in airflow.
Of course, sometimes there's a lot more customizability here.
And the other great strength of airflow
are predefined operators. An operator is simply
a class that we use to create a task.
So on line 19 you can see I have an operator that is called
local file system to minio operator. This is actually
an operator that I import locally on line free. You can see I
import it from my own include folder because I made this for
a use case and it didn't exist. And you can see I
have a class and I only give it a few parameters. I give
it a task id, which is what airflow will call the task in the UI
and internally in the database. And then I give it some information,
a bucket and an object name. And everything else happens under
the hood. So the whole call to the minio API happens
in this class that has been defined anywhere. And this is
a strength because you have these predefined operators for many,
many data tools. And if someone has created
an operator for what you need, then you can simply use that,
and you don't have to redefine the wheel every time.
All right, that was a very, very simple dag. But of course dags can
be as complex as you want them to be. Here you can see a DAC
that is a little more complex. Of course these can get very big,
but you can see here we have more complex dependencies. We can group
tasks. So there's a lot of flexibility here as long
as it stays directed and asyclic.
Why should you use airflow? You can orchestrates data pipelines
and ML pipelines in the same place, because airflow
is tool agnostic. That means you can talk
to any tool that has an API. Anything that you can do in python
you can do in airflow so you don't have to use different
tools anymore, and you can start creating complex dependencies.
A lot of times you have an ML pipeline that you want to start once
certain data is available, but maybe you want data quality
checking to happen on that data. And the ML part of the
pipelines should only start if the data quality check has a certain result
and all sorts of complex dependencies
like that. There's also fully functional API for airflow, so you
can even start kicking off an ML pipeline by a button press on a website.
I already said it's tool agnostic, and this is very important.
And I realize every day more and more how important that
is in the current data field. I feel like I wake up every day
and there's a new ML tool and a new model and a new website
springing up that I might want to use. And if you're using airflow
for your orchestration, that will never be a problem
because as long as it has some API, some way to call it,
you can connect it to your airflow instance. And you don't have to redefine
your whole pipeline just because a new tool doesn't natively integrate
with another tool that you're already using. So this plug and play style
of data orchestration is really important in the current data sphere now.
And the third point, it is event driven.
You can schedule dags to happen or tasks to happen
based on events that are happening in your other data tools. I will
show you what I mean by that in the pipeline walkthrough
I said it's all just Python code. You can customize it,
you can plug and play operators, create your own, you can extend upon airflow,
it's open source. If you do something useful, please share it
back to the community. And for ML, really important.
There are already pre built operators that help you to
run dedicated tasks in Kubernetes pods.
So if you have a Kubernetes cluster like
you saw, I put the at task decorator on top of a python
function. I could also put at task Kubernetes. I have
to give it a few more parameters in that case to define
which Kubernetes cluster I'm sending the task to. But that way you can spin
up your task in a dedicated pod. For example,
if you need heavy gpu just for one task in your pipeline.
Okay, that was the crash course for Apache
Airflow. Now I'm going to jump back a little
in my life and talk a little bit about medicine.
So the data I was using, I was thinking, what is something I'm familiar
with? And I've worked in neurosurgery before, so I decided to go
with brain tumors and there was a small little data set for that
on Kaggle. I've looked at gliomas versus meningiomas.
You don't have to know anything about that, really. A glioma is a
brain tumor coming from glial cells, so from the structural
cells in your brain. And a meningioma is a brain tumor coming from
arachnoidal cap cells. The arachnoid is a membrane that
is surrounding your brain. So those are two different tumors that
are coming from different cells. And of course they have all subclasses,
but I was just looking at those two large buckets of kinds of tumour.
I had a little more over 300 t two weighted images
of both. T two weighted is just one type of MRI image.
You can tell it's probably t two weighted if fluids are bright in the
image. I did a train test, split of about a quarter being
test, which left me with a test set with around 80
pictures of each and a train set with 260 gliomas
and 247 meningiomas. Now, just so
you can imagine what my data looked like, I only had slices,
so only pictures, not a full MRI. And here
you can see. Sometimes the differentiation between the two types
of brain tumors is fairly easy. This was one of the reason why I picked
these two types, because I was thinking this is something that you might
be able to tell from one picture. Of course,
never conclusively. You would never make a conclusive diagnosis
from one picture or just from an MRI. In general, you would always wait for
the biopsy. But it is something that I could imagine an ML model
could be learning. Here on the left
hand side, you can see the meningioma. If you remember,
I said, it comes from cells that are part of the membrane surrounding
your brain, so it has to touch that membrane at one point.
And this is a very typical picture here. And on the right hand side,
you can see a glioma, you can see this one started in
the middle of the brain, you can see all the edema around it.
It looks a little angry, what we would have said in radiology,
because this looks quite malign, which unfortunately,
gliomas are often malign. Meningiomas are more often benign.
But of course, both entities have benign and malign types.
Now, this was an example of one picture where I, on the
first glass said, yeah, I'm pretty sure what I would classify it as,
but it's not always that simple. There were also examples where the comparison
was much harder. Here again, on the left hand side that
later turned out to be a meningioma, and on the right hand
side that turned out to be a glioma.
Okay, this was the data I had, and with that I
got started with the pipeline. Now I will walk through the pipeline, but of
course, if you want to see and play with the whole code, it's on GitHub.
I will post that link in the discord as well, and you can clone that
and use it as a blueprint for your own pipelines if you want to.
First, I want to cover the tools that I used.
So obviously I used Airflow and I used the astropython SDK,
which is an open source package on top of airflow.
I stored all my files and images in s free, but you could use any
blob storage, of course that you want to, or even store them just locally.
I used a duckdB instance to store references
to my files and store the results of my different models. And then in
the end pick the model that performed best for a specific test set.
I decided to go with hugging phase for the ML heavy parts,
and specifically I decided to pick the Microsoft Resnet 50
model and fine tune that because I was thinking that's a very general model.
I'm curious how well it performs before it's fine tuned
and after on a very specific data set. Of course,
since I was working with airflow, I was doing everything in Python, and in the
end I wanted to have some way to be alerted of what's happening
and I picked slack for that.
Okay, again, you can see a DAG view, but this time of
the actual airflow environment that I was using for
this pipelines. And what you can see here, in total I have eight dags,
so eight separate workflows, and all of them
have a very varying different number of tasks.
And what you can see here is what it looks like while it's running.
And I have this overview. I know exactly when each DAC ran
the last time and when it's scheduled to run next.
Talking about the schedule, you might have heard about airflow
being an advancement of Cron, and it's still very much
possible to schedule your DAX on a cron string. But since then,
Airflow has come a long way and I've actually scheduled
most of the DAX in this repository on
data. So what I did is here you can see what we call
the data sets view in green
or slightly bluish. Depending on your monitor. You can see the DAX
and you can see in orange the data set.
And what a data set is in a nutshell is if you imagine you
have an airflow task and you know that airflow tasks,
for example, updates a table in a database,
then you can tell that task, hey, I know you are updating that
table. Please, whenever you complete successfully, raise a little
flag that tells my whole airflow environment that
this table just has been updated. And then on the other side,
I can tell other DAX, hey, whenever you see that flag or whenever you
see that combination of flags, I want you to run. I want that
whole other part of the pipelines to start. This is exactly
what I did. So you can see here I have two dacs that are not
scheduled on data sets. The in new test data and in new brain
data DAC. And each of those DAX has one
task that updates what I call a data set. So each of those
DAX will eventually put new data in a
folder and that folder is my data set. So I decided,
hey, when I have new test data, I want that task to say,
I have new test data. Raise that little flag. And the preprocessed
test data DAC, which is downstream, is being
told to run every time there is new data in that s free
bucket. And that's how I started to chain my whole DAX.
I will not go through the whole list here, but for example, on the right
side, on the lower part of the screen, you can see that if I
trained my model, I did the exact same thing. I told that task that trains
the model. Hey, whenever you're successful and when you're done, raise a little
flag that says new model has been trained. And then the testing
part, the part that tests my fine tuned model runs every time
that flag has been raised.
I said, two dacs are not scheduled on a data set.
I actually decided for two dacs that I want them to always run.
And I'm very excited about this. You might be able to tell because this is
a super new feature. This came out, I think, two weeks ago with airflow
2.6. It's called the continuous schedule.
And what this means is if you set the schedule to add continuous,
the DAC will always run. So you have a DAC
that's always there, always running, and whenever it completes successfully,
it will automatically start its next run. So now I can have
my DAX that are always listening for something to happen and
always running. These are
the two Dax that are always running. They look very similar. They are
mirrors of each other. One for the training data, one for the testing data.
Because I was thinking in my case I would assume that
if a new MRI file comes in that it would be filed
either to go into the training or the testing data,
depending on certain other properties. So I'm not doing a random split here.
I assume that someone with domain knowledge would make that decision. Where this
file goes and what's happening here is on the left
hand side you can see the wait for training data, wait for new testing data
task. And these are waiting for the new task for
the new data to come in. Now I
said I'm using s free. So what is happening under the hood
is probably something all of you have done before. A call to the Bodocore API.
But I did not have to think about bodocore a single time while
creating this pipeline, because someone has done that for me.
Very grateful. At this point there is a
sensor, an operator called s free key
sensor async, which is a deferrable operator. This means that
this task, once it starts, it is waiting for something.
In this case I just tell it exactly what key to
wait for, in what bucket and in what AWS
account. You can see AWS con id. That's where I store my credentials.
So the credentials are hidden here and make the connection to
my AWS environment. And as soon as this
task starts waiting, it is actually not being run by
an airflow worker anymore. It is being put into
a completely different process called the triggerer.
And this process runs in an async fashion
and frees up your worker slot, which, as you can imagine, if you have a
lot of these tasks running, can save you a ton of resources and make your
pipelines much more efficient. So that was the first thing I wanted to show you.
If you have sensors running, always check if there's an asynchronous version.
Now let's imagine I have a new file. It comes in. What I
want to happen next is I want to have the list of all the new
files. And then I copy the files over. And you see here I have square
brackets with a number in it. And this is because I
decided I want to know for each of the individual files if my
copy statement was successful, because I want to backtrack in case maybe
someone had put in a file with
a different file ending, or maybe something else went wrong with a corrupted file.
I want to be able to backtrack that for every single file.
And I did that by using dynamic task mapping. I did not write out
400 tasks, especially because I never know how many new data will
come in. This time I had 400 pictures. Maybe tomorrow
it's just one. So I need a varying number of tasks every
time this diagrams and I definitely don't want to change the code every time.
This is where dynamic tasks come in and dynamic tasks
are a way to make your a flow pipelines more dynamic.
I will not have time to talk in depth about how to use
dynamic task mapping. There are a lot of resources online,
there's a full length webinar as well. But what you can see here
is we have an operator.
So on line 73 we have the s free copy object operator.
Again, someone already did all the work for me. I just had to plug that
operator from the provider package and I say,
well, for each of the instances of this operator I
always use the same connection, I always connect to the same AWS account.
But for each instance I have a different source bucket
and a different destination bucket key that's on line 70 and
I pass in a list of dictionaries and for each
of those dictionaries a copied task will be made.
And this happens at runtime. So the decision how
many tasks will happen is always done once this
task actually has to run, and that's how
it can have one task instance maybe one day if there
was only one new file, or it can have 100 or even 1000
the next day without me having to change any code. If you're interested in that,
I encourage you to look more into dynamic tasks.
This is what it looks like in the UI you can see here. All my
mapped tasks, all of them were success in copying my
file. All right, we have
our data in the right folder in the US free bucket.
Now we do preprocessing. First little
side note, I'm creating the task called create DuckdB.
Pool actually doesn't create a pool in the database sense
that you might be familiar with. This creates an airflow pool,
which is an airflow specific feature that is a collection of
worker slots. And I can say I have a certain number of tasks
and those tasks can be anywhere. In my whole airflow instance they
can be in separate DAX and I can tell all of those tasks,
you only get to use these worker slots.
This is used to better handle resources to, for example group tasks
that you never want to use too many worker slots at the same
time. But it's also super handy if you have tasks
that you never want to have run at the same time
because in this case I'm writing to duckdb a few
times, and I cannot do that synchronously.
I cannot have two processes handling writing to the same
duckdb instances at the same time. So what I'm doing is I'm
creating a duckdb pool with one slot, and throughout
the whole airflow instance, all the tasks that are writing some
information to DuckdB, I tell them you can only use that one pool, only that
one slot, and my parallelism issues are solved.
Next, I list all the files in my training bucket
and then I create a table in duckdb and I load both
the references to the files. So the key to the file name
and the labels that in this case I'm fetching it from the file name.
But of course in a use case you probably have a different way that you're
getting your labels. Maybe it's stored in a different database,
maybe it's something that someone does manually. But there's another task that
fetches the labels, and I store both the labels and the references to the
file together in the same table in duckdb.
Here I want to quickly zoom into the list train files task because again,
that's something that you would need to call the AWS API
for. But here there's actually an even handier thing,
and this is part of the Astro. This is the first part of the astro
that I want to show you in this talk. In general, the Astro
is an open source tool for next generation DAC offering, and it tries
to create tasks or utility functions
that are tool agnostic in itself. So if you remember earlier,
I was using an s free copy object operator. So an operator
that is specific to s free, if I would switch to a different
kind of blob storage, I would have to switch out the operator.
But in this case here I'm using the get file list function
from the astropython SDK and I just point it towards
the connection to my blob storage. But if tomorrow
someone decides that we are moving from AWS to Azure,
the only thing that I would need to do is give it a new path
and give it a new connection id to Azure. I wouldn't have to change
any other thing. And that is very plug and play
because as you see, I use a variable for my connection Id. So with
one change of a variable I could change any task that is
using the astropython SDK from one to another blob storage,
and also from one to another relational storage. If those
storages are supported by the SDK. If that is something
that sounds interesting to you now, mainly speaking to all the data engineers listening
because this solves so much pain and it has a ton more features and functions,
like merging in tables together, for example.
Then I encourage you to check out the Astro Python SDK docs.
All right, we arrived finally at training the model. So far,
everything was kind of data engineering and not really mlops yet, but now
we are training a model. I'm getting my image keys from my duckdb
storage. So this is also something that could be modified if you had
any other process that decides which image to use in
a given training session. But here, I'm just getting all of them.
I'm loading all the training images into a temporary folder.
I get my labels. I train my classifier using a lot of huggingface
technology, and then I delete the files again from
my temporary folder.
And this is where the custom operator came
in. So what I first did was just put the add task decorator
on top of a script and had it running and it worked fine. But I
was thinking there's actually more I can do here because maybe I
want to reuse, maybe I have another project where
I want to have a binary image classification and
use houging face. And I want this to be more modular.
So what I did here is I wrapped the whole model fine tuning in
a custom operator. This operator doesn't exist publicly yet,
but you can just copy paste the code from my repo, of course.
But what I did here is I wrapped all of the code that you're familiar
with and created this class that I just have to instantiate in
my DaC code. So I have the model name
here, I give it the file paths, I give it the labels. Learning rate,
and all of the other hyperparameters tell it where to save the fine tuned
model, give it the transformation functions, and this is good to go.
And here, one thing that you can actually see is the very last line
that says outlets to an airflow data set.
This is what creates that little flag that I was talking about of the data
set. Whenever this task completes successfully, the flag
will go up to the airflow environment. Hey, we have a new trained
model, and everything that I decide needs to happen in that case
can start happening.
Okay, I have another dag because I thought,
I'm really curious how the model performs if it's not fine tuned.
And I want to have a robust baseline, very similar structure loading
in the images from s three, then running a test operator that
I call test classifier. And in the end, I write the
results to my duckdb table. So I created another table in my
duckdb database and in this table. I want to store all the
models with this result so I can very easily figure out
which was the best model for a given test run.
Here I want to zoom in into the testing operator.
I did the same thing again. I took all my test code and
wrapped it into a custom operator. So I simply have to provide
some parameters. There are more parameters that I just set
a default to here. But now my Dax
file stays clean and I have all my testing code in
a separate file doing
the same thing to the fine tuned model. Of course, that's the
core thing that we're all always waiting for and watching the logs for.
I get the latest fine tuned model so I'm storing all my
fine tuned model in a local directory, but I want to
get the latest for each test run again, loading the test images,
testing my model and then write the results to duct
Db and deleting the images from the temporary folder.
The code here you can see. That's why it's so great to have things
in a modular way. I used the same operator, same import,
and this time I just gave it a few different parameters. Namely I
gave it the result of the get latest fine tuned model task
that was upstream. So I'm using a at
task decorated function and return the
name of the model that I want to use here and I can just plug
that into model name and this will also automatically set
the dependency correctly. Now what I wanted to highlight here is I
talked about slack notifications and I was thinking,
well, the thing I'm interested in is every time a
model has been fine tuned and that model has been tested,
I want to know how it's doing because maybe if it's not doing
well, I have to go back and I have to change some habit parameters
or change something on my code. So I decided to
in this test classifier task I want
to have a callback. It's an on success callback.
So this runs whenever this task is successful.
I could also have on failure callbacks. Of course, it's actually
very common for people. We also have that in our internal
data pipelines to have on failure callbacks that send you an email
if something goes wrong with something you are responsible for. But in this
case I wanted to be notified if the task was successful.
I want to get a slack message.
This is what that slack message looked like. This is fully customizable.
So this is just what I decided I want to have in this message.
And I'm pulling from upstream tasks. I'm pulling results here.
I'm pulling the result of my fine tuned model test and started reading
average test loss zero. Okay, that's too good. And yeah,
I need to go back to the drawing board. That was one of the notifications
while I was building this pipeline that sent me back to.
Okay, hyperparameters completely out of whack. The accuracy is much
worse than baseline and f one
score. AUC is also much worse and average Tesla's of zero.
I think this might be overfitting. So let's go back to the drawing
board.
And lastly, I have the last DAC in my whole
pipeline. I didn't put the whole graph view because it's a very simple
one, but it's a DaG that picks the best model and
then deploys it. Doesn't actually deploy it yet because I know that code
will look different for everyone. But I put in a task already that is
called deploy model, where you could put in all your code that actually deploys to
your solution. And that's the second place where I wanted to talk
about the astropython SDK because here you can see I'm
using at AQL something tasks.
And what this signifies is that I'm using Astro
convenience tasks or Astro SDK operators.
The first one I'm using is at AQL transform,
which is a way that you can turn any SQL
query into a task that runs on any
table that you put in. Here I have the intable,
which is my results table, and I simply point that
at a table in my relational storage. So on line 52 you
can see that I'm pointing it at a table object
and I again give it a connection id. And if I ever would
decide I don't want to use duckdb anymore, maybe I'm using a
snowflake database. I could simply point that at my snowflake instance
and I wouldn't have to change any other code.
So again, this is a way to make airflow much more modular and
much more versatile and flexible if you ever change an underlying
tool. And the second task takes in
the returned table. So I'm running the SQL statement
on my results table. This will return a temporary table in this case
because I didn't give it an output table. And this temporary table is simply
passed into the AQL data frame task,
which automatically turns that relational table into
a pandas data frame. So I can just print out the
model name of the best model that I selected.
All right, that was all of the pipelines. I know that was a lot,
but I wanted to also show you what that looks like. So let's hop
over to my airflow instance. And you can see here
it's actually running. You can see there was one successful run of the whole pipeline
because we have one past success run.
And you also see I have all the Dax turned on and two
of them are currently running. So these are the two dax that
are using the add continuous schedule and they will
always be running. And the first task, which was the s
free key async operator, is in a deferred state here.
So this task is using the triggerer
component to wait for a file to drop in my
s free bucket. So let's give it what it's waiting for.
Hopping over to s free.
And let's take a file here.
Let's say I have a new meningioma that I want to train.
I want to upgrade the train set, and then I want to retrain my model
with this. Okay,
uploaded. It takes
a second and you can see it's not deferred anymore. Now this task is running.
That was the center, and this will run all
the downstream tasks. And the other thing that you saw is something
happened here in this different dag. This is because this DAG is
scheduled to happen as soon as this DAG
has moved that file from the folder I dropped the
new picture in into a different folder that is called
train data ng. And now this part of the pipeline is
starting. Can see here that actually has a
lot of tasks scheduled, 450. So I'm not going to wait for
all of this to happen. Just wanted you to see how the pipeline
is kicked off. But once this DAG would be completed,
the model training would start over here because again, I have
scheduled it on a data set that is updated by a task in
this environment. All right, we're not going to watch all of this happening,
but we're going to jump into the code.
So here you
can see my visual code studio and
you can see all the code that went into making this airflow
environment. And this is also all the code that is going to be in the
repository for you to download. On the left hand
side, you can see what the airflow
environment looks like. An Astro project here,
because I created this airflow project with the Astro CLi.
The Astro CLI is another open source tool that helps you use
airflow. There will be instructions on how to use the Astro
CLI on your computer in the readme of the repository. But in
a nutshell, once you have the Astro CLI installed and you run astrodefinit
in any empty folder. It will create most of these files for you.
And what you can see here is the main thing is the docker file that
I have open here. It pulls an image.
It's called Astro runtime, but this is almost completely equivalent
to open source airflow and it just has a few additional
features. I'm also setting an environment variable here that is necessary
for running the Astro SDK. But that is all that's needed to start
airflow. And once you have run Astro in it, you can run Astro
start next and it will start up an airflow environment with two
example dags for you ready to use now. Next,
I wanted to show you where the dags live. So I have this folder here
called Dax, and this contains all of the dags that are in my
environment. The one I want to show you is the training DAC.
You can see here, I have some documentation import everything
can see. I tried to modularize it as much as possible. So I'm importing
a lot of my variables from a local config file.
I'm importing my transform function that is running on my images.
And of course I'm importing my fine tuned hugging face binary image classifier
operator. I'm bad at naming things. And here
the dag is defined. We scroll down, we can see
all of the tasks. Each of these pieces of code is
a task that is generated in my DAC. You can see some use
the astro decay here. This one is using the add task decorator
because I wanted to be able to configure
this a little more. And here we are using the fine
tune hugging face binary image classifier operator with all
of the parameters. Now here
I told you I'm importing this from a local file. So if we scroll
back up, can see I'm importing this from include custom operator hugging face.
So let's go to that file. And this is where
all of the magic happens that you are probably more familiar with, a lot
more familiar with than I am. We are using torch here
and different utilities
from huggingface. But if we scroll down here, we have
all of the functions, we have the custom data set that I'm creating,
and we have the huggingface binary image classifier here. And what
I wanted to show you here is how easy it is to create a custom
operator. The only things you need to do are you have
to inherit from the base operator that you import from airflow,
and then you have to define all of your parameters. You can
give it a color, you don't have to, but I always do.
And initialize it. And you just have to initialize
the parent class as well and then you're good to go. You can
have an execute method, and everything that is in this execute method
will run as soon as this airflow task is running.
And here is all that you are probably very familiar with.
So here I'm pulling the actual model and
I'm doing all my training in this execute method.
And of course same thing happens with the test hugging face binary
image classifier operator.
All right, this is still running,
but let's hop back into the presentation because
I wanted to talk about the results. Now, I haven't done
this before. This was my first time fine tuning an image classification
model, so I went in blind. But it was very interesting
before I fine tuned it, just running the all purpose Resnet 50
on my classification glioma versus meningioma.
It has not seen many brain tumors before, I'm assuming.
But yeah, the model did not perform very well. And if you remember, I had
a pretty balanced test set. So the split is worse
than guessing or about
as good as guessing. And after I fine tuned the model,
I got something that I was pretty happy with. I tried to aim
for highest AUC and I got 0844,
which is something especially looking at the data set.
The images were often cut at very different slices in the brain.
There were other inconsistencies. I'm impressed.
And that was only for the app pox. I only trained it for 1
hour on my local computer. So yeah, this is actually
working pretty well.
All right, what's next? Of course, this is just
a very simple ML and data orchestrating pipeline.
There is much more that you can do here. For example, you could
use dynamic task mapping to fine tune your
models, to tune the hyperparameters. If you remember, I showed
you that you can have one task, one operator, and then for
different parameters you can simply provide it with sets and it
will create a mapped task instance for each of these sets. You could
do that with your hyperparameter parameters. So this is ideal
for hyperparameter finetuning with full observability. If you go back,
you could also, if you're running this in production, use the Kubernetes executor
or the Kubernetes pod operator to run the heavy tasks in a dedicated
pod and specify the resources you need, like GPU.
And of course, one thing that I was thinking while I was doing this,
I think at scale I would try to use a relational database
with the option of parallel writing,
especially for writing the file references,
for writing the model results. Duckdb was really great,
but writing the file references, I found
myself wishing I could have parallel writing. And of course,
maybe you're thinking, hey, I'm using huggingface,
I am doing NLP and is there an operator for me? Not yet,
but there could be, and it's pretty easy to write it.
If that's something you're interested in, feel free to create your own and share it
with the whole airflow community. One of the best places to get started
with that, and also get help if you run into any issues is the airflow
slack. I'm also going to put that link into the discord chat later.
All right, I hope you got interested in learning more
about airflow in seeing how Airflow and ML can
work together. There are a lot of resources
that are specific to ML use cases.
For ML Flow, there's a whole provider, so there are existing operators
that interact with ML Flow. I link the GitHub repository here.
There's a whole demo on how to use airflow with weights and biases.
There is a very great webinar on how to orchestrate machine learning workflows
with airflow, and a blog post about
what you might want to know before you get started. Both of these resources,
the webinar and the blog post, were created by Jeff Fletcher.
And I say that because he helped me a lot while creating this pipeline.
So big shout out to him. He's the director of field engineering
machine learning at Astronomer. And yeah, big thanks and
I can really recommend his resources. And of course, if you're using Sagemaker,
we already have a tutorial on how to connect airflow and Sagemaker,
and there are existing operators for running things in Sagemaker.
And yeah, the last thing I wanted to say here on the slide for resources,
if you're very new to airflow, probably very experienced with ML,
but want to get started with airflow, we have a quick start that you simply
can clone and run that is more data engineering focused,
but highlights a lot of interesting airflow features that
also were part of this talk like dynamic task mapping and data sets. And of
course, if there is an ML tool that you're using and that you want
to see an integration with, or there's content that you would like to see around
ML and Airflow, please tell me, please tell us. At astronomer,
we are always looking for how we can help the airflow and greater data community.
That was it. For content. Take home messages. I have to
say one thing. As someone who formerly worked clinically. Was this clinically useful?
Absolutely not. In reality, you would always have a radiologist
look at these pictures, would look at the whole MRI. You never just look at
one slice of an MRI. So this isn't really clinically useful
in any way. It was just something that was fun and interesting to do.
But it shows that even with these very constricted resources,
I only had a handful of pictures,
I only ran my whole training locally. I didn't
do that much fine parameter tuning. You can get interesting improvements on
an existing model, and I think in part this will be the feature future.
Maybe not so much for brain mris, because they are always
looked at very closely anyway, but maybe for other
images that are, of which there are a lot taken. And maybe you
want to prioritize which to look at first. Or one use
case that I've seen actually already happening, or being trialed
a few years ago in a radiology department, was that they had a model that
flagged potentially cancerous small lesions in lung cts.
So the radiologists, after they read the whole image normally, they would
always go back and look at those flagged images and
just look through these little lesions to make sure none of them
might have been missed. So that's something that how ML
could augment the work of physicians in many ways.
And yeah, the next thing, can the pipeline be easily adjusted for
other use cases? Yes, definitely. That was my goal here,
that you can adjust it for different things. You might have to change
a little bit about maybe the transform function for your images,
but the whole idea of airflow is to make it as modular and adjustable
and flexible as possible. So the main take home message here
is airflow is the one central place that can orchestrates both
your data and your ML. Pipelines can have complex interdependencies
and it is fully tool agnostic.
And with that, I thank you so much for listening to this talk.
I hope you got interested in checking out airflow and if you have
any questions, please ask me in the Discord channel.
And with that, I hope you have an awesome day.