Conf42 Kube Native 2022 - Online

Building Scalable Data Pipelines With Argo Workflows

Video size:

Abstract

At Spectrum Labs, our deep learning models are trained and validated against large data sets that can only be processed in a distributed manner. Data Scientists require the ability to invoke batch jobs as part of the the model development lifecycle, and these jobs must run in a scalable, fault-tolerant manner.

Argo Workflows is chosen as our data pipeline framework as it provides a container-native workflow engine that allows us to build on our existing Kubernetes deployment. We use Apache Spark as our distributed processing engine, which we deploy and manage ourselves on Kubernetes. The integration of these 2 technologies provides us with a batch job framework to meet our Data Scientists needs.

In this session, we will provide an overview of Argo Workflows and how we use it within Spectrum Labs to execute Spark jobs that process datasets that contain over 100 million records. We will demo our current pipelines and describe how we have built a framework that allows us to deploy a scalable Spark application with only a few lines of configuration. We will also provide an overview of Argo Events which allows us to orchestrate our pipelines in an automated manner. Lastly, we will discuss some of the advantages and challenges of using the Argo framework.

Summary

  • This presentation will discuss describe how we're using Argo workflows within spectrum labs to build and deploy scalable data pipelines. In particular, it will show how we have built a framework that allows you to deploy can elastically scaling spark cluster using only a couple of lines of configuration.
  • Argo hosts a web server where you can basically post a request to run an argo workflow. This is a read only UI that allows you to view all the workflows that have executed in your cluster. I'll show you how we execute this and we will view it then in the argo UI.
  • I'm going to walk you through a demo of the IRO workflow to show how we use Spark within Argo. You can configure a spark job or spark pipeline with a couple of lines of Yaml. And when this pipeline is in deployed you get a fully scalable spark pipeline.
  • The spark operator is not a native aggregate component. The yaml definitions are cumbersome. Yaml by its definition it's easy to introduce typos. We have the ability to run all our pipelines locally and deploy them locally. This means we can get feedback on these kind of errors in seconds rather than minutes.
  • When you're running Spark on your own Kubernetes infrastructure instead of EMR, there's obviously a lot more configuration required. So that concludes my overview of argo workflows and argo events. I'd like now to invite any questions that people may have on the presentation.

Transcript

This transcript was autogenerated. To make changes, submit a PR.
This presentation will discuss describe how we're using Argo workflows within spectrum labs to build and deploy scalable data pipelines. In particular, it will show how we have built a framework that allows you to deploy can elastically scaling spark cluster using only a couple of lines of configuration. So, a quick overview of the structure of the presentation I'm going to describe can introduction to argo workflows and an automated demo. Likewise, I will give a walkthrough of one of our pipelines that we use. I describe the SDK that we've developed to simplify the development, deployment and implication of our argo workflows. I will give an overview of argo events and automated demo. And lastly, we'll talk about some of the challenges that we've had using Argo and Spark. Together. Spectrum Labs build b to b trust and safety technology. Our customers, which are gaming apps, dating apps, social media platforms, et cetera, send us text conversations. Our deep learning models can then identify any toxic behaviors in those conversations and trigger an action. An action may involve a human moderator making an intervention, or it may involve an automated action that can be configured via our guardian moderation software. So to support the development of these deep learning models, our team of data scientists require data pipelines that can run on large data sets. This is a very simplified version of the lifecycle, but we may have processing steps that involve cleaning data. We have the training of our models. We may run eval sets to validate that our models are meeting a certain quality threshold before we release them to production. For all of these, our data scientists require the ability to easily kick off workflows, to be able to monitor them, to be able to troubleshoot them. So we have chosen Argo workflows as our tool of choice to facilitate this process. Argo workflows is a container native workflow engine for running jobs on Kubernetes. It's open source and it is supported by the Cloud Native Foundation. I think a key advantage of it is that it's native to Kubernetes. So if you're already deploying your services and your systems on kubernetes, Argo workflows is a great fit. It can be easily installed via Helmchart and involves obviously configuration, but it's fairly straightforward to get up and running. The argo suite consists of four components, argo workflows and argo events, which we're going to go through in this presentation. There's also continuous deployments and argo rollouts. Then lastly, which is a blue green type deployment framework. So at its core, argo workflows provides you with the ability to define your pipelines via YAML files. Each pipelines can then be packaged up via helm and deployed and shared like any other helm chart. So each task runs on its own pod. A task contains a template. A template defines a docker image and some instructions to that image, which then executes as a container within the pod. So you can size and scale your tasks and your pods independently of each other when it comes to message passing. Then between tasks you can define string parameters which argo will pass from container to container. Or you can also define a file artifact. A file artifact may be the output from a previous task, which can then be referenced further downstream behind the scenes. Argo will package that up into its artifact repository and place it on any pod or task where it's defined. So I'd like to dig into some of the core concepts of argo workflows now via a simple demo. So I'm going to switch over here to my ide and walk through a simple enough argo workflow. So this is packaged up as a Helen chart. So I've defined a main yaml file which consists of a dag. So the input to the argo workflow is a string batch which references an s three batch. So the first task in the workflow will references that string path and it will download these file from s three and register it as an argo file artifact. These the next task refers to that file artifact from above. So what it's saying here is the input artifact, this particular task is these output artifact from the task above, and then it invokes this template which will display some of the contents of the file on the screen. So further up here we've defined the templates inline. Now typically you would probably define each template within its own yaml file, but for the purpose of these demo we keep things simple. So this particular template consists of an image and some instructions to that image, which will then execute as a container on its own part. So what's happening here is we are using the s three ClI to download the file from s three and save it to this local temp directory. We then unzip it and we write the output to this temp output file txt. Then the output from this particular task is to register that local file as an artifact within the artifact repository. So the next step in the task has referenced to this particular artifact. And what we're saying here is download that artifact to this particular local location. Then within this particular task. Again, we've defined our image and we've defined some instructions which are basically to take the first number of lines from this particular file and display those on the screen. We are using our own custom image here, but this particular workflow could probably work under any image that contains the s three CLI and can use batch. So I want to just highlight this particular notation here. This is helm templating that allows you to plug in values from your values at yaml lines. So in this case we've defined headcount as 100. So when we deploy this particular chart, helm will plug in 100 here as these value. So this gives a lot of power which will show later on in more complex workflows. So it gives us the power of timidizing our workflows. So I'll now show you how we execute this and we will view it then in the argo UI. So now we're going to deploy and run this argo workflow. So the first thing we do is run the helm command that deploys this particular workflow. So this is the same Yaml file that I showed you previously, except we've plugged in the value from the values of yaml file. When we do the deployment, we then execute the workflow via a curl statement. So typically you're not going to execute your ago workflows using this approach. Argo has its own CLI that you can use with inspectum. We've created our own CLI to simplify the execution of these workflows, but for the purposes of showing the mechanics here, I'm going to run the curl statement. So what's happening here is Argo hosts a web server where you can basically post a request to run an argo workflow. So in this case, all we're doing is we're defining the workflow that we want to run, which is our demo main workflow, and we're defining the path, the string batch to the s these file that we wish to download and display. So Argo has returned back a 200 response, which means it has now begun executing that workflow. So I'm going to switch over here to my local host UI. This is a read only UI, which allows you to view all the workflows that have executed in your cluster, and it allows you to click on a particular workflow and view its progress. So it contains the ability to read some metadata about your agile workflow. It also has the ability to resubmit, stop it, terminate it, et cetera. So while this first task is executing, you can click on a pod here and it gives you some useful metadata about that particular task. It shows you also like the container that run the image, the arguments that image, et cetera. So this has executed successfully. I'm just going to click on the logs here, show the output. As you can see here, it's done a can on the first 100 records of that particular file. So this workflow has executed successfully. So a few other things about the UI. You can view all the workflow templates, which are these helm charts that have been deployed to your cluster here as well. You can look at cron jobs if you have argo events set up. You can look at the argo events too as well. So to give a little bit more context on the kind of pipelines that we have within spectrum labs, I'm going to explain this Iro pipeline. This pipelines is used as an input into the training of our models. It will be typically executed against labeled feature sets that would consist of tens of millions of human labeled records. So the pipeline will involve some preprocessing. We will then perform some language detection using our own in house framework. The data is then cleaned and tokenized. And lastly we will download embeddings for each token. So embeddings describe these relationships between words and consist of these large vectors. So typically can input file could be, like I said, tens of millions of records and the output file could be over 100gb once we include these embeddings. So to achieve this kind of scale, we need to use a distributed framework. So we are using Apache Spark within spectrum to run this within Argo we're using the Kubernetes spark operator so we can install this as a Kubernetes custom resource and it then allows us these ability to submit spark jobs and it basically orchestrates the creation and the management of the spark job. So now I'm going to walk through a demo of the IRO workflow to show how we use Spark within Argo and these framework that we've created. So to show how we are running Spark within Argo, I'm going to walk through our Iro pipeline which I described above. So our IRO workflow consists of a couple of YAML files and the mainly YAML file references our spark standard workflow template. Then we've also got a SPAC job YAMl file which references our SPaC job templates. What we've done here is we've created these templates that can be used without spectrum for any particular spark job. So you can configure a spark job or spark pipeline with a couple of lines of Yaml and a couple of lines of configuration or values of YAML file. And when this pipeline is in deployed you get a fully scalable spark pipeline. So to explain how that works, I will first show you the spark job template. This spark job template is can argo template like I described earlier on. It's obviously a lot more complicated. In these case you see all the various parameters that are provided at runtime, the number of executors, how much memory, et cetera, et cetera. Down here we've defined the manifest which is used to configure and submit the spark application to the Kubernetes spark operator. So within this we define what image to use. In this case, this defines contains the spark binaries that we're going to use, then contains configuration like location of the SPAC submit job, then the main class to run within. We've got a lot of standard configuration here that a developer doesn't need to worry about. Further down, you have the ability to plug in any custom configuration that you wish that is particular to your particular job. Similarly, you can define your driver, how much memory you want, other custom configurations that you want to plug in your executor as well. Some of these are provided at runtime and others are defined are provide at deploy time. So anything that is defined in your values of YAML file is plugged in at deploy time. So these are typically static configuration values. So we're defining here the job name which contains the batch to these s three bucket, the main class to run, how much memory to use, and then some other configurations like the versions to use of Java and some environment variables. The key parameter here is this process record in milliseconds. This will become a little bit clearer later on, but what this does is defines how long it takes to process an individual record in your spray job. And using this our framework can then elastically scale out how many executors that you need and how big batch of those should be based on the lines that you're looking to process and this particular value. To run this particular iRo workflow we first need to deploy it. So we run the helm command similar to before we provide the values file that we wish to configure with. This renders all the AML files that were defined in our templates and amalgamates the configuration values in our values file with these templates. So if I just go to the very end I will show you the spark template. So in this case we have plugged in some of the all of the configuration items that were in the values file. So you can see here we have defined the path where the s three or where the spark submit job is on s three we've defined the main class further down. You can see here we've plugged in how much memory that we want the driver to have. We have also then provided some configuration for this third party service that the spark application uses to run this in within spectrum we've created our own CLI which takes in a YAmL file. So when I showed the provides demo, you saw that Argo's API is a web service that allows you to pass up a string parameter. So what happens here is we want to provide instructions to a pipeline. We define a YAml file for this particular YAml file we're specifying to use a text cleaner job which maps to iroh. We specify these input location. This is a file that we want to perform processing on that contain a couple of hundred thousand records, the output location and the embeddings to you. So what happens behind the scenes when Argonaut submits this? It will convert this into a GRPC message, or, sorry, a protof message. It will then serialize that as a hex decimal string, pass that up as a string parameter to the workflow, and when the workflow runs, the first step it will do is it will convert that string back into a probable message, which is then passed as a file artifact between all the tasks in the workflow. So to execute this particular workflow, I'm going to submit the job to a dev environment. And this workflow has now been created. So as you can see here, the IRO workflow has kicked off. So I'll show you one that completed earlier on. So as you can see, the pipeline contain quite a few tasks. The first step here is the one that I have mentioned, whereby we convert these string hexadecimal into a file artifact which has been passed down through all the tasks. We do some preprocessing first. This particular pod is the actual spark job. You can see here that it's using five executor instances, each one with 15 cores, and it also specifies the recommended partition count. So we didn't define any of these in our values file. These are calculated at runtime in these particular preprocessing steps. So these two particular tasks here will look at the process record time in milliseconds the size of your file. It will look up our internal catalog to figure out how big this file is, and then it figures out based on that configuration item, it will figure out how big your cluster should be, and it will size it accordingly. We also have some other processing tasks. These for example, there's one here that figures out the optimal number of partitions. We also have one that figures out what node type to use. So we have defined different types of nodes, we've spot instances and we have on demand instances. So depending on the size of your job, we will pick a suitably sized kubernetes node to run the spark job on. So as you can see here, this particular job took nine minutes to run on five executors to process a data sets that's a couple of hundred thousand records in size. So just to reiterate, some of the items I covered in the workflow there. So, to support elastic scalability within our framework, you define how long it will take to process an individual record. In milliseconds, you configure that in your values file from there. Then our preprocessing steps, which run prior to this batch job, will figure out how many records are in your file that you're looking to process. So it'll first query our catalog to see if we've covered cost this file before. If it's not there, these we open up a file and we estimate how many records are in the data set. If the files are too big and they exceed a certain number of gigabytes, then we do an estimate and we take a guess at how many records are in that data set. So from there then we figure out how long in total it would take to process this data set, and we then size our cluster accordingly. So this allows us to elastically scale out our spark cluster depending on the size of your data set and the type of job that you're executing. Within spectrum, we've created our own pipeline framework to simplify the execution, the monitoring, and the development of our pipelines. So as I showed earlier on, we have the Argonaut CLI, which allows you to specify your instructions to your workflow in a user friendly format, whether that's a YAML file or a JSON file that's then converted into a hexadecimal string and pushed up as a string parameter to the argo workflow. So within spectrum, we're primarily a scala shop. We use protobuff as our message format. We have created some libraries that can be used by any task in a workflow to simplify the configuration of a file from a local file and into a protocol message and also, likewise, converting a protocol message back out into a file so that it's then referenced as can Argo artifact. We've also created cron cleanup jobs that can be used to clean up any headless spark applications, or also any long running workflows. On each pod, we have deployed a datadog agent, which will push up metrics and logs to datadogs. We have a centralized location then for troubleshooting our argo workflows. Lastly, we have the ability to run all this locally via minikube. Minikube allows us to run a kubernetes locally and greatly simplifies the development, deployment and testing of these, particularly when it comes to dealing with YaMl files. For the examples I've shown so far, our data scientists will typically kick off an argo workflow manually via the Argonaut CLI. However, in some cases, and as we bring in more scale, we will want to trigger these workflows based on automated events. So Argo events is a framework that allows you to achieve that allows you to register events with Argo and then trigger some actions based on those events. So the classic example here is a file being dropped into an s three bucket and then triggering an event based on that. So in our case, what I'm going to demo next is an argo workflow being triggered based on that event. You can also register other types of events, whether those are kind of message queues, or Kafka is another one that we use. So what I'm going to do here is I'm going to remove a file from the s three bucket, and I am then using to upload that file to the bucket. So, to show you how that works, we have set up argo events here. So with s three. It doesn't work with s three directly for some reason. What we have to do instead is with s three, you have to configure the s three event to trigger an SQS message, which you can then listen to events for. So, in this case, we have defined this template, which will be triggered when an SQS message is put on the particular topic. So what happens here is this piece of code will be invoked when the SQS message is received, and we will translate that message into a protobuff message, and then invoke the IRO pipelines. So, as you can see here, the argo events orchestrator has consumed that event. Just click on this. We get some Elena, you can see here the JSON from the SQS message, and down here you will see the bucket name and the file that was placed in the s three bucket. That in turn these has kicked off the IRO workflow. So within spectrum we find argo workflows and argo lines to be a great fit for our data pipelines. But there are some challenges that we encounter along the way. I think the big one is that the spark operator is not a native aggregate component. So for example, sometimes when you were to kill an agile workflow, it may not always kill the underlying spark job. So in that case we have created our own cron jobs which will kill these kind of headless spark applications. The yaml definitions are cumbersome. Yaml by its definition it's easy to introduce typos, et cetera. So that's where the mini cube setup is really important. We have the ability to run all our pipelines locally and deploy them locally, and that means we can get feedback on these kind of errors in seconds rather than minutes. It's a newer technology. The documentation is quite good, particularly for agro workflows three x, but compared to something like Apache Airflow, there's probably a lot less answers on the lines of stack overflow, et cetera. So it can be a little bit tricky sometimes when you're trying to troubleshoot issues, I think. Lastly then, is the platform tuning for Spark? When you're running Spark on your own Kubernetes infrastructure instead of EMR, there's obviously a lot more configuration required, so we'd set stuff up around claims policies, various node types in particular too. As well, we had custom Spark configuration to allow our jobs to run resiliently on spot instances. So quite a bit there in terms of getting Spark to run efficiently on our own Kubernetes infrastructure. So that concludes my overview of argo workflows and argo events. I'd like now to invite any questions that people may have on the presentation.
...

David Joyce

Principal Data Engineer @ Spectrum Labs

David Joyce's LinkedIn account



Join the community!

Learn for free, join the best tech learning community for a price of a pumpkin latte.

Annual
Monthly
Newsletter
$ 0 /mo

Event notifications, weekly newsletter

Delayed access to all content

Immediate access to Keynotes & Panels

Community
$ 8.34 /mo

Immediate access to all content

Courses, quizes & certificates

Community chats

Join the community (7 day free trial)