Skip to content

Commit

Permalink
Display equivalent gcloud cli command for Dataflow for forward migrat…
Browse files Browse the repository at this point in the history
…ions (#641)
  • Loading branch information
Deep1998 authored Sep 27, 2023
1 parent 0d51c31 commit 287bec4
Show file tree
Hide file tree
Showing 14 changed files with 337 additions and 13 deletions.
111 changes: 111 additions & 0 deletions common/utils/dataflow_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package utils contains common helper functions used across multiple other packages.
// Utils should not import any Spanner migration tool packages.
package utils

import (
"encoding/json"
"fmt"
"sort"
"strings"

"cloud.google.com/go/dataflow/apiv1beta3/dataflowpb"
"golang.org/x/exp/maps"
)

// Generate the equivalent gCloud CLI command to launch a dataflow job with the same parameters and environment flags
// as the input body.
func GetGcloudDataflowCommand(req *dataflowpb.LaunchFlexTemplateRequest) string {
lp := req.LaunchParameter
templatePath := lp.Template.(*dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath).ContainerSpecGcsPath
cmd := fmt.Sprintf("gcloud dataflow flex-template run %s --project=%s --region=%s --template-file-gcs-location=%s %s %s",
lp.JobName, req.ProjectId, req.Location, templatePath, getEnvironmentFlags(lp.Environment), getParametersFlag(lp.Parameters))
return strings.Trim(cmd, " ")
}

// Generate the equivalent parameter flag string, returning empty string if none are specified.
func getParametersFlag(parameters map[string]string) string {
if len(parameters) == 0 {
return ""
}
params := ""
keys := maps.Keys(parameters)
sort.Strings(keys)
for _, k := range keys {
params = params + k + "=" + parameters[k] + ","
}
params = strings.TrimSuffix(params, ",")
return fmt.Sprintf("--parameters %s", params)
}

// We don't populate all flags in the API because certain flags (like AutoscalingAlgorithm, DumpHeapOnOom etc.)
// are not supported in gCloud.
func getEnvironmentFlags(environment *dataflowpb.FlexTemplateRuntimeEnvironment) string {
flag := ""
if environment.NumWorkers != 0 {
flag += fmt.Sprintf("--num-workers %d ", environment.NumWorkers)
}
if environment.MaxWorkers != 0 {
flag += fmt.Sprintf("--max-workers %d ", environment.MaxWorkers)
}
if environment.ServiceAccountEmail != "" {
flag += fmt.Sprintf("--service-account-email %s ", environment.ServiceAccountEmail)
}
if environment.TempLocation != "" {
flag += fmt.Sprintf("--temp-location %s ", environment.TempLocation)
}
if environment.MachineType != "" {
flag += fmt.Sprintf("--worker-machine-type %s ", environment.MachineType)
}
if environment.AdditionalExperiments != nil && len(environment.AdditionalExperiments) > 0 {
flag += fmt.Sprintf("--additional-experiments %s ", strings.Join(environment.AdditionalExperiments, ","))
}
if environment.Network != "" {
flag += fmt.Sprintf("--network %s ", environment.Network)
}
if environment.Subnetwork != "" {
flag += fmt.Sprintf("--subnetwork %s ", environment.Subnetwork)
}
if environment.AdditionalUserLabels != nil && len(environment.AdditionalUserLabels) > 0 {
jsonByteStr, err := json.Marshal(environment.AdditionalUserLabels)
// If error is not nil, omit this flag and move on. We don't need error handling here.
if err == nil {
flag += fmt.Sprintf("--additional-user-labels %s ", string(jsonByteStr))
}
}
if environment.KmsKeyName != "" {
flag += fmt.Sprintf("--dataflow-kms-key %s ", environment.KmsKeyName)
}
if environment.IpConfiguration == dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PRIVATE {
flag += "--disable-public-ips "
}
if environment.WorkerRegion != "" {
flag += fmt.Sprintf("--worker-region %s ", environment.WorkerRegion)
}
if environment.WorkerZone != "" {
flag += fmt.Sprintf("--worker-zone %s ", environment.WorkerZone)
}
if environment.EnableStreamingEngine {
flag += "--enable-streaming-engine "
}
if environment.FlexrsGoal != dataflowpb.FlexResourceSchedulingGoal_FLEXRS_UNSPECIFIED {
flag += fmt.Sprintf("--flexrs-goal %s ", environment.FlexrsGoal)
}
if environment.StagingLocation != "" {
flag += fmt.Sprintf("--staging-location %s ", environment.StagingLocation)
}
return strings.Trim(flag, " ")
}
2 changes: 1 addition & 1 deletion conversion/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func dataFromDatabaseForDMSMigration() (*writer.BatchWriter, error) {
func dataFromDatabaseForDataflowMigration(targetProfile profiles.TargetProfile, ctx context.Context, sourceProfile profiles.SourceProfile, conv *internal.Conv) (*writer.BatchWriter, error) {
updateShardsWithDataflowConfig(sourceProfile.Config.ShardConfigurationDataflow)
conv.Audit.StreamingStats.ShardToDataStreamNameMap = make(map[string]string)
conv.Audit.StreamingStats.ShardToDataflowJobMap = make(map[string]string)
conv.Audit.StreamingStats.ShardToDataflowInfoMap = make(map[string]internal.ShardedDataflowJobResources)
tableList, err := common.GetIncludedSrcTablesFromConv(conv)
if err != nil {
fmt.Printf("unable to determine tableList from schema, falling back to full database")
Expand Down
11 changes: 9 additions & 2 deletions internal/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ type Audit struct {
SkipMetricsPopulation bool `json:"-"` // Flag to identify if outgoing metrics metadata needs to skipped
}

// Stores information related to resources.
type ShardedDataflowJobResources struct {
JobId string `json:"JobId"`
GcloudCmd string `json:"GcloudCmd"`
}

// Stores information related to the streaming migration process.
type streamingStats struct {
Streaming bool // Flag for confirmation of streaming migration.
Expand All @@ -195,8 +201,9 @@ type streamingStats struct {
SampleBadWrites []string // Records that faced errors while writing to Cloud Spanner.
DataStreamName string
DataflowJobId string
DataflowGcloudCmd string
ShardToDataStreamNameMap map[string]string
ShardToDataflowJobMap map[string]string
ShardToDataflowInfoMap map[string]ShardedDataflowJobResources
}

// Stores information related to rules during schema conversion
Expand Down Expand Up @@ -426,7 +433,7 @@ func addMissingPrimaryKeyWarning(tableId string, colId string, conv *Conv) {
columnLevelIssues = tableIssues.ColumnLevelIssues
} else {
columnLevelIssues = make(map[string][]SchemaIssue)
}
}
issues := columnLevelIssues[colId]
issues = append(issues, MissingPrimaryKey)
columnLevelIssues[colId] = issues
Expand Down
13 changes: 9 additions & 4 deletions streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/logger"
"github.com/GoogleCloudPlatform/spanner-migration-tool/profiles"
)

Expand Down Expand Up @@ -369,7 +370,8 @@ func CleanUpStreamingJobs(ctx context.Context, conv *internal.Conv, projectID, r
CleanupDatastream(ctx, dsClient, conv.Audit.StreamingStats.DataStreamName, projectID, region)
}
// clean up jobs for sharded migrations (with error handling)
for _, dfId := range conv.Audit.StreamingStats.ShardToDataflowJobMap {
for _, resourceDetails := range conv.Audit.StreamingStats.ShardToDataflowInfoMap {
dfId := resourceDetails.JobId
err := CleanupDataflowJob(ctx, c, dfId, projectID, region)
if err != nil {
fmt.Printf("Cleanup of the dataflow job: %s was unsuccessful, please clean up the job manually", dfId)
Expand Down Expand Up @@ -527,18 +529,21 @@ func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile
fmt.Printf("flexTemplateRequest: %+v\n", req)
return fmt.Errorf("unable to launch template: %v", err)
}
storeGeneratedResources(conv, datastreamCfg, respDf, project, streamingCfg.DataShardId)
gcloudDfCmd := utils.GetGcloudDataflowCommand(req)
logger.Log.Debug(fmt.Sprintf("\nEquivalent gCloud command for job %s:\n%s\n\n", req.LaunchParameter.JobName, gcloudDfCmd))
storeGeneratedResources(conv, datastreamCfg, respDf, gcloudDfCmd, project, streamingCfg.DataShardId)
return nil
}

func storeGeneratedResources(conv *internal.Conv, datastreamCfg DatastreamCfg, respDf *dataflowpb.LaunchFlexTemplateResponse, project string, dataShardId string) {
func storeGeneratedResources(conv *internal.Conv, datastreamCfg DatastreamCfg, respDf *dataflowpb.LaunchFlexTemplateResponse, gcloudDataflowCmd string, project string, dataShardId string) {
conv.Audit.StreamingStats.DataStreamName = datastreamCfg.StreamId
conv.Audit.StreamingStats.DataflowJobId = respDf.Job.Id
conv.Audit.StreamingStats.DataflowGcloudCmd = gcloudDataflowCmd
if dataShardId != "" {
var resourceMutex sync.Mutex
resourceMutex.Lock()
conv.Audit.StreamingStats.ShardToDataStreamNameMap[dataShardId] = datastreamCfg.StreamId
conv.Audit.StreamingStats.ShardToDataflowJobMap[dataShardId] = respDf.Job.Id
conv.Audit.StreamingStats.ShardToDataflowInfoMap[dataShardId] = internal.ShardedDataflowJobResources{JobId: respDf.Job.Id, GcloudCmd: gcloudDataflowCmd}
resourceMutex.Unlock()
}
fullStreamName := fmt.Sprintf("projects/%s/locations/%s/streams/%s", project, datastreamCfg.StreamLocation, datastreamCfg.StreamId)
Expand Down
117 changes: 117 additions & 0 deletions testing/common/utils/dataflow_utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// TODO: Refactor this file and other integration tests by moving all common code
// to remove redundancy.

package utils_test

import (
"os"
"testing"

"cloud.google.com/go/dataflow/apiv1beta3/dataflowpb"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils"
"github.com/stretchr/testify/assert"
)

func TestMain(m *testing.M) {
res := m.Run()
os.Exit(res)
}

func getTemplateDfRequest() *dataflowpb.LaunchFlexTemplateRequest {
launchParameters := &dataflowpb.LaunchFlexTemplateParameter{
JobName: "test-job",
Template: &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{ContainerSpecGcsPath: "gs://template/Cloud_Datastream_to_Spanner"},
Parameters: map[string]string{
"inputFilePattern": "gs://inputFilePattern",
"streamName": "my-stream",
"instanceId": "my-instance",
"databaseId": "my-dbName",
"sessionFilePath": "gs://session.json",
"deadLetterQueueDirectory": "gs://dlq",
"transformationContextFilePath": "gs://transformationContext.json",
"directoryWatchDurationInMinutes": "480", // Setting directory watch timeout to 8 hours
},
Environment: &dataflowpb.FlexTemplateRuntimeEnvironment{
MaxWorkers: 50,
NumWorkers: 10,
ServiceAccountEmail: "[email protected]",
TempLocation: "gs://temp-location",
MachineType: "n2-standard-16",
AdditionalExperiments: []string{"use_runner_V2", "test-experiment"},
Network: "my-network",
Subnetwork: "my-subnetwork",
AdditionalUserLabels: map[string]string{"name": "wrench", "count": "3"},
KmsKeyName: "sample-kms-key",
IpConfiguration: dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PRIVATE,
WorkerRegion: "test-worker-region",
WorkerZone: "test-worker-zone",
EnableStreamingEngine: true,
FlexrsGoal: 1,
StagingLocation: "gs://staging-location",
},
}
req := &dataflowpb.LaunchFlexTemplateRequest{
ProjectId: "test-project",
LaunchParameter: launchParameters,
Location: "us-central1",
}
return req
}

func TestGcloudCmdWithAllParams(t *testing.T) {

req := getTemplateDfRequest()
expectedCmd := "gcloud dataflow flex-template run test-job " +
"--project=test-project --region=us-central1 " +
"--template-file-gcs-location=gs://template/Cloud_Datastream_to_Spanner " +
"--num-workers 10 --max-workers 50 --service-account-email [email protected] " +
"--temp-location gs://temp-location --worker-machine-type n2-standard-16 " +
"--additional-experiments use_runner_V2,test-experiment --network my-network " +
"--subnetwork my-subnetwork --additional-user-labels {\"count\":\"3\",\"name\":\"wrench\"} " +
"--dataflow-kms-key sample-kms-key --disable-public-ips --worker-region test-worker-region " +
"--worker-zone test-worker-zone --enable-streaming-engine " +
"--flexrs-goal FLEXRS_SPEED_OPTIMIZED --staging-location gs://staging-location " +
"--parameters databaseId=my-dbName,deadLetterQueueDirectory=gs://dlq," +
"directoryWatchDurationInMinutes=480,inputFilePattern=gs://inputFilePattern," +
"instanceId=my-instance,sessionFilePath=gs://session.json,streamName=my-stream," +
"transformationContextFilePath=gs://transformationContext.json"
assert.Equal(t, expectedCmd, utils.GetGcloudDataflowCommand(req))
}

func TestGcloudCmdWithPartialParams(t *testing.T) {

req := getTemplateDfRequest()
req.LaunchParameter.Parameters = make(map[string]string)
req.LaunchParameter.Environment.FlexrsGoal = 0
req.LaunchParameter.Environment.IpConfiguration = 0
req.LaunchParameter.Environment.EnableStreamingEngine = false
req.LaunchParameter.Environment.AdditionalExperiments = []string{}
req.LaunchParameter.Environment.AdditionalUserLabels = make(map[string]string)
req.LaunchParameter.Environment.WorkerRegion = ""
req.LaunchParameter.Environment.NumWorkers = 0

expectedCmd := "gcloud dataflow flex-template run test-job " +
"--project=test-project --region=us-central1 " +
"--template-file-gcs-location=gs://template/Cloud_Datastream_to_Spanner " +
"--max-workers 50 --service-account-email [email protected] " +
"--temp-location gs://temp-location --worker-machine-type n2-standard-16 " +
"--network my-network --subnetwork my-subnetwork " +
"--dataflow-kms-key sample-kms-key " +
"--worker-zone test-worker-zone " +
"--staging-location gs://staging-location"
assert.Equal(t, expectedCmd, utils.GetGcloudDataflowCommand(req))
}
4 changes: 3 additions & 1 deletion ui/src/app/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ import { ShardedBulkSourceDetailsFormComponent } from './components/sharded-bulk
import { ShardedDataflowMigrationDetailsFormComponent } from './components/sharded-dataflow-migration-details-form/sharded-dataflow-migration-details-form.component';
import { BulkDropRestoreTableDialogComponent } from './components/bulk-drop-restore-table-dialog/bulk-drop-restore-table-dialog.component'
import { AddNewColumnComponent } from './components/add-new-column/add-new-column.component';
import { AddShardIdPrimaryKeyComponent } from './components/add-shard-id-primary-key/add-shard-id-primary-key.component'
import { AddShardIdPrimaryKeyComponent } from './components/add-shard-id-primary-key/add-shard-id-primary-key.component';
import { EquivalentGcloudCommandComponent } from './components/equivalent-gcloud-command/equivalent-gcloud-command.component';

@NgModule({
declarations: [
Expand Down Expand Up @@ -92,6 +93,7 @@ import { AddShardIdPrimaryKeyComponent } from './components/add-shard-id-primary
BulkDropRestoreTableDialogComponent,
AddNewColumnComponent,
AddShardIdPrimaryKeyComponent,
EquivalentGcloudCommandComponent,
],
imports: [
BrowserModule,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<div mat-dialog-content>
<h2>Equivalent gcloud command line<mat-icon class="configure"
matTooltip='This is the gcloud command to launch the dataflow job with the same parameters. Can be used to re-run a dataflow job manually in case of failure.'>
info</mat-icon></h2>

<span class="left-text">{{ gcloudCmd }}</span>

<div mat-dialog-actions class="buttons-container">
<button mat-button color="primary" mat-dialog-close>Close</button>
<button mat-button color="primary" [cdkCopyToClipboard]="gcloudCmd">Copy</button>
</div>
</div>
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { ComponentFixture, TestBed } from '@angular/core/testing';

import { EquivalentGcloudCommandComponent } from './equivalent-gcloud-command.component';

describe('EquivalentGcloudCommandComponent', () => {
let component: EquivalentGcloudCommandComponent;
let fixture: ComponentFixture<EquivalentGcloudCommandComponent>;

beforeEach(async () => {
await TestBed.configureTestingModule({
declarations: [ EquivalentGcloudCommandComponent ]
})
.compileComponents();
});

beforeEach(() => {
fixture = TestBed.createComponent(EquivalentGcloudCommandComponent);
component = fixture.componentInstance;
fixture.detectChanges();
});

it('should create', () => {
expect(component).toBeTruthy();
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { Component, Inject, OnInit } from '@angular/core';
import { MatDialogRef, MAT_DIALOG_DATA } from '@angular/material/dialog';

@Component({
selector: 'app-equivalent-gcloud-command',
templateUrl: './equivalent-gcloud-command.component.html',
styleUrls: ['./equivalent-gcloud-command.component.scss']
})
export class EquivalentGcloudCommandComponent implements OnInit {
gcloudCmd: string

constructor(
@Inject(MAT_DIALOG_DATA) public data: string,
private dialofRef: MatDialogRef<EquivalentGcloudCommandComponent>
) {
this.gcloudCmd = data
}

ngOnInit(): void {
}
}
Loading

0 comments on commit 287bec4

Please sign in to comment.