Conf42 Machine Learning 2024 - Online

A GenAI Pipeline for Content Generation with Apache Airflow

Video size:

Abstract

Get a tour through the code of an end-to-end GenAI Airflow pipeline, using custom information in an RAG pattern and fine-tuning GPT-3.5 Turbo for your content generation use case. The pipeline code is open-source and will be provided to you.

Summary

  • Apache Airflow can be used to create generative AI pipelines. The code in the demo is open source and available for you to use. Why is airflow so popular to use for AI? It can sit in the middle of your data and ML stack.
  • Airflow 29 includes new features for creating best practice generative AI pipelines. You need to make sure that your training data is always up to date or you're augmenting data. And then of course, every time you put something into production, you need to worry about scalability and reliability.
  • The task flow API is a pythonic way to write airflow dags. It's a much more natural way that you can create and chain your tasks. You can also mix and match traditional operators and taskflow API. Personally, I've switched over to using decorators whenever possible.
  • You can configure automatic retries, and this is really a best practice in production. By default, airflow only runs tasks if all the upstream tasks were successful. But if you are using branching, it's very easy to get a situation where one task gets skipped and one is successful. And that's when you need to adjust the trigger rules.
  • The other thing I wanted to mention really quickly in just one slide are deferrable operators. If you are creating generative AI pipelines, it's likely that you have tasks that run for a long time. The best practice here is to use the variable operators whenever possible for longer running tasks.
  • You can chain your dags based on the data that they are updating. This is available as of Airflow 24. The data sets are defined in a URI syntax. Now you can see all of your data sets in your DaG graph as well.
  • You can now schedule on logical dataset expressions. You can also have a DaG running on both time and data set. We now have a rest API endpoint to update datasets. This could be used for cross deployment dependencies. The demo shows all three of these new advanced dataset scheduling features.
  • The other airflow feature that I want to dive in a little bit more deeply is dynamic task mapping. This is really a core airflow feature now that you should be aware of, especially if you're using airflow for generative AI. Use dynamic tasks whenever possible, especially over dynamic Dax.
  • In the demo there are traditional operators that are dynamically mapped. We had three components for basic dynamic task mapping. The first one was the elements that stay the same in between the dynamically mapped tasks. You can also customize the map index template.
  • A demo repository that creates LinkedIn posts for me about Apache Airflow. But you can very easily adapt this repository to create any type of content for any topic. It's all open source. You're very free to fork it and to use it.
  • The ingest knowledge base stack is scheduled based on both time. And we can actually see some of the scheduled runs. One way that you can make your airflow easier to use and to reuse certain parts of your code is by modularizing functions.
  • In this demo I have 14 chunks of airflow guides that I'm ingesting into viviate. I modularized all of the code that is used to chunk the text into sizable elements that can be used and ingested. And that gives me this highly adaptable, highly dynamic rag dag.
  • Python three API scripts and then start fine tuning pipeline. Both validation examples and training examples get evaluated. This is one way that you can use branching to protect yourself from accidentally incurring a lot of cost.
  • The fine tuning dag uses a deferrable finetune openair operator. Currently this task is deferred because it's waiting for the fine tuning to finish. There are two options to make this operator more efficient.
  • Custom operators GPT fine tune you can use this operator as a template for your own custom operators. One of the ways that you can standardize how you interact with generative AI. You can run that on any schedule or on specific triggers.
  • Even if you've never run airflow on your computer, you can do this today. Use OpenAI both for your call to create the inference. And also in your vv eight connection in order to create embeddings. Once all of the dags have run you can start creating your own blog posts about airflow.
  • Both your data and your orchestration with airflow sets you apart from your competitors when creating any genai applications. I hope you fork the repository and have fun with it, create amazing things with it.

Transcript

This transcript was autogenerated. To make changes, submit a PR.
Hi, thank you for joining. I'm super excited to talk to you today about how you can use Apache Airflow to create generative AI pipelines. Specifically, I will talk about a demo that shows a full generative AI pipeline for content generation with Airflow. This is the agenda for today. I will give a quick introduction into the topics of Airflow and astronomer. Then I'll talk about some of the challenges that you might run into when you want to take generative AI projects into production, and how airflow addresses those challenges. Next, I will cover some key airflow features that I think you should know when you are creating generative AI challenges, especially features that are used in the demo code. So wanting to help you to understand the demo better, I will specifically focus on two features that got a major update in the latest airflow release. Airflow 29 came out a little lower a month ago, and there were significant updates to advanced dataset scheduling as well as a dynamic task mapping that I will cover in the slides. Then we will spend some time in the demo. The demo is a pipeline that creates content using a retrieval augmented generation pattern, together with also fine tuning GPT for your use case all of the code in the demo is open source and available for you to use, and this is the link to the GitHub repository where you can find that code. I will show this link several times throughout the talk. One note before we get started, I will assume that you have some familiarity with Apache Airflow, so I will not cover the very basics of how to write an airflow dag or how to define an airflow task. Really, if you are completely new to the topic, if you want to do generative AI pipelines and are new to airflow, I highly recommend that you take a look at some of our get started started resources. Those are available at Astronomer IO docs. Learn or you go to the Airflow Academy that astronomer has, which is available at Academy Astronomer IO. With that said, what is airflow? It's of course the open source standard for workflow management. It's extremely popular. It's being downloaded 22 million times per month, which is just wild to think about. And the one thing that I want to call out on this slide is that airflow now has 46,000 members in the airflow slack. So if you ever have a question about airflow, about any of your use cases, it's one of the best places to ask. If your question is, well, are people using airflow for ML and AI already, or is this completely new? The answer is it's all already a very established pattern. So at the end of December last year, we did a survey among airflow users, the Apache Airflow survey. And among the almost 800 people who responded, 28% said that they are already using airflow to train or serve generally machine learning models, or just manage their mlops use cases. So this is not something new. This is a pattern that already a lot of people are using now. Why is airflow so popular to use for AI and also for general data orchestration? It's because it can sit in the middle of your whole data and ML stack. So if you are in the data and ML space, you will recognize some of these logos, maybe you recognize all of them, but these are just a few of the tools that airflow can connect to and orchestrate actions. In and with airflow being the orchestrator, what you really want is to have a way to make sure that all of the tasks, all of the actions, and all of these tools are happening at the exact right moment and with the right dependencies. The prime example is you cannot train your model before your training data is ready. So all of this is connected, and airflow can sit in the middle and be the orchestrator to your data symphony. The reason why airflow is so good at this is because in airflow, you can define everything you need in python code. So if you have any action that you can define in python code, any call to another API of a different data tool, you can turn that into a task inside of airflow very easily. That's airflow. Who is astronomer? Why am I talking about airflow to you? Astronomer is the driving force behind Apache Airflow. So astronomer is a company that drives. Currently, all of the new airflow releases, has a lot of the major committers and PMC members on board, and also contributed over half of the current airflow code. So Airflow is open source, but a lot of the code is contributed by employees of Astronomer. And there's also a growing academy ecosystem. I mentioned the academy earlier. It has beginner courses and more advanced modules, and now there are over 30,000 people who have taken courses in the academy. So airflow education is also something that's very important to astronomer. Of course, Astronomer is a company, so there's a commercial offering which is called Astro. Astro is a place where you can run airflow deployments at scale, and many of them without needing to worry about infrastructure. So Airflow itself is open source, and you can run it however you want to. But if you want to have an easy way to spin up a lot of airflow deployments, especially if you have different teams that use airflow and want to have additional features on top of airflow like RBAC management, then Astral is a great option for you. If this is something that interests you, this QR code and the link on the slide here give you a free trial, a 14 day trial with $300 in credits so you can try out what it feels like to run airflow with astronomer. And if you sign up with this link or with this QR code, you also get a complimentary airflow fundamentals certification exam try. So if you go to the academy that I mentioned and want to take the certification exam, you get one try for free with the code that sent to you. If you sign up via this link, I will show this again at the very end of the talk. But important to note, everything else that I'm showing, all of the code that I'm showing is all possible with open source airflow. All right, so you want to do Genai with airflow, and usually you have some prototype, maybe in a Jupyter notebook, and the prototype works great, but putting this into production is a different beast. So you have all of these challenges that arise. You run into API outages and rate limits. You need to make sure that your training data is always up to date or you're augmenting data. Because if you think about it, everyone who's watching this talk can make a call to an API of a large language model. So it's very easy now to just create a wrapper around these APIs of large language models. But you need to set yourself apart from your competitors and the way that you can do that, or one way that you can do that is by having better data, is by using your valuable, your organization's data or your own data to augment the responses that you can get from a large language model. You have changing tools and APIs every day. I see a new model that has come out almost every day, literally, and new benchmarks. So it's possible that a month from now there will be a new model for your use case that is just a lot better than the one that you are using now, and that might be hosted it on a different service. So you really need to be able to adapt to changing tools and APIs. You can end up with quite complex pipeline structures, and you need to be able to determine which data went to do training. In a lot of cases, especially for compliance reasons, everyone who, like me, is in Europe. You will be aware of GDPR. That's one of the reasons why you always you need to be able to audit your pipelines. And then of course, every time you put something into production, you need to worry about scalability and reliability. Airflow has an answer for all of these challenges. With API outages and rate limits, airflow can have automatic weak tries configured Airflow is already the standard to keep your training data up to date. And as I mentioned before, Airflow can connect to any tool that you can connect to using Python code. So Airflow is tool agnostic at its core. And especially if you use the task flow API, which is something I'm going to show in this talk, then it's very easy to just change which tool you are connecting to. You can create complex pipeline structures, among other things, with data sets and dynamic task mapping and branching, which I will all cover in this talk. And there are ways to determine which data went into training by using observability features and an open lineage integration. For scalability, you have pluggable compute and reliability wise. Airflow has been around for a while now and is really battle tested. And because Airflow itself is all defined in code, it's all defined in Python. You can have CI CD and DevOps best practices on top of your pipeline code to really make sure that your pipelines are robust and your genai application is reliable. Little side note, in the Conf 42 talk about Python this year, in 2024, I gave a talk about how to do testing and CI CD with Airflow, if that's something that interests you. So after seeing all of these answers to the challenges, we can kind of append our statement from earlier and say it's not just your data that sets you apart from your competitors, it's also the quality of your orchestration. If your orchestration is reliable, never goes down, and is scalable, you will have the better product than your competitor. All right, picking out some of these features. These are the features that I think are essential to learn about when you create best practice generative AI pipelines. And I will cover the first six ones in this talk, but for the other ones there are guides available as well on our learn page on astronomer IO learn docs learn. For the first four, the light blue ones here, I will just cover them briefly, and then I will spend some more time talking about dataset scheduling and dynamic task mapping, specifically about what is new about these features in Airflow 29. So let's get started with quick airflow features in rapid succession. First of all, the task flow API. Now this is something that a lot of you might be familiar with if you are writing modern airflow DAX, but if you've used airflow a while ago, or if you are on an older version, you might not know about this yet. And the task flow API really is a pythonic way to write airflow dags, and it's a much more natural way that you can create and chain your tasks. On the left hand side here you can see a traditionally defined airflow task. So we're using a Python operator to run a function called say hi func. And this is possible. It's still possible with airflow, but it takes a lot of boilerplate code. And what you can do with the task flow API is you can create the exact same output, so you can run the exact same task by simply having a python function and putting task on top of it. So what happens now is I can have any Python function, and by putting at task on top of it, Airflow realizes this is an airflow task now and does everything that you previously had to define this Python operator for. It's also much easier to create dependencies in between tasks when you're using the task flow API, and you can see what this looks like in the example code. You can also mix and match traditional operators and taskflow API. That's also something I've done in demo. And there are not just this. There's not just this one decorator at task, there are many others. There's kubernetes, there's at task bash, and you can learn more about them in our guide about airflow decorators. Personally, I've switched over to using decorators whenever possible over traditional operators, though traditional operators still have their use, especially when you are interacting with complex tools and the operator does a lot of work for you and abstracts over a lot of python logic for you. All right, now we're writing pythonic airflow Dax let's protect against API failures and rate limits. When I was creating the demo for this talk, I actually rate limited myself. I ran into this exact problem. So what you can do is you can configure automatic retries, and this is really a best practice in production. You always want retries configured, unless for a specific task, there's a reason why you do not want this task to automatically retry. You can of course configure the number of retries. You can configure a specific delay in between the retries, or you can also say you want to have an exponential back off, so it takes longer and longer after each try to try again. You can set a maximum delay as well. The ways to configure retries, you can configure them in the airflow configuration so you can set them for your whole airflow environment. You can set them for all tasks in a specific DAC in the default arguments, and you can override that at the specific task level. So a common pattern is to have an airflow configuration that says all tasks have free retries by default. But for specific tasks where you already know that API is finicky, you might set retries to ten with an exponential back off. Okay, we have task flow API retries. Now. Our pipelines are getting more complex because a lot of times you run into the issue that depending on different aspects of your data or different situations, you want different tasks to be running. And the way you can achieve that is by using branching. What you can see here is an abbreviated version of a task that exists in the demo. And the the graph view of this dag is the challenger versus champion dag that is also in the demo as well. And what you can see is I have a python function and I'm again using a decorator on line 35. I'm saying at task branch, and this turns my task into a branching function. And this function is very simple. I have some logic that I abbreviated here that figures out if a current champion model exists, and if it does, I return the string champion exists. If it does not, I return, there's no champion. So this task is what creates this 1st 1st branching task here. And the string that is returned is the task id of the downstream task that I want to run. So if there is a champion present, I want to run the task called champion exists and go that way in my dag structure. If there's no champion present, I want to run the task no champion and skip all of the intermediate tasks that are here. Because if there's no champion, I don't need to do a champion challenger comparison, I will just use the challenger. In this specific DAC, we have two more branching tasks. We have ischallenger, which does the same thing to evaluate whether a current challenger model exists when this DAG is running. Because if there's no challenger, I would just keep the current champion. And if both exist, if a current champion model exists and a challenger model has been presented, then I want to actually pick the better one. And then I have my comparing accuracies task. And depending on which model was more accurate, I either switch to champion, so I'm using the challenger, or I keep my champion. Now you can create very fun dags with this. You can create little games with this as well. One thing that you have to keep in mind is by default, airflow only runs tasks if all the upstream tasks were successful. But if you are using branching, it's very easy to get a situation where one task gets skipped and one is successful, and the downstream task that is scheduled, depending on both of these tasks, should always run anyway. And that's when you need to adjust the trigger rules. So the trigger rules downstream of branching tasks always need a little thought. A common trigger rule that people use in this case would be, for example, one success. In that case, the task runs as soon as one upstream task has been successful, and it doesn't matter if the other ones get skipped or even fail. The other thing I wanted to mention really quickly in just one slide are deferrable operators, because if you are creating generative AI pipelines, it's likely that you have tasks that run for a long time. So the prime example is model training and fine tuning. That might take a while, or you are waiting for an event to occur in an external system with a sensor. You're probably familiar with sensors. A lot of deferrable operators are deferrable versions of sensors. If you are waiting for a long time, you don't want that worker slot to be taken up while waiting in your production environment, you want to release the worker slot and for that worker to be able to run a different task in parallel. So what you're doing is you define it. You use a deferrable operator which will go to a whole different airflow component called the trigger component, and held that component to start an asynchronous process waiting for something to happen. Meanwhile, the worker slot gets released and you can have the advantage of resource optimization so the worker can do a different task in the meantime. And as soon as the condition is happening that the async process is waiting for, the task will get picked up by a worker. Again, the best practice here is to use the variable operators whenever possible for longer running tasks. In the demo, there is one deferrable operator, and I will show you how that works in practice. Okay, those were the four features I wanted to cover a little bit. Now let's dive deeper into data set scheduling. This is my personal favorite way of scheduling DAX, and I hope it will become yours as well, because what you can do with datasets is you can chain your dags based on the data that they are updating. So this is the data sets view in the airflow environment. This is available as of Airflow 24. So if you do not have that tab, you might need to upgrade your airflow. And what we have here is we have one upstream dac which is called my ETL Dac. And there is a task in this DAC where I as the DAC offer know this task updates some data. So this task updates the data CSV file in my s three bucket. It doesn't actually matter if that update happens. Task could do something completely different or it could be an empty task. I, as the DAG offer tell the task, hey, when you finish successfully, you produce an update to that data set. I always imagine it as the task waving a little flak at as soon as it's done and then it updates this data set and you can have as many data sets as you want and then you can have downstream dags. So my ML DAC here is a different Dag and you can have these dags scheduled based on updates to datasets. So I can say, well, whenever that data CSV file is updated, I want to run the my ML DAc in this case to show you how this can be chained. There is another task inside of this mYml dag which produces an update to a new data set called Snowflake Mytable. A few notes here. The data sets are defined in a URI syntax. That is just a best practice. You could theoretically call the data set, my model, or something else. There are a few strings that are not allowed, but generally it's very possible to use different strings as soon as they would evaluate to a valid Uri. But it is, as I said, a best practice to use uris, especially waiting for potential future airflow features. And the other question that always comes up is airflow is currently not aware of the underlying data. So it doesn't matter whether that snowflake table exists, whether that s three bucket exists. You as the DAG offer name these datasets and you determine which tasks actually produce updates to these datasets. This is what it looks like in the UI if you are running Airflow 29. So this is a brand new 2.9 feature. You can see this is the consumer DAC that we had in the previous slide, and we have one task in this DAG called my task. But you can also see the data set that this DAg is scheduled on, that's to the very left of the DaG graph. And then you can see we have the new producer task and the next dataset that this task is producing too. So now you can see all of your data sets in your DaG graph as well, which I personally find really nice. But that was not the only update to dataset scheduling in Airflow 29. We also have these three new additions which open up a ton of new use cases for dataset scheduling. The first one is you can now schedule on logical dataset expressions. So previously you could schedule either on one dataset or having an update to a whole list of datasets with and logic. But now you can use any combination of and and or logic. So you can say, I want the stack to run as soon as this data set or this other data set has been updated, and you can do that with as many data sets as you want, and you can build complex logical structures like that. The other new update is that you can also have a DaG now that is running on both time and data set. The prime use case is that you have a Dag that updates a table and you want this dag to run once per day, but also whenever a certain data set is updated. And you can now do that with the data set or time schedule. And the last one which I'm really excited about because it opens up airflow to the world in a way. We now have a rest API endpoint to update datasets. So previously, before Airflow 29, you could only have an update to a data set from a task within the same airflow deployment, but now you can send that update from anywhere. This could be used for cross deployment dependencies. So you have two Airflow deployments and they can now talk to each other very easily using this feature, or it could also be from anywhere. So you could have a button on a website and then you run some JavaScript code and that creates the update to the dataset. And that kicks off a whole set of downstream dags. So you can really make these updates now from anywhere. I will show how to do that and also how to do the two other features that are new in the demo. So the demo shows all three of these new advanced dataset scheduling features. The other airflow feature that I want to dive in a little bit more deeply is dynamic task mapping, because dynamic task mapping is really a core airflow feature now that you should be aware of, especially if you're using airflow for generative AI. What dynamic task mapping can do is you can have kind of like a template task, and you can say, well, at runtime I want airflow to figure out how many copies we need of that task to run in parallel. And this can be based on inputs that occurs at runtime. At the bottom of the slide you can see a very simple dag that has two tasks. We have one task that's called getfilepaths. So we are getting a number of different file paths and then the downstream task is processing files. And I want to have one task for each of the file paths that I'm getting, but I don't know how many file paths I will get every day. Maybe these are files that are dropped into an object storage by a domain expert on my topic and I never know to how many files they will get that day. Maybe one day it's two, like in this example, and the other day they are dropping ten files. And this way my pipeline is the dynamic, so it will automatically create one parallel task instance for each of the file piles that it is given. The important thing to remember is that you have two ways of defining parameters. In a dynamically mapped task, you can have parameters that stay the same in between these copies. These go into partial and parameters that change in between the dynamically mapped task instances. These go into existence. Expand or expand quarks the best practice here, use dynamic tasks whenever possible, especially over dynamic Dax. Dynamic Dax, that's still a valid use case. Use case for them in many cases. But there are some cases where people use dynamic DAX that would be really possible to address with dynamic tasks which are much more observable and easier to use. The other best practice is to customize the map index. This is an airflow 29 feature. So this is fairly new, but it makes it much easier to find the logs of a specific dynamically mapped task. Now, how do you do that in practice? So we have the basics, which is we have the partial method, and as I said before, all the parameters that you put into partial will stay the same for each dynamically mapped task instance. So in this case I'm saying the parameter a is always equal to two. The second method that you put on your call of a task is dot expand. So here I have another parameter called b, and I'm saying, well, b is the parameter that I want to change in between the dynamically mapped task instances. So to b I'm giving a list of inputs, in this case zero and one. This will create two dynamically mapped task instances, the first one using zero for the parameter b, and the second one using one for the parameter b. Of course, here I hard coded a list. This could be the output of an upstream task, and that's how you can create these dynamics pipelines. One thing to note, it's important that you name the keyword argument here you have to say b equals. You cannot forget that, otherwise you will get a dag input error, but the error tells you specifically what you did wrong in this case. And lastly, the other basic if you are running Airflow 29, you want to create a map index template. This is a parameter that goes directly into either the partial method in a traditional operator or into the task decorator if you're using the task flow API, and then you can customize the map index. If you're already familiar with dynamic task mapping or a more advanced airflow user, you will probably be interested to learn about the more advanced methods. There's expand quarks. This allows you to map over sets of keyword arguments. I highly recommend looking into that because it also opens up a lot of use cases and one of my favorite little airflow things that not that many people know about, there is a function called dot map that allows you to transform the output of an upstream task before you map over it. So without having an extra task, you can change the output of any upstream task if it's not in the right format for your mapping. I use this a lot, especially if I have an upstream traditional operator and the output isn't exactly what I need. And it's a very neat feature to use in dynamic task mapping. If you want to learn more about dynamic tasks, especially about the more advanced use cases, I highly recommend our guide on dynamic task mapping. Now, there are several examples of dynamic task mapping in the demo, but since there's always a lot of code in the demo, I wanted to show you just the very basics of what is necessary so you can use this today in your airflow pipelines. All right, so here we have two tasks to find. The first one, line 44, we have the task decorator, and then we have the function on line 45. So the first task is get file paths, and the second task is on line 51, called process file, also defined with a taskflow API decorator. Little side note, I'm using the task flow API a lot here, but you can also dynamically map traditional operators. And in the demo there are traditional operators that are dynamically mapped. So I can show you what this looks like with a traditional operator in a second in the demo. So we had three components for basic dynamic task mapping. The first one was the elements that stay the same in between the dynamically mapped tasks. So this is what goes into that partial. So we have our function process file and we call that function with the method partial. And here is where we enter the elements that we want to stay the same in between the copies of the task. I very creatively named this parameter constantly. Course this could be named anything. And I'm saying the constant is always 42. All right, so no matter how many map task instances we get, we could get two. Like in the example here, we could get 100. The constant will always be 42. Now, what is the element that changes we from the upstream task, get a list of file paths. In this case it's hard coded. We will get two file paths, but this is the task that would, in our example return maybe two file paths one day and 100 the next day. So this is what we put into the expand method. We have our keyword argument called file. We set that equal to that list of inputs. And in this case I don't hard code the list in the method call, but I get the list as the output from my upstream task. On line 60 you can see file paths equals get file paths. So this is the output. Whatever my upstream task returns, whatever that list is, gets parsed to file. And then I get one dynamically mapped task instance. So one copy per element in that list. So in this case I will get two copies of this task, one for the first file and one for the second file. Lastly, let's say we are on Airflow 29. I want to customize the map index template. So on line 50 you can see I add a parameter into my task decorator. I say the map index template is equal to this jinja template, and this is a ginger template that doesn't exist yet. You could use an existing one, but I want to create my new one specifically for this task. And the way to create that inside of your task is, is at line 57 you can see I get the current airflow context and then I add this variable to the context on line 58. In this case it's equal to a string that uses both the name of the file and the constant. And the cool thing now here is because I can define this parameter value inside of the task, I could set this to outputs that I'm generating in the task. So the prime example here would be if I'm testing a machine learning model and I get a test accuracy back. I could use the test accuracy in my custom map index template by defining the custom map index at the very bottom of this task. So when I hop over into the airflow UI, you can see here the map index. I clicked on the mapped task tab and then I can see all my dynamically map task indices. The custom map index is not zero and one anymore as in previous airflow versions. But now I have this string that I defined. So what you could do, for example, with model testing, you could have your model testing parameter like the accuracy or your r squared value displayed on your map index. And then you can very easily find the one mapped task index for the model that had the best value. Okay, that was all of the airflow features. I hope this gives you a good basis to understand the demo code better. And let's hop into the demo. So we are talking about content creation today, and I wanted to have a demo repository that creates LinkedIn posts for me about Apache Airflow. So that's useful for me personally. But you can very easily adapt this repository to create any type of content for any type of topic. So let's say you are really into metal music and you are running a blog post about metal. You could provide this application with specific information about the music you want to write about, like the newest albums. And you could also provide it with examples of your previous blog posts. And then tell the app instead of posts about Airflow, I want you to create whole blog posts about the newest metal bands so you can really adapt this to anything. Again, the code is in the repository and at a high level. This airflow project has two distinct paths. The first one on top here in blue is a traditional RG pipeline. So we take some knowledge, some proprietary knowledge that in my case is about airflow. I'm using some of our own airflow guides and we ingest that and embed it into a vector database. In this case, I'm using viviate. Little side note, because Airflow is too liagnostic, you could very easily swap out any of the elements here. Like you could, for example, use a different vector database or a different large language model if you wanted to. The second part of the demo pipeline is the fine tuning. So not only do I want more information to be available to the large language model, I want to fine tune the language model to use a similar tone that I want and similar length of posts. So I have some very short examples of LinkedIn posts about airflow, both training and validation examples. Then I make sure that the format is correct for fine tuning. If you've fine tuned GPT before you know that you have a very specific format that the examples need to be in. And I also added a little step that makes sure that the fine tuning will be in budget. So I'm actually counting the tokens and estimating how much the fine tuning will cost. Then I run the actual fine tuning, this is run in a deferrable way because it takes a while. And lastly, I have the champion versus challenger Dag that I showed in the slides earlier when I talked about branching. That makes sure that we pick the actual better model. So either the current champion model or the new challenger model. So if you keep training this with different examples, it will always pick the best accurate model. Then on the front end, I just use the streamlit app. So this is a very simple frame, but you could connect this to any front end that you wanted to connect to. And this is where the user can put in a prompt. It gets augmented with the newest knowledge provided by the retrieval augmented generation pipeline. And then it uses the current champion GPT model to create a response. I also have a second part of the app where the response gets sent to Dali to create a picture that is mostly for fun because I find the pictures quite amusing that it creates about airflow. All right, that's the general pipeline. Let's look. Let's hop into the code and see what actually happens. So, as I said, this is the repository. It's all open source. You're very free to fork it and to use it. And this is what you get if you fork this repository and clone it locally. This is the whole repository. And you can see. See, there are a lot of dags here. Not only are there dags tagged with use case, those are the dags that are actually pertaining to the use case that I'm going to show. There are also a lot of dags tagged with simple dags. And you can see by the title here that these are about the features that I talked about in the slides. So I was thinking you might want to have more code examples for these features. So with these dags, you can check out more simpler code examples about a specific feature. But in our case, we want to look at the use case. So let's only filter for use case and zoom out a little. So these are all the dags that are about the use case that we're going to talk about. You can see it already ran a few times, and here we have two dags that are currently paused. They are tagged with helper. Those are helper tags that help you when you're developing. So we have one dag that can start the whole fine tuning pipeline. And we have one dag that deletes everything that's currently in vv eight. Because when you're developing, you might want to iterate over the content that you put into VB eight. So we can ignore these two dags for now and let's just focus on the other ones. And the first stack that I want to talk about is the one for the full first part of the pipeline. So this blue part of the pipeline, the rig pipeline, is all happening in one dag. So let's click on this dag. It's called the ingest knowledge base stack. And when we zoom into the graph view, you can see the stack is scheduled based on both time. So we have a data set or time schedule, one of the new airflow to nine features, or on either of these data sets receiving an update. Then what happens next? And we can actually see some of the scheduled runs. This is one of the runs that happened recently. And you can see I make sure that my class exists in the VV eight database, so I interact with viviate. I have a branching task here. If the task already exists, then I do nothing, just an empty operator if it doesn't exist. If it's the first run of the DaG, I actually will create the class. Then we start with ingesting the document sources. And here you can see that I have two arms that are being created to ingest from different sources. This is just one way that you can structure an IG Dag. And this is actually structured in a very modular, best practice way that you can write airflow data so that you can very easily add another one of these branches here if you had a new ingesting source. So if you had a different type of file that you want to ingest, maybe CSV files. And what happens with each of the sources is that we extract the information from the source. I mentioned that we use airflow guides, so extract from guides. We extract from text files. We use Lang chain to split up the guides and text files and then they are being prepared for the ingestion. So they are being put into the exact right format that we need to ingest into viviate. And this all happens with the task flow API. You can see add task here. But for the last task I'm using an operator because the viviate ingest operator makes it a lot easier to interact with VB eight. And I don't need to think about the call to the VB eight API at all. In this case I just provide the data and then it will be ingested into the local VV eight instance. This is the general dag. Let's look at the code that was used to create this dag. So let's hop over. This is the versus code environment where I'm running this airflow project. Of course you can use any editor that you like. And if we look at the dags here, ingest knowledge base, then we can see all of the code that went into creating the dag that I've just shown. And you can see here we have some description. We have all of our imports. This is where we import functions that are being modularized. So one way that you can make your airflow easier to use and to reuse certain parts of your code is by modularizing functions. So extract, split and prep for ingest are functions that I've modularized here. I'm importing some environment variables and then I define my different document sources. Again, this is just the way that the Dag has been optimized. You could also just have the functions directly in your dag code and only use one source here. In this case I'm saying, well, I have a source that's called guides. And when we are using the guides, I want to use this specific extract function. So this is the extract function that can extract from markdown files and I want to use this specific folder path. The other source is I have text files and then I want to use a different extraction function because, because I have to handle text a little bit differently than markdown and I will use this other folder path. Now next we define the actual dag. And here you can see several features that I mentioned in the slides earlier. So I said that we are using a data set or time schedule. So I want this dag to run every day at midnight to update, but also if I send a specific update to one of the datasets. So what I did here is I'm using the data set or time schedule. You can scroll up, you can see this is being imported from airflow timetables, datasets and to the schedule I have to provide two arguments. I provide a timetable. In this case I'm using the cron trigger timetable and this just says every day as midnight. And I have to provide some data sets. I here modularize this so it will actually use a list of data sets and then it will create a list that says, well use update on any of these data sets. But you could also just put any data set expression here. So any logical data set expression directly if you don't want to make it dynamic like this. Alright, so now this is running both every day at midnight and whenever any of the datasets that have a URI in this environment variable get updated. The next feature that I covered in the slides is retries. In this case, I'm setting my retries at the DAG level. So I'm saying all of the tasks in this dagger should have at least three retries, should have free retries, and I want to wait five minutes in between retrying my tasks. This is the default for all the tasks in the dag, but you can override this at the task level. Two more things I wanted to mention, because maybe some of you are thinking, are thinking, wait a second, how do you have emojis in your dag names? That's possible. Also, as of Airflow 29, you can have a dag display name. So the Dag id is still ingest knowledge base, but you can have now a dag display name that includes emojis or special characters. And the other thing which is still considered experimental. But I think a great best practice tool going forward is, I say if the stack fails five times in a row, I want it to stop. If I go on vacation and this dag runs every day at midnight and it has failed for five days, then there's probably something wrong that I need to fix. So I don't want it to incur more costs by running more times. And you can very easily set that now by using Max consecutive failed Dag runs. I set that to five here and in the UI, you can also, if you hover here, you can see this dagger auto pauses itself after it failed for five times. Very neat, very small feature that is new. And that I think gives you a great way to make your dags stop if they fail too many times. All right, we have our branching. So this is in essence the same as I've showed in the slides. We have the task branch decorator, and then we return a string based. This is modularized. I put the string into the task call, but we return the string of the downstream tasks that we want to run. If there is current, if our current, the class that we want to have in our VV eight database doesn't exist yet, then we use the VV eight hook from the provider in order to interact with viviate and create that class. And then lastly, we want to ingest our sources. So we have a loop to create one of the branches for each of the sources. And I can say the first task, which is the extraction task, this one should use my extraction function. So what this creates is it creates both of these tasks here, one in the first run of the loop and the other one in the next run. Again, very modularized. You could also have your regular at task decorated tasks defined in here. Then I'm splitting up my text. I use the splittext function, which is also modularized. Maybe in this case I can show you what the function looks like. If we go here, split, you can see this is all of the code that is used to chunk the text into sizable elements that can be used and ingested into viviate. And because I didn't want all of this code to be in my dag, I modularized it out and just imported it because at the end airflow is all just python code. So you can do things like that. And the texts are the ones that were extracted by the first task and I can just input it into my second task. Next I'm preparing the embedding for import. And this is a task that, that is dynamically mapped. So you can see here, last time I had 14 chunks of airflow guides. And if I click here you can actually see the map index. So I say which is the guide that I'm ingesting? I'm ingesting the one about dynamic tasks very fittingly. And which chunk am I ingesting? And this is what happens here. I have this task flow decorated task that I call with the function that is modularized. And then I say, well, the VV eight class is always the same, so that's my default, that's goes into partial, but I want to create one dynamically mapped task instance per chunk, so per element that is being returned from this upstream task. And lastly we want to ingest this information into viviate. And in this case I'm using, using a traditional operator, the VV eight ingest operator. And I'm also dynamically mapping this operator. So I'm saying I want to have one ingestion click from here. So we also have 14 dynamically mapped task instances here. I say I want to have one instance of this VB eight ingest operator for each of the chunks that I'm giving it. And here you can see they are named in the same way. And this is done by using the partial on most of the arguments. Like most of the arguments stay the same. I always use the same connection to bv eight. I use the same trigger rule, but I want to have one dynamically mapped task instance per element in the list that was outputted by the upstream task. So per embed option object. And this has a length of 14, had that in the last run. So that's why I get 14 dynamically mapped task instances here, the map index template, if you're using a traditional operator can be set with direct jinja templates. You can see here, I'm saying I'm using of this task, the input data parameter and then I'm indexing into that and I'm using the URI that is being returned from this upstream task. And that's what gives me the Uri of this or the path in this case of this guide. All right, lastly, I set some dependencies. I chained this all together down here as well. And that gives me this highly adaptable, highly dynamic rag dag. So this is already all that is necessary to do this first part of the demo. So to get the up to date knowledge and get it into my VBA database now, next I want to actually fine tune a model. And all of the other dags are about the fine tuning. And since I modularized this pipeline into different dags, but I want them to always happen in the same order, I always need the examples first. Then I check the examples. Alright, then I fine tune. Then I have champion versus challenger competition. Extremely great case for using datasets. So let's look at the datasets view of this project. So these up here are just data sets from the simpler DAX, but these are the data sets from the actual use case. And what you can see here is that we have all of these local file uris that I've scheduled these dags on. Of course, in a production case you're usually not using local files and this is likely to refer to files in an object storage. And what you can see here, for example, on this site is we have some training files and as soon as any of these training files are updated, our ingest train examples starts. Because if the training examples are new, I say okay, this means I want to kick off a new fine tuning run. This ends with the formatted examples. So I'm getting the examples, putting them into the right format, I have a new data set, then I evaluate the cost and also the format of the examples with the next tag. This produces a new file with the evaluated format and then I can fine tune my DaX. So everything is happening in succession based on dataset scheduling. Once the fine tuning is done, I have a new results file that gives me an accuracy. And this kicks off the champion challenger dag. Okay, so there are different ways to kick this off when you are using this repository locally, and one is to just run this dag. That's just for convenience. But there was one more new data set feature that I wanted to show. So let's kick this off using the API. So if I select this here, this is the script. It's a very, very basic API script. You could run this from anywhere. This could be run from a web application, this could be run from a curl command. But in my case just want to run Python three API scripts and then start fine tuning pipeline. So let's run this example. And you can see I print out the response here and it says it updated this one data set, and this is one of the data sets where my ingest train example stack is being scheduled on. So this stack just started because I made this call to the airflow API, and this will kick off this whole pipeline. The stack itself is very simple. It just gets the example folders, it creates the JSON L that GPT needs for fine tuning, and then it makes sure that we update the next dataset. We say that now this JSON L file exists that has just finished, and once this has finished, the evaluate tag is very. So we didn't see this running, but the evaluation dag, this is the dag that just ran, you can see is very structured Dag that uses a lot of branching. So this is an example that you can look at in your own time, where the branch will have four possible outcomes and decides which parts are running. Because if we have no validation examples or no training examples, for whatever reason, we do not need to kick off the fine tuning. It won't work. So we stop the pipeline here in its tracks. But if we have both, we want to evaluate the formatting of both. So both the validation examples and the training examples get evaluated. And for the training examples, I actually want to know. I want to count the tokens and calculate the expected cost here. Decide if that's within budget and only if it's within budget. I will actually continue and this will later kick off the fine tuning task. If this is over budget, I currently put the budget, I think at $20. Then the DAC will stop as well. So this is one way that you can use branching to protect yourself from accidentally incurring a lot of cost. All right, so the fine tuning DAC is currently running. So let's look at this. And you can see here the fine tuning is also scheduled on some datasets. And we make sure we have the right validations file and examples file. We upload those files to OpenAI because I'm using OpenAI to fine tune. And then we have our fine tuning task. And this uses a deferrable operator. You can see currently this task is deferred because it's waiting for the fine tuning to finish. If we go into the details of this stack and the details of this task here, this uses the deferrable finetune openair operator. We actually have an extra link here. So if I click on this link, it takes me directly to my finetune. So this just started. It's not very smart yet, but this is starting my fine tuning and I can directly look at the job and I could copy the ID or I could also cancel it from here. But this will take a while to fine tune a new model. But yeah, let's look at the operator in the meantime, because this is a deferrable operator, which actually currently doesn't exist yet in the provider. So let's hop over into the code again, and this is our fine tuning dag. So let's clear this and look at the fine tuning dag. So again we have imports, we get some environment variables, we get this operator, and because it's a custom operator, we can get it from a local file in the include folder. And this fine tune operator is then used in this dag down here. So here you can see all of the code that I need to write in order to fine tune an OpenAI model. If I use this fine tune operator, I give it a task id, I give it my fine tuning file, give it a validation file, I tell it which model to use, I create a suffix for it. I'm using a parameter here, but you could also hard code this. And then I have two options to make this operator more efficient. So the first option is wait for completion. By default, this is false and the operator will just kick off the fine tuning job and then be successful and the pipeline continues. But in a lot of cases you only want to continue your pipeline if you know the fine tuning has been successful and has happened. So that's when you turn wait for completion to true, and then the operator will wait. In that case it still uses up a worker slot. So if you're saying, well, I have a trigger or component, I want you to actually release the worker and be deferable. Then you also set the parameter deferable to true. This is a pattern that is also, for example, used in the trigger diagram operator with the exact same two parameters. We can have a quick look at the code of this custom operator. Let's meet here. Custom operators GPT fine tune you can use this operator as a template for your own custom operators, especially if you want to fine tune a model, or if you want to tune a model for service where there is no provider yet. You're very welcome to use this. You can see that's the link that I defined. So that's how to define an operator extra link. And this is the trigger. So this is the process that's actually being handed to the trigger, a component to do the async call. And this is where the asynchronous waiting is happening. And this is using a function to wait for the OpenAI API to return that the fine tuning status is succeeded or failed or canceled. So this is what's happening currently in the trigger component running asynchronously. Once this has completed, the operator will actually complete and then it will give me all of the information that I want to have about the fine tune. So if we scroll down in the operator code, you can see down here is all of the information that it will log to the logs about which model. I fine tuned the model id, how many training tokens, all those things. That's the fine tuning duration. And then it will also push the model name to XCom because I can use that in a downstream dag. I know this was quick, just scrolling through the code. As I said, feel free to use it. Feel free to look at it in more detail. And it's just one of the ways that you can standardize how you interact with generative AI. Okay, it has finished, so we can look at the logs. And here you can see the output of all of these print statements. And you can see this is my fine tuned model. And it took about five minutes in this case, and I have a result file. So the next stack that then automatically runs and already finished because it's also a very fast stack, is the dag that actually pulls this results file and then makes sure it compares the champion and the challenger. And in this case, you can see I have a champion and I have a challenger. I've ran this before. So it ran the middle part and did all of the comparison. And when it compared the accuracies, the new model was better. So it decided to switch to champion. And that's all that is happening on the airflow side. And if you're running this in production, this is just running without you needing to do anything. You can run that on any schedule or on specific triggers like with the data set, and you don't have to worry about any individual tasks that are happening. Let's see the results. So this is the frontend app, this is the streamlit app. And let's say, actually, let's say I want a LinkedIn post about time and dataset schedules. All of my examples are very short, so I'm expecting a very short post also to be able to show this better. So it's thinking for a little bit. This makes a call to my fine tuned model. So this call did not go to regular chat GPT or to the regular GPT model, but it went to this fine tuned iteration, the last one. And yeah, you can see. Did you know you combine data set based schedule with time based scheduling in Airflow 29? I did, but the current GPT 3.5 model does not because this is very new information. And that's exactly how you can be better than your competition, by providing your own data to your genai applications. It even gives me the exact name of the class, which is cool. And it gives me a fun fact about space. Also for fun we can generate a picture. This takes a second. This takes the post that was created and sends it to Dali. Okay, while this is running, let's talk about the steps that you need to take to run this repository by yourself today. Because it's not many steps. First you go to this GitHub repository. If you want to be nice, you can star it, but what you need to do is you fork it. Then you clone it locally so you get the local address. Then you need to make sure that you have the Astro CLI installed. This is an open source project that astronomer provides. That makes it very easy to run airflow in Docker. So even if you've never run airflow on your computer, you can do this. Today. There are install instructions in our documentation. But if you are on a Mac, close all of this. If you are on a Mac, you can just run brew, install Astro, and then you get the Astro Cli, you clone the repository, you go to the root of it, and then you need to make a few things. So one thing that I have here that you will not have is the env file. You just have the env example file. And what you need to do is you need need to copy paste the whole content of this file and create your own env file and copy the content in it. Because this is the file that airflow will use for all of the environment variables to configure in the DAG. What you need to do here, because we're using OpenAI, you need to put in your OpenAI API key once here in the environment variable and ones here in the vv eight connection. So with this you'll be able to able to use OpenAI both for your call to create the inference. So to create the post, that's the first one and also in your vv eight connection in order to create the embeddings. These are also created by using OpenAI. That's the only tool you need. You do not need a vv eight account because this project runs vv eight locally. Here you can see the docker compose override file and this creates a vv eight container as well as the streamlit container. So you do not need to have streamlit vv eight installed or anything. This will all happen automatically. Once you copy pasted the contents here and created the env file, you just run Astro def start and the first time you run this it takes a couple of minutes. And then at localhost 8080 you will have your airflow environment and at localhost 8501 you will have the finished app. Also, that's our dataset and time schedule image. That's actually very pretty, especially the second one. Then you just run this first stack or run the API script that I showed and all of the dags will be running. You also need to unpause them of course, but as soon as all of the dags have run, you can start creating your own posts about airflow. If you want to adapt this to your own data, to your own use case, for example to write blog posts, let's say blog posts about your favorite dog, then you need to make a few more changes in the include folder here, the knowledge base. This is where all of the knowledge is. So what you want to do is you switch out these markdown files and these text files with your own information. Let's say if you really like golden Retrievers and you run a dog about golden retrievers, you could put all of your golden retriever extra information into these files here, and that way the app will have access to this extra information. You will also want to have different fine tuning examples. So in the examples folder you will need to have different training examples here. These folders will not exist the first time you run this, and then you add your training examples here and you can also add validation examples. So with the examples you change the tone and the format of the response, and with the knowledge base you change the information that your application has access to. The last thing that you need to change if you adapt this to your own use case is if you go into the streamlit folder, include streamlit. This is the app code. So this is the code that creates the streamlit app down here in the getresponse function you can see the actual prompt so if I look bigger, you can see here is what I tell the large language model every time additionally to the user provided query. So I'm saying you are the social post generator Astra, you might want to change this to. You are the blog post generator and you don't want to have interesting fact post about airflow, but interesting fact about golden retrievers. Then it retrieves all of the articles, it gets the query. And lastly, this was my addition to add a space fact. You may want to change that to something else, but that's all that's needed. Important side note, you need to have at least something in your VBA database in order to be able to run the pipeline and this file in order to need to run the pipeline. But you do not need to run the fine tuning pipeline if you don't want to. If there's no fine tuned model, it will just use GPT 3.5 turbo by default. All right, so that was everything that I had to show you today. Again, in case you missed it. If you want to try out Astro, if you want to take Dax like these and run them in the cloud automatically and have that very easy, then you can use this QR code or the link on this slide and you will also get a free complimentary airflow fundamentals certification exam with that. The take home message of this whole talk. Other than use this code and play around with it, it's really both your data and your orchestration with airflow that sets you apart from your competitors when creating any genai applications. Because anyone can wrap around the OpenAI API, but only you have your data. And now you are one of the people who know how to best orchestrate the data pipelines and the ML pipelines with Apache Airflow with that. Thanks so much for watching. I hope this is helpful for you. I hope you fork the repository and have fun with it, create amazing things with it, and that you have a great rest of the conference.
...

Tamara Janina Fingerlin

Developer Advocate @ Astronomer

Tamara Janina Fingerlin's LinkedIn account Tamara Janina Fingerlin's twitter account



Join the community!

Learn for free, join the best tech learning community for a price of a pumpkin latte.

Annual
Monthly
Newsletter
$ 0 /mo

Event notifications, weekly newsletter

Delayed access to all content

Immediate access to Keynotes & Panels

Community
$ 8.34 /mo

Immediate access to all content

Courses, quizes & certificates

Community chats

Join the community (7 day free trial)