Skip to content

Commit

Permalink
awsdms: use custom status checker
Browse files Browse the repository at this point in the history
This commit switches the dms test to use a custom
status checker instead of the one from the SDK. The
way the SDK checks for status changes has a likely
chance of a race condition which causes it to return an
error when there is really no error.

We now use a custom method to do the status checks which
does an explicit status check each time the describe task
api is called so we are always checking the latest result
compared to the way the sdk was doing it.

Fixes: #108270
Release note: None
  • Loading branch information
Jeremyyang920 committed Aug 10, 2023
1 parent 2c650af commit af2dc5a
Showing 1 changed file with 36 additions and 6 deletions.
42 changes: 36 additions & 6 deletions pkg/cmd/roachtest/tests/awsdms.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,8 @@ func setupDMSEndpointsAndTask(
return err
}
t.L().Printf("waiting for replication task to be ready")
if err := dms.NewReplicationTaskReadyWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName), awsdmsWaitTimeLimit); err != nil {
input := dmsDescribeTasksInput(t.BuildVersion(), task.tableName)
if err = dmsTaskStatusChecker(ctx, dmsCli, input, "ready"); err != nil {
return err
}

Expand Down Expand Up @@ -899,17 +900,46 @@ func setupDMSEndpointsAndTask(
}

t.L().Printf("waiting for replication task to be running")
if err := dms.NewReplicationTaskRunningWaiter(dmsCli).Wait(
ctx,
dmsDescribeTasksInput(t.BuildVersion(), task.tableName),
awsdmsWaitTimeLimit,
); err != nil {
if err = dmsTaskStatusChecker(ctx, dmsCli, input, "running"); err != nil {
return err
}
}
return nil
}

func dmsTaskStatusChecker(
ctx context.Context, dmsCli *dms.Client, input *dms.DescribeReplicationTasksInput, status string,
) error {
closer := make(chan struct{})
r := retry.StartWithCtx(ctx, retry.Options{
InitialBackoff: 10 * time.Second,
MaxBackoff: 30 * time.Second,
Closer: closer,
})
timeout := time.After(awsdmsWaitTimeLimit)
for r.Next() {
select {
case <-timeout:
close(closer)
// Since we only ever have a unique task returned per filter,
// it should be safe to direct index for the task name.
return errors.Newf("exceeded time limit waiting for %s to transition to %s", input.Filters[0].Values[0], status)
default:
dmsTasks, err := dmsCli.DescribeReplicationTasks(ctx, input)
if err != nil {
return err
}
for _, task := range dmsTasks.ReplicationTasks {
// If we match the status we want, close the retry and exit.
if *task.Status == status {
close(closer)
}
}
}
}
return nil
}

func isDMSResourceNotFound(err error) bool {
return errors.HasType(err, &dmstypes.ResourceNotFoundFault{})
}
Expand Down

0 comments on commit af2dc5a

Please sign in to comment.