- When we need to move data from BigQuery to Spanner using Apache Spark in Scala
- Use of Spanner Mutation instead of SQL
- Spanner has a limitation on mutations per transaction, particularly affecting its efficiency in handling UPSERTS using Mutations.
- Dynamic batch size based on table schema and the size of data in a row
- This code computes the batch size for mutations within each transaction, aiming to avoid hitting any Spanner mutation limits.
- Dynamic type mapping between Spark type and Spanner type
- This is based on the schema obtained from BigQuery, where data types are converted and a schema is built to be compatible with Cloud Spanner.
- Dynamic assignment is performed to build mutations for each column of a given table
- This assignment is done dynamically based on the columns received from the source, adapting to the sink (i.e., Cloud Spanner).
The repository contains Spark Scala code crafted to retrieve data from BigQuery and then transfer it to Spanner through Mutations. These operations encompass both Inserts and Updates.
- Mutation Building: The buildMutation function constructs a single Mutation object for each row of data extracted from the BigQuery table.
- Mutation Batching: The code utilizes an ArrayBuffer to accumulate mutations.
- Get Column Count: It retrieves the number of columns in your DataFrame (df.schema.fields.length).
- Minimum Batch Size: The Math.max(..., 1) ensures that the batch size is never less than 1, preventing issues if you have a very large number of columns.
- Transactions per Mutation: The code effectively batches # of mutations calculated above into a single transaction when writing to Spanner.
- Accumulator: The spark.sparkContext.longAccumulator("Total Mutations") creates a long accumulator named "Total Mutations".
- Increment Accumulator: Inside the foreach loop where you build mutations, totalMutations.add(1) increments the accumulator for each row processed.
- Global Batch Size Check: The if (totalMutations.value >= batchSize) condition now checks the accumulator value. This provides a global count of mutations across all workers.
- Reset Accumulator: After writing a batch to Spanner, totalMutations.reset() sets the accumulator back to 0 for the next batch.
- Final Write: Any remaining mutations in the buffer are written after processing all rows in the partition.
-
Need java 11 --> Dataproc 2.2 images all use Java 11
-
SCALA (v2.12.18) (if not installed already on local)
cs install scala:2.12.18 scalac:2.12.18
-
build a JAR
sbt package
-
jar will be created in default location under target folder of the project. For eg:
test-lbg_2.12-0.1.0-SNAPSHOT.jar
-
create some environment variables
export PROJECT_ID=$(gcloud config list core/project --format="value(core.project)") export PROJECT_NUM=$(gcloud projects describe $PROJECT_ID --format="value(projectNumber)") export GEO_REGION="europe-west2" export GCS_BUCKET="gs://test-lbg-cloudera/" export GCS_BUCKET_JARS="${GCS_BUCKET}/jars" export BQ_DATASET="lbg" export BQ_TABLE="test" export SPANNER_INSTANCE="test-instance" export SPANNER_DB="example-db" export SPANNER_TABLE="test" export CLUSTER_NAME="lbg-cluster" export APP_JAR_NAME="test-lbg_2.12-0.1.0-SNAPSHOT.jar"
-
Upload required JARs to Google Cloud Storage bucket
- google-cloud-spanner-jdbc-2.17.1-single-jar-with-dependencies.jar
- google-cloud-spanner-6.45.1.jar
- test-lbg_2.12-0.1.0-SNAPSHOT.jar --> Scala code build using sbt
-
Launch Scala Apache Spark job on Dataproc cluster (assuming there is cluster available in GCP project)
gcloud dataproc jobs submit spark --cluster ${CLUSTER_NAME} \ --region=us-central1 \ --jar=${GCS_BUCKET_JARS}/${APP_JAR_NAME} \ --jars=${GCS_BUCKET_JARS}/google-cloud-spanner-6.45.1.jar,gs://test-lbg-cloudera/jars/google-cloud-spanner-jdbc-2.17.1-single-jar-with-dependencies.jar \ -- ${PROJECT_ID} ${BQ_DATASET} ${BQ_TABLE} ${SPANNER_INSTANCE} ${SPANNER_DB} ${SPANNER_TABLE}
Need to match dev environment with environment created by dataproc image
attribute | Dataproc | Local Dev |
---|---|---|
Dataproc image | 2.2-debian12 | n/a |
Apache Spark | 3.5.0 | n/a |
BigQuery connector | 0.34.0 | n/a |
GCS connector | 3.0.0 | n/a |
Java | 11 | zulu-11 (java version "11.0.20") |
Scala | 2.12.18 | 2.12.18 |
IDE | n/a | IntelliJ IDEA (2022.3.3) |
build system | n/a | sbt |
sbt | n/a | 1.9.9 |