From 89e116039607fa0c045a3a97a6eb5ff656f3ea14 Mon Sep 17 00:00:00 2001 From: Jeremy Yang Date: Thu, 10 Aug 2023 14:13:05 +0000 Subject: [PATCH] awsdms: use custom status checker This commit switches the dms test to use a custom status checker instead of the one from the SDK. The way the SDK checks for status changes has a likely chance of a race condition which causes it to return an error when there is really no error. We now use a custom method to do the status checks which does an explicit status check each time the describe task api is called so we are always checking the latest result compared to the way the sdk was doing it. Fixes: #108270 Release note: None --- pkg/cmd/roachtest/tests/awsdms.go | 42 ++++++++++++++++++++++++++----- 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/pkg/cmd/roachtest/tests/awsdms.go b/pkg/cmd/roachtest/tests/awsdms.go index 98551b1d0c9e..c334e54b5878 100644 --- a/pkg/cmd/roachtest/tests/awsdms.go +++ b/pkg/cmd/roachtest/tests/awsdms.go @@ -871,7 +871,8 @@ func setupDMSEndpointsAndTask( return err } t.L().Printf("waiting for replication task to be ready") - if err := dms.NewReplicationTaskReadyWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName), awsdmsWaitTimeLimit); err != nil { + input := dmsDescribeTasksInput(t.BuildVersion(), task.tableName) + if err = dmsTaskStatusChecker(ctx, dmsCli, input, "ready"); err != nil { return err } @@ -899,17 +900,46 @@ func setupDMSEndpointsAndTask( } t.L().Printf("waiting for replication task to be running") - if err := dms.NewReplicationTaskRunningWaiter(dmsCli).Wait( - ctx, - dmsDescribeTasksInput(t.BuildVersion(), task.tableName), - awsdmsWaitTimeLimit, - ); err != nil { + if err = dmsTaskStatusChecker(ctx, dmsCli, input, "running"); err != nil { return err } } return nil } +func dmsTaskStatusChecker( + ctx context.Context, dmsCli *dms.Client, input *dms.DescribeReplicationTasksInput, status string, +) error { + closer := make(chan struct{}) + r := retry.StartWithCtx(ctx, retry.Options{ + InitialBackoff: 10 * time.Second, + MaxBackoff: 30 * time.Second, + Closer: closer, + }) + timeout := time.After(awsdmsWaitTimeLimit) + for r.Next() { + select { + case <-timeout: + close(closer) + // Since we only ever have a unique task returned per filter, + // it should be safe to direct index for the task name. + return errors.Newf("exceeded time limit waiting for %s to transition to %s", input.Filters[0].Values[0], status) + default: + dmsTasks, err := dmsCli.DescribeReplicationTasks(ctx, input) + if err != nil { + return err + } + for _, task := range dmsTasks.ReplicationTasks { + // If we match the status we want, close the retry and exit. + if *task.Status == status { + close(closer) + } + } + } + } + return nil +} + func isDMSResourceNotFound(err error) bool { return errors.HasType(err, &dmstypes.ResourceNotFoundFault{}) }