Skip to content

Commit

Permalink
Merge #119708
Browse files Browse the repository at this point in the history
119708: importer: harden internal retry loop r=yuzefovich a=yuzefovich

**importer: harden internal retry loop**

This commit adjusts the internal retry loop in `importer.ingestWithRetry` to be time based. Previously, we only performed up to 5 retries and then we paused the job. This commit makes it so that we have 2 minutes (controlled by new `bulkio.import.retry_duration` cluster setting) to make at least 1% progress (max backoff has also been raised to 15s). This should make IMPORTs more reliable.

Release note: None

**importer: adjust retry duration for application VCs around draining**

This commit makes it so that we retry "could not register flow because the registry is draining" errors during IMPORT in virtual clusters for longer (by increasing the retry duration by a factor of 30). This is temporary workaround until #100578 is resolved. In particular, for system tenants we use gossip-backed DistSQL physical planning which is aware of draining state of nodes and will avoid such in `distImport`, but in virtual clusters we use the information from `sql_instances` table that currently is draining-unaware - as a result, the IMPORT is more likely to run out of the "retry budget" before the drain completes. This change effectively gives 60 minutes by default for the drain to complete.

Fixes: #119592.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Feb 28, 2024
2 parents 60e4868 + 0ac6058 commit 225cec3
Showing 1 changed file with 50 additions and 9 deletions.
59 changes: 50 additions & 9 deletions pkg/sql/importer/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,19 @@ func (r *importResumer) checkForUDTModification(
return sql.DescsTxn(ctx, execCfg, checkTypesAreEquivalent)
}

var retryDuration = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"bulkio.import.retry_duration",
"duration during which the IMPORT can be retried in face of non-permanent errors",
time.Minute*2,
settings.PositiveDuration,
)

func getFractionCompleted(job *jobs.Job) float64 {
p := job.Progress()
return float64(p.GetFractionCompleted())
}

func ingestWithRetry(
ctx context.Context,
execCtx sql.JobExecContext,
Expand All @@ -1240,24 +1253,27 @@ func ingestWithRetry(
ctx, sp := tracing.ChildSpan(ctx, "importer.ingestWithRetry")
defer sp.Finish()

// We retry on pretty generic failures -- any rpc error. If a worker node were
// to restart, it would produce this kind of error, but there may be other
// errors that are also rpc errors. Don't retry to aggressively.
// Note that we don't specify MaxRetries since we use time-based limit
// in the loop below.
retryOpts := retry.Options{
MaxBackoff: 1 * time.Second,
MaxRetries: 5,
// Don't retry too aggressively.
MaxBackoff: 15 * time.Second,
}

// We want to retry an import if there are transient failures (i.e. worker
// nodes dying), so if we receive a retryable error, re-plan and retry the
// import.
var res kvpb.BulkOpSummary
var err error
// State to decide when to exit the retry loop.
lastProgressChange, lastProgress := timeutil.Now(), getFractionCompleted(job)
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
for {
res, err = distImport(ctx, execCtx, job, tables, typeDescs, from, format, walltime,
testingKnobs, procsPerNode)
// Replanning errors should not count towards retry limits.
res, err = distImport(
ctx, execCtx, job, tables, typeDescs, from, format, walltime, testingKnobs, procsPerNode,
)
// If we got a re-planning error, then do at least one more attempt
// regardless of the retry duration.
if err == nil || !errors.Is(err, sql.ErrPlanChanged) {
break
}
Expand Down Expand Up @@ -1292,8 +1308,33 @@ func ingestWithRetry(
job.ID(), reloadErr)
} else {
job = reloadedJob
// If we made decent progress with the IMPORT, reset the last
// progress state.
curProgress := getFractionCompleted(job)
if madeProgress := curProgress - lastProgress; madeProgress >= 0.01 {
log.Infof(ctx, "import made %d%% progress, resetting retry duration", int(math.Round(100*madeProgress)))
lastProgress = curProgress
lastProgressChange = timeutil.Now()
r.Reset()
}
}

maxRetryDuration := retryDuration.Get(&execCtx.ExecCfg().Settings.SV)
if !execCtx.ExecCfg().Codec.ForSystemTenant() && flowinfra.IsFlowRetryableError(err) {
// If we encountered "could not register flow because the registry
// is draining" error in the application virtual cluster, we
// calibrate the retry duration. This is the case since DistSQL
// physical planning in virtual clusters uses 'sql_instances' table
// which currently doesn't have draining information
// TODO(#100578): remove this when this problem is addressed.
maxRetryDuration *= 30
}
if timeutil.Since(lastProgressChange) > maxRetryDuration {
log.Warningf(ctx, "encountered retryable error but exceeded retry duration, stopping: %+v", err)
break
} else {
log.Warningf(ctx, "encountered retryable error: %+v", err)
}
log.Warningf(ctx, "encountered retryable error: %+v", err)
}

// We have exhausted retries, but we have not seen a "PermanentBulkJobError" so
Expand Down

0 comments on commit 225cec3

Please sign in to comment.