diff --git a/pkg/cmd/roachtest/tests/awsdms.go b/pkg/cmd/roachtest/tests/awsdms.go index 98551b1d0c9e..c334e54b5878 100644 --- a/pkg/cmd/roachtest/tests/awsdms.go +++ b/pkg/cmd/roachtest/tests/awsdms.go @@ -871,7 +871,8 @@ func setupDMSEndpointsAndTask( return err } t.L().Printf("waiting for replication task to be ready") - if err := dms.NewReplicationTaskReadyWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName), awsdmsWaitTimeLimit); err != nil { + input := dmsDescribeTasksInput(t.BuildVersion(), task.tableName) + if err = dmsTaskStatusChecker(ctx, dmsCli, input, "ready"); err != nil { return err } @@ -899,17 +900,46 @@ func setupDMSEndpointsAndTask( } t.L().Printf("waiting for replication task to be running") - if err := dms.NewReplicationTaskRunningWaiter(dmsCli).Wait( - ctx, - dmsDescribeTasksInput(t.BuildVersion(), task.tableName), - awsdmsWaitTimeLimit, - ); err != nil { + if err = dmsTaskStatusChecker(ctx, dmsCli, input, "running"); err != nil { return err } } return nil } +func dmsTaskStatusChecker( + ctx context.Context, dmsCli *dms.Client, input *dms.DescribeReplicationTasksInput, status string, +) error { + closer := make(chan struct{}) + r := retry.StartWithCtx(ctx, retry.Options{ + InitialBackoff: 10 * time.Second, + MaxBackoff: 30 * time.Second, + Closer: closer, + }) + timeout := time.After(awsdmsWaitTimeLimit) + for r.Next() { + select { + case <-timeout: + close(closer) + // Since we only ever have a unique task returned per filter, + // it should be safe to direct index for the task name. + return errors.Newf("exceeded time limit waiting for %s to transition to %s", input.Filters[0].Values[0], status) + default: + dmsTasks, err := dmsCli.DescribeReplicationTasks(ctx, input) + if err != nil { + return err + } + for _, task := range dmsTasks.ReplicationTasks { + // If we match the status we want, close the retry and exit. + if *task.Status == status { + close(closer) + } + } + } + } + return nil +} + func isDMSResourceNotFound(err error) bool { return errors.HasType(err, &dmstypes.ResourceNotFoundFault{}) }