From 1d591695404d3cb389b5976e7d15b9ca354e77d6 Mon Sep 17 00:00:00 2001 From: Jeremy Yang Date: Thu, 19 Jan 2023 09:31:13 -0800 Subject: [PATCH] roachtest/awsdms: add no pk full load test case This commit adds a new test case that will attempt a full load on a table with no primary key. We will assert that there is a table error with the full load. Fixes: #95328 Release note: None --- pkg/cmd/roachtest/tests/awsdms.go | 231 +++++++++++++++++++----------- 1 file changed, 151 insertions(+), 80 deletions(-) diff --git a/pkg/cmd/roachtest/tests/awsdms.go b/pkg/cmd/roachtest/tests/awsdms.go index 6eeca678206c..6d526e5a0c8a 100644 --- a/pkg/cmd/roachtest/tests/awsdms.go +++ b/pkg/cmd/roachtest/tests/awsdms.go @@ -12,6 +12,7 @@ package tests import ( "context" + gosql "database/sql" "fmt" "math/rand" "os" @@ -46,6 +47,37 @@ const ( awsdmsNumInitialRows = 100000 ) +type dmsTask struct { + tableName string + tableMappings string + replicationSettings *string +} + +const tableRules = `{ + "rules": [ + { + "rule-type": "selection", + "rule-id": "1", + "rule-name": "1", + "object-locator": { + "schema-name": "%%", + "table-name": "%s" + }, + "rule-action": "include" + } + ] +}` + +var dmsTasks = []dmsTask{ + {"test_table", tableRules, nil}, + {"test_table_no_pk", tableRules, proto.String(`{ + "FullLoadSettings":{ + "TargetTablePrepMode":"TRUNCATE_BEFORE_LOAD" + } + }`), + }, +} + func awsdmsVerString(v *version.Version) string { if ciBranch := os.Getenv("TC_BUILD_BRANCH"); ciBranch != "" { ciBranch = strings.ReplaceAll(ciBranch, ".", "-") @@ -66,8 +98,8 @@ func awsdmsRoachtestDMSParameterGroup(v *version.Version) string { return "roachtest-awsdms-param-group-" + awsdmsVerString(v) } -func awsdmsRoachtestDMSTaskName(v *version.Version) string { - return "roachtest-awsdms-dms-task-" + awsdmsVerString(v) +func awsdmsRoachtestDMSTaskName(v *version.Version, tableName string) string { + return fmt.Sprintf("roachtest-awsdms-dms-task-%s-%s", strings.ReplaceAll(tableName, "_", "-"), awsdmsVerString(v)) } func awsdmsRoachtestDMSReplicationInstanceName(v *version.Version) string { @@ -107,12 +139,14 @@ func dmsDescribeInstancesInput(v *version.Version) *dms.DescribeReplicationInsta } } -func dmsDescribeTasksInput(v *version.Version) *dms.DescribeReplicationTasksInput { +func dmsDescribeTasksInput( + v *version.Version, tableName string, +) *dms.DescribeReplicationTasksInput { return &dms.DescribeReplicationTasksInput{ Filters: []dmstypes.Filter{ { Name: proto.String("replication-task-id"), - Values: []string{awsdmsRoachtestDMSTaskName(v)}, + Values: []string{awsdmsRoachtestDMSTaskName(v, tableName)}, }, }, } @@ -175,6 +209,14 @@ func runAWSDMS(ctx context.Context, t test.Test, c cluster.Cluster) { } targetPGConn := c.Conn(ctx, t.L(), 1) + checkDMSReplicated(ctx, t, sourcePGConn, targetPGConn) + checkDMSNoPKTableError(ctx, t, dmsCli) + t.L().Printf("testing complete") +} + +func checkDMSReplicated( + ctx context.Context, t test.Test, sourcePGConn *pgx.Conn, targetPGConn *gosql.DB, +) { waitForReplicationRetryOpts := retry.Options{ MaxBackoff: time.Second, MaxRetries: 90, @@ -267,8 +309,40 @@ func runAWSDMS(ctx context.Context, t test.Test, c cluster.Cluster) { }(); err != nil { t.Fatal(err) } +} - t.L().Printf("testing complete") +func checkDMSNoPKTableError(ctx context.Context, t test.Test, dmsCli *dms.Client) { + waitForTableError := retry.Options{ + MaxBackoff: time.Second, + MaxRetries: 90, + } + t.L().Printf("testing no pk table has a table error") + if err := func() error { + for r := retry.StartWithCtx(ctx, waitForTableError); r.Next(); { + err := func() error { + dmsTasks, err := dmsCli.DescribeReplicationTasks(ctx, dmsDescribeTasksInput(t.BuildVersion(), "test_table_no_pk")) + if err != nil { + if !isDMSResourceNotFound(err) { + return err + } + } + for _, task := range dmsTasks.ReplicationTasks { + if task.ReplicationTaskStats.TablesErrored == 1 { + t.L().Printf("table error was found") + return nil + } + } + return errors.New("no table error found yet") + }() + if err == nil { + return nil + } + t.L().Printf("table error not found, retrying: %+v", err) + } + return errors.Newf("failed to find table error") + }(); err != nil { + t.Fatal(err) + } } // setupAWSDMS sets up an RDS instance and a DMS instance which sets up a @@ -452,6 +526,11 @@ func setupRDSCluster( `INSERT INTO test_table(id, t) SELECT i, md5(random()::text) FROM generate_series(1, %d) AS t(i)`, awsdmsNumInitialRows, ), + `CREATE TABLE test_table_no_pk(id integer, t TEXT)`, + fmt.Sprintf( + `INSERT INTO test_table_no_pk(id, t) SELECT i, md5(random()::text) FROM generate_series(1, %d) AS t(i)`, + awsdmsNumInitialRows, + ), } { if _, err := pgConn.Exec( ctx, @@ -579,56 +658,46 @@ func setupDMSEndpointsAndTask( } } - t.L().Printf("creating replication task") - replTaskOut, err := dmsCli.CreateReplicationTask( - ctx, - &dms.CreateReplicationTaskInput{ - MigrationType: dmstypes.MigrationTypeValueFullLoadAndCdc, - ReplicationInstanceArn: proto.String(replicationARN), - ReplicationTaskIdentifier: proto.String(awsdmsRoachtestDMSTaskName(t.BuildVersion())), - SourceEndpointArn: proto.String(sourceARN), - TargetEndpointArn: proto.String(targetARN), - // TODO(#migrations): when AWS API supports EnableValidation, add it here. - TableMappings: proto.String(`{ - "rules": [ - { - "rule-type": "selection", - "rule-id": "1", - "rule-name": "1", - "object-locator": { - "schema-name": "%", - "table-name": "%" - }, - "rule-action": "include" - } - ] -}`), - }, - ) - if err != nil { - return err - } - t.L().Printf("waiting for replication task to be ready") - if err := dms.NewReplicationTaskReadyWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion()), awsdmsWaitTimeLimit); err != nil { - return err - } - t.L().Printf("starting replication task") - if _, err := dmsCli.StartReplicationTask( - ctx, - &dms.StartReplicationTaskInput{ - ReplicationTaskArn: replTaskOut.ReplicationTask.ReplicationTaskArn, - StartReplicationTaskType: dmstypes.StartReplicationTaskTypeValueReloadTarget, - }, - ); err != nil { - return err - } - t.L().Printf("waiting for replication task to be running") - if err := dms.NewReplicationTaskRunningWaiter(dmsCli).Wait( - ctx, - dmsDescribeTasksInput(t.BuildVersion()), - awsdmsWaitTimeLimit, - ); err != nil { - return err + for _, task := range dmsTasks { + t.L().Printf(fmt.Sprintf("creating replication task for %s", task.tableName)) + replTaskOut, err := dmsCli.CreateReplicationTask( + ctx, + &dms.CreateReplicationTaskInput{ + MigrationType: dmstypes.MigrationTypeValueFullLoadAndCdc, + ReplicationInstanceArn: proto.String(replicationARN), + ReplicationTaskIdentifier: proto.String(awsdmsRoachtestDMSTaskName(t.BuildVersion(), task.tableName)), + SourceEndpointArn: proto.String(sourceARN), + TargetEndpointArn: proto.String(targetARN), + // TODO(#migrations): when AWS API supports EnableValidation, add it here. + TableMappings: proto.String(fmt.Sprintf(task.tableMappings, task.tableName)), + ReplicationTaskSettings: task.replicationSettings, + }, + ) + if err != nil { + return err + } + t.L().Printf("waiting for replication task to be ready") + if err := dms.NewReplicationTaskReadyWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName), awsdmsWaitTimeLimit); err != nil { + return err + } + t.L().Printf("starting replication task") + if _, err := dmsCli.StartReplicationTask( + ctx, + &dms.StartReplicationTaskInput{ + ReplicationTaskArn: replTaskOut.ReplicationTask.ReplicationTaskArn, + StartReplicationTaskType: dmstypes.StartReplicationTaskTypeValueReloadTarget, + }, + ); err != nil { + return err + } + t.L().Printf("waiting for replication task to be running") + if err := dms.NewReplicationTaskRunningWaiter(dmsCli).Wait( + ctx, + dmsDescribeTasksInput(t.BuildVersion(), task.tableName), + awsdmsWaitTimeLimit, + ); err != nil { + return err + } } return nil } @@ -662,38 +731,40 @@ func tearDownAWSDMS( // tearDownDMSTasks tears down the DMS task, endpoints and replication instance // that may have been created. func tearDownDMSTasks(ctx context.Context, t test.Test, dmsCli *dms.Client) error { - dmsTasks, err := dmsCli.DescribeReplicationTasks(ctx, dmsDescribeTasksInput(t.BuildVersion())) - if err != nil { - if !isDMSResourceNotFound(err) { - return err - } - } else { - wasRunning := false - for _, task := range dmsTasks.ReplicationTasks { - if *task.Status == "running" { - t.L().Printf("stopping DMS task %s (arn: %s)", *task.ReplicationTaskIdentifier, *task.ReplicationTaskArn) - if _, err := dmsCli.StopReplicationTask(ctx, &dms.StopReplicationTaskInput{ReplicationTaskArn: task.ReplicationTaskArn}); err != nil { + for _, task := range dmsTasks { + dmsTasks, err := dmsCli.DescribeReplicationTasks(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName)) + if err != nil { + if !isDMSResourceNotFound(err) { + return err + } + } else { + wasRunning := false + for _, task := range dmsTasks.ReplicationTasks { + if *task.Status == "running" { + t.L().Printf("stopping DMS task %s (arn: %s)", *task.ReplicationTaskIdentifier, *task.ReplicationTaskArn) + if _, err := dmsCli.StopReplicationTask(ctx, &dms.StopReplicationTaskInput{ReplicationTaskArn: task.ReplicationTaskArn}); err != nil { + return err + } + wasRunning = true + } + } + if wasRunning { + t.L().Printf("waiting for task to be stopped") + if err := dms.NewReplicationTaskStoppedWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName), awsdmsWaitTimeLimit); err != nil { return err } - wasRunning = true } - } - if wasRunning { - t.L().Printf("waiting for task to be stopped") - if err := dms.NewReplicationTaskStoppedWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion()), awsdmsWaitTimeLimit); err != nil { - return err + for _, task := range dmsTasks.ReplicationTasks { + t.L().Printf("deleting DMS task %s (arn: %s)", *task.ReplicationTaskIdentifier, *task.ReplicationTaskArn) + if _, err := dmsCli.DeleteReplicationTask(ctx, &dms.DeleteReplicationTaskInput{ReplicationTaskArn: task.ReplicationTaskArn}); err != nil { + return err + } } - } - for _, task := range dmsTasks.ReplicationTasks { - t.L().Printf("deleting DMS task %s (arn: %s)", *task.ReplicationTaskIdentifier, *task.ReplicationTaskArn) - if _, err := dmsCli.DeleteReplicationTask(ctx, &dms.DeleteReplicationTaskInput{ReplicationTaskArn: task.ReplicationTaskArn}); err != nil { + t.L().Printf("waiting for task to be deleted") + if err := dms.NewReplicationTaskDeletedWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName), awsdmsWaitTimeLimit); err != nil { return err } } - t.L().Printf("waiting for task to be deleted") - if err := dms.NewReplicationTaskDeletedWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion()), awsdmsWaitTimeLimit); err != nil { - return err - } } return nil }