Skip to content

Data pipeline: Migration

Sarah Anoke edited this page Feb 11, 2020 · 2 revisions

Migration involved reading in a table from PostgreSQL into Spark, then writing that table to CockroachDB. This was not as easy as it sounds because...

1 - some tables were 100GB+

Spark has many possible adjustment knobs, and determining working settings for my cluster architecture and datasets took a lot of trial and error. The most important of these settings were

1a - spark.network.timeout and spark.executor.heartbeatInterval

Because the individual tasks were taking a long time, I needed to tell Spark to wait a little bit longer before declaring an executor as dead.

1b - repartition() versus numPartition

I was reading in a 110GB dataset using spark.read and applying repartition(). My cluster would go down because Spark was reading all these data into the memory of one machine and that machine would die from the strain. I started instead using the numPartition option and my cluster was reading in my data and storing individual partitions in the memory of different workers, as I initially wanted.

What happened? My understanding is that repartition() initiates a partitioning process AFTER reading in the entire dataset. The numPartition option creates partitions as the data is being read in.

The difficulty with numPartition is that I needed to also specify a partitionColumn, lowerBound, and upperBound. Spark would then take (upperBound - lowerBound)/(numPartition) as the number of rows in each partition, using the numeric value of within partitionColumn to make partition assignments.

[Issue] I had no such numerical column in my dataset.

[Solution] I used SQL to create a column called rno, containing the row number:

ROW_NUMBER() OVER(ORDER BY (SELECT NULL)) AS rno

[Issue] My dataset was too big to determine the number of rows.

[Solution] If I couldn't read in the dataset, how could I apply count()? Thankfully, my data were coming from PostgreSQL and this database maintains metadata tables! I was able to pull the number of rows from information_schema.tables.

2 - my CockroachDB cluster write was very slow

Even though my Spark cluster was writing to CockroachDB through a load balancer, I had to keep the write speed to < 4,000 rows/second... and I had about 400 million rows of data (~28 hours of writing).

The slow speed appears to be related to CockroachDB memory management; when the write speed is any faster a node's memory is overloaded and it dies, taking down the whole cluster.

Controlling the write speed involved trial-and-error adjustment with the number of dataset partitions in Spark as well as the number of smaller DataFrames I split the larger DataFrame into. In the end, I settled on

  • reading in the PostgreSQL tables as a DataFrame with 200 partitions
  • splitting the single DataFrame into smaller DataFrames with ~1 million rows each

I also want to note the write failure rate of my CockroachDB cluster, empirically determined to be 0.06%... this means that out of the 400 million rows I read from PostgreSQL, about 225,000 rows did not make it to CockroachDB. At the moment I don't have any built-in system for catching these failures, but it's an area of interest.