Skip to content

Commit

Permalink
Merge #108514
Browse files Browse the repository at this point in the history
108514: awsdms: use custom status checker r=otan a=Jeremyyang920

This commit switches the dms test to use a custom
status checker instead of the one from the SDK. The way the SDK checks for status changes has a likely chance of a race condition which causes it to return an error when there is really no error.

We now use a custom method to do the status checks which does an explicit status check each time the describe task api is called so we are always checking the latest result compared to the way the sdk was doing it.

Fixes: #108270
Release note: None

Co-authored-by: Jeremy Yang <[email protected]>
  • Loading branch information
craig[bot] and Jeremyyang920 committed Aug 11, 2023
2 parents 4af8a54 + af2dc5a commit 6fa90e9
Showing 1 changed file with 36 additions and 6 deletions.
42 changes: 36 additions & 6 deletions pkg/cmd/roachtest/tests/awsdms.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,8 @@ func setupDMSEndpointsAndTask(
return err
}
t.L().Printf("waiting for replication task to be ready")
if err := dms.NewReplicationTaskReadyWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName), awsdmsWaitTimeLimit); err != nil {
input := dmsDescribeTasksInput(t.BuildVersion(), task.tableName)
if err = dmsTaskStatusChecker(ctx, dmsCli, input, "ready"); err != nil {
return err
}

Expand Down Expand Up @@ -899,17 +900,46 @@ func setupDMSEndpointsAndTask(
}

t.L().Printf("waiting for replication task to be running")
if err := dms.NewReplicationTaskRunningWaiter(dmsCli).Wait(
ctx,
dmsDescribeTasksInput(t.BuildVersion(), task.tableName),
awsdmsWaitTimeLimit,
); err != nil {
if err = dmsTaskStatusChecker(ctx, dmsCli, input, "running"); err != nil {
return err
}
}
return nil
}

func dmsTaskStatusChecker(
ctx context.Context, dmsCli *dms.Client, input *dms.DescribeReplicationTasksInput, status string,
) error {
closer := make(chan struct{})
r := retry.StartWithCtx(ctx, retry.Options{
InitialBackoff: 10 * time.Second,
MaxBackoff: 30 * time.Second,
Closer: closer,
})
timeout := time.After(awsdmsWaitTimeLimit)
for r.Next() {
select {
case <-timeout:
close(closer)
// Since we only ever have a unique task returned per filter,
// it should be safe to direct index for the task name.
return errors.Newf("exceeded time limit waiting for %s to transition to %s", input.Filters[0].Values[0], status)
default:
dmsTasks, err := dmsCli.DescribeReplicationTasks(ctx, input)
if err != nil {
return err
}
for _, task := range dmsTasks.ReplicationTasks {
// If we match the status we want, close the retry and exit.
if *task.Status == status {
close(closer)
}
}
}
}
return nil
}

func isDMSResourceNotFound(err error) bool {
return errors.HasType(err, &dmstypes.ResourceNotFoundFault{})
}
Expand Down

0 comments on commit 6fa90e9

Please sign in to comment.