Transcript
This transcript was autogenerated. To make changes, submit a PR.
Today I am going to talk about go concurrency powering a
gigabyte scale real world data pipeline. So let's get into
it. So before we get started, a quick thing about myself.
I'm chinmay naik. I go by chinmai 185 on Twitter,
GitHub, LinkedIn, etcetera. You can catch me up right there. I'm a founder
at one two n where we help companies with backend and
reliability engineering. I also write stories on pragmatic
software engineering based on our work that we do on
Twitter and LinkedIn. So follow me up there. And in
general, I like engineering. I love psychology, I play
percussion, and I'm a huge fan of rather old computer game
called Age of empires two. So hit me up there.
So today I'm going to talk about MongoDB to RDBMs,
data migration, and how we achieved that using ghost concurrency
features. Right? So fundamentally, we wanted to move data from
Mongo to postgres. I'm not advocating for either
one of these two technologies as such, but we just had a use case where
we had to move data from MongoDB to postgres.
Now this was two type of transfer. Like one is the ETL
extract transform load kind of transfer where we were doing one
time bulk transfer. Kind of think of it like snapshot transfer of data. And we
also had to worry about streaming data transfer. Like if
there is any ongoing updates to MongoDB, how do those reflect
in postgres? Before we go
ahead, let's think about how do we map MongoDB documents,
tables and rows? Because these two are different technologies,
they're not one to one direct data transfer.
So let's look at student collection.
It's just sample collection. In MongodB, where we have,
this is a sample student document where you see id,
which is primary key. There is name, which is a string field.
You've got role number, which is a numeric field. There is Boolean field for
is graduated. And this is also another string
field. And in Mongo, what you can do is you have bunch
of nested documents. So you have address as a nested set of
nested array of objects, and you have nested
object, which is phone number. How does that data
get translated into postgres? So the student
record itself is pretty simple, right? You could just migrate the
keys one on one, and keys in MongodB document become columns
in postgres. So for example, id becomes primary key.
You have name as a string field, roll number is graduated,
date of birth. Pretty simple. What about nested
fields? We had addressed and we had phone.
How do those get translated? What we could do
is we could create a relationship between student address
and student phone, right? So we could create a student address
table where we have the primary key and
we have a foreign key of sorts, which is logical foreign key from the
parent student table. Then we have
line items and other fields, line address and
zip and other fields. Similarly, for phone number, we can create a primary key
and we have a foreign key, which is a logical foreign key
based on the student id, right? So and other fields
like personal and work from the previous Mongo document. So essentially
we are migrating data in this fashion. We have got,
for a single Mongo document, we have got a student table,
one record in postgres for the address,
which is a nested sub document. We are going to create two records in
postgres for student address table. And of
course there will be a relationship between student table and the student address table.
And there is going to be a third table which is student phone table,
which will have one record right from the Mongo document. So that's
how we are going to migrate data from Mongo to postgres.
If you think about how does it translate into from JSON
to SQL, this is how it looks like. So for the id column
in our id field in Mongo, we are going to create an
insert table record in student table. The same id
will then get replicated as foreign key of sorts
as part of the student id table,
a column in the in postgres and same for the
phone record. So how does MongoDB
JSOn data maps to Sql? We just saw
one Mongo document can map to N SQL statements,
right? So a nested array of JSON
objects or further array of objects in Mongo,
they get translated into relevant tables in
postgres and are recorded with primary key and
foreign key relationships with some logical key constraint.
Well, inserts are cool, but I. How do we do updates
and deletes in Mongo? Right? We also have updates to
records and deletes. Well, for updates there can
be two types of update. One is we have an update SQL statement
wherein a particular documents, some records get updated
or some column get updated in postgres,
which is update table, set column value
kind of command in SQL. Because Mongo doesn't have
schema, a fixed schema and postgres or any relational database has a
schema, a new key added to JSON
document that will translate into alter table statement
in Mongo, in postgres, right? Which will have a schema change essentially.
So for delete, it's going to be a simple delete SQL statement
from a table. Then we have two choices of migrating
data. One is a bulk one time migrate where we
just move all the data from Mongo to postgres in this fashion, where we copy
data from Mongo, create proper SQL schema,
create insert statements, and just copy data.
But we can't stop our production mongo and
migrate everything and then just migrate to postgres. For example,
we would have to have some streaming data fashion as well where
we would migrate one time the initial data, and after that
we are going to need to rely on updates
to mongo or new inserts to mongo or deletes to Mongo to be
also translated to postgres. So that has to be done in a streaming
fashion. What if we want both options,
which is where we need a reliable way to track all the
updates to Mongo database so that we can
use that to migrate to postgres. Any thoughts that
come to mind? What kind of Mongo feature we can use? You guessed
it, it's called Mongo Uplog. In the Mongo operation,
log is a logical collection. It's a capped collection in MongoDB.
It's in the local database. What it does is it kind of
think of it like a write ahead log for MongoDB.
It tracks all the edits to the database,
whether it's insert updates or deletes. And you're going to be
able to look at whatever is happening to the database in
terms of updates or inserts or deletes. So what
we could do is we could use Oplog to
translate data or transfer data from Mongo to postgres. So let's look
at how it looks like in practice. What does the Oplog record look
like? Here is the sample insert oplog. You've got the
op as the operation, which is insert. Here I stands for insert.
The namespace stands for a database and a collection
combination. So you got test as a database name and student as
the collection name. Then you've got the proper
insert object, which contains the actual object that is
being inserted in MongoDB. Similarly for
update, you've got operation as update, you've got the namespace
and you get a set of field value pairs,
right? What is being updated, for example, and you get the actual
updated object. So imagine this to be you want to update based
on some key and you want to set some new set of values.
So that's what you get in the update oplog. And similarly, you'll have delete
as well. You might be wondering when are we really getting
to the concurrency and the go part of it? So, well, the wait is over.
Here is the sequential pipeline. What looks like. So fundamentally our
idea is to migrate data from MongoDB on one side to postgres on
the other. The way we do this is we think of writing
a utility or a program called, let's call it Oplog to SQL.
What that program is going to do is it's going to read the oplogs from
MongoDB sequentially as it comes is going
to, there is going to be a go routine which can read these oplogs.
It's going to then put these records, put these oplogs into a channel.
There will be another go routine which will read from this channel which will
convert process oplogs in some sort of
SQL format. That's what we saw just now, right, how we can convert a single
oplog into N SQL statements. So we are going to cover,
convert and process and convert an oplog to
a bunch of SQL statements. Once that's done, we put them
into another channel from this channel, another go
routine picks up these values, these SQL statements, and then it
just basically does raw inserts in postgres, right?
So imagine we wrote this program with this
kind of sequential data pipeline. We will be able to migrate
data from Mongo to postgres both in streaming and in the
bulk fashion, assuming we have access to those set of oplogs from
that time. So here you can consider
these as go routines and these channels,
these sort of pipes as channels essentially.
So this is how our Mongo oplog looks like. Here is
how corresponding postgres database table would look like.
Similarly, you've got this namespace and you've got
table that translates into database schema
and table name. In postgres, the data
gets translated into particular table.
So to be able to translate or move this data
from Mongo to postgres, we are going to have to create schema,
we are going to have to create a table and then we are going to
insert data into that table. Once we insert this
data, this is what it looks like. We have the schema, the table
and the insert statements. Well,
imagine we have two oplogs, two insert oplogs
on the same mongo collection. Well, we can't go and
create new schema and new table all the time.
We shouldn't actually. So we're going to create schema
and table only once and perform n number of
inserts, right? So that's what we look at
here. So one insert goes here and then the second insert goes here.
So we have to also maintain some sort of state to be able to not
create schema and table multiple times.
For now, given this is a talk related to
concurrency, we'll skip all the details and edge cases related to updates
and deletes. You can trust me that I've got it handled.
So here is what our pipeline looks like.
Let's say we run this pipeline, right? What we get
for about 3.6 million records. I ran this pipeline
on my sort of MacBook with basic specs.
It ran for about nine hour 20 minutes.
So let's think about how we can improve the performance of the system.
Typically if you think about it, we can probably
add some worker pool to be able to speed up things.
But where? Which parts of the program
can be parallelized? Where can we add concurrency?
Typically the process and the convert Oplog to SQL. That's the go
routine. That seems like it's doing too much work and
it could use some help. So let's
modify the program to add worker pools.
What we have done in this case is we are reading the oplogs in a
single go routine. We are pushing the oplogs in the oplogs
channel. From there we have an orchestration where
we are creating n number of worker pools or worker go
routines to be able to process these oplogs concurrently.
And hopefully if you have multiple cpu cores we can
also run them in parallel. Once all of these worker go
routines run there Oplog processing
and conversion operations, they will convert these into
SQL statements. Those SQL statements will go into this SQL
statements channel. The execute
SQL go routine is going to then pick up and run
these SQL statements in postgres. We've got
one design, but can we do better? Can we add more worker pools?
Well, I think the SQL Go routine execution can
also be parallelized and can also use some concurrency, right?
So it could also use some help. So let's modify
the program to be able to have more go routines. Now in
this case we have same, but what we've done is for each go
routine which handles the incoming oplog,
it creates a set of SQL statements and it pushes
the SQL statements in its own channel which another
guru team can read from. This way we have
maximum concurrency. And by the way, I can tell you like no go routines,
gophers are harmed in this exercise. So we are good.
So we ran this program. What happens
is it runs in about for the same number of same
number of oplogs. 3.6 million. It runs
concurrently in about 2 hours 18 minutes.
And that's actually four X performance
improvement over the sequential one. So we've already gotten like four
X performance improvements. That's amazing.
Before we do something more, we just realize something
is wrong. So what could be wrong? So again,
remember we have this oplog channel where you are getting all the oplogs
one by one. And there is multiple go routines which are
processing these oplogs concurrently. If you use go
routines multiple times, you will probably know that
you can't guarantee ordering of independent go routines.
You can't really have synchronizations across go
routines. So what does that mean?
That means this insert and update in the output
can actually also become update and insert. Well it can also become delete
and insert. This is a problem. How does
this cause a problem? So think about our pipeline.
We read the oplogs, we put them in a channel. There is bunch of go
routine workers which process these oplogs.
So imagine we have a single document which is being inserted
and immediately after is being updated sequential
oplog entries, one for insert and one for update of the same
document. What could happen with this setup is that
the SQL statement for insert would be generated and similar
SQL statement for update would be generated. But we can't guarantee
the ordering of these results. That means it
could end up happening that the update SQL runs first and then
it tries to run the insert SQL. Well that would result in SQL
error. So we can't really
throw worker pulls at a problem, right? Like the data integrity
gets compromised and we have a correctness
issue. So we need to always write correct program first. Then we
can make it performant. So this is a program that
is performant but it's not really correct. So then
do we go back to drawing board or what do we do? And again,
imagine we have n number of databases in Mongo and each database
has m collections. What we could do is we
could fan out for each database and fan
in for SQL. That's an option. So for example,
what we would do is again we would read the same in
a single go routine. Remember from MongoDB we push this into the
same channel, same oplox channel. But on the other side
we have a go routine, a single go routine which will fan
out these oplogs based on per database.
So essentially what we are doing is we are segregating per database.
So if you have n databases, we are going to create n number of channels
on the other side, right side of the oplogs for each database
we will have a go routine that will consume these oplogs.
Remember, each go routine consumes oplogs from only one database.
So for example, that go routine only processes DB one Oplog.
This go routine only processes DB two Oplog and vice versa.
So we have n number of go routines based
on the number of databases. And ultimately all these go
routines would then fan in and send all the SQL
updates or statements to a single channel, from which we
could then execute the SQL to postgres.
Well, you could theoretically say that that
single go routine on the who is executing the SQL
on database, that could become the bottleneck and it could use some help.
So we could modify this design and we could
fan out for each database without fanning in all the coroutines
into a single without fanning
in all the SQL statements into a single channel.
So this would, this is how it looks like.
Similarly we have those n databases, right? We have n go routines,
but instead of fanning them all in into a single channel,
we are creating a channel for each go routine.
And similarly to be able to insert records
in postgres, we are going to create n number of Go routines on
the database also, right? So we will have one go routine
per database and we'll have one go routine
per SQL insert, update or whatever. The total
number of go routines that we'll end up having is two nice. Plus obviously
there are two go routines that we have, one for reading the Oplog and one
for orchestrating the fan out. But basically
if we have n databases, we are typically going to have twice the number of
Go routines. If we take this idea even further
and we say well, why don't we create go routines
for each database and collection combination?
Also, right now we were only creating
go routines per database, but each database has
m or some collections, right? So what if we
create go routines per database collection combination?
That will be a truly concurrent and massively parallelized
solution, right? So this is what,
what we end up with. We have a concurrent data pipeline. We've got
Mongodb. The diagram was so huge to fit horizontally that
I had to fit it vertically, right? So we have Mongodb on top.
Then we have this oplog to SQL program where we have,
we read the oplogs in a single go routine. We push them into a channel.
On the receiving end we have a orchestrator go routine which is going to fan
out per database. So it's going to create n channels
every time it encounters a new database. It's going to create a new channel and
it's going to push all the values, all the oplogs for that
database in that channel. Then we have the
fan out oplogs per collection. So we have those
n go routines, one for each database as and when
they encounter an oplog for a different collection in the
database, they are going to create M such collections
oplogs or m such channels.
Goroutine will then process oplogs
for that only which channel
specifies for a database and a collection combination.
So this go routine is going to work on only,
let's say DB one collection one combination. The next coroutine
will work on DB one, collection two, db one
collection three likewise. And then a new set of go
routines will work on DB two, collection one, db two collection two,
etcetera. So we've essentially got m into
n kind of go routines. Each of them
write create SQL statements. Each of them push these
SQL statements into another channel,
which we will have again a SQL execution go routine which
will execute all these SQL statements onto postgres.
So essentially when we run this program,
we see that it runs in about like 1 hour 36 minutes
and 30 seconds for the same number of oplogs.
So we had about 3.6 million oplogs. The sequential
operation ran in nine hour 20 minutes and the concurrent one
ran in 1 hour 36 minutes.
Have in terms of resource utilization though, for this is that,
imagine we have 16 databases and we have
128 collections per database,
right? That's on the higher side, but let's imagine that. So the total
number of go routines that we'll have is we going to have n
number of fan out oplogs per database,
right? So we have got n goroutines like this. Then we
have got, for each database we are going to create m go routines.
So there will be n multiplied by m of those
yellow go routines. And lastly, for execution
of SQL, we are going to have one go routine per database collection combination.
That means n multiplied by m. Again, if we
just consider 16 databases and 128 collections, we are
going to end up with 2048 database connections
because each coroutine will actually form a connection to postgres.
That's going to be quite some number of connections and it's going to hog the
cpu and probably even cause
problems with postgres if we have so many open connections.
So while this program is performant, it is not utilizing the resources
or it is over utilizing or overburdening postgres.
What can we do? Can we do better? Right?
So what we do in this case is we keep the most of
the pipeline, the same, but instead of fanning
out for postgres writes we fan
them in. Remember, we don't need to create n
multiplied by m connections to database if we can
limit it to just n connections, one connection per database,
that's good enough, and that's what we'll do here. So from
all the go routines which process a single databases,
records, all of them will funnel the data back
into a single channel, and that channel will
have another go routine which will execute all those SQL
updates, inserts, etcetera on the postgres. So essentially we are going to create
n number of go routines for handling database
connections to postgres and that's where we limit the number of
go routines prol that happens. So if you consider
16 databases and 128 collections per database before
we had a massive fan out where we would
end up with 2048 database connections on
the right side after the modification, what we have is
we are fanning them in again to per database.
That means we have only 16 number of
connections to database. So we don't create
massive database connections and cause database to hang and perform slow,
but we instead create only 16 connections to database.
So if you were to then compare this improved performance,
you will see that in my case it ran a
bit faster because it's probably utilizing the database and overall
machine properly. So it was definitely better than
concurrent implementation earlier, but it wasn't
so much better. But in terms of resource utilization it was
massively so. This is what our final concurrent
data pipeline looks like we've got. We're reading
those from Mongolia. We are fanning out into
n number of Go routines, one for each database
for each of them. We are fanning them out again into m number of collections
and m number of Go routines, one for each database collection combination.
Ultimately then we fan in again. We don't have a go routine
sprawl. We fan them in into collecting all
SQL updates, inserts, deletes all sqls basically
for one database into a single go routine and a single channel.
And then that go routine will perform updates, insert and
delete to Mongo to postgres database. That's what our final
concurrent pipeline looks like. Think about what we
just did overall as a solution, right? We understood
the problem domain. We didn't.
We said that the solution is in context of
what the problem that we are solving. So our problem was
MongoDB to database postgres data migration.
We built the working solution first, which is a sequential
data pipeline. Agreed that it ran very much slow,
but we first always have a working solution before we try to optimize
something, we identify possible
parallel portions of the program. Not every program can
be parallelized, right? So you have to identify what
parts of the program can actually be improved. We have
to avoid blindly applying concurrency patterns.
For example, we tried to apply worker pools without really thinking too
much, and then it didn't work out,
right? The program was concurrent, but it wasn't performing.
I mean, it was performing well, but we did not have correctness.
And without correctness, no optimization helps.
We have to also consider Amdahl's law, where what parts of the
program can be parallelized and what's the sequential only portion
of the program, right, which cannot be parallelized. So no matter
how much hardware you throw at the parallelizable portion of the program.
But if it's not more than some percentage of the
remaining program, which is the sequential program, it's not going to help out much.
So always like consider Amdahl's law, and also consider its
sort of variant and even more broader law, which is universal
scalability law. Fundamentally, like simplicity may
be more valuable, and keeping things simple may be more valuable for
the maintenance and runtime of the program. And then premature optimization,
right? And premature performance gains that
you may gain beyond certain point.
So finally we ended up with a performant concurrent
implementation with fan in for database, to be able to limit the
the go routine sprawl and to be able to limit the number of database
connections. So that's it. I hope you learned
something. Connect with me on Chen Mai 185
and check out our website at 120. Here is
a link to our playbook and specifically link to this particular
problem where I've dumbed down this problem into set of stories
that you can implement yourself and try out various concurrency
features of Google. I've broken this problem down into multiple
small stories so that you can incrementally work on those.
Thanks. Hope you have a nice conference.