From 5e6ce265363538a21d49afb91393f52dc76a75e3 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 27 Feb 2024 14:12:07 -0800 Subject: [PATCH 1/2] importer: harden internal retry loop This commit adjusts the internal retry loop in `importer.ingestWithRetry` to be time based. Previously, we only performed up to 5 retries and then we paused the job. This commit makes it so that we have 2 minutes (controlled by new `bulkio.import.retry_duration` cluster setting) to make at least 1% progress (max backoff has also been raised to 15s). This should make IMPORTs more reliable. Release note: None --- pkg/sql/importer/import_job.go | 50 ++++++++++++++++++++++++++++------ 1 file changed, 41 insertions(+), 9 deletions(-) diff --git a/pkg/sql/importer/import_job.go b/pkg/sql/importer/import_job.go index 7c602eea2ecf..f29d59d3ba9c 100644 --- a/pkg/sql/importer/import_job.go +++ b/pkg/sql/importer/import_job.go @@ -1225,6 +1225,19 @@ func (r *importResumer) checkForUDTModification( return sql.DescsTxn(ctx, execCfg, checkTypesAreEquivalent) } +var retryDuration = settings.RegisterDurationSetting( + settings.ApplicationLevel, + "bulkio.import.retry_duration", + "duration during which the IMPORT can be retried in face of non-permanent errors", + time.Minute*2, + settings.PositiveDuration, +) + +func getFractionCompleted(job *jobs.Job) float64 { + p := job.Progress() + return float64(p.GetFractionCompleted()) +} + func ingestWithRetry( ctx context.Context, execCtx sql.JobExecContext, @@ -1240,12 +1253,11 @@ func ingestWithRetry( ctx, sp := tracing.ChildSpan(ctx, "importer.ingestWithRetry") defer sp.Finish() - // We retry on pretty generic failures -- any rpc error. If a worker node were - // to restart, it would produce this kind of error, but there may be other - // errors that are also rpc errors. Don't retry to aggressively. + // Note that we don't specify MaxRetries since we use time-based limit + // in the loop below. retryOpts := retry.Options{ - MaxBackoff: 1 * time.Second, - MaxRetries: 5, + // Don't retry too aggressively. + MaxBackoff: 15 * time.Second, } // We want to retry an import if there are transient failures (i.e. worker @@ -1253,11 +1265,15 @@ func ingestWithRetry( // import. var res kvpb.BulkOpSummary var err error + // State to decide when to exit the retry loop. + lastProgressChange, lastProgress := timeutil.Now(), getFractionCompleted(job) for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { for { - res, err = distImport(ctx, execCtx, job, tables, typeDescs, from, format, walltime, - testingKnobs, procsPerNode) - // Replanning errors should not count towards retry limits. + res, err = distImport( + ctx, execCtx, job, tables, typeDescs, from, format, walltime, testingKnobs, procsPerNode, + ) + // If we got a re-planning error, then do at least one more attempt + // regardless of the retry duration. if err == nil || !errors.Is(err, sql.ErrPlanChanged) { break } @@ -1292,8 +1308,24 @@ func ingestWithRetry( job.ID(), reloadErr) } else { job = reloadedJob + // If we made decent progress with the IMPORT, reset the last + // progress state. + curProgress := getFractionCompleted(job) + if madeProgress := curProgress - lastProgress; madeProgress >= 0.01 { + log.Infof(ctx, "import made %d%% progress, resetting retry duration", int(math.Round(100*madeProgress))) + lastProgress = curProgress + lastProgressChange = timeutil.Now() + r.Reset() + } + } + + maxRetryDuration := retryDuration.Get(&execCtx.ExecCfg().Settings.SV) + if timeutil.Since(lastProgressChange) > maxRetryDuration { + log.Warningf(ctx, "encountered retryable error but exceeded retry duration, stopping: %+v", err) + break + } else { + log.Warningf(ctx, "encountered retryable error: %+v", err) } - log.Warningf(ctx, "encountered retryable error: %+v", err) } // We have exhausted retries, but we have not seen a "PermanentBulkJobError" so From 0ac60589fefd29322cc5fe4e4eb3e7c8135222fc Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 27 Feb 2024 14:22:17 -0800 Subject: [PATCH 2/2] importer: adjust retry duration for application VCs around draining This commit makes it so that we retry "could not register flow because the registry is draining" errors during IMPORT in virtual clusters for longer (by increasing the retry duration by a factor of 30). This is temporary workaround until 100578 is resolved. In particular, for system tenants we use gossip-backed DistSQL physical planning which is aware of draining state of nodes and will avoid such in `distImport`, but in virtual clusters we use the information from `sql_instances` table that currently is draining-unaware - as a result, the IMPORT is more likely to run out of the "retry budget" before the drain completes. This change effectively gives 60 minutes by default for the drain to complete. Release note: None --- pkg/sql/importer/import_job.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/sql/importer/import_job.go b/pkg/sql/importer/import_job.go index f29d59d3ba9c..64669509c224 100644 --- a/pkg/sql/importer/import_job.go +++ b/pkg/sql/importer/import_job.go @@ -1320,6 +1320,15 @@ func ingestWithRetry( } maxRetryDuration := retryDuration.Get(&execCtx.ExecCfg().Settings.SV) + if !execCtx.ExecCfg().Codec.ForSystemTenant() && flowinfra.IsFlowRetryableError(err) { + // If we encountered "could not register flow because the registry + // is draining" error in the application virtual cluster, we + // calibrate the retry duration. This is the case since DistSQL + // physical planning in virtual clusters uses 'sql_instances' table + // which currently doesn't have draining information + // TODO(#100578): remove this when this problem is addressed. + maxRetryDuration *= 30 + } if timeutil.Since(lastProgressChange) > maxRetryDuration { log.Warningf(ctx, "encountered retryable error but exceeded retry duration, stopping: %+v", err) break