Skip to content

Commit

Permalink
Merge pull request #108577 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.1-108514

release-23.1: awsdms: use custom status checker
  • Loading branch information
Jeremyyang920 authored Aug 11, 2023
2 parents 83e78cb + 89e1160 commit 04b0b7a
Showing 1 changed file with 36 additions and 6 deletions.
42 changes: 36 additions & 6 deletions pkg/cmd/roachtest/tests/awsdms.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,8 @@ func setupDMSEndpointsAndTask(
return err
}
t.L().Printf("waiting for replication task to be ready")
if err := dms.NewReplicationTaskReadyWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName), awsdmsWaitTimeLimit); err != nil {
input := dmsDescribeTasksInput(t.BuildVersion(), task.tableName)
if err = dmsTaskStatusChecker(ctx, dmsCli, input, "ready"); err != nil {
return err
}

Expand Down Expand Up @@ -899,17 +900,46 @@ func setupDMSEndpointsAndTask(
}

t.L().Printf("waiting for replication task to be running")
if err := dms.NewReplicationTaskRunningWaiter(dmsCli).Wait(
ctx,
dmsDescribeTasksInput(t.BuildVersion(), task.tableName),
awsdmsWaitTimeLimit,
); err != nil {
if err = dmsTaskStatusChecker(ctx, dmsCli, input, "running"); err != nil {
return err
}
}
return nil
}

func dmsTaskStatusChecker(
ctx context.Context, dmsCli *dms.Client, input *dms.DescribeReplicationTasksInput, status string,
) error {
closer := make(chan struct{})
r := retry.StartWithCtx(ctx, retry.Options{
InitialBackoff: 10 * time.Second,
MaxBackoff: 30 * time.Second,
Closer: closer,
})
timeout := time.After(awsdmsWaitTimeLimit)
for r.Next() {
select {
case <-timeout:
close(closer)
// Since we only ever have a unique task returned per filter,
// it should be safe to direct index for the task name.
return errors.Newf("exceeded time limit waiting for %s to transition to %s", input.Filters[0].Values[0], status)
default:
dmsTasks, err := dmsCli.DescribeReplicationTasks(ctx, input)
if err != nil {
return err
}
for _, task := range dmsTasks.ReplicationTasks {
// If we match the status we want, close the retry and exit.
if *task.Status == status {
close(closer)
}
}
}
}
return nil
}

func isDMSResourceNotFound(err error) bool {
return errors.HasType(err, &dmstypes.ResourceNotFoundFault{})
}
Expand Down

0 comments on commit 04b0b7a

Please sign in to comment.