From af2dc5ac8400f344169fff4906f1fec9a210094f Mon Sep 17 00:00:00 2001 From: Jeremy Yang Date: Thu, 10 Aug 2023 07:13:05 -0700 Subject: [PATCH] awsdms: use custom status checker This commit switches the dms test to use a custom status checker instead of the one from the SDK. The way the SDK checks for status changes has a likely chance of a race condition which causes it to return an error when there is really no error. We now use a custom method to do the status checks which does an explicit status check each time the describe task api is called so we are always checking the latest result compared to the way the sdk was doing it. Fixes: #108270 Release note: None --- pkg/cmd/roachtest/tests/awsdms.go | 42 ++++++++++++++++++++++++++----- 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/pkg/cmd/roachtest/tests/awsdms.go b/pkg/cmd/roachtest/tests/awsdms.go index 046ac12cf1ef..0e7b085ef90a 100644 --- a/pkg/cmd/roachtest/tests/awsdms.go +++ b/pkg/cmd/roachtest/tests/awsdms.go @@ -871,7 +871,8 @@ func setupDMSEndpointsAndTask( return err } t.L().Printf("waiting for replication task to be ready") - if err := dms.NewReplicationTaskReadyWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName), awsdmsWaitTimeLimit); err != nil { + input := dmsDescribeTasksInput(t.BuildVersion(), task.tableName) + if err = dmsTaskStatusChecker(ctx, dmsCli, input, "ready"); err != nil { return err } @@ -899,17 +900,46 @@ func setupDMSEndpointsAndTask( } t.L().Printf("waiting for replication task to be running") - if err := dms.NewReplicationTaskRunningWaiter(dmsCli).Wait( - ctx, - dmsDescribeTasksInput(t.BuildVersion(), task.tableName), - awsdmsWaitTimeLimit, - ); err != nil { + if err = dmsTaskStatusChecker(ctx, dmsCli, input, "running"); err != nil { return err } } return nil } +func dmsTaskStatusChecker( + ctx context.Context, dmsCli *dms.Client, input *dms.DescribeReplicationTasksInput, status string, +) error { + closer := make(chan struct{}) + r := retry.StartWithCtx(ctx, retry.Options{ + InitialBackoff: 10 * time.Second, + MaxBackoff: 30 * time.Second, + Closer: closer, + }) + timeout := time.After(awsdmsWaitTimeLimit) + for r.Next() { + select { + case <-timeout: + close(closer) + // Since we only ever have a unique task returned per filter, + // it should be safe to direct index for the task name. + return errors.Newf("exceeded time limit waiting for %s to transition to %s", input.Filters[0].Values[0], status) + default: + dmsTasks, err := dmsCli.DescribeReplicationTasks(ctx, input) + if err != nil { + return err + } + for _, task := range dmsTasks.ReplicationTasks { + // If we match the status we want, close the retry and exit. + if *task.Status == status { + close(closer) + } + } + } + } + return nil +} + func isDMSResourceNotFound(err error) bool { return errors.HasType(err, &dmstypes.ResourceNotFoundFault{}) }