Transcript
This transcript was autogenerated. To make changes, submit a PR.
This presentation will discuss describe how we're using Argo workflows within spectrum labs
to build and deploy scalable data pipelines.
In particular, it will show how we have built a framework that allows
you to deploy can elastically scaling spark cluster
using only a couple of lines of configuration.
So, a quick overview of the structure of the presentation
I'm going to describe can introduction to argo workflows and
an automated demo. Likewise, I will give a walkthrough
of one of our pipelines that we use. I describe
the SDK that we've developed to simplify the development,
deployment and implication of our argo workflows. I will
give an overview of argo events and automated demo. And lastly,
we'll talk about some of the challenges that we've had using Argo and Spark.
Together. Spectrum Labs
build b to b trust and safety technology.
Our customers, which are gaming apps,
dating apps, social media platforms, et cetera,
send us text conversations. Our deep learning
models can then identify any toxic behaviors in those conversations
and trigger an action. An action may involve a human moderator
making an intervention, or it may involve an automated action
that can be configured via our guardian moderation software.
So to support the development of these deep learning models,
our team of data scientists require data pipelines that
can run on large data sets. This is a very simplified version
of the lifecycle, but we may have processing steps
that involve cleaning data. We have the training of our
models. We may run eval sets to validate that our
models are meeting a certain quality threshold before we release
them to production. For all of these, our data scientists require the
ability to easily kick off workflows, to be able to monitor them,
to be able to troubleshoot them. So we have
chosen Argo workflows as our tool of choice to facilitate this process.
Argo workflows is a container native workflow engine for
running jobs on Kubernetes. It's open
source and it is supported by the Cloud
Native Foundation. I think a key advantage of it is that it's native to Kubernetes.
So if you're already deploying your services and your systems on kubernetes,
Argo workflows is a great fit. It can be easily installed
via Helmchart and involves obviously configuration,
but it's fairly straightforward to get up and running. The argo suite
consists of four components, argo workflows and argo events, which we're going to go through
in this presentation. There's also continuous deployments and argo rollouts.
Then lastly, which is a blue green type deployment framework.
So at its core, argo workflows provides you with
the ability to define your pipelines via
YAML files. Each pipelines can then
be packaged up via helm and deployed and
shared like any other helm chart. So each task
runs on its own pod. A task contains a template.
A template defines a docker image and some instructions
to that image, which then executes as a
container within the pod. So you can size and scale your
tasks and your pods independently of each other when it
comes to message passing. Then between tasks you can define
string parameters which argo will pass from container
to container. Or you can also define a
file artifact. A file artifact may be the output from
a previous task, which can then be referenced further downstream
behind the scenes. Argo will package that
up into its artifact repository and place
it on any pod or task where it's defined.
So I'd like to dig into some of the core concepts
of argo workflows now via a simple demo. So I'm going to switch
over here to my ide and walk through
a simple
enough argo workflow. So this is packaged up as a Helen
chart. So I've defined a main yaml file which consists
of a dag. So the input to the argo
workflow is a string batch which references an s three batch. So the
first task in the workflow will references that
string path and it will download
these file from s three and register it as an argo file artifact.
These the next task refers to that file
artifact from above. So what it's saying here is the input
artifact, this particular task is these output
artifact from the task above, and then it invokes
this template which will display some of the contents of the file on
the screen. So further up here we've defined the
templates inline. Now typically you would probably define each template
within its own yaml file, but for the purpose of these demo we keep things
simple. So this particular template consists of an
image and some instructions to that image, which will then execute as a
container on its own part. So what's happening here is we
are using the s three ClI to download the
file from s three and save it to this local temp directory.
We then unzip it and we write the output to this
temp output file txt. Then the
output from this particular task is to register
that local file as an artifact within
the artifact repository. So the next step in the task
has referenced to this particular artifact.
And what we're saying here is download that artifact to
this particular local location.
Then within this particular task. Again, we've defined our
image and we've defined some instructions which are
basically to take the first number of lines from
this particular file and display those on the screen.
We are using our own custom image here,
but this particular workflow could probably work under
any image that contains the s three CLI and
can use batch. So I want to just highlight this particular
notation here. This is helm templating that allows you
to plug in values from your values at yaml lines.
So in this case we've defined headcount as 100. So when
we deploy this particular chart,
helm will plug in 100 here as these value. So this gives a lot of
power which will show later on in more complex workflows.
So it gives us the power of timidizing
our workflows.
So I'll now show you how we execute
this and we will view it then in the argo
UI. So now we're going to deploy
and run this argo workflow. So the first thing we do is run the
helm command that deploys this particular workflow.
So this is the same Yaml file that I showed you previously, except we've
plugged in the value from the values of yaml file.
When we do the deployment,
we then execute the workflow via a
curl statement. So typically you're not going
to execute your ago workflows using this approach.
Argo has its own CLI that you can use with inspectum. We've created
our own CLI to simplify the execution
of these workflows, but for the purposes
of showing the mechanics here, I'm going to run the curl statement.
So what's happening here is Argo hosts a
web server where you can basically post a request
to run an argo workflow. So in this case, all we're
doing is we're defining the workflow that we
want to run, which is our demo main workflow,
and we're defining the path, the string batch to the
s these file that we wish to download and display.
So Argo has returned back a 200 response,
which means it has now begun executing that
workflow. So I'm going to switch over here to
my local host UI. This is a read only UI,
which allows you to view all the workflows that have
executed in your cluster,
and it allows you to click on a particular workflow and
view its progress. So it contains
the ability to read some metadata about your
agile workflow. It also has the ability to resubmit, stop it,
terminate it, et cetera. So while this first
task is executing, you can click on a pod
here and it gives you some useful metadata about that
particular task. It shows
you also like the container that
run the image, the arguments that image, et cetera.
So this has executed successfully. I'm just going
to click on the logs here, show the output. As you can see here,
it's done a can on the first 100 records of that particular
file. So this workflow has executed successfully.
So a few other things about the UI. You can view all
the workflow templates, which are these helm charts that have been deployed
to your cluster here as well. You can
look at cron jobs if you have argo events set up.
You can look at the argo events too as well.
So to give a little bit more context on the kind of pipelines that
we have within spectrum labs, I'm going to explain this Iro pipeline.
This pipelines is used as an input into the training of
our models. It will be typically executed against
labeled feature sets that would consist of
tens of millions of human
labeled records. So the pipeline will involve
some preprocessing. We will then perform some language
detection using our own in house framework.
The data is then cleaned and tokenized. And lastly
we will download embeddings for each token.
So embeddings describe these relationships between words and
consist of these large vectors. So typically
can input file could be, like I said,
tens of millions of records and the output file could be
over 100gb once we include these embeddings. So to
achieve this kind of scale, we need to use a
distributed framework. So we are using Apache Spark
within spectrum to run this within Argo we're
using the Kubernetes spark operator so we can install
this as a Kubernetes custom resource and it
then allows us these ability to submit
spark jobs and it basically orchestrates
the creation and the management of the spark job.
So now I'm going to walk through a demo of the IRO
workflow to show how we use Spark within Argo
and these framework that we've created. So to show
how we are running Spark within
Argo, I'm going to walk through our Iro pipeline which I described
above.
So our IRO workflow consists of
a couple of YAML files and the mainly YAML file references our
spark standard workflow template. Then we've also got a SPAC
job YAMl file which references our SPaC job templates.
What we've done here is we've created these templates that can be used
without spectrum for any particular spark job. So you can configure
a spark job or spark pipeline with a
couple of lines of Yaml and a couple of lines of configuration or
values of YAML file. And when this pipeline is in deployed
you get a fully scalable spark pipeline.
So to explain how that works, I will first show you
the spark job template. This spark job template
is can argo template like
I described earlier on. It's obviously a lot more complicated.
In these case you see all the various parameters
that are provided at runtime, the number of executors,
how much memory, et cetera, et cetera. Down here we've defined the
manifest which is used to
configure and submit the spark application to
the Kubernetes spark operator. So within this we
define what image to use. In this case, this defines
contains the spark binaries that we're going to use,
then contains configuration like location of the SPAC
submit job, then the main class to run within.
We've got a lot of standard configuration here that a developer doesn't need to worry
about. Further down, you have the ability to
plug in any custom configuration that you wish that is particular
to your particular job. Similarly, you can
define your driver, how much
memory you want, other custom configurations that you want
to plug in your executor as well.
Some of these are provided at runtime and others are defined
are provide at deploy time.
So anything that is defined in your values of YAML
file is plugged in at
deploy time. So these are typically static configuration
values. So we're defining here the job name which contains
the batch to these s three bucket, the main class to run, how much
memory to use, and then some other
configurations like the versions to use of Java and some
environment variables. The key parameter
here is this process record in milliseconds. This will become a little
bit clearer later on, but what this does is defines how long
it takes to process an individual record in
your spray job. And using this our framework can then elastically
scale out how many executors
that you need and how big batch of those should be
based on the lines that you're looking to process
and this particular value.
To run this particular iRo workflow we first need to deploy
it. So we run the helm command similar to before we
provide the values file that we wish to configure with.
This renders all the AML files that were defined in our templates and amalgamates
the configuration values in our values file
with these templates.
So if I just go to the very end I will show you the
spark template. So in this case
we have plugged in some of the all of the configuration
items that were in the values file. So you can see here we have
defined the path where the s three or where the spark submit
job is on s three we've defined the main class further
down. You can see here we've plugged in how much memory that we want the
driver to have. We have also then provided some
configuration for this third party
service that the spark application uses
to run this in within spectrum we've created our
own CLI which takes in a YAmL file.
So when I showed the provides demo,
you saw that Argo's API is a
web service that allows you to pass up a string parameter.
So what happens here is we want to provide instructions to
a pipeline. We define a YAml file for this
particular YAml file we're specifying to use a
text cleaner job which maps to iroh.
We specify these input location. This is a file
that we want to perform processing on that contain a couple of hundred thousand
records, the output location and the embeddings to
you. So what happens behind the scenes when Argonaut submits
this? It will convert this
into a GRPC message, or, sorry, a protof message.
It will then serialize that as a hex decimal string,
pass that up as a string parameter to the workflow,
and when the workflow runs, the first step it will do is it
will convert that string back into a
probable message, which is then passed as a file artifact
between all the tasks in the workflow.
So to execute this particular workflow, I'm going to
submit the job to a dev environment.
And this workflow has now been created.
So as you can see here, the IRO
workflow has kicked off.
So I'll show you one that completed earlier on.
So as you can see, the pipeline contain quite a
few tasks. The first step
here is the one that I have mentioned, whereby we convert these
string hexadecimal into a file
artifact which has been passed down through all the tasks. We do some
preprocessing first. This particular pod is the actual
spark job. You can see here that it's using
five executor instances, each one with 15 cores,
and it also specifies the recommended partition count. So we
didn't define any of these in our values file. These are
calculated at runtime in these particular preprocessing
steps. So these two particular tasks here
will look at the process record time in milliseconds the size
of your file. It will look up our internal
catalog to figure out how big this file is,
and then it figures out based on that configuration item,
it will figure out how big your cluster should
be, and it will size it accordingly. We also have
some other processing tasks. These for example, there's one
here that figures out the optimal number of partitions.
We also have one that figures out what node type to use.
So we have defined different types
of nodes, we've spot instances and we have on demand
instances. So depending on the size of your job,
we will pick a suitably sized
kubernetes node to run the spark job on.
So as you can see here, this particular job
took nine minutes to run on five
executors to process a
data sets that's a couple of hundred thousand records in size.
So just to reiterate, some of the items I covered in
the workflow there. So, to support elastic scalability within
our framework, you define how
long it will take to process an individual record. In milliseconds,
you configure that in your values file from there. Then our preprocessing
steps, which run prior to this batch job, will figure
out how many records are in your file
that you're looking to process. So it'll first query our catalog to see if we've
covered cost this file before. If it's not there, these we
open up a file and we estimate how many records are in the data
set. If the files are too big and they exceed a certain number of gigabytes,
then we do an estimate and we take a guess at how many records
are in that data set. So from there then we figure out
how long in total it would take to process this data set,
and we then size our cluster accordingly. So this allows us
to elastically scale out our spark cluster
depending on the size of your data set and the type of
job that you're executing. Within spectrum,
we've created our own pipeline framework to simplify the
execution, the monitoring, and the development of
our pipelines. So as I showed earlier on, we have the Argonaut
CLI, which allows you to specify your
instructions to your workflow in a
user friendly format, whether that's a YAML file or a JSON file that's
then converted into a hexadecimal string and pushed
up as a string parameter to the argo
workflow. So within spectrum, we're primarily a
scala shop. We use protobuff as our message
format. We have created some libraries
that can be used by any task in a
workflow to simplify the configuration of a
file from a local file
and into a protocol message and also, likewise, converting a protocol
message back out into a file so that it's then referenced
as can Argo artifact.
We've also created cron cleanup jobs that can be used to
clean up any headless spark applications,
or also any long running workflows.
On each pod, we have deployed a datadog agent,
which will push up metrics and logs to datadogs.
We have a centralized location then for troubleshooting our argo workflows.
Lastly, we have the ability to run all this locally via minikube.
Minikube allows us to run a kubernetes locally and greatly simplifies
the development, deployment and testing of these,
particularly when it comes to dealing with YaMl files.
For the examples I've shown so far, our data scientists will
typically kick off an argo workflow manually via
the Argonaut CLI. However, in some cases, and as we bring in more
scale, we will want to trigger these workflows
based on automated events. So Argo events
is a framework that allows you to achieve that allows you to register
events with Argo and then trigger some
actions based on those events. So the classic
example here is a file being dropped into an s three bucket and then
triggering an event based on that. So in our case, what I'm going to demo
next is an argo workflow being triggered
based on that event. You can also register other types
of events, whether those are kind of message queues, or Kafka
is another one that we use.
So what I'm going to do here is I'm going to remove a file from
the s three bucket, and I am then using to
upload that file to the bucket.
So, to show you how that works, we have set up argo events here.
So with s three. It doesn't work with s three directly for
some reason. What we have to do instead is with s three, you have to
configure the s three event to trigger an SQS
message, which you can then listen
to events for. So, in this case, we have defined this template,
which will be triggered when an
SQS message is put on the particular topic.
So what happens here is this piece of code will
be invoked when the SQS message is
received, and we will translate that message
into a protobuff message, and then invoke
the IRO pipelines.
So, as you can see here, the argo events orchestrator
has consumed that event.
Just click on this. We get some Elena, you can see here the JSON
from the SQS message, and down here you will
see the bucket name and the file
that was placed in the s three bucket.
That in turn these has kicked off the IRO workflow.
So within spectrum we find argo workflows and argo lines to be
a great fit for our data pipelines. But there are some challenges
that we encounter along the way. I think the big one is that the spark
operator is not a native aggregate component. So for example,
sometimes when you were to kill an agile workflow, it may
not always kill the underlying spark job. So in
that case we have created our own cron jobs which will kill
these kind of headless spark applications. The yaml
definitions are cumbersome.
Yaml by its definition it's easy to introduce typos,
et cetera. So that's where the mini cube setup is really important.
We have the ability to run all our pipelines locally and deploy
them locally, and that means we can get feedback on these kind of errors in
seconds rather than minutes. It's a newer technology.
The documentation is quite good, particularly for
agro workflows three x, but compared
to something like Apache Airflow, there's probably a lot less answers
on the lines of stack overflow, et cetera.
So it can be a little bit tricky sometimes when you're trying to troubleshoot
issues, I think. Lastly then, is the platform tuning for
Spark? When you're running Spark on your own Kubernetes
infrastructure instead of EMR, there's obviously a lot more configuration required, so we'd
set stuff up around claims policies,
various node types in particular too. As well, we had custom
Spark configuration to allow our jobs to
run resiliently on spot
instances. So quite a bit there in terms
of getting Spark to run efficiently
on our own Kubernetes infrastructure.
So that concludes my overview of argo workflows
and argo events. I'd like now to invite
any questions that people may have on the presentation.