Skip to content

Commit

Permalink
roachtest/awsdms: add no pk full load test case
Browse files Browse the repository at this point in the history
This commit adds a new test case that will attempt a
full load on a table with no primary key. We will
assert that there is a table error with the full
load.

Fixes: #95328

Release note: None
  • Loading branch information
Jeremyyang920 committed Jan 19, 2023
1 parent c036635 commit 1d59169
Showing 1 changed file with 151 additions and 80 deletions.
231 changes: 151 additions & 80 deletions pkg/cmd/roachtest/tests/awsdms.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package tests

import (
"context"
gosql "database/sql"
"fmt"
"math/rand"
"os"
Expand Down Expand Up @@ -46,6 +47,37 @@ const (
awsdmsNumInitialRows = 100000
)

type dmsTask struct {
tableName string
tableMappings string
replicationSettings *string
}

const tableRules = `{
"rules": [
{
"rule-type": "selection",
"rule-id": "1",
"rule-name": "1",
"object-locator": {
"schema-name": "%%",
"table-name": "%s"
},
"rule-action": "include"
}
]
}`

var dmsTasks = []dmsTask{
{"test_table", tableRules, nil},
{"test_table_no_pk", tableRules, proto.String(`{
"FullLoadSettings":{
"TargetTablePrepMode":"TRUNCATE_BEFORE_LOAD"
}
}`),
},
}

func awsdmsVerString(v *version.Version) string {
if ciBranch := os.Getenv("TC_BUILD_BRANCH"); ciBranch != "" {
ciBranch = strings.ReplaceAll(ciBranch, ".", "-")
Expand All @@ -66,8 +98,8 @@ func awsdmsRoachtestDMSParameterGroup(v *version.Version) string {
return "roachtest-awsdms-param-group-" + awsdmsVerString(v)
}

func awsdmsRoachtestDMSTaskName(v *version.Version) string {
return "roachtest-awsdms-dms-task-" + awsdmsVerString(v)
func awsdmsRoachtestDMSTaskName(v *version.Version, tableName string) string {
return fmt.Sprintf("roachtest-awsdms-dms-task-%s-%s", strings.ReplaceAll(tableName, "_", "-"), awsdmsVerString(v))
}

func awsdmsRoachtestDMSReplicationInstanceName(v *version.Version) string {
Expand Down Expand Up @@ -107,12 +139,14 @@ func dmsDescribeInstancesInput(v *version.Version) *dms.DescribeReplicationInsta
}
}

func dmsDescribeTasksInput(v *version.Version) *dms.DescribeReplicationTasksInput {
func dmsDescribeTasksInput(
v *version.Version, tableName string,
) *dms.DescribeReplicationTasksInput {
return &dms.DescribeReplicationTasksInput{
Filters: []dmstypes.Filter{
{
Name: proto.String("replication-task-id"),
Values: []string{awsdmsRoachtestDMSTaskName(v)},
Values: []string{awsdmsRoachtestDMSTaskName(v, tableName)},
},
},
}
Expand Down Expand Up @@ -175,6 +209,14 @@ func runAWSDMS(ctx context.Context, t test.Test, c cluster.Cluster) {
}
targetPGConn := c.Conn(ctx, t.L(), 1)

checkDMSReplicated(ctx, t, sourcePGConn, targetPGConn)
checkDMSNoPKTableError(ctx, t, dmsCli)
t.L().Printf("testing complete")
}

func checkDMSReplicated(
ctx context.Context, t test.Test, sourcePGConn *pgx.Conn, targetPGConn *gosql.DB,
) {
waitForReplicationRetryOpts := retry.Options{
MaxBackoff: time.Second,
MaxRetries: 90,
Expand Down Expand Up @@ -267,8 +309,40 @@ func runAWSDMS(ctx context.Context, t test.Test, c cluster.Cluster) {
}(); err != nil {
t.Fatal(err)
}
}

t.L().Printf("testing complete")
func checkDMSNoPKTableError(ctx context.Context, t test.Test, dmsCli *dms.Client) {
waitForTableError := retry.Options{
MaxBackoff: time.Second,
MaxRetries: 90,
}
t.L().Printf("testing no pk table has a table error")
if err := func() error {
for r := retry.StartWithCtx(ctx, waitForTableError); r.Next(); {
err := func() error {
dmsTasks, err := dmsCli.DescribeReplicationTasks(ctx, dmsDescribeTasksInput(t.BuildVersion(), "test_table_no_pk"))
if err != nil {
if !isDMSResourceNotFound(err) {
return err
}
}
for _, task := range dmsTasks.ReplicationTasks {
if task.ReplicationTaskStats.TablesErrored == 1 {
t.L().Printf("table error was found")
return nil
}
}
return errors.New("no table error found yet")
}()
if err == nil {
return nil
}
t.L().Printf("table error not found, retrying: %+v", err)
}
return errors.Newf("failed to find table error")
}(); err != nil {
t.Fatal(err)
}
}

// setupAWSDMS sets up an RDS instance and a DMS instance which sets up a
Expand Down Expand Up @@ -452,6 +526,11 @@ func setupRDSCluster(
`INSERT INTO test_table(id, t) SELECT i, md5(random()::text) FROM generate_series(1, %d) AS t(i)`,
awsdmsNumInitialRows,
),
`CREATE TABLE test_table_no_pk(id integer, t TEXT)`,
fmt.Sprintf(
`INSERT INTO test_table_no_pk(id, t) SELECT i, md5(random()::text) FROM generate_series(1, %d) AS t(i)`,
awsdmsNumInitialRows,
),
} {
if _, err := pgConn.Exec(
ctx,
Expand Down Expand Up @@ -579,56 +658,46 @@ func setupDMSEndpointsAndTask(
}
}

t.L().Printf("creating replication task")
replTaskOut, err := dmsCli.CreateReplicationTask(
ctx,
&dms.CreateReplicationTaskInput{
MigrationType: dmstypes.MigrationTypeValueFullLoadAndCdc,
ReplicationInstanceArn: proto.String(replicationARN),
ReplicationTaskIdentifier: proto.String(awsdmsRoachtestDMSTaskName(t.BuildVersion())),
SourceEndpointArn: proto.String(sourceARN),
TargetEndpointArn: proto.String(targetARN),
// TODO(#migrations): when AWS API supports EnableValidation, add it here.
TableMappings: proto.String(`{
"rules": [
{
"rule-type": "selection",
"rule-id": "1",
"rule-name": "1",
"object-locator": {
"schema-name": "%",
"table-name": "%"
},
"rule-action": "include"
}
]
}`),
},
)
if err != nil {
return err
}
t.L().Printf("waiting for replication task to be ready")
if err := dms.NewReplicationTaskReadyWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion()), awsdmsWaitTimeLimit); err != nil {
return err
}
t.L().Printf("starting replication task")
if _, err := dmsCli.StartReplicationTask(
ctx,
&dms.StartReplicationTaskInput{
ReplicationTaskArn: replTaskOut.ReplicationTask.ReplicationTaskArn,
StartReplicationTaskType: dmstypes.StartReplicationTaskTypeValueReloadTarget,
},
); err != nil {
return err
}
t.L().Printf("waiting for replication task to be running")
if err := dms.NewReplicationTaskRunningWaiter(dmsCli).Wait(
ctx,
dmsDescribeTasksInput(t.BuildVersion()),
awsdmsWaitTimeLimit,
); err != nil {
return err
for _, task := range dmsTasks {
t.L().Printf(fmt.Sprintf("creating replication task for %s", task.tableName))
replTaskOut, err := dmsCli.CreateReplicationTask(
ctx,
&dms.CreateReplicationTaskInput{
MigrationType: dmstypes.MigrationTypeValueFullLoadAndCdc,
ReplicationInstanceArn: proto.String(replicationARN),
ReplicationTaskIdentifier: proto.String(awsdmsRoachtestDMSTaskName(t.BuildVersion(), task.tableName)),
SourceEndpointArn: proto.String(sourceARN),
TargetEndpointArn: proto.String(targetARN),
// TODO(#migrations): when AWS API supports EnableValidation, add it here.
TableMappings: proto.String(fmt.Sprintf(task.tableMappings, task.tableName)),
ReplicationTaskSettings: task.replicationSettings,
},
)
if err != nil {
return err
}
t.L().Printf("waiting for replication task to be ready")
if err := dms.NewReplicationTaskReadyWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName), awsdmsWaitTimeLimit); err != nil {
return err
}
t.L().Printf("starting replication task")
if _, err := dmsCli.StartReplicationTask(
ctx,
&dms.StartReplicationTaskInput{
ReplicationTaskArn: replTaskOut.ReplicationTask.ReplicationTaskArn,
StartReplicationTaskType: dmstypes.StartReplicationTaskTypeValueReloadTarget,
},
); err != nil {
return err
}
t.L().Printf("waiting for replication task to be running")
if err := dms.NewReplicationTaskRunningWaiter(dmsCli).Wait(
ctx,
dmsDescribeTasksInput(t.BuildVersion(), task.tableName),
awsdmsWaitTimeLimit,
); err != nil {
return err
}
}
return nil
}
Expand Down Expand Up @@ -662,38 +731,40 @@ func tearDownAWSDMS(
// tearDownDMSTasks tears down the DMS task, endpoints and replication instance
// that may have been created.
func tearDownDMSTasks(ctx context.Context, t test.Test, dmsCli *dms.Client) error {
dmsTasks, err := dmsCli.DescribeReplicationTasks(ctx, dmsDescribeTasksInput(t.BuildVersion()))
if err != nil {
if !isDMSResourceNotFound(err) {
return err
}
} else {
wasRunning := false
for _, task := range dmsTasks.ReplicationTasks {
if *task.Status == "running" {
t.L().Printf("stopping DMS task %s (arn: %s)", *task.ReplicationTaskIdentifier, *task.ReplicationTaskArn)
if _, err := dmsCli.StopReplicationTask(ctx, &dms.StopReplicationTaskInput{ReplicationTaskArn: task.ReplicationTaskArn}); err != nil {
for _, task := range dmsTasks {
dmsTasks, err := dmsCli.DescribeReplicationTasks(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName))
if err != nil {
if !isDMSResourceNotFound(err) {
return err
}
} else {
wasRunning := false
for _, task := range dmsTasks.ReplicationTasks {
if *task.Status == "running" {
t.L().Printf("stopping DMS task %s (arn: %s)", *task.ReplicationTaskIdentifier, *task.ReplicationTaskArn)
if _, err := dmsCli.StopReplicationTask(ctx, &dms.StopReplicationTaskInput{ReplicationTaskArn: task.ReplicationTaskArn}); err != nil {
return err
}
wasRunning = true
}
}
if wasRunning {
t.L().Printf("waiting for task to be stopped")
if err := dms.NewReplicationTaskStoppedWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName), awsdmsWaitTimeLimit); err != nil {
return err
}
wasRunning = true
}
}
if wasRunning {
t.L().Printf("waiting for task to be stopped")
if err := dms.NewReplicationTaskStoppedWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion()), awsdmsWaitTimeLimit); err != nil {
return err
for _, task := range dmsTasks.ReplicationTasks {
t.L().Printf("deleting DMS task %s (arn: %s)", *task.ReplicationTaskIdentifier, *task.ReplicationTaskArn)
if _, err := dmsCli.DeleteReplicationTask(ctx, &dms.DeleteReplicationTaskInput{ReplicationTaskArn: task.ReplicationTaskArn}); err != nil {
return err
}
}
}
for _, task := range dmsTasks.ReplicationTasks {
t.L().Printf("deleting DMS task %s (arn: %s)", *task.ReplicationTaskIdentifier, *task.ReplicationTaskArn)
if _, err := dmsCli.DeleteReplicationTask(ctx, &dms.DeleteReplicationTaskInput{ReplicationTaskArn: task.ReplicationTaskArn}); err != nil {
t.L().Printf("waiting for task to be deleted")
if err := dms.NewReplicationTaskDeletedWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName), awsdmsWaitTimeLimit); err != nil {
return err
}
}
t.L().Printf("waiting for task to be deleted")
if err := dms.NewReplicationTaskDeletedWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion()), awsdmsWaitTimeLimit); err != nil {
return err
}
}
return nil
}
Expand Down

0 comments on commit 1d59169

Please sign in to comment.