Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roachtest/awsdms: add no pk full load test case #95518

Merged
merged 1 commit into from
Jan 19, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 151 additions & 80 deletions pkg/cmd/roachtest/tests/awsdms.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package tests

import (
"context"
gosql "database/sql"
"fmt"
"math/rand"
"os"
Expand Down Expand Up @@ -46,6 +47,37 @@ const (
awsdmsNumInitialRows = 100000
)

type dmsTask struct {
tableName string
tableMappings string
replicationSettings *string
}

const tableRules = `{
"rules": [
{
"rule-type": "selection",
"rule-id": "1",
"rule-name": "1",
"object-locator": {
"schema-name": "%%",
"table-name": "%s"
},
"rule-action": "include"
}
]
}`

var dmsTasks = []dmsTask{
{"test_table", tableRules, nil},
{"test_table_no_pk", tableRules, proto.String(`{
"FullLoadSettings":{
"TargetTablePrepMode":"TRUNCATE_BEFORE_LOAD"
}
}`),
},
}

func awsdmsVerString(v *version.Version) string {
if ciBranch := os.Getenv("TC_BUILD_BRANCH"); ciBranch != "" {
ciBranch = strings.ReplaceAll(ciBranch, ".", "-")
Expand All @@ -66,8 +98,8 @@ func awsdmsRoachtestDMSParameterGroup(v *version.Version) string {
return "roachtest-awsdms-param-group-" + awsdmsVerString(v)
}

func awsdmsRoachtestDMSTaskName(v *version.Version) string {
return "roachtest-awsdms-dms-task-" + awsdmsVerString(v)
func awsdmsRoachtestDMSTaskName(v *version.Version, tableName string) string {
return fmt.Sprintf("roachtest-awsdms-dms-task-%s-%s", strings.ReplaceAll(tableName, "_", "-"), awsdmsVerString(v))
}

func awsdmsRoachtestDMSReplicationInstanceName(v *version.Version) string {
Expand Down Expand Up @@ -107,12 +139,14 @@ func dmsDescribeInstancesInput(v *version.Version) *dms.DescribeReplicationInsta
}
}

func dmsDescribeTasksInput(v *version.Version) *dms.DescribeReplicationTasksInput {
func dmsDescribeTasksInput(
v *version.Version, tableName string,
) *dms.DescribeReplicationTasksInput {
return &dms.DescribeReplicationTasksInput{
Filters: []dmstypes.Filter{
{
Name: proto.String("replication-task-id"),
Values: []string{awsdmsRoachtestDMSTaskName(v)},
Values: []string{awsdmsRoachtestDMSTaskName(v, tableName)},
},
},
}
Expand Down Expand Up @@ -175,6 +209,14 @@ func runAWSDMS(ctx context.Context, t test.Test, c cluster.Cluster) {
}
targetPGConn := c.Conn(ctx, t.L(), 1)

checkDMSReplicated(ctx, t, sourcePGConn, targetPGConn)
checkDMSNoPKTableError(ctx, t, dmsCli)
t.L().Printf("testing complete")
}

func checkDMSReplicated(
ctx context.Context, t test.Test, sourcePGConn *pgx.Conn, targetPGConn *gosql.DB,
) {
waitForReplicationRetryOpts := retry.Options{
MaxBackoff: time.Second,
MaxRetries: 90,
Expand Down Expand Up @@ -267,8 +309,40 @@ func runAWSDMS(ctx context.Context, t test.Test, c cluster.Cluster) {
}(); err != nil {
t.Fatal(err)
}
}

t.L().Printf("testing complete")
func checkDMSNoPKTableError(ctx context.Context, t test.Test, dmsCli *dms.Client) {
waitForTableError := retry.Options{
MaxBackoff: time.Second,
MaxRetries: 90,
}
t.L().Printf("testing no pk table has a table error")
if err := func() error {
for r := retry.StartWithCtx(ctx, waitForTableError); r.Next(); {
err := func() error {
dmsTasks, err := dmsCli.DescribeReplicationTasks(ctx, dmsDescribeTasksInput(t.BuildVersion(), "test_table_no_pk"))
if err != nil {
if !isDMSResourceNotFound(err) {
return err
}
}
for _, task := range dmsTasks.ReplicationTasks {
if task.ReplicationTaskStats.TablesErrored == 1 {
t.L().Printf("table error was found")
return nil
}
}
return errors.New("no table error found yet")
}()
if err == nil {
return nil
}
t.L().Printf("table error not found, retrying: %+v", err)
}
return errors.Newf("failed to find table error")
}(); err != nil {
t.Fatal(err)
}
}

// setupAWSDMS sets up an RDS instance and a DMS instance which sets up a
Expand Down Expand Up @@ -452,6 +526,11 @@ func setupRDSCluster(
`INSERT INTO test_table(id, t) SELECT i, md5(random()::text) FROM generate_series(1, %d) AS t(i)`,
awsdmsNumInitialRows,
),
`CREATE TABLE test_table_no_pk(id integer, t TEXT)`,
fmt.Sprintf(
`INSERT INTO test_table_no_pk(id, t) SELECT i, md5(random()::text) FROM generate_series(1, %d) AS t(i)`,
awsdmsNumInitialRows,
),
} {
if _, err := pgConn.Exec(
ctx,
Expand Down Expand Up @@ -579,56 +658,46 @@ func setupDMSEndpointsAndTask(
}
}

t.L().Printf("creating replication task")
replTaskOut, err := dmsCli.CreateReplicationTask(
ctx,
&dms.CreateReplicationTaskInput{
MigrationType: dmstypes.MigrationTypeValueFullLoadAndCdc,
ReplicationInstanceArn: proto.String(replicationARN),
ReplicationTaskIdentifier: proto.String(awsdmsRoachtestDMSTaskName(t.BuildVersion())),
SourceEndpointArn: proto.String(sourceARN),
TargetEndpointArn: proto.String(targetARN),
// TODO(#migrations): when AWS API supports EnableValidation, add it here.
TableMappings: proto.String(`{
"rules": [
{
"rule-type": "selection",
"rule-id": "1",
"rule-name": "1",
"object-locator": {
"schema-name": "%",
"table-name": "%"
},
"rule-action": "include"
}
]
}`),
},
)
if err != nil {
return err
}
t.L().Printf("waiting for replication task to be ready")
if err := dms.NewReplicationTaskReadyWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion()), awsdmsWaitTimeLimit); err != nil {
return err
}
t.L().Printf("starting replication task")
if _, err := dmsCli.StartReplicationTask(
ctx,
&dms.StartReplicationTaskInput{
ReplicationTaskArn: replTaskOut.ReplicationTask.ReplicationTaskArn,
StartReplicationTaskType: dmstypes.StartReplicationTaskTypeValueReloadTarget,
},
); err != nil {
return err
}
t.L().Printf("waiting for replication task to be running")
if err := dms.NewReplicationTaskRunningWaiter(dmsCli).Wait(
ctx,
dmsDescribeTasksInput(t.BuildVersion()),
awsdmsWaitTimeLimit,
); err != nil {
return err
for _, task := range dmsTasks {
t.L().Printf(fmt.Sprintf("creating replication task for %s", task.tableName))
replTaskOut, err := dmsCli.CreateReplicationTask(
ctx,
&dms.CreateReplicationTaskInput{
MigrationType: dmstypes.MigrationTypeValueFullLoadAndCdc,
ReplicationInstanceArn: proto.String(replicationARN),
ReplicationTaskIdentifier: proto.String(awsdmsRoachtestDMSTaskName(t.BuildVersion(), task.tableName)),
SourceEndpointArn: proto.String(sourceARN),
TargetEndpointArn: proto.String(targetARN),
// TODO(#migrations): when AWS API supports EnableValidation, add it here.
TableMappings: proto.String(fmt.Sprintf(task.tableMappings, task.tableName)),
ReplicationTaskSettings: task.replicationSettings,
},
)
if err != nil {
return err
}
t.L().Printf("waiting for replication task to be ready")
if err := dms.NewReplicationTaskReadyWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName), awsdmsWaitTimeLimit); err != nil {
return err
}
t.L().Printf("starting replication task")
if _, err := dmsCli.StartReplicationTask(
ctx,
&dms.StartReplicationTaskInput{
ReplicationTaskArn: replTaskOut.ReplicationTask.ReplicationTaskArn,
StartReplicationTaskType: dmstypes.StartReplicationTaskTypeValueReloadTarget,
},
); err != nil {
return err
}
t.L().Printf("waiting for replication task to be running")
if err := dms.NewReplicationTaskRunningWaiter(dmsCli).Wait(
ctx,
dmsDescribeTasksInput(t.BuildVersion(), task.tableName),
awsdmsWaitTimeLimit,
); err != nil {
return err
}
}
return nil
}
Expand Down Expand Up @@ -662,38 +731,40 @@ func tearDownAWSDMS(
// tearDownDMSTasks tears down the DMS task, endpoints and replication instance
// that may have been created.
func tearDownDMSTasks(ctx context.Context, t test.Test, dmsCli *dms.Client) error {
dmsTasks, err := dmsCli.DescribeReplicationTasks(ctx, dmsDescribeTasksInput(t.BuildVersion()))
if err != nil {
if !isDMSResourceNotFound(err) {
return err
}
} else {
wasRunning := false
for _, task := range dmsTasks.ReplicationTasks {
if *task.Status == "running" {
t.L().Printf("stopping DMS task %s (arn: %s)", *task.ReplicationTaskIdentifier, *task.ReplicationTaskArn)
if _, err := dmsCli.StopReplicationTask(ctx, &dms.StopReplicationTaskInput{ReplicationTaskArn: task.ReplicationTaskArn}); err != nil {
for _, task := range dmsTasks {
dmsTasks, err := dmsCli.DescribeReplicationTasks(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName))
if err != nil {
if !isDMSResourceNotFound(err) {
return err
}
} else {
wasRunning := false
for _, task := range dmsTasks.ReplicationTasks {
if *task.Status == "running" {
t.L().Printf("stopping DMS task %s (arn: %s)", *task.ReplicationTaskIdentifier, *task.ReplicationTaskArn)
if _, err := dmsCli.StopReplicationTask(ctx, &dms.StopReplicationTaskInput{ReplicationTaskArn: task.ReplicationTaskArn}); err != nil {
return err
}
wasRunning = true
}
}
if wasRunning {
t.L().Printf("waiting for task to be stopped")
if err := dms.NewReplicationTaskStoppedWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName), awsdmsWaitTimeLimit); err != nil {
return err
}
wasRunning = true
}
}
if wasRunning {
t.L().Printf("waiting for task to be stopped")
if err := dms.NewReplicationTaskStoppedWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion()), awsdmsWaitTimeLimit); err != nil {
return err
for _, task := range dmsTasks.ReplicationTasks {
t.L().Printf("deleting DMS task %s (arn: %s)", *task.ReplicationTaskIdentifier, *task.ReplicationTaskArn)
if _, err := dmsCli.DeleteReplicationTask(ctx, &dms.DeleteReplicationTaskInput{ReplicationTaskArn: task.ReplicationTaskArn}); err != nil {
return err
}
}
}
for _, task := range dmsTasks.ReplicationTasks {
t.L().Printf("deleting DMS task %s (arn: %s)", *task.ReplicationTaskIdentifier, *task.ReplicationTaskArn)
if _, err := dmsCli.DeleteReplicationTask(ctx, &dms.DeleteReplicationTaskInput{ReplicationTaskArn: task.ReplicationTaskArn}); err != nil {
t.L().Printf("waiting for task to be deleted")
if err := dms.NewReplicationTaskDeletedWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName), awsdmsWaitTimeLimit); err != nil {
return err
}
}
t.L().Printf("waiting for task to be deleted")
if err := dms.NewReplicationTaskDeletedWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion()), awsdmsWaitTimeLimit); err != nil {
return err
}
}
return nil
}
Expand Down