Transcript
This transcript was autogenerated. To make changes, submit a PR.
Hi guys. Today we are going to discuss optimizing
of Apache Spark. Today we are going to discuss performance tuning
of Apache Spark. And you might ask, why would I
care? Spark three is quite good, is good enough.
I don't really need to pay attention to that.
But the reality is that problems are
still happening every now and then. So if you know how to debug them,
if you know how to tune the performance, then you
save time of yourself, you save time of
your team, but that also means saving money
you spend on the infrastructure, saving money
you spend on your cloud. Bill my name is Martin,
I'm CEO of Tantus Data. At Tantusdata
we help our customers with everything data related,
from infrastructure through data engineering
up to machine learning in production. And we also help
with trainings. And today I'll share
our lessons learned from multiple projects we
have been helping with. And let me get started with
short history of Spark. Because depending of
which spark version you are working with,
which spark version you had experience with, it very much depends
on what impressions you've had, what experience you've had.
So Spark version one was very unstable construct and
you very often had to run to your admins, to your ops for some
help for restarting of some services.
Then Spark version two was much
more stable. But still there was quite a big chance that
you run a sequel, a sequel, which is
perfectly correct, but it doesn't work. It doesn't work
because of the data distribution, it doesn't work
because of size of the data.
If it doesn't work, if you don't know how to solve this
kind of problem, all you could do, you could go and
run asking for help or randomly selecting
some random parameters with some hope that it will help,
but usually it need not help. And then with Spark
version three, the situation improved significantly because
many of the common problems have been resolved.
You have adaptive query execution, which basically does
lots of optimizations for you,
but still there is a chance it
will fail, it will be too slow and you have to do the
manual tuning. And today I'll show you
some of these examples, some of examples which are not fixed
even if, even in spark three,
if SQL is good enough these
days, in many cases, yes, in many cases you are good
with SQL or with understanding of spark API,
but I'll be sharing the cases where SQL is
just let's get started with case number one
and a short disclaimer is about what
kind of cases. I'm showing cases which are
based on production use cases, but they are very much simplified
so they are simplified. We are dealing with very,
very simplistic schema. We are working with very simplistic SQL,
but the techniques we will be applying are very much
matching what you would do in production. So how
you debug, how you find out what the problem is,
how you fix that, it's pretty much production
like. And so it's very practical. So let's get started with case number
one. In case number one, we will be processing
events. We'll be processing a table with events. And that
table will have just three columns, user id, event id,
and a timestamp. And the whole point is that we are
collecting some events, and let's say we are collecting them because
we would like to understand how our users
are interacting with a mobile app. So we
understand where the glitch is, what is confusing to the users,
and so on. And we would like to understand how much
time they spend on a specific event. So we would like to calculate
current event timestamp and the next event timestamp.
So we understand which action
is taking lots of time, which is potentially a
confusing one. So we are searching for a potential of two
to improve. So we are searching for the
next event. We have some users and timestamp here we are
looking at just single user. I pre sorted the event
timestamp. We want to fill in
an extra column which is next timestamp. And that
is super simple on a data which is already pre sorted because the next
timestamp would be exactly this one.
And then for the second row, the next
timestamp would be and
so on. So it is quite simple. And if you
would like to implement the code, then this is what it would
look like in it doesn't look like a rocket science. We are just
using a lead function.
We do it over partition, data partition by
user id and ordered by event timestamp.
Super simple code. I don't really want to dig into that more,
but the point is the code is perfectly fine,
the code is correct. And let's see
what will happen when you go to the Sparkui.
If you go to the Sparkui, you will realize that
the job is running. It has been running for 23
minutes. Nothing to be worried about yet. We don't know the size
of the data yet. So might be perfectly fine,
might be not. What's a bit worrisome
is number of tasks which are failing. We have already
three tasks which are failing, but that doesn't necessarily
mean something is wrong. We are talking about distributed system.
It could be a network glitch and spark is supposed
to recover from some random failures. So that's
the whole point of having system like spark. It will hide
some of the complexities, some of the failures from you if it was just
random. But let's go a bit deeper. Let's go into
the job details. In the job details, we have a
single active stage which has been running for
23 minutes, and we have 126
tasks completed. One task is still waiting, is still
running. If we go to that stage details,
we can see extra information.
So, first of all, the output size so far is quite
small. It's just five gigs. But the PO is
150 gigs of memory spill,
80 gigs of disk spill. So it's, it is quite a lot. And that already
shows us that there is some inefficiency.
Whether we should be optimizing that or not, it very much depends. It very much
depends on what exactly we are optimizing
for. And it doesn't necessarily mean that if you
see some spill that you have to jump into the optimization, because it could
be quite hard. Let's see if
we actually can see anything else which is boring.
So if we can have a look at the diagram, we can see lots of
those green, very tiny tasks. So most of the
tasks are completing in no time.
There is this one task has been running for quite a
lot of time. There is another one which
has been running for some time, and it failed again.
And it's attempt number one. The previous one was attempt
number zero. Now it's attempt number one. So we
keep restarting the same task failing,
and it takes most of the job time. That's already
something we should look closer into. So what
we have now, we have another attempt of the same task,
and it's running for the fourth time. In the
task overview, we can see all the tasks which has failed. We can
see the single one which is running. This memory
spill is actually produced only, but by those tasks,
there is no more memory spill. We can see
that they are taking some time, as we could see on the graph.
And we also can see some, we can see some errors, and they
are not very descriptive, maybe, except the disk space one.
But overall, we can see errors,
which doesn't really give us a good clue
why it's happening. And other than that,
we can see other tasks which are completing within 10
seconds, which are producing like 40 megabytes
of data. Nothing really suspicious about that.
So let's try to dig into the problematic tasks, the task which
keep restarting. If we look here,
we can see the view from the SQL tab in
spark and that gives us pretty much an execution
plan with lots of details.
First information we can see is that we are reading 100gb
of data, nothing very special. It's 1 billion
records. It is a lot, but it not
something we shouldn't be able to process, right? Right. We are dealing with a distributed
system, we are dealing with very small records. It shouldn't be
too bad. And if we scroll down, we can
see this first box we always should
be paying attention to is called exchange. And that's pretty much
means we are shuffling the data over the network.
We are exchanging the data over the network. And this is something
you need to pay attention to simply because first
of all, it's expensive in general. Shuffle is something
expensive in general, because it involves disk,
network and so on. But other than that,
it's good to pay attention to these kind of boxes.
There are many things which are going wrong with exchanging
of the data with. And in this code we are
doing just a single shuffle. It's good to
hover over the exchange box and understand what it's
caused by. And it's here we are hash partitioning
based on user id simply because we are explicitly
in the, in the SQL code, we tell that we want to
partition the data based on user id. But if your code
is much bigger, it's good to understand what is
the correlation between what in the spark UI and part of the code.
And that helps you understand that, because usually you join by,
you have multiple joins, you join by multiple different columns.
And this is a good hint to narrow it down. Which part of the code
is responsible for a specific box
in the execution plan in the SQL tab?
Okay, but if we check how many bytes
were written by the shuffle, we are talking about 40 gigs.
And the top partition is 45 megabytes. It is
small, nothing to be worried about. But on the other hand,
how many bytes have been after the
shuffle, then it's completely different story.
The top partition is 6gb, and that's
huge. That means top partition is 67gb
and that partition will be processed by just a single,
by a single task. And that partition will
be causing the problems because the,
the whole rule is that single partition is processed
by a single task. So the way it looks like
is that whenever you read the data here we have virtual
slide with hdfs. But that applies to s three,
that applies to delta. Whatever data source
you have, parc always have the
concept of partition in mind, and it reads the data in chunks.
So you read a chunk of the data within a task, you do some
processing on the fly and eventually you prepare the
data for shuffle. Here we are organizing the data based on the
user id, simply because we are partitioning by
user id. But it's the same with joins. If you join by a specific
column, the data will be organized by that column,
and then you have many tasks like that. And what will
happen during the shuffle? All the chunks
with specific user will go to the
same, and that's perfectly fine.
And the second user will probably go to another task,
and that's all perfectly fine as long as
the data can fit into the executor, as the data can be processed
by the executor. If your user's user
base, the event base is not balanced, if single
user is producing too many events, you have
a problem. Then you have a problem with maybe just
a single task which cannot really process that data.
And these kind of problems were very common in spark version
two. Then with adaptive query
execution it's been improved because for instance, like if you
have this problem with left join, it's automatically
balanced. But we are looking at the window function example
where it is not balanced and we are struggling with that.
That's exactly the situation we are struggling with. So how do
we solve that? How do you fix a
data which is imbalanced? How do you fix
a skew in your data? First of all, you really need to
understand where the problem is coming from and that
your spark job is failing because of the skew. And once
you understand that, then you can think, okay, maybe it
is a bug, maybe it is a bug in my data, and then you simply
have to fix that. Maybe you can think of filtering
out the problematic users because in that given processing,
you don't really need them. If that doesn't help,
then you need to find out a way, figure out
a way of completing the processing.
And before I show you how to do that in this specific
use case, I just want to make one more note,
one more note, which is upgrade Spark if
you are not on spark version three yet, because the
problem I'm showing you is still something you
have to fix manually. But in many cases,
Spark three allows you to avoid this kind of manual fixes.
So for instance, if you are doing a left join,
Spark would automatically fix it for you if you are with
the newest version. So keep that in mind,
but let's see how we can potentially solve this problem.
So the scenario I showed you is that we are calculating
the next timestamp, so we are pulling it from the next record
and so on. And the scenario I'm showing you
is that we are calculating user one
data in the same partition. But if this user is producing
so many records, why don't we process that
user or all the users, so we don't have to
process all the, all the records at once. We have,
we can process them day by day,
and that's quite easy to implement. The only problem with that will
be that the last record of each
day will be now, unless we fix that. And coming up with
a fix is really not a rocket science.
So if we look into that here, we are calculating an extra
column, which is a bit of math to tell which day
is that. So we are not dealing with timestamp anymore. We are also dealing
with number of a day. We are defining
a bunch of windows for the processing,
because we will be processing the user not only based
on user id, but also on a window which is user id and
a day. And then we calculate a bunch of
kind of helper data frames, which is the
events with nodes. Then after that we
fix the nodes and then we do the union and
merge them together. But the whole point is not really how exactly this code works.
It's not a rocket science. The whole point is that if we
do that, if we do this kind of processing, we explicitly tell
Spark that we want to do day by day processing.
And this is what we will see in the spark UI.
So we don't have this single event with or single task,
which is taking most of the time and eventually failing. We see
the distribution being very even and the
top task is taking 210 seconds. You might argue that
it probably not optimal either.
But the point is the job was running for 20 minutes
and it would never really complete. I just killed that.
And now it's running less for less than
1 minute and it completes and everything is fine.
So if you do this kind of optimizations,
really need to understand why you are optimizing
this. In our case, the job was just failing,
so you had to fix that. But if you do further optimizations, you need to
think what is worth your time. Maybe you want to
just move on and move to another problem. Maybe you want to
optimize for the wall clock time because your customers are waiting
for the data. Maybe you want to optimize for the resource utilizer
because you want to optimize the cloud bill. And maybe
you want to avoid optimization because you don't want to complicate
your code. You don't want to make your code
more complex simply because you want the maintenance.
All right, so let's have a look at case number two. Case number
two, which on the surface could look very similar
to the previous one, but it would be slightly different.
So let's have a look. We are not looking into the code.
We just try to very quickly figure out what's going
on based on just Sparkui. So we have a spark drop, which is
again running for 20 minutes. After an hour or so, I killed it because
it could never really complete. And what we can see is
a very similar situation to the previous
job, which had tasks almost
immediately completing. And this single task, which is problematic,
which is running for probably it would be very hard to complete.
And you might say, okay, but it looks like exactly the same situation.
The difference would be when we look into
active stages and if you look at the output
and the shuffle read, it looks like we are processing
almost no data. In the shuffle read phase we have
just 20 megabytes. So it looks like we are processing
almost no data. Yet the job cannot complete.
In the SQL tab we can confirm that
the input is small. After doing
a bit of kind of self joining, we can see that
the sortmer join phase is
processing the record number, 4 billion something.
And it, the number is growing. So even though we are reading
not that many records, we are producing lots of
record. I'll do a bit of a shortcut because
of the time limitation, but bottom line
is that products table with order id
and we are joining the product table with itself based
on the order id, because we would like to do an
analysis of products r
board. And the problem with Cartesian join, that if
you have three records as an input size is
nine, then for ten you get hundred in an output,
but it grows really quickly. So for 10,000 records,
you end up with hundreds, millions of records. It's nothing
new. It's a problem which is known for a long time. You just need to
be aware that if you are doing self joined, then the number of records will
be huge. But the problem here is that all the records are
processed within just a single executors.
Single executor. Simply because Spark
is doing that based on the order id is our joint key
and the same order id is being processed in the
same task. Unless we explicitly. The very
simple technique to avoid this kind of behavior is salting
in the product table. If we join it
with itself, we on one hand we
add a random value.
Here I'm using the random value from range one up to two
so it can fit into the slides right hand side
table. We are generating every single
possible combination of SAl. So we duplicate
the records. But the benefit of doing that is
that instead of all the nine records being
processed in one task, with these
three producing six records with the remaining
three. So instead of processing all the nine
records in the same task, it will be processed by two
separate tasks. And this way we distribute the load.
So just so we are on the same page doing
something in cartesian join, we are not limiting the number of
records. The number of records will still be the
same. You just produce lots of records. But what you're doing is you're
parallelizing the execution, you make it better
distributed. So end of the day,
we can see many tasks which are
all doing something which are all running for quite some time,
and the top task is running for eleven minutes. You could argue we
could split it even further. The tasks are still processing quite
a lot of data. Probably we can split that. But the point is
we won't be waiting forever. This might be good enough.
You might want to tune the saud value we used in this case,
I believe we used 180 or something like that.
So a bit more than on the. Alright, so let's have
a look at case number three and case number four.
And they are grouped together because they are both related to lazy evaluation.
So lazy evaluation is something which is thought
at the very beginning of every single spark curse.
Yet it is something which is very easy to forget about.
And the whole concept is that if you read some data
frame and then you do some complex transformation,
then that complex transformation does not happen immediately.
The park just remembers what it has to do,
and it does it when it's absolutely necessary.
So it's a, it's an optimization concept, so it doesn't
have to do every single action immediately.
If you for some reason decide to do a while and
then do this kind of complex transformation in a loop,
you need to be aware that this will not
be memorized anywhere. Spark doesn't have any cache for
that which is happening immediately.
Parc will be just remembering more and more operations to be
done, and the complexity of every single iteration
will grow. So the first iteration it takes 23 seconds
and a few jobs to be completed. Then after some more iterations,
it takes 1 minute and several jobs to be completed.
And then it takes nine minutes and growing really
fast. And the same with the complexity.
Initial one is initial job is quite simple,
then it's a bit more complex. And then we go
here, we scroll down even more and we are keep repeating the
same operations, but we do them from scratch
in every iteration unless we do something
about that. So what is the conclusion here. First of all,
loops are suspicious because it's somewhat like
doing a batch within a batch system so far
has its own operators,
which are batch operators. But we are using loops simply
because we can. And sometimes it's okay, but in some cases
it's really suspicious. In some cases it really leads to
growing complexity of the execution plan. But you
can grow the execution plan even without the loop. So if your
execution plan is growing out of hand, if you can see that spark
slowing down because of how complex,
how many operations you have to do, then it's worth
considering checkpointing or materializing the results
on the way. And it's, on one hand, it's a
good kind of hack, or it's a
good, let's say, workaround for very
big. But on the other hand, it could be good idea to
split very complex code, very complex, dug into modules,
and that could help, you could help even with the code maintenance.
So it could serve two purposes.
But other than that, something which is very much related to
the lazy execution is whether our code is
deterministic or not, because if it's not deterministic.
So for instance, we want to randomly generate
values because we want a unique.
It becomes very tricky. It becomes
very tricky because let's say we want to split our
data frame into two data frames because of the machine
learning, split the split for machine learning. Then if we write
a code like that, we generate a column with random value
and we generate two different data frames, and we hope that the
split based on that random value is,
is happening. So you have two data frames
which are not overlapping. Then you might get surprised,
because when you write data frame one, then you write data
frame two. The random generation will happen
from the very beginning, it will happen from scratch.
And that's a real problem I actually have seen.
It will fail. It will create overlaps, it will create
lost records. And this is a LinkedIn message
I received from a friend. He was fuming because
of that. He spent lots of time.
It was really significant bug because it was introducing
an overfitting effect
in their machine learning pipeline. So whenever
you deal with random values, first of all, check if you
don't have this kind of mechanism like splitting data frames
in the standard library, because that you do have. But if you
do something a bit more custom, then make sure you materialize
result, because that's the safest. That is the safest one. It might be not
the most kind of time efficient, but it's
the safest. You don't risk that someone else will come do
some modification in your code without knowing what is happening.
So what will be the conclusion from all
the examples I showed you? First of all, you really need to know
what you are optimizing for, whether it's your time,
the compute time, the wall clock time. It really
helps if you understand where the bottleneck is.
If these kind of cases are new to you,
I would suggest that you invest a bit of time for experimenting and
learning a bit more about how spark works under the hood. But if
you feel that you don't have these kind of problems
on daily basis, you should at least know
when to ask for help and who to ask for help. Because maybe you
are a data scientist. You want to focus on the data science, not spark debugging.
You don't want to be a PhD in
spark, but then at least know when you are blocked
and make sure you know where to ask for help. So for
instance, maybe there is a data engineer which is good at these kind of things.
Feel free to reach out to me on LinkedIn and
I will be very happy to talk to.