Transcript
This transcript was autogenerated. To make changes, submit a PR.
Hi. Welcome everyone, and thank you so much for joining this
talk on how to test your Apache Airflow data pipelines using
common Python frameworks. So I'm very excited
to talk to you about this topic. First, I'll be giving a
quick introduction to Apache Airflow and astronomer,
and I will talk a little bit about why you want to test your data
pipelines. After that, I will go over the technical
details on how to implement testing inside of local
development environments and also how you can add
your airflow tests to your CI CD pipeline.
I will show everything that I show in the slides in a demo as well,
and the demo repository code will be available to you
in an open source GitHub repository.
But first, what is Airflow? Who is astronomer now?
Airflow is the open standard for workflow management.
It's a very popular tool. It's being downloaded millions of
times per month, which is just a lot. We have a big slack
community. If you ever have a question about Apache Airflow, this is the best
place to ask. Just join our slack and you
will be able to talk to all sorts of airflow users and also developers.
It's a very actively developed project. There are over
2800 contributors now, and there are a
lot of building blocks in the ecosystem now.
That's the high level, a workflow management system. If you are
working on data, you will recognize some or a
lot of these logos. Usually if you are working on data,
you have your data in different places and you perform actions
on the data using different tools, like one or more
of these most likely. And some of these
tools talk together, but not all of them. And you really are missing something
in the middle there, right? You're missing a centerpiece, and that is really
what Airflow is. Airflow is the orchestrator to your
data symphony. So Airflow sits in the middle and is
able to talk to all of these pieces to orchestrate
actions in all of these data tools with the right dependencies,
on the right cadence at the exact right time.
We very recently did a survey of airflow users,
and among the about 800 people who responded,
81% actually said that Airflow is important
or even very important to their business. So if
there is something going on in their airflow pipelines, it is
a big deal in their companies and in their businesses.
That's the high level. But what does it actually look like day to
day? If you are working with Airflow in general for this talk,
I will assume that you've used airflow before, that you've written a couple of dags,
but in case you have not, or in case you need a refresher on more
modern syntax. This is the Airflow 101 slide.
On the left hand side you can see a screenshot from the airflow UI.
So this shows one pipelines within airflow which
we call dag that stands for directed asyclic graph.
And on the right hand side you can see all of the code that went
into creating this DAC. So we have a DAC.
In this case it is called intro to airflow DAC.
And this is just defined by using a decorator in this case.
So I use this Act DAC decorator on line seven, I put
it on a python function and if I put this file
into the right folder, which I'm going to show in the demo, then airflow knows
this is a DAG and automatically will pick
it up. A dag itself contains
tasks. So in the graph on the left hand side you
can see we have a graph with two nodes. Those are the
tasks, pick a number and add 23,
and the edge in between is the dependency. So the first
task, pick a number needs to happen before the second
task add 23 is happening.
Now these tasks can be defined in two main different ways.
You can use decorators or operator classes
because it is all just python code under the hood. So the first
task, pick a number I've defined using the add task decorator.
I just used a regular Python function, put add task on
top of it and then I can put any python code in between and
airflow will automatically create a task out
of this. This is already how you can turn all of
your Python scripts into airflow tasks if you want to
do that without any additional knowledge.
Now this is one way that you can create a task. The other one
is by using an operator class. There are
thousands of these operator classes around
and a lot of them are bundled into provider packages.
So they abstract away a lot of the
logic that you need to interact with certain data tools.
For example, there's an Azure provider, there's a Google provider,
there's an Amazon provider, and they contain a lot of pre made
operators that make it easier for you to talk to those services.
In this case, I used an operator called my basic
math operator, and this is an operator that I actually wrote
myself for demonstration purposes.
So I can also show you the best later that test this specific operator.
In this case, this operator just adds the number 23,
in this case to the number that is being returned from
the first task. So you can see on line 25 I can actually
use the output of an upstream task in my downstream
task, so you can also exchange information in between tasks.
Lastly, on line 30, I say, well, let's chain those
tasks. I need to pick a number before I add 23.
That is already all you need to know to define
your first few airflow. Dax two notes on best practices.
Usually, DAX run periodically,
incrementally, and automatically. So you usually
have some sort of cadence that can be based on time. Like in this
case, I have a daily schedule, but it can also be based on something that's
data driven. So you can, for example, wait on an action
in a different data tool. Or you can also
say that one dag needs to happen once another dag has
finished, and usually it works incrementally
on data if you're using it for more traditional ETL ELT pipelines.
And ideally it runs automatically with only little input from
you as soon as you're done developing them. Now,
for the tasks themselves, we have three best practice
words that you can keep in mind when you're writing a task. The first
one is they should be item potent, which I already broke in this little example.
You might have noticed I'm using random, and this will not be
item potent because I'm not seeding it in this case. But ideally you
want your task to always have the same output for the
same input. This is important when you're rerunning tasks after the
fact, which airflow gives you a lot of options to do that
maybe a month later or even a year later to rerun a specific
run of a dag. The next one is to keep
your tasks atomic, so you want your tasks to do
as little as possible. You do not want to have a very long
script that does a lot of things, that even talks to different outside
tools all in the same task, because then you lose observability
and you lose a lot of the additional features that airflow can
give you, or they are just not that useful for you if you pack everything
into the same task. And lastly, because it is all just python
code, you can create this in a modular structure.
You can put functions in
supporting folders, import them, use them in different dags.
You can even build your own custom framework on top of
airflow on how you define and write dags. It is all just
python, and it is all fully open source.
All right, I already said airflow is open source.
What is astronomer? So astronomer is
the best place to run Apache Airflow in production. I'm obviously biased,
but I truly believe that it is a managed service that
gives you the infrastructure to run your Apache airflow pipelines,
especially once you grow and you have hundreds of pipelines,
this helps you run airflow reliably in production.
Of course, there are other ways you can run airflow with
open source tools. Like there is an official helm chart.
A lot of people have custom helm charts. There are a lot of different
options that you have. Astronomer also provides
additional tooling on top of airflow. If you want to
try this, there's a free trial. If you go to astronomer IO
slash Triastro, you can take it for a spin. But everything
that I'm going to show you in this talk is possible with fully open
source tools. You do not need to be an astronomer customer to
do all of the testing ways that I'm going to show you in this talk.
All right. Why should you test your data pipelines?
Well, I already alluded to something. Airflow is really
important in a lot of businesses. So if something goes
wrong, it can look like this. Of course,
there are a ton of reasons why there could be something up with
your data pipelines. The worst way to find out is
usually by a slack message from someone high up in the organization.
That sounds something like, where is my dashboard? Or why is my dashboard
showing something nonsensical? You cannot prevent all
of it, but you can prevent some of these events by testing
your data pipelines. And even if there
is something going on, if your pipelines are tested,
it will make it easier for you to debug because you know which part
of your pipelines you can trust.
This is kind of the takeaway message of this talk. Airflow is written in
Python, and airflow pipelines are just Python code,
so it's Python all the way down. And this means that all software
engineering and DevOps best practices apply, including testing
and CI CD. You would never ship application code
that is not unit tested, and you should also not do that
with Apache Airflow. You should also test all of your DAx,
all of your pipelines.
Okay, told you why you should test it. Now let's
get into how you actually can do the testing. And first we will
talk about local development. So this is the overview.
This is one of the potential structures that you can have
when you're working with data pipelines. One important
thing that is really just a DevOps best practice is all of the
code is in source control. In version control. In this case,
I use GitHub. But of course, you can use any version control
system that you want.
And if you're using Airflow for machine
learning purposes, you will also have your ML artifacts inside
of this version control. And in a lot of cases you have configuration files
for infrastructure as code as well. But we will focus on
the DAC code for this presentation.
Now we have our version control. We have three branches, in this case,
development, staging and production. And then we have all of
the steps that our code takes. And on each of these steps
you can see I put a little gear icon and CI CD because
we are doing testing all the way along. And first
of all, we are going to talk about the local part. So we
have our code, we have our development branch. We are working locally on
a machine, but we can already do some testing.
Now, if you want to develop with airflow locally, you have
a lot of different options. You can run a standalone instance. You can
run airflow in Docker. There's one that I highly, highly recommend.
It's called the Astro Cli. This is also a fully open source tool.
It's developed by Astronomer, but it's not specific to astronomer customers.
Everyone can use it. And this allows you to very easily
spin up a local airflow environment within Docker.
And it also includes built in testing features.
So if you go to this link that is on the screen on the astronomer
docs, the Astro Cli, you can get some install instructions.
But what you really need to know is you just need three common first
you install the package brew install Astro.
Then you go to any empty directory on your machine
and you run astrodefinit. And once you run astrodefinite,
a local astro project will be initialized.
And this is already fully functional with two example DAX.
So you can directly say Astrodef start.
And after one or two minutes you have airflow running locally
within Docker, and you can access the efflow UI at localhost
80 80. Of course,
then you can start adding your own DAX and start to develop.
All right, let's say you've done that. You used the
Astro Cli, you have your project, you added your DAX.
How do you go about testing those DAX? And the first option
that you have is by using CLI features. If you've
used airflow before, you are probably familiar with the blue
ones on the screen. Those are part of the general Airflow CLI.
You can also use those with the Astro Cli. You just have
to append Astrodev run, as I've shown
in the parentheses to the command. And the first testing
command that you can use is airflow Dags test. And this just executes
one dag run of your dag. So it runs through
your dag, and if you have any issues along the way, it will tell you.
The second option is airflow tasks test,
so you can also best specific tasks inside
of your dag. This can be very useful if you're working on a long DAC
with a lot of dependencies, but you have a task in the middle that doesn't
really need anything from upstream and you just want to develop and
iterate on that one. You can just test a specific task
directly from the CLI and you get the full logs of the task as
well. Now, if you're using the Astro Cli,
you have more options. So the green ones here,
the first one is called Astrodev parse, and this
parses your dax and makes sure there are no import errors.
Of course it parses for python syntax, so if you made a basic
syntax error, it will notice. It also checks for things like
there cannot be two tasks that have the same id
in the same dag, or you cannot, because it's called directed
asyclic graph, you cannot accidentally define a circle.
So these are things that Astrodev parse would catch,
and you can also do that very easily from the CLI.
The next one, which I will talk about more in the upcoming slides,
is Astrodefpy test. This command runs
all of the tests that you've defined in the best directory.
I will show you that in a demo. And even though it's called Pytest,
you can use any Python framework that you like. You do not need to use
pytest can be unit best, another Python framework. This command
just runs all of the best in that folder. And lastly,
a little bit of a different kind of test. There is a command called
Astrodef upgrade best that will test your
current environment and all of your dags against a newer airflow
version. So if you are planning to upgrade and you're unsure
whether there might be a breaking change, you can run this command
and it will test your DaX. It will also tell you about all of
the version changes of packages. It's another thing that I will show you
in the demo in more detail. Right, this was the CLI,
but I promised there would be something about IDe debugging.
It's one of my favorite little features that not that many people know about.
You can actually test airflow DAx interactively
in your ide with your favorite debugging tool. So no matter
which ide you use, if you use vs code Pycharm, you can actually
run through your Dax the same way you run through other Python scripts
with your debugger. And the way that you do that is
at the end of your dag file,
you add if name equals main. So if you run the file directly,
then you take the dag object that is in that file and you
say best. And that's already it. That makes the DAC
interactive. And then you can run that file with a debugging
tool. But you have additional options. So if you
want to, you can say, well, I want to run this DAC for a specific
date. If you're worried about your DAC breaking
on the 1 January, if you're doing a lot of logic around
date times, it's definitely one of the worries on my mind with some of the
dags that I'm writing sometimes, then you can just test it for that date.
You can also use different connections. You can store them in a yaml file
and just inject them. So you could test your DAC against a new database,
for example, or a new schema. You can use different variables
and you can also inject the DAC configuration. I will show
you that in the demo as well.
All right, so let's move on to CI CD, because so far we've
stayed local. We were just working locally on a machine with our
local development created by the astral CLI.
But eventually we want our code to live in production.
Right? That's the dream of all of the code. So let's
move on to the staging branch. And there is a CI CD step
here. Now, we recommend and different teams
will have different processes around these. The important thing is that you have a process
that you follow to have a PR from
a dev branch to a staging branch. Usually there are a lot of developer
branches, especially in larger teams. But one dedicated staging
branch and the PR will be in your version control and
ideally reviewed by your team members.
And as part of this pr, you run some
tests automatically. And this is where you can reuse Astrodef
pytests, because this is not just a command that
you can run locally, but you can incorporate this command into
your CI CD and it will run all of the best in
your tests folder. Usually this means you have DAC validation tests,
unit tests, and integration tests. I will talk about that in
the next slide. And I put this on the slide
because I've done this before. I didn't
want to wait for all of the checks to complete. I was 100% sure
that my code was fine. Just a small change, right? You should
wait at only merge if the tests have passed.
Now, what are those three kinds of tests. The first one is
airflow specific, so this is something that you do not have with your other python
files. You can have DAC validation tests. The first test
that all DAC validation should run is if the dags
are valid. So what I mentioned earlier, are there any circles defined?
Are the task ids valid? Is it a valid type
of schedule? Did you miss a mandatory parameter? Those sort of things
that are also run when you run Astrodev parse.
The second part is that you can add custom dag
rules. So for example, you could have a constraint on schedules.
If you have a team member who might accidentally push
a dag with the wrong cron string and suddenly your dag runs
every five minutes and it's supposed to run once per day,
you can add a test that makes sure that only approved schedules
come through. You could theoretically even define for like a specific
dag what schedules are allowed. If that fits your use case.
You can do the same with start dates or with tags.
The screenshot in this example makes sure that catch up is set
to false, which all airflow developers who are listening
know or have done that before on accident, set catch up to
true, or forgot about catch up to false, and then all
of the backfills have scheduled automatically. So this is a very common
one that people use to prevent that. Or you
can also say, yeah, I want to have specific tags. If you have ways of
organizing your dags, you can really enforce your architectural
decisions onto your dag offers by using these
custom dag rules. Another one that
is very common is because you can also have constraints on your
tasks. You could, for example, only allow specific
operators. This is something that I will show you in the demo,
and this example uses Pytest, but you can use any Python
test framework. The reason I've used Pytest here is because if you use
the astro CLI, there will be example tests already
automatically created, and those will use pytest.
All right, the next one is unit testing. Now, all of the Python
developers, of course, are familiar with unit testing. Where does that
come in? If you're using airflow, it really is important
if you have custom hooks and operators. So like the example
that I've showed in the intro slide, my basic
math operator is an operator that I wrote. So it's really an operator
that I should also write best for. The second use case is if
you have functions used and add task decorated tasks,
you can also write unit tests for those as well. So really,
if you write your custom python code, you should also add
your testing. Again, you can use any Python test framework
that you like. This code snippet here uses unit best, but you
could use pytest. You could use any package that you prefer.
A little side note here, I mentioned that there are a lot of building
blocks in Apache airflow. If you're using one of those open source
building blocks, then the tests are already done for you. So you
only have to write unit tests for your own custom
operators. If you want to figure out if there is already an operator for
your use case, the best place to do so is the astronomer registry.
If you go to registry astronomer IO, you can find all sorts
of information on all sorts of operators.
All right. And the third type of best is integration testing.
This is if you are talking to an API. So with unit tests
you usually try to only test the code that you yourself
wrote. So if a custom operator calls out
to an API, for example, you're writing a custom operator to
interact with Snowflake, and then you would start
to mock that API call to make sure that
it is your code that is working or failing and not
something with the API. And with integration tests, you actually
want to test that API call, which again you do for custom code that
you write and you can do with any Python test framework. In this
code snippet, I simply use the built in assert. One little note here.
Be careful, of course, if you call out to an API, that can
incur cost and sometimes take a lot of time, especially nowadays
with llms. So be careful of what
kind of payload you are sending and what kind of code call you
are making inside of your tests.
All right, so we merged our PR.
Our code is now on the staging branch. It is very happy on that
branch, but it wants to move to the deployment. And CI
CD stands for continuous integration, continuous deployment.
So we have a step that does this automatically.
Ideally the code, once it is moved onto the staging branch,
it will run the tests again, and if those tests pass, it will
be automatically deployed to the staging deployment. Some people also
add more integrated best to the stage that
maybe test your code in context,
in the context of the whole environment. You have a lot
of options on what you can put into your CI CD script.
Since airflow is all just Python code, you can use any CI CD
tool that you like. In the demo. I'm going to show GitHub actions,
but there are example scripts for different tools available in the astronomer
documentation. You can also consider having automated
creation of your deployment. So for example,
maybe you want the deployment to automatically update
a configuration and have that configuration file in
your source code as well, or in your source control system as well.
Then you can use infrastructure management as
part of your CI CD. This is kind of the next step in DevOps in
a lot of cases, and you can also do that for your airflow infrastructure.
This will depend on how you are running airflow in production.
But if you're an astronomer customer, you can use the Astro
API to manage your infrastructure in a code based
way. Okay, so our code
made it to the staging deployment. It is running. There it is.
Fine, let's go to production.
This is just a recommendation. It's not always possible, but we
recommend that you run your code for a few days in your staging
deployment as an end to end test, because you can never
think of anything. There will always be edge cases that you did not think of
that maybe come up only with data on certain days or.
Yeah, something specific that you only see once your code is running
in the full context. So that is one of the main purposes of the
staging deployment, making sure your new code is working,
your new pipelines are working end to end,
and then usually a couple of prs get bundled, and there is a staging
to production pr that gets reviewed again.
And once that is merged, all of our tests run
again and the deployment is automatic again.
All right, that was the overview over the different ways
and contexts of testing. Now I will move to the
demo. So, as I said, all of the code
in this demo is available to you if you go to
GitHub and then astronomer externaltalkdemos.
It is on the branch for this conference. So 2024
conf 42 Python airflow testing prod. Don't worry, you don't have
to write that down. The readme on the main branch will
have a link to this branch. But let's
look at this. All right, so this is
the airflow environment you can see here. I'm running on localhost
80 80. So this is running on my local machine with
the Astro CLi. And the code repository that
is available to you just contains two dacs. The first one
is the intro to airflow DAC that I've shown in the slide. So you have
the code, and the second one is the one where I'm going to show
you the testing options. So let's click on this math DAC.
Scroll down. Okay, this is our DAC. Can already see
that I've tested a specific task here. But if you're looking
at a full run, you can see the structure
of this dag. So it is a very, very simple dag.
We have two tasks. At first one picks
a random number. It actually uses an API to do that.
The second one retrieves an operation from an airflow
variable, and then we will perform that operation
with that number and the number 23 using this basic
math operator that is a custom operator. And lastly,
we make sure that a table exists in a local postgres database
and then we write that result to that table.
So a very simple DAX structure, but it has all of the pieces
that we need to look at testing. So with
that said, let's hop over into the code.
I'm using vs code here. You can use any ide that you prefer,
but what I've done here is in an empty folder. I ran
the command astrodef in it, and this
will create the Astro project and it will create
most of the files that you see here and most of the folders.
The important one here is the Docker file. So what
is happening here is that it pulls the astro
runtime and this is functionally equivalent to open
source airflow. It just adds a little few features that
help with running airflow with the Astro CLI and
with Astronomer. But this is also ready
for you to use and you can expect feature parity
from Apache Airflow. In this case, version ten two
corresponds to a 2.8 version of
open source airflow. So here is where we pull
in airflow into the Docker environment. You do not need to
know anything about Docker in depth or in particular to use this
tool, but you can add your customization here if there's something
you want to customize. Since Docker is running
in an isolated environment, we need to install our packages.
And this is where you can do that. You can add all of the packages
that you would pip install to the requirements file. And then once you
run Astrodefstart, these automatically get installed in
your airflow environment. One little note here.
It is a best practice to always pin your versions here. This is also something
that has caused me some headache before when I forgot that for a simple package,
always pin your versions. All right, so the
next thing is we have the DAX folder. And I said
earlier, airflow will know that a file is an
airflow DAC. If you use this add DAC decorator and
you put the file into the right folder and this is the right folder.
So if you put a file in here, like our intro to airflow file,
and it has the word dag in it, it will automatically be
picked up by the scheduler. We have an
include file. This is where we can put all of our supporting code. So,
for example, the custom operator lives here.
And most importantly for this talk, we have the tests folder where
all of our tests live. And I will go through those in more detail in
a second. Okay,
so this is our astro project that is created by Astrodefinit.
We can start it by Astrodef start. It is already running here.
Show you the docker containers that are running.
And now we want to do some development. So we
wrote this DaG. This is the DAG that I've shown you with
this very simple structure, picking a random number and performing
an operation on it. Let's say we've written this
DaG and we want to test it. Now. If you remember,
the first option that we have is by using CLI commands.
So one of the commands would be we can just test the
whole DaG. So airflow DAX test,
if you're using the Astro Cli translates to Astrodef
run DAX test, and then we use the DAC id.
So I run Astrodef, run DAx test,
math DAC. Let's run this. This will just run for the whole
DAC. And it's pretty fast. So you can
see here it executed all of the
alt tasks. We ended up with eleven
times 23. So the random number was eleven.
And this result eventually gets inserted into our postgres
database. All right, so let's say I do some
more development on a specific task. Maybe this pick random number
task. And I don't want to run everything all of the time,
so let's just run this specific task. This is
also something you can easily do with the CLI. We can do Astrodef
run tasks, test, map dag, pick a random number.
So we use the DAC id and then the task id.
And this will just run this task. I get
14. Let's do it again. It should be random, not item potent.
Get 71. So this is working.
All right. So this is already very helpful.
But sometimes you need to debug something in depth.
So you need to debug your tasks in more depth
and you want to use the python debugger. And what you can do then
is let's add a breakpoint here. Let's add the breakpoint
here. This just means that I tell my
project, I think there's something going on here. I want to stop here
if I'm using my debugger tool. And let's use
the vs code debugger. So what this
does is it runs and debugs the stack and
it stops here. And then I have all of my debug
options. So can actually see here, I can see the
maximum and minimum that I am running with, can see the global variables,
something that is very useful if you're an airflow developer. I can see all
of the options that I have within the context
that I injected into this task and I can perform
actions here. Like I can say, well, is there
already a num at this point? There is not. Num has not
been defined, but I have minimum.
Minimum is defined. And from the context,
I think I have context ts.
I have all of the context that is within ts.
And I can of course, use all of the debugging.
I can do a step down, step further,
and now I'm within the function that is being called. So I
can see what is happening inside of the function. I can
step through the request, and at this point I will probably
have the request ready do r not defined,
but I can step through and then I return the
jSOn here and say, well, what is r here?
R is the response. So it's 200. What is R
Json? That should be the payload. And I can get
the payload. So it's 43 at that point in time. So this is
really how you can prevent all of those print statements that
as a junior developer, I had a lot of print statements in my code because
I needed to know what is going on in debugging. But if
you're using a debugging tool like this, you can just step through your code
and do this in a much cleaner way and in a very interactive
way. Okay, let's exit this debugger and go
back to our code. Go back to our terminal.
All right, so how did I do that? Because this is not
working just by default. Right. You may
have tried this before with your flow, Dax. I added
the DAC test function. So what I've done here on line
100 is the same thing that you've seen in the slides. I added
if name equals main. So if this file is run
directly, I want you to run this instead.
So I say the dag object. I assigned
the call of my dag function to an object.
I take this dag object and I use the dot test method,
and all of these parameters are optional. But this
just means now you can run this dag in your debugging
tool, and I can run it for a specific date. For example,
I can say, well, I'm worried in a year is
this still going to work? Like if I'm using some date time, and then
I can run it for a specific time of the year.
I'm also injecting connections and variables, and I
can show you how I do that. I have this folder daC test up here,
and then I have my connections in
a yaml format. This is just my local postgres,
and this is the database
that I want to connect to when I'm testing the DaG. But I could point
this at any database and I also inject a
variable. In this case, I inject the operation. So if I change that,
for example, to change it to minus,
then it would subtract instead of multiply.
So I can test for different situations.
Of course, you can also have configurations. In this case I
have an upper limit and a lower limit for the random number.
And I can just inject a different limit so I
can say, well, I want the upper limit to be 20.
Okay, let's run this again with the different configurations.
And now it runs through. I removed the breakpoint so it
didn't break, but what you can see now if we scroll
up is here we got the equation 14
-23 so we didn't get the multiplication
that it was earlier, but we ran it with the new variable,
which was the minus operation.
All right, so this is super useful if you're working locally and
if you're debugging while developing. But let's look
at the tests that are in the tests folder.
So if we look through that, we have DAC validation tests
first. And these are actually some that are generated
automatically if you run Astrodef in it. So if you scroll up here,
this code up here will be generated for you. And this
just gives you the function that
gets the input errors. So the function that makes sure that your dags are
passing correctly and the function that
gets your dags. And what I added here is
the first one is the one that I've shown in the slides. So this is
a test that makes sure that the catch up parameter is set to false,
which is very very useful. And then I also added
a test that makes sure that I only use allowed operators.
So I say I only allow this add task decorator,
which the official name is the python decorated operator
and my own custom one, plus the SQL execute
query operator. And this is the test that actually makes sure
that for all of these tasks in my DAC tasks. So for
all DACs that I have in my flow environment, please make sure
that the type of the task is in the allowed
operators. All right,
integration tests just have a very simple one.
I take this function that makes the API
call that you've seen earlier can show you
that it's in the utils very simple function that just calls this random
number API. And it makes sure that this
returns a result that makes sense for our constraints and
the unit tests. Of course, if I write a custom operator,
I should really unit test this. This tests the operator for
all operations. It makes sure that if I divide by zero
I get an error, and it makes sure that if
I have an invalid operation that it also
throws an error. And you can see those were three different
python frameworks. Unit best built in
assert and pytest. And we can run
all of these with one command. So if I run Astrodef pytest,
this will actually create a little container and
then it will run all of tests and they all passed.
Very happy. But let's see,
let's make one fail a test. So we say we want catch
up to always be set to false. So what happens if
catch up is suddenly true? Let's rerun the test.
This DaG should not get passed the test, and it did not.
One test failed. Catch up equals false.
This is not allowed. So this is how you can get real control
over your DAX, and not just in
your local situation to run it, but you can take
this test and add it to your CI CD.
So in this case I'm using GitHub
actions, but you can do this with any CI CD tool.
I'm saying I want to always run these tests when
I push to these branches or when a pull request is
closed to these branches, then I have some customization which
tests I want to run. And I say,
well, check out the repository, install the Astro Cli
so we can run Astrodef Pytest and then run
the test harness. So this is running the exact same command inside
of the CI CD that I can run locally.
We have the same thing on merge to production.
And the last step here is just because I'm using astronomer,
that it also automatically sends my code to my astronomer
deployment using my deployment id.
But if you're using a different way of running airflow in production
or in the cloud, you just can replace this last step.
Now to show you that this is actually working, let's hop
over. And this is the repository that I've talked about.
If we scroll up, it's the astronomer external talk demos
repository. And you can see here this was
happening on a push to a branch and it runs
the exact same test. So if we scroll down,
you can see we also have those twelve best that passed.
We had the DAC validation tests, the integration tests and
the unit tests. All right,
so the last thing that I wanted to show you is that
you can also do upgrade testing. So one thing that you
might have noticed in the demo earlier, we had a banner up
here, new version is available. This happens a lot because new versions
come out all the time. So we are on ten two and
there's actually a runtime version ten three available.
Now I want to upgrade, but I want to be sure that my DAX
are fine and I want to know which packages have
had upgraded versions in this change. So what I
can do is I can run Astrodef
upgrade test. So let's do this. I already
ran it before, but it's actually fairly quick, takes a little longer
the more DAx you have, but it
ran and it tells us no errors detected in your dax.
This is great. This also tells you it is not running the
Astrodef pytest, it's just testing your DaX against the new
version because the Astrodef pytests would
still fail our intro DAC for having catch up to true.
But this best against the new version. And what you get is you
get this little folder here. And this contains several
files. The first one is can HTML file that you can open in
Firefox or in chrome. And you can see that this
tells you all of the details of your environment, which can be super useful,
especially if you're debugging a platform issue. But we can see what
I'm running on here, which Python version, the packages that are
relevant, and we can see both of our dags and
we can see that both of them passed all of the tests.
They will work in ten three as they are in ten two
right now. The other thing that I personally find super useful is
dependency comparison. So there will be pinned versions
of different packages for each astro runtime and each
airflow version. We actually get the full pip freeze
for both versions here. And this one tells
you what actually changed. So you can see that
we are using the same base airflow version, so we're
using the same airflow, but the Astro runtime extra has changed. We did
some provider upgrades of the built in providers and
for example, Titest had a major update in this
version. And this can be very important if you're using
one of these packages inside of your airflow tasks and you
may have a specific dependency or you need to update something
in your airflow tasks. This will tell you, and this will make sure that
your Dax do not break.
Okay, so that
was the whole journey of our code from
local development to production and
how you can test each step of the way. As a little recap,
if you're doing local testing, you have two main options. You can
use the Astro CLI airflow CLI commands, and you can use
DAX test to debug interactively and for all of
the CI CD steps. We really recommend to have a robust
testing suite inside of your tests folder to run with Astrodefpy
tests on every step on the way, with DAC validation tests,
unit tests for all of your custom code, and selected integrated tests
for APIs that you specifically want to test.
The take home message airflow is written in Python. Airflow pipelines
are just python code. This means you should really test
it. Software engineering best practices and DevOps best
practices do apply. I hope this was helpful for
you. Thank you so much again for joining, and I hope
you have a great rest of the conference.