From 330530f8025d2d30eb97d784d8f999b9d6ab0a26 Mon Sep 17 00:00:00 2001 From: aksharauke <126752897+aksharauke@users.noreply.github.com> Date: Wed, 10 Jan 2024 14:44:30 +0530 Subject: [PATCH] User guide and runner script for reverse replication (#744) * Reverse Replication User Guide and Runner * added skip file logic * added skip file logic * fixed failing formatter test * fixed sample commands * fixed customization section * review comments incorporated * custom template path sample command --- .../ReverseReplicationUserGuide.md | 171 +++--- .../RunnigReverseReplication.md | 200 +++++-- reverse_replication/launcher.go | 486 ----------------- .../reverse-replication-runner.go | 491 ++++++++++++++++++ 4 files changed, 740 insertions(+), 608 deletions(-) delete mode 100644 reverse_replication/launcher.go create mode 100644 reverse_replication/reverse-replication-runner.go diff --git a/docs/reverse-replication/ReverseReplicationUserGuide.md b/docs/reverse-replication/ReverseReplicationUserGuide.md index 87f90d9b6..df9d0927d 100644 --- a/docs/reverse-replication/ReverseReplicationUserGuide.md +++ b/docs/reverse-replication/ReverseReplicationUserGuide.md @@ -29,18 +29,16 @@ Reverse replication could also be used to replicate the Cloud Spanner writes to Reverse replication flow involves below steps: 1. Reading the changes that happened on Cloud Spanner using [Cloud Spanner change streams](https://cloud.google.com/spanner/docs/change-streams) -2. Removing forward migrated changes +2. Removing forward migrated changes ( if configured to filter ) 3. Cloud Spanner being distributed database, the changes captured must be temporally ordered before writing to a single source database 4. Transforming Cloud Spanner data to source database schema 5. Writing to source database -These steps are achieved by two Dataflow jobs, along with an interim buffer which holds the ordered changes. +These steps are achieved by two Dataflow jobs, along with an interim buffer which holds the change stream records. -![Architecture](https://services.google.com/fh/files/misc/reversereploverview.png) +![Architecture](https://services.google.com/fh/files/misc/reversereplicationgcsgeneric.png) -*Note that the buffer used is the [Cloud Pub/Sub](https://cloud.google.com/pubsub/docs/overview). Kafka is experimentally supported and requires manual setup, which is not discussed in this guide. [Contact us](#contact-us) for using Kafka.* - ## Before you begin A few prerequisites must be considered before starting with reverse replication. @@ -51,13 +49,12 @@ A few prerequisites must be considered before starting with reverse replication. - Check that the MySQL server is up. - The MySQL user configured in the [source shards file](./RunnigReverseReplication.md#sample-sourceshards-file) should have [INSERT](https://dev.mysql.com/doc/refman/8.0/en/privileges-provided.html#priv_insert), [UPDATE](https://dev.mysql.com/doc/refman/8.0/en/privileges-provided.html#priv_update) and [DELETE](https://dev.mysql.com/doc/refman/8.0/en/privileges-provided.html#priv_delete) privileges on the database. 2. Ensure that Dataflow permissions are present.[Basic permissions](https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates#before_you_begin:~:text=Grant%20roles%20to%20your%20Compute%20Engine%20default%20service%20account.%20Run%20the%20following%20command%20once%20for%20each%20of%20the%20following%20IAM%20roles%3A%20roles/dataflow.admin%2C%20roles/dataflow.worker%2C%20roles/bigquery.dataEditor%2C%20roles/pubsub.editor%2C%20roles/storage.objectAdmin%2C%20and%20roles/artifactregistry.reader) and [Flex template permissions](https://cloud.google.com/dataflow/docs/guides/templates/configuring-flex-templates#permissions). -3. Ensure the compute engine service account has the following permissions: - - roles/pubsub.subscriber - - roles/pubsub.publisher +3. Ensure the compute engine service account has the following permission: - roles/spanner.databaseUser + - roles/secretManager.secretAccessor + - roles/secretmanager.viewer 4. Ensure the authenticated user launching reverse replication has the following permissions: (this is the user account authenticated for the Spanner Migration Tool and not the service account) - roles/spanner.databaseUser - - roles/pubsub.editor - roles/dataflow.developer 5. Ensure that [golang](https://go.dev/dl/) (version 1.18 and above) is setup on the machine from which reverse replication flow will be launched. 6. Ensure that gcloud authentication is done,refer [here](./RunnigReverseReplication.md#before-you-begin). @@ -66,6 +63,7 @@ A few prerequisites must be considered before starting with reverse replication. 9. [Source shards file](./RunnigReverseReplication.md#sample-sourceshards-file) already uploaded to GCS. 10. Resources needed for reverse replication incur cost. Make sure to read [cost](#cost). 11. Reverse replication uses shard identifier column per table to route the Spanner records to a given source shard.The column identified as the sharding column needs to be selected via Spanner Migration Tool when performing migration.The value of this column should be the logicalShardId value specified in the [source shard file](./RunnigReverseReplication.md#sample-sourceshards-file).In the event that the shard identifier column is not an existing column,the application code needs to be changed to populate this shard identifier column when writing to Spanner. +12. The reverse replication pipelines use GCS as data buffer, this GCS bucket needs to be created before starting the reverse replication flows. ## Launching reverse replication @@ -82,26 +80,37 @@ There are various progress points in the pipeline. Below sections detail how to Unless there is change stream data to stream from Spanner, nothing will be reverse replicated. The first step is to verify that change stream has data. Refer [here](https://cloud.google.com/spanner/docs/change-streams/details#query) on how to check this. -#### Metrics for Dataflow job that writes from Spanner to Sink +#### Metrics for Dataflow job that writes from Spanner to GCS The progress of the Dataflow jobs can be tracked via the Dataflow UI. The last step gives an approximation of where the step is currently - the Data Watermark would give indication of Spanner commit timestamp that is guaranteed to be processed. On the Dataflow UI, click on JobGraph and scroll to the last step, as shown below. Click on the last step and the metrics should be visible on the right pane. -![Metrics](https://services.google.com/fh/files/misc/sourcetosinkmetrics.png) +![Metrics](https://services.google.com/fh/files/misc/readermetrics.png) +In addition, there are following application metrics exposed by the job: -#### Pub/Sub metrics +| Metric Name | Description | +|---------------------------------------|----------------------------------------------------------------------------------------------------------------------------------| +| data_record_count | The number of change stream records read | +| num_files_written_\| Number of files successfully written for the shard | -Track the Pub/Sub topic [metrics](https://cloud.google.com/pubsub/docs/monitor-topic) to verify that there is inflow of messages by checking the 'Published Messages' metric. +The progress of files created per shard is also captured in the shard_file_create_progress table, which gets created in the metadata database specified when starting the job. -Note that subscription [metrics](https://cloud.google.com/pubsub/docs/monitor-subscription) can be verified to check if the dataflow job that writes to source database is reading from Pub/Sub as expected. #### Metrics for Dataflow job that writes to source database -The Dataflow job that writes to source database exposes per shard metric like so, which should be visible on the right pane titled 'Job Info'. +The Dataflow job that writes to source database exposes the following per shard metrics: -![Metrics](https://services.google.com/fh/files/misc/orderedbuffertosourcemetrics.png) +| Metric Name | Description | +|---------------------------------------|----------------------------------------------------------------------------------------------------------------------------------| +| file_read_\| Number of files successfully read for the shard | +| records_read_from_gcs_\| Number of records read from GCS for the shard | +| records_processed_\ | Number of records successfully written for the shard +| +|replication_lag_in_seconds_\| Replication lag min,max and count value for the shard| +| metadata_file_create_lag_retry_\ | Count of file lookup retries done when the job that writes to GCS is lagging | +| mySQL_retry_\ | Number of retries done when MySQL is not reachable| These can be used to track the pipeline progress. However, there is a limit of 100 on the total number of metrics per project. So if this limit is exhausted, the Dataflow job will give a message like so: @@ -127,6 +136,8 @@ mean(value.user_counter)] Metrics visible on Dataflow UI can also be queried via REST,official document [here](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.jobs/getMetrics?apix_params=%7B%22projectId%22%3A%22span-cloud-testing%22%2C%22location%22%3A%22us-east1%22%2C%22jobId%22%3A%222023-06-06_05_20_27-10999367971891038895%22%7D). +The progress of files created per shard is also captured in the shard_file_process_progress table, which gets created in the metadata database specified when starting the job. + #### Verifying the data in the source database To confirm that the records have indeed been written to the source database, best approach is to check the record count on the source database, if that matches the expected value. Note that verifying data takes more than just record count matching. The suggested tool for the same is [here](https://github.com/GoogleCloudPlatform/professional-services-data-validator). @@ -138,7 +149,7 @@ Following are some scenarios and how to handle them. #### Dataflow job does not start 1. Check that permission as listed in [prerequisites](#before-you-begin) section are present. -2. Check the DataFlow logs since they are an excellent way to understand if something is not working as expected. +2. Check the DataFlow logs, since they are an excellent way to understand if something is not working as expected. If you observe that the pipeline is not making expected progress, check the Dataflow logs for any errors.For Dataflow related errors, please refer [here](https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline) for troubleshooting. Note that sometimes logs are not visible in Dataflow, in such cases, follow these suggestions. ![DataflowLog](https://services.google.com/fh/files/misc/dataflowlog.png) @@ -149,24 +160,20 @@ If you observe that the pipeline is not making expected progress, check the Data In this case, check if you observe the following: -- ***The watermark of the Spanner to Sink pipeline does not advance*** - - This happens when the job is hit with a huge backlog, that leads to infinite loop. The recovery steps are covered [here](#recovery-steps-for-the-infinte-loop). - -- ***PubSub message count is not decreasing and the same data is being written back to source repeatedly*** - This happens when the time to write all messages to source database and send an ACK to PubSub exceeds the deadline. Ensure the ACK deadline for the subscriptions are high enough (10 minutes is the highest value, consider bumping it to that). If still facing this issue, consider moving the Dataflow job writing to source database geographically closer to the source database. - -- ***There is data in change stream yet not present in Pub/Sub*** +- ***There is data in change stream yet not present in GCS*** Records of below nature are dropped from reverse replication. Check the Dataflow logs to see if they are dropped. 1. Records which are forward migrated. 2. Shard Id based routing could not be performed since the shard id value could not be determined. 3. The record was deleted on Cloud Spanner and the deleted record was removed from Cloud Spanner due to lapse of retention period by the time the record was to be reverse replicated. + 4. Check the data_seen and shard_file_create_progress tables created in the metadata database. An entry in the data_seen table means that change record for read for the given interval for the given shard. If no change record was generated for the interval, then no file is generated. The shard_file_create_progress table indicates the maximum interval until which the files have been generated for the shard at that point. If the file creation interval is lesser than the expected interval, then wait for the pipeline to process the change records. + 5. Check for issues in the dataflow job. This can include scaling issues, CPU utilization being more than 70% consistently. This can be checked via [CPU utilization](https://cloud.google.com/dataflow/docs/guides/using-monitoring-intf#cpu-use) section on the Dataflow job UI.Check for any errors in the jobor worker logs which could indicate restarts. Sometimes a worker might restart causing a delay in record processing. The CPU utlization would show multiple workers during the restart period. The number of workers could also be viewed via [here](https://cloud.google.com/dataflow/docs/guides/using-monitoring-intf#autoscaling). + 6. When working with session file based shard identification logic, if the table of the change record does not exist in the session file, such records are written to skip directory and not reverse replicated. -- ***There is data in Pub/Sub yet not present in source database*** +- ***There is data in GCS yet not present in source database*** - Check worker logs to ensure that records are being read from PubSub. Filter the logs based on logical shard id of the shard you want to check. It should have messages like below, which indicate records are being read from Pub/Sub. + Check worker logs to ensure that records are being read from GCS. Filter the logs based on logical shard id of the shard you want to check. It should have messages like below, which indicate records are being read from GCS. ![DataflowLog](https://services.google.com/fh/files/misc/recordsreadfrompubsub.png) @@ -174,13 +181,15 @@ In this case, check if you observe the following: Check for logs to see if there are any Connection exception warnings like below. This means that the source database is not reachable and the job keeps retrying to connect, hence nothing gets written to the source database. In such case, please ensure that the [prerequisite](#before-you-begin) of connectivity between source database and Dataflow workers is met. ![DataflowLog](https://services.google.com/fh/files/misc/connectionretry.png) - + Note that in case of connection exceptions, the mySQL_retry_\ metric would keep incrementing to indicate that connection is being retired. Check the Dataflow logs to see if records are being dropped. This can happen for records for which primary key cannot be determined on the source database. This can happen when: 1. The source database table does not have a primary key 2. The primary key value was not present in the change stream data - 3. The record was deleted on Cloud Spanner and the deleted record was removed from Cloud Spanner due to lapse of retention period by the time the record was to be reverse replicated. + 3. Check the shard_skipped_files table created in the metadata database. The contains the intervals for which the file was found in GCS, for the cases when no change record was generated for the interval.If a file is present in the shard_skipped_files table and also exists in GCS - this indicates a data loss scenario - please raise a bug. + 4. Check the shard_file_process_progress table in the metadata database. If it is lagging, then wait for the pipeline to catch up so such that data gets reverse replicated. + #### There is higher load than the expected QPS on spanner instance post cutover @@ -189,9 +198,11 @@ In this case, check if you observe the following: 2. If the forward migration is still running post cutover, the incoming writes on Spanner that are reverse-replicated to the source get forward migrated again. This can cause the load on Spanner to be almost double the expected QPS, as each write will get reflected twice. Also, it could lead to transient inconsistencies in data under certain cases. To avoid this, stop/delete the forward migration pipeline post cutover. If the forward pipeline is required, add custom filtration rules and build a custom forward migration dataflow template. -### Retry +### Resuming from failures + +The reader Dataflow job stops when any error is encountered. The writer dataflow job halts processing a shard if there is error encountered for the shard, this is to ensure ordering of writes does not break. -For both the Dataflow jobs, once an error is encountered for a given shard, then procesing is stopped for that shard to preserve ordering.To recover,rerun the job.The jobs are idempotent and it's safe to rerun them. +The metadata tables keep track of progress made by the Dataflow templates. This helps to start the Dataflow jobs from where they left off. The command to run the Dataflow jobs should be available when launching the Dataflow jobs via launcher script.The arguments are similar to what was passed in the launcher [script](./RunnigReverseReplication.md#arguments). @@ -209,58 +220,35 @@ When providing subnetwork, give the option like so: --subnetwork=https://www.googleapis.com/compute/v1/projects//regions//subnetworks/ ``` -Example command for the Spanner to Sink job +#### Retry of Reverse Replication jobs -```code -gcloud dataflow flex-template run ordering-fromspanner \ - --project \ - --region \ - --template-file-gcs-location gs://dataflow-templates/2023-07-18-00_RC00/flex/Spanner_Change_Streams_to_Sink \ ---additional-experiments=use_runner_v2,use_network_tags=,use_network_tags_for_flex_templates= \ - --parameters "changeStreamName=" \ - --parameters "instanceId=" \ - --parameters "databaseId=" \ - --parameters "spannerProjectId=" \ - --parameters "metadataInstance=" \ - --parameters "metadataDatabase=" \ - --parameters "sinkType=pubsub" \ - --parameters "pubSubDataTopicId=projects//topics/" \ - --parameters "pubSubErrorTopicId=projects//topics/" \ - --parameters "pubSubEndpoint=:443" \ ---parameters "sessionFilePath=" +In order to resume the reader job, it should be started with run mode as **resume** and the runIdentifier should be same as that of the original job. -``` +In order to resume the writer job for all shards the run mode should be **resumeAll** and the runIdentifier should be same as that of the original job. -Example command for the writing to source database job +Example command for resume is [here](RunnigReverseReplication.md#resuming-jobs). -```code -gcloud dataflow flex-template run writes-tosql \ ---project= \ ---region= \ ---template-file-gcs-location=gs://dataflow-templates/2023-07-18-00_RC00/flex/Ordered_Changestream_Buffer_to_Sourcedb \ ---additional-experiments=use_runner_v2,use_network_tags=,use_network_tags_for_flex_templates= \ ---parameters "sourceShardsFilePath=" \ ---parameters "sessionFilePath=" \ ---parameters "pubSubProjectId=" +In order to process only the failed shards in the writer job, the run mode should be **resumeFailed** and the runIdentifier should be same as that of the original job. -``` +Example command for the same is [here](RunnigReverseReplication.md#reprocessing-error-shards) -## Reverse Replication Limitations -The following sections list the known limitations that exist currently with the Reverse Replication flows: +In order to process only certain failed shards, update the status as REPROCESS in the shard_file_process_progress table for those shards and launch writer job, the run mode should be **reprocess** and the runIdentifier should be same as that of the original job. + +In order to resume processing of only the successful shards in the writer job, the run mode should be **resumeSuccess** and the runIdentifier should be same as that of the original job. + -### Dataflow job of Spanner to Sink getting stuck in infinte loop -The Dataflow job that reads the change streams and writes to PubSub gets stuck in infinite loop retrying the same set of records during certain scenarios.These scenario can arise when there are a lot of changestream records to be read in a short interval of time, which occurs in the following situations: -1. There is an unexpected spike on Spanner -2. The pipeline is started with a date in past ( due to issues that required downtime such as bug fix ) -Currently, there is no way to revert this within the same job. More details and recovery steps below. +Note: Additional optional parameters for the reader job are [here](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/spanner-change-streams-to-sharded-file-sink/README_Spanner_Change_Streams_to_Sharded_File_Sink.md#optional-parameters). -#### Recovery steps for the infinte loop -1. The user must track the last handled window that was successfully processed - this can be obtained from the Dataflow logs or by checking the watermark of the job stage. -2. Specify a different partition metadata database or drop the previous metadata tables - since watermarks for a given partition are persisted and hence will be read from that point onwards. So the pipeline must begin with a clean partition metadata database.Note that the metadata tables can be dropped by giving DROP TABLE\ statements in the Cloud UI. -3. The current pipeline must be updated - the user must specify the start time of change stream query as the last successfully processed timestamp and the end time of the change stream as the time that would result in ~1GB writes. Example, if average record size is 1KB, then 10,00,000 records should be processed in a window and if the TPS during that window was 20K, then the window must end at 50 second. Detailed steps to update the Dataflow job are given here: https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline#gcloud-cli -4. Once this window is processed, the pipeline must be updated with next set of start and end times, until the pipeline catches up and the finally the end timestamp need not be passed. +Note: Additional optional parameters for the writer job are [here](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/gcs-to-sourcedb/README_GCS_to_Sourcedb.md#optional-parameters). + +## Reverse Replication Limitations + + The following sections list the known limitations that exist currently with the Reverse Replication flows: + + 1. Currently only MySQL source database is supported. + 2. Certain transformations are not supported, below section lists those: ### Reverse transformations Reverse transformation can not be supported for following scenarios out of the box: @@ -279,11 +267,26 @@ In the above cases, custom code will need to be written to perform reverse trans ## Best practices -1. Avoid backlog build up of Spanner writes before starting the reverse replication. Start the reverse replication pipeline just before cutover of the first shard. +1. Set the change stream retention period to maximum value of 7 days to avoid any data loss. + +2. The change records get written to GCS in plain text, ensure that appropriate [access control](https://cloud.google.com/storage/docs/access-control) exist on GCS to avoid inadvertant data access. + +3. The Spanner TPS and [windowDuration](RunnigReverseReplication.md#arguments) decides how large a batch will be when writing to source. Perfrom benchmarks on expected production workloads and acceptable replication lag to fine tune the windowDuration. + +4. The metrics give good indication of the progress of the pipeline, it is good to setup [dashboards](https://cloud.google.com/monitoring/charts/dashboards) to monitor the progress. -2. Set the chagne stream retention period to maximum value of 7 days to avoid any data loss. +5. Create GCP bucket with [lifecycle](https://cloud.google.com/storage/docs/lifecycle) to handle auto deletion of the objects. -3. Use the launcher script to create the necessary GCP resources and avoid creating them manually. +6. Use a different database for the metadata tables than the Spanner database to avoid load. + +7. The default change stream monitors all the tables, if only a subset of tables needs reverse replication, create change stream manually before launching the script. When creating a change stream manually, use the NEW_ROW option, sample command below : +``` +CREATE CHANGE STREAM allstream +FOR ALL OPTIONS ( +retention_period = '7d', +value_capture_type = 'NEW_ROW' +); +``` ## Customize @@ -296,16 +299,26 @@ Some use cases could be: To customize, checkout the open source template, add the custom logic, build and launch the open source template. -Refer to [Spanner Change Streams to Sink template](https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/main/v2/spanner-change-streams-to-sink#readme) on how to build and customize this. +Refer to [Spanner Change Streams to Sharded File Sink template](https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/main/v2/spanner-change-streams-to-sharded-file-sink) on how to build and customize this. + +Refer to [GCS to Sourcedb](https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/main/v2/gcs-to-sourcedb) on how to build and customize this. + +### Shard routing customization + +In order to make it easier for users to customize the shard routing logic, the [Spanner Change Streams to Sharded File Sink template](https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/main/v2/spanner-change-streams-to-sharded-file-sink) accepts a GCS path that points to a custom jar and another input parameter that accepts the custom class name, which are used to invoke custom logic to perform shard identification. + +Steps to perfrom customization: +1. Write custom shard id fetcher logic [CustomShardIdFetcher.java](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/spanner-custom-shard/src/main/java/com/custom/CustomShardIdFetcher.java). Details of the ShardIdRequest class can be found [here](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/spanner-migrations-sdk/src/main/java/com/google/cloud/teleport/v2/spanner/utils/ShardIdRequest.java). +2. Build the [JAR](https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/main/v2/spanner-custom-shard) and upload the jar to GCS +3. Invoke the reverse replication flow by passing the [custom jar path and custom class path](RunnigReverseReplication.md#custom-jar). -Refer to [Ordered Changestream Buffer to Sourcedb](https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/main/v2/ordered-changestream-buffer-to-sourcedb#readme) on how to build and customize this. ## Cost 1. Cloud Spanner change stream incur additional storage requirement, refer [here](https://cloud.google.com/spanner/docs/change-streams#data-retention). 2. For Dataflow pricing, refer [here](https://cloud.google.com/dataflow/pricing) -3. For Pub/Sub pricing, refer [here](https://cloud.google.com/pubsub/pricing). +3. For GCS pricing, refer [here](https://cloud.google.com/storage/pricing). ## Contact us diff --git a/docs/reverse-replication/RunnigReverseReplication.md b/docs/reverse-replication/RunnigReverseReplication.md index bd4f6e2bc..e1b12bd3a 100644 --- a/docs/reverse-replication/RunnigReverseReplication.md +++ b/docs/reverse-replication/RunnigReverseReplication.md @@ -9,7 +9,7 @@ nav_order: 2 {: .no_toc } Spanner migration tool currently does not support reverse replication out-of-the-box. -The launcher.go script can be used instead to setup the resources required for a +The reverse_replication_runner.go script can be used instead to setup the resources required for a reverse replication pipeline.
@@ -22,46 +22,62 @@ reverse replication pipeline.
## Resources -The pipeline requires a few GCP resources to be setup. The launcher script creates these resources for you, skipping creation if they already exist. The resources are: +The pipeline requires a few GCP resources to be setup. The runner script creates these resources for you, skipping creation if they already exist. The resources are: - `Change Stream`: The target spanner database should have a changestream setup with value_capture_type = 'NEW_ROW'. This helps stream CDC events from Spanner. -- `Ordering Dataflow Job`: This dataflow job reads from Spanner CDC, orders the data and pushes it to a PubSub topic. -- `PubSub Topic & Subscriptions`: The topic that the ordering job pushes to needs to be created beforehand. For each shard, a subscription needs to be created, with the subscription name as the corresponding logicalShardId. These names are fetched from the source shards file mentioned later. -- `Writer Dataflow Job`: This reads messages from the PubSub subscriptions, translates them to SQL and writes to the source shards. +- `Metadata Database` : This is the metadata database that holds the pipeline related metadata. +- `Reader Dataflow Job`: This dataflow job reads from Spanner CDC, and writes them to GCS. +- `Writer Dataflow Job`: This reads the GCS files, orders the records, translates them to SQL and writes to the source shards. ## Arguments The script takes in multiple arguments to orchestrate the pipeline. They are: -- `projectId`: project id of the Spanner instance. -- `dataflowRegion`: region for Dataflow jobs. -- `jobNamePrefix`: job name prefix for the Dataflow jobs, defaults to `reverse-rep`. Automatically converted to lower case due to Dataflow name constraints. -- `changeStreamName`: change stream name to be used. Defaults to `reverseReplicationStream`. -- `instanceId`: spanner instance id. -- `dbName`: spanner database name. + +- `changeStreamName`: Change stream name to be used. Defaults to `reverseReplicationStream`. +- `dataflowRegion`: Region for Dataflow jobs. +- `dbName`: Spanner database name. +- `filtrationMode`: The flag to decide whether or not to filter the forward migrated data.Defaults to forward_migration. +- `gcsPath`: The GCS directory where the change stream data resides.The GCS directory should be pre-created. +- `instanceId`: Spanner instance id. +- `jobNamePrefix`: Job name prefix for the Dataflow jobs, defaults to `smt-reverse-replication`. Automatically converted to lower case due to Dataflow name constraints. +- `jobsToLaunch`: whether to launch the spanner reader job or the source writer job or both. Default is both. Support values are both,reader,writer. +- `machineType`: dataflow worker machine type, defaults to n2-standard-4. +- `metadataDatabase`: Spanner database name to store changestream metadata, defaults to `rev_repl_metadata`. - `metadataInstance`: Spanner instance name to store changestream metadata. Defaults to target spanner instance id. -- `metadataDatabase`: Spanner database name to store changestream metadata, defaults to `change-stream-metadata`. -- `startTimestamp`: timestamp from which the changestream should start reading changes in RFC 3339 format, defaults to empty string which is equivalent to the current timestamp. -- `pubSubDataTopicId`: pub/sub data topic id. DO NOT INCLUDE the prefix 'projects//topics/'. Defaults to 'reverse-replication'. -- `pubSubEndpoint`: Pub/Sub endpoint, defaults to same endpoint as the Dataflow region. -- `sourceShardsFilePath`: GCS file path for file containing shard info. Details on structure mentioned later. +- `metadataTableSuffix`: The suffix to apply when creating metadata tables.Helpful in case of multiple runs.Default is no suffix. +- `networkTags`: network tags addded to the Dataflow jobs worker and launcher VMs. +- `projectId`: Project id of the Spanner instance. - `sessionFilePath`: GCS file path for session file generated via Spanner migration tool. -- `machineType`: dataflow worker machine type, defaults to n2-standard-4. -- `orderingWorkers`: number of workers for ordering job. Defaults to 5. +- `serviceAccountEmail`: the email address of the service account to run the job as. +- `skipChangeStreamCreation`: whether to skip the change stream creation. Default is false. +- `skipMetadataDatabaseCreation`: whether to skip Metadata database creation.Default is false. +- `sourceDbTimezoneOffset`: the timezone offset with respect to UTC for the source database.Defaults to +00:00. +- `sourceShardsFilePath`: GCS file path for file containing shard info. Details on structure mentioned later. +- `sourceWriterTemplateLocation` : the dataflow template location for the Source writer job. +- `spannerReaderTemplateLocation`: the dataflow template location for the Spanner reader job +- `startTimestamp`: Timestamp from which the changestream should start reading changes in RFC 3339 format, defaults to empty string which is equivalent to the current timestamp. +- `readerShardingCustomClassName`: the fully qualified custom class name for sharding logic. +- `readerShardingCustomJarPath` : the GCS path to custom jar for sharding logic. +- `readerSkipDirectoryName`: Records skipped from reverse replication are written to this directory. Defaults to: skip. +- `readerRunMode`: whether the reader from Spanner job runs in regular or resume mode. Default is regular. +- `readerWorkers`: number of workers for ordering job. Defaults to 5. +- `windowDuration`: The window duration/size in which change stream data will be written to Cloud Storage. Defaults to 10 seconds. +- `writerRunMode`: whether the writer to source job runs in regular,reprocess,resumeFailed,resumeSuccess or resumeAll mode. Default is regular. - `writerWorkers`: number of workers for writer job. Defaults to 5. +- `vpcHostProjectId`: project ID hosting the subnetwork. If unspecified, the 'projectId' parameter value will be used for subnetwork. - `vpcNetwork`: name of the VPC network to be used for the dataflow jobs - `vpcSubnetwork`: name of the VPC subnetwork to be used for the dataflow jobs. Subnet should exist in the same region as the 'dataflowRegion' parameter. -- `vpcHostProjectId`: project ID hosting the subnetwork. If unspecified, the 'projectId' parameter value will be used for subnetwork.. -- `serviceAccountEmail`: the email address of the service account to run the job as. -- `networkTags`: network tags addded to the Dataflow jobs worker and launcher VMs. -- `filtrationMode`: Whether to filter forward migrated data or not. Supported values are forward_migration and none, defaults to 'forward_migration'. + ## Pre-requisites Before running the command, ensure you have the: 1) Target Spanner instance ready 2) Session file already uploaded to GCS 3) Source shards file (more details below) already uploaded to GCS +4) GCS path for buffering the data exists ## Sample sourceShards File This file contains meta data regarding the source MYSQL shards, which is used to connect to them. This should be present even if there is a single source database shard. +The database user password should be kept in [Secret Manager](#https://cloud.google.com/security/products/secret-manager) and it's URI needs to be specified in the file. The file should be a list of JSONs as: ``` [ @@ -69,7 +85,7 @@ The file should be a list of JSONs as: "logicalShardId": "shard1", "host": "10.11.12.13", "user": "root", - "password": "mypwd", + "secretManagerUri":"projects/123/secrets/rev-cmek-cred-shard1/versions/latest", "port": "3306", "dbName": "db1" }, @@ -77,42 +93,140 @@ The file should be a list of JSONs as: "logicalShardId": "shard2", "host": "10.11.12.14", "user": "root", - "password": "mypwd", + "secretManagerUri":"projects/123/secrets/rev-cmek-cred-shard2/versions/latest", "port": "3306", "dbName": "db2" } ] ``` -{: .note } -The logicalShardId is expected to be a string that begins with a letter, is atleast 3 characters long and and contain only the following characters: letters, numbers, dashes (-), periods (.), underscores (_), tildes (~), percents (%) or plus signs (+). Cannot start with goog. - ## Sample Commands Checkout out the reverse replication folder from the root: ``` cd reverse_replication ``` -### Quickstart -Run the launcher command via: -```sh -go run launcher.go -projectId=my-project -dataflowRegion=us-east1 -instanceId=my-instance -dbName=mydb -sourceShardsFilePath=gs://bucket-name/shards.json -sessionFilePath=gs://bucket-name/session.json +### Quickstart reverse replication with all defaults + +``` +go run reverse-replication-runner.go -projectId= -dataflowRegion= -instanceId= -dbName= -sourceShardsFilePath=gs://bucket-name/shards.json -sessionFilePath=gs://bucket-name/session.json -gcsPath=gs://bucket-name/ +``` + +The response looks something like this. +The Dataflow job ids can be captured to trigger manual shutdown. +Note the defaults used. +The gcloud command can be saved to retrigger a specific job if needed. + +``` +Setting up reverse replication pipeline... +metadataInstance not provided, defaulting to target spanner instance id: +changestream not found +Creating changestream +Successfully created changestream +Created metadata db projects//instances//databases/ + +GCLOUD CMD FOR READER JOB: +gcloud dataflow flex-template run smt-reverse-replication-reader-2024-01-05t10-33-56z --project= --region= --template-file-gcs-location=