From 2dcd7fd08ee16fe3390d76aed9732e4f0f1e38a3 Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Mon, 22 Feb 2021 15:50:40 -0500 Subject: [PATCH 1/6] changefeed: unexport some helper method Release note: None --- pkg/ccl/changefeedccl/changefeed_processors.go | 6 +++--- pkg/ccl/changefeedccl/changefeed_stmt.go | 6 +++--- pkg/ccl/changefeedccl/changefeed_test.go | 4 ++-- pkg/ccl/changefeedccl/encoder.go | 2 +- pkg/ccl/changefeedccl/errors.go | 14 +++++++------- pkg/ccl/changefeedccl/rowfetcher_cache.go | 4 ++-- pkg/ccl/changefeedccl/sink.go | 8 ++++---- 7 files changed, 22 insertions(+), 22 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 8fe293c2df94..327d07dcc56a 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -208,7 +208,7 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.flowCtx.Cfg.Settings, timestampOracle, ca.flowCtx.Cfg.ExternalStorageFromURI, ca.spec.User(), ) if err != nil { - err = MarkRetryableError(err) + err = markRetryableError(err) // Early abort in the case that there is an error creating the sink. ca.MoveToDraining(err) ca.cancel() @@ -924,7 +924,7 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.flowCtx.Cfg.Settings, nilOracle, cf.flowCtx.Cfg.ExternalStorageFromURI, cf.spec.User(), ) if err != nil { - err = MarkRetryableError(err) + err = markRetryableError(err) cf.MoveToDraining(err) return } @@ -1050,7 +1050,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad if cf.EvalCtx.Settings.Version.IsActive( cf.Ctx, clusterversion.ChangefeedsSupportPrimaryIndexChanges, ) { - err = MarkRetryableError(err) + err = markRetryableError(err) } else { err = errors.Wrap(err, "primary key change occurred") } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index db6e91ee4e74..6e9d88d39a2e 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -288,7 +288,7 @@ func changefeedPlanHook( if err != nil { telemetry.Count(`changefeed.core.error`) } - return MaybeStripRetryableErrorMarker(err) + return maybeStripRetryableErrorMarker(err) } settings := p.ExecCfg().Settings @@ -318,7 +318,7 @@ func changefeedPlanHook( settings, nilOracle, p.ExecCfg().DistSQLSrv.ExternalStorageFromURI, p.User(), ) if err != nil { - return MaybeStripRetryableErrorMarker(err) + return maybeStripRetryableErrorMarker(err) } if err := canarySink.Close(); err != nil { return err @@ -574,7 +574,7 @@ func (b *changefeedResumer) Resume(ctx context.Context, execCtx interface{}) err if err = distChangefeedFlow(ctx, jobExec, jobID, details, progress, startedCh); err == nil { return nil } - if !IsRetryableError(err) { + if !isRetryableError(err) { if ctx.Err() != nil { return ctx.Err() } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 59cb002c3096..b0fe1de8431d 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -1722,7 +1722,7 @@ func TestChangefeedRetryableError(t *testing.T) { failSinkHook := func() error { switch atomic.LoadInt64(&failSink) { case 1: - return MarkRetryableError(fmt.Errorf("synthetic retryable error")) + return markRetryableError(fmt.Errorf("synthetic retryable error")) case 2: return fmt.Errorf("synthetic terminal error") } @@ -3000,7 +3000,7 @@ func TestChangefeedRestartDuringBackfill(t *testing.T) { t.Fatal(err) } // Make extra sure that the zombie changefeed can't write any more data. - beforeEmitRowCh <- MarkRetryableError(errors.New(`nope don't write it`)) + beforeEmitRowCh <- markRetryableError(errors.New(`nope don't write it`)) // Insert some data that we should only see out of the changefeed after it // re-runs the backfill. diff --git a/pkg/ccl/changefeedccl/encoder.go b/pkg/ccl/changefeedccl/encoder.go index 2e94b6f81142..2445d97b1790 100644 --- a/pkg/ccl/changefeedccl/encoder.go +++ b/pkg/ccl/changefeedccl/encoder.go @@ -547,7 +547,7 @@ func (e *confluentAvroEncoder) register( return nil }); err != nil { log.Warningf(ctx, "%+v", err) - return 0, MarkRetryableError(err) + return 0, markRetryableError(err) } return id, nil diff --git a/pkg/ccl/changefeedccl/errors.go b/pkg/ccl/changefeedccl/errors.go index bacde502f257..5094d24f8971 100644 --- a/pkg/ccl/changefeedccl/errors.go +++ b/pkg/ccl/changefeedccl/errors.go @@ -22,9 +22,9 @@ type retryableError struct { wrapped error } -// MarkRetryableError wraps the given error, marking it as retryable to +// markRetryableError wraps the given error, marking it as retryable to // changefeeds. -func MarkRetryableError(e error) error { +func markRetryableError(e error) error { return &retryableError{wrapped: e} } @@ -40,9 +40,9 @@ func (e *retryableError) Cause() error { return e.wrapped } // planned to be moved to the stdlib in go 1.13. func (e *retryableError) Unwrap() error { return e.wrapped } -// IsRetryableError returns true if the supplied error, or any of its parent -// causes, is a IsRetryableError. -func IsRetryableError(err error) bool { +// isRetryableError returns true if the supplied error, or any of its parent +// causes, is a isRetryableError. +func isRetryableError(err error) bool { if err == nil { return false } @@ -69,10 +69,10 @@ func IsRetryableError(err error) bool { return false } -// MaybeStripRetryableErrorMarker performs some minimal attempt to clean the +// maybeStripRetryableErrorMarker performs some minimal attempt to clean the // RetryableError marker out. This won't do anything if the RetryableError // itself has been wrapped, but that's okay, we'll just have an uglier string. -func MaybeStripRetryableErrorMarker(err error) error { +func maybeStripRetryableErrorMarker(err error) error { // The following is a hack to work around the error cast linter. // What we're doing here is really not kosher; this function // has no business in assuming that the retryableError{} wrapper diff --git a/pkg/ccl/changefeedccl/rowfetcher_cache.go b/pkg/ccl/changefeedccl/rowfetcher_cache.go index bf074613aae6..d155a2d49875 100644 --- a/pkg/ccl/changefeedccl/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/rowfetcher_cache.go @@ -86,7 +86,7 @@ func (c *rowFetcherCache) TableDescForKey( if err != nil { // Manager can return all kinds of errors during chaos, but based on // its usage, none of them should ever be terminal. - return nil, MarkRetryableError(err) + return nil, markRetryableError(err) } // Immediately release the lease, since we only need it for the exact // timestamp requested. @@ -112,7 +112,7 @@ func (c *rowFetcherCache) TableDescForKey( }); err != nil { // Manager can return all kinds of errors during chaos, but based on // its usage, none of them should ever be terminal. - return nil, MarkRetryableError(err) + return nil, markRetryableError(err) } // Immediately release the lease, since we only need it for the exact // timestamp requested. diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 74a9e2641f68..ef01b1bd951d 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -270,7 +270,7 @@ func (s errorWrapperSink) EmitRow( ctx context.Context, topicDescr TopicDescriptor, key, value []byte, updated hlc.Timestamp, ) error { if err := s.wrapped.EmitRow(ctx, topicDescr, key, value, updated); err != nil { - return MarkRetryableError(err) + return markRetryableError(err) } return nil } @@ -279,21 +279,21 @@ func (s errorWrapperSink) EmitResolvedTimestamp( ctx context.Context, encoder Encoder, resolved hlc.Timestamp, ) error { if err := s.wrapped.EmitResolvedTimestamp(ctx, encoder, resolved); err != nil { - return MarkRetryableError(err) + return markRetryableError(err) } return nil } func (s errorWrapperSink) Flush(ctx context.Context) error { if err := s.wrapped.Flush(ctx); err != nil { - return MarkRetryableError(err) + return markRetryableError(err) } return nil } func (s errorWrapperSink) Close() error { if err := s.wrapped.Close(); err != nil { - return MarkRetryableError(err) + return markRetryableError(err) } return nil } From 9e1bfa53d941fb182e1629d6c052ea7b265bdb52 Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Mon, 22 Feb 2021 16:14:46 -0500 Subject: [PATCH 2/6] changefeedccl: extract job retrying code into common package The retrying logic that changefeed performs should be shared across all long running jobs that set up a DistSQL flow. This commit extracts the logic that changefeed uses to mark errors as retryable and to retry them inside the job into a common error that can be shared by all jobs in utilccl. Unfortunately, the changefeed specific retry errors cannot be removed outright since we need to maintain backwards compatibility with 20.2 nodes. When marking an error as retryable during the changefeed job, we now also mark it with the generic jobsFlowRetryable error so that the changefeed specific checks can be removed in 21.1. Release note: None --- pkg/ccl/changefeedccl/changefeed_stmt.go | 67 ++++--------- pkg/ccl/changefeedccl/errors.go | 23 +++-- pkg/ccl/utilccl/errors.go | 119 +++++++++++++++++++++++ 3 files changed, 151 insertions(+), 58 deletions(-) create mode 100644 pkg/ccl/utilccl/errors.go diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 6e9d88d39a2e..25a160fff7ad 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -11,7 +11,6 @@ package changefeedccl import ( "context" "encoding/hex" - "fmt" "math/rand" "net/url" "sort" @@ -39,7 +38,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" - "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/privilege" @@ -49,7 +47,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/cloudimpl" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -553,49 +550,7 @@ func (b *changefeedResumer) Resume(ctx context.Context, execCtx interface{}) err details := b.job.Details().(jobspb.ChangefeedDetails) progress := b.job.Progress() - // We'd like to avoid failing a changefeed unnecessarily, so when an error - // bubbles up to this level, we'd like to "retry" the flow if possible. This - // could be because the sink is down or because a cockroach node has crashed - // or for many other reasons. - opts := retry.Options{ - InitialBackoff: 5 * time.Millisecond, - Multiplier: 2, - MaxBackoff: 10 * time.Second, - } - var err error - - for r := retry.StartWithCtx(ctx, opts); r.Next(); { - // startedCh is normally used to signal back to the creator of the job that - // the job has started; however, in this case nothing will ever receive - // on the channel, causing the changefeed flow to block. Replace it with - // a dummy channel. - startedCh := make(chan tree.Datums, 1) - - if err = distChangefeedFlow(ctx, jobExec, jobID, details, progress, startedCh); err == nil { - return nil - } - if !isRetryableError(err) { - if ctx.Err() != nil { - return ctx.Err() - } - - if flowinfra.IsFlowRetryableError(err) { - // We don't want to retry flowinfra retryable error in the retry loop above. - // This error currently indicates that this node is being drained. As such, - // retries will not help. - // Instead, we want to make sure that the changefeed job is not marked failed - // due to a transient, retryable error. - err = jobs.NewRetryJobError(fmt.Sprintf("retryable flow error: %+v", err)) - } - - log.Warningf(ctx, `CHANGEFEED job %d returning with error: %+v`, jobID, err) - return err - } - - log.Warningf(ctx, `CHANGEFEED job %d encountered retryable error: %v`, jobID, err) - if metrics, ok := execCfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok { - metrics.ErrorRetries.Inc(1) - } + runChangefeed := func(ctx context.Context) error { // Re-load the job in order to update our progress object, which may have // been updated by the changeFrontier processor since the flow started. reloadedJob, reloadErr := execCfg.JobRegistry.LoadJob(ctx, jobID) @@ -609,10 +564,24 @@ func (b *changefeedResumer) Resume(ctx context.Context, execCtx interface{}) err } else { progress = reloadedJob.Progress() } + + // startedCh is normally used to signal back to the creator of the job that + // the job has started; however, in this case nothing will ever receive + // on the channel, causing the changefeed flow to block. Replace it with + // a dummy channel. + startedCh := make(chan tree.Datums, 1) + + return distChangefeedFlow(ctx, jobExec, jobID, details, progress, startedCh) + } + + logOnRetryableError := func(err error) { + log.Warningf(ctx, `CHANGEFEED job %d encountered retryable error: %v`, jobID, err) + if metrics, ok := execCfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok { + metrics.ErrorRetries.Inc(1) + } } - // We only hit this if `r.Next()` returns false, which right now only happens - // on context cancellation. - return errors.Wrap(err, `ran out of retries`) + + return utilccl.RetryDistSQLFlowCustomRetryable(ctx, isChangefeedRetryableError, runChangefeed, logOnRetryableError) } // OnFailOrCancel is part of the jobs.Resumer interface. diff --git a/pkg/ccl/changefeedccl/errors.go b/pkg/ccl/changefeedccl/errors.go index 5094d24f8971..6e7f0f1b59b7 100644 --- a/pkg/ccl/changefeedccl/errors.go +++ b/pkg/ccl/changefeedccl/errors.go @@ -13,18 +13,26 @@ import ( "reflect" "strings" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/errors" ) const retryableErrorString = "retryable changefeed error" +// retryableError is deprecated, but used to maintain backwards +// compatibility with 20.2 nodes. +// TODO(pbardea): Remove in 21.2, and use the helpers in utilccl/errors.go +// instead. type retryableError struct { wrapped error } // markRetryableError wraps the given error, marking it as retryable to // changefeeds. +// TODO(pbardea): Remove in 21.2, and use utilccl.MarkRetryableError instead. func markRetryableError(e error) error { + // Wrap all these errors with a more generic job retry error. + e = utilccl.MarkRetryableError(e) return &retryableError{wrapped: e} } @@ -40,9 +48,11 @@ func (e *retryableError) Cause() error { return e.wrapped } // planned to be moved to the stdlib in go 1.13. func (e *retryableError) Unwrap() error { return e.wrapped } -// isRetryableError returns true if the supplied error, or any of its parent -// causes, is a isRetryableError. -func isRetryableError(err error) bool { +// isChangefeedRetryableError returns true if the supplied error, or any of its +// parent causes, is a isChangefeedRetryableError. This should be used in +// utilccl.RetryDistSQLFlowCustomRetryable, and should eventually be removed in +// 21.2, when no more errors will be marked with this changefeed specific error. +func isChangefeedRetryableError(err error) bool { if err == nil { return false } @@ -60,12 +70,7 @@ func isRetryableError(err error) bool { // unfortunate string comparison. return true } - if strings.Contains(errStr, `rpc error`) { - // When a crdb node dies, any DistSQL flows with processors scheduled on - // it get an error with "rpc error" in the message from the call to - // `(*DistSQLPlanner).Run`. - return true - } + return false } diff --git a/pkg/ccl/utilccl/errors.go b/pkg/ccl/utilccl/errors.go new file mode 100644 index 000000000000..cbcb90d8d607 --- /dev/null +++ b/pkg/ccl/utilccl/errors.go @@ -0,0 +1,119 @@ +package utilccl + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/errors" +) + +const retryableJobsFlowError = "retryable jobs error" + +type retryableError struct { + wrapped error +} + +// MarkRetryableError wraps the given error, marking it as retryable to +// jobs. +func MarkRetryableError(e error) error { + return &retryableError{wrapped: e} +} + +// Error implements the error interface. +func (e *retryableError) Error() string { + return fmt.Sprintf("%s: %s", retryableJobsFlowError, e.wrapped.Error()) +} + +// Cause implements the github.com/pkg/errors.causer interface. +func (e *retryableError) Cause() error { return e.wrapped } + +// Unwrap implements the github.com/golang/xerrors.Wrapper interface, which is +// planned to be moved to the stdlib in go 1.13. +func (e *retryableError) Unwrap() error { return e.wrapped } + +// IsDistSQLRetryableError returns true if the supplied error, or any of its parent +// causes, is a IsDistSQLRetryableError. +func isDistSQLRetryableError(err error) bool { + if err == nil { + return false + } + + // TODO(knz): this is a bad implementation. Make it go away + // by avoiding string comparisons. + + errStr := err.Error() + if strings.Contains(errStr, `rpc error`) { + // When a crdb node dies, any DistSQL flows with processors scheduled on + // it get an error with "rpc error" in the message from the call to + // `(*DistSQLPlanner).Run`. + return true + } + return false +} + +// RetryDistSQLFlowCustomRetryable retries the given func in the context of a +// long running DistSQL flow which is used by all jobs. If a node were to fail, +// either the work func should be retried, or the error returned will be a job +// retry error that will retry the entire job in the case of the coordinator +// node being drained. +// +// This is maintained to support old-version nodes running CDC that may return +// with CDC specific retryable errors. +// TODO(pbardea): In 20.2, remove the isRetryable argument. +func RetryDistSQLFlowCustomRetryable( + ctx context.Context, + isRetryable func(error) bool, + retryable func(ctx context.Context) error, + logRetryableError func(error), +) error { + opts := retry.Options{ + InitialBackoff: 5 * time.Millisecond, + Multiplier: 2, + MaxBackoff: 10 * time.Second, + } + + var err error + + for r := retry.StartWithCtx(ctx, opts); r.Next(); { + err = retryable(ctx) + if err == nil { + return nil + } + + isCustomRetryable := false + if isRetryable != nil && isRetryable(err) { + isCustomRetryable = true + } + if retryable := isDistSQLRetryableError(err) || isCustomRetryable; !retryable { + if ctx.Err() != nil { + return ctx.Err() + } + + if flowinfra.IsFlowRetryableError(err) { + // We don't want to retry flowinfra retryable error in the retry loop + // above. This error currently indicates that this node is being + // drained. As such, retries will not help. + // Instead, we want to make sure that the job is not marked failed due + // to a transient, retryable error. + err = jobs.NewRetryJobError(fmt.Sprintf("retryable flow error: %+v", err)) + } + + log.Warningf(ctx, `returning with error: %+v`, err) + return err + } + + if logRetryableError != nil { + logRetryableError(err) + } + } + + // We only hit this if `r.Next()` returns false, which right now only happens + // on context cancellation. + return errors.Wrap(err, `ran out of retries`) +} From d704baaaea9f27c2b77056297a567d6801eda009 Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Mon, 22 Feb 2021 16:33:16 -0500 Subject: [PATCH 3/6] backupccl,importccl: retry DistSQL flow on retryable failure This commit retries backup/restore and import jobs on temporary failures that might be caused by nodes crashing. It follows the approach set out by changefeeds. Release justification: bug fix Release note (bug fix): Backup, restore and import are now more resilient to node failures and will retry automatically. --- pkg/ccl/backupccl/BUILD.bazel | 1 + pkg/ccl/backupccl/backup_job.go | 64 ++++++--- pkg/ccl/backupccl/restore_job.go | 57 +++++++- .../changefeedccl/changefeed_processors.go | 6 +- pkg/ccl/changefeedccl/changefeed_stmt.go | 71 +++++++--- pkg/ccl/changefeedccl/changefeed_test.go | 4 +- pkg/ccl/changefeedccl/encoder.go | 2 +- pkg/ccl/changefeedccl/errors.go | 25 ++-- pkg/ccl/changefeedccl/rowfetcher_cache.go | 4 +- pkg/ccl/changefeedccl/sink.go | 8 +- pkg/ccl/importccl/import_stmt.go | 46 ++++++- pkg/ccl/utilccl/BUILD.bazel | 1 + pkg/ccl/utilccl/errors.go | 123 +++--------------- 13 files changed, 238 insertions(+), 174 deletions(-) diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index e2d3aec78876..1e5dd0c294ea 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -92,6 +92,7 @@ go_library( "//pkg/util/log", "//pkg/util/metric", "//pkg/util/protoutil", + "//pkg/util/retry", "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 0514914288a3..79e998bbbfeb 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -17,6 +17,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/build" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -38,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/cloudimpl" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -439,23 +441,53 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error { } statsCache := p.ExecCfg().TableStatsCache - res, err := backup( - ctx, - p, - details.URI, - details.URIsByLocalityKV, - p.ExecCfg().DB, - p.ExecCfg().Settings, - defaultStore, - storageByLocalityKV, - b.job, - backupManifest, - p.ExecCfg().DistSQLSrv.ExternalStorage, - details.EncryptionOptions, - statsCache, - ) + // We retry on pretty generic failures -- any rpc error. If a worker node were + // to restart, it would produce this kind of error, but there may be other + // errors that are also rpc errors. Don't retry to aggressively. + retryOpts := retry.Options{ + MaxBackoff: 1 * time.Second, + MaxRetries: 5, + } + + // We want to retry a backup if there are transient failures (i.e. worker nodes + // dying), so if we receive a retryable error, re-plan and retry the backup. + var res RowCount + for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { + res, err = backup( + ctx, + p, + details.URI, + details.URIsByLocalityKV, + p.ExecCfg().DB, + p.ExecCfg().Settings, + defaultStore, + storageByLocalityKV, + b.job, + backupManifest, + p.ExecCfg().DistSQLSrv.ExternalStorage, + details.EncryptionOptions, + statsCache, + ) + if err == nil { + break + } + + if !utilccl.IsDistSQLRetryableError(err) { + return errors.Wrap(err, "failed to run backup") + } + + log.Warningf(ctx, `BACKUP job encountered retryable error: %+v`, err) + + // Reload the backup manifest to pick up any spans we may have completed on + // previous attempts. + var reloadBackupErr error + backupManifest, reloadBackupErr = b.readManifestOnResume(ctx, p.ExecCfg(), defaultStore, details) + if reloadBackupErr != nil { + log.Warning(ctx, "could not reload backup manifest when retrying, continuing with old progress") + } + } if err != nil { - return errors.Wrap(err, "failed to run backup") + return errors.Wrap(err, "exhausted retries") } b.deleteCheckpoint(ctx, p.ExecCfg(), p.User()) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index d6479bdd2d26..2fc813551324 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -13,9 +13,11 @@ import ( "context" "fmt" "math" + "time" "github.com/cockroachdb/cockroach/pkg/build" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" @@ -48,6 +50,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/interval" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -500,6 +503,56 @@ func rewriteBackupSpanKey( return newKey, nil } +func restoreWithRetry( + restoreCtx context.Context, + execCtx sql.JobExecContext, + backupManifests []BackupManifest, + backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo, + endTime hlc.Timestamp, + dataToRestore restorationData, + job *jobs.Job, + encryption *jobspb.BackupEncryptionOptions, +) (RowCount, error) { + // We retry on pretty generic failures -- any rpc error. If a worker node were + // to restart, it would produce this kind of error, but there may be other + // errors that are also rpc errors. Don't retry to aggressively. + retryOpts := retry.Options{ + MaxBackoff: 1 * time.Second, + MaxRetries: 5, + } + + // We want to retry a restore if there are transient failures (i.e. worker nodes + // dying), so if we receive a retryable error, re-plan and retry the backup. + var res RowCount + var err error + for r := retry.StartWithCtx(restoreCtx, retryOpts); r.Next(); { + res, err = restore( + restoreCtx, + execCtx, + backupManifests, + backupLocalityInfo, + endTime, + dataToRestore, + job, + encryption, + ) + if err == nil { + break + } + + if !utilccl.IsDistSQLRetryableError(err) { + return RowCount{}, err + } + + log.Warningf(restoreCtx, `encountered retryable error: %+v`, err) + } + + if err != nil { + return RowCount{}, errors.Wrap(err, "exhausted retries") + } + return res, nil +} + // restore imports a SQL table (or tables) from sets of non-overlapping sstable // files. func restore( @@ -1463,7 +1516,7 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error var resTotal RowCount if !preData.isEmpty() { - res, err := restore( + res, err := restoreWithRetry( ctx, p, backupManifests, @@ -1497,7 +1550,7 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error { // Restore the main data bundle. We notably only restore the system tables // later. - res, err := restore( + res, err := restoreWithRetry( ctx, p, backupManifests, diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 327d07dcc56a..8fe293c2df94 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -208,7 +208,7 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.flowCtx.Cfg.Settings, timestampOracle, ca.flowCtx.Cfg.ExternalStorageFromURI, ca.spec.User(), ) if err != nil { - err = markRetryableError(err) + err = MarkRetryableError(err) // Early abort in the case that there is an error creating the sink. ca.MoveToDraining(err) ca.cancel() @@ -924,7 +924,7 @@ func (cf *changeFrontier) Start(ctx context.Context) { cf.flowCtx.Cfg.Settings, nilOracle, cf.flowCtx.Cfg.ExternalStorageFromURI, cf.spec.User(), ) if err != nil { - err = markRetryableError(err) + err = MarkRetryableError(err) cf.MoveToDraining(err) return } @@ -1050,7 +1050,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad if cf.EvalCtx.Settings.Version.IsActive( cf.Ctx, clusterversion.ChangefeedsSupportPrimaryIndexChanges, ) { - err = markRetryableError(err) + err = MarkRetryableError(err) } else { err = errors.Wrap(err, "primary key change occurred") } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 25a160fff7ad..db6e91ee4e74 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -11,6 +11,7 @@ package changefeedccl import ( "context" "encoding/hex" + "fmt" "math/rand" "net/url" "sort" @@ -38,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" + "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/privilege" @@ -47,6 +49,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/cloudimpl" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -285,7 +288,7 @@ func changefeedPlanHook( if err != nil { telemetry.Count(`changefeed.core.error`) } - return maybeStripRetryableErrorMarker(err) + return MaybeStripRetryableErrorMarker(err) } settings := p.ExecCfg().Settings @@ -315,7 +318,7 @@ func changefeedPlanHook( settings, nilOracle, p.ExecCfg().DistSQLSrv.ExternalStorageFromURI, p.User(), ) if err != nil { - return maybeStripRetryableErrorMarker(err) + return MaybeStripRetryableErrorMarker(err) } if err := canarySink.Close(); err != nil { return err @@ -550,7 +553,49 @@ func (b *changefeedResumer) Resume(ctx context.Context, execCtx interface{}) err details := b.job.Details().(jobspb.ChangefeedDetails) progress := b.job.Progress() - runChangefeed := func(ctx context.Context) error { + // We'd like to avoid failing a changefeed unnecessarily, so when an error + // bubbles up to this level, we'd like to "retry" the flow if possible. This + // could be because the sink is down or because a cockroach node has crashed + // or for many other reasons. + opts := retry.Options{ + InitialBackoff: 5 * time.Millisecond, + Multiplier: 2, + MaxBackoff: 10 * time.Second, + } + var err error + + for r := retry.StartWithCtx(ctx, opts); r.Next(); { + // startedCh is normally used to signal back to the creator of the job that + // the job has started; however, in this case nothing will ever receive + // on the channel, causing the changefeed flow to block. Replace it with + // a dummy channel. + startedCh := make(chan tree.Datums, 1) + + if err = distChangefeedFlow(ctx, jobExec, jobID, details, progress, startedCh); err == nil { + return nil + } + if !IsRetryableError(err) { + if ctx.Err() != nil { + return ctx.Err() + } + + if flowinfra.IsFlowRetryableError(err) { + // We don't want to retry flowinfra retryable error in the retry loop above. + // This error currently indicates that this node is being drained. As such, + // retries will not help. + // Instead, we want to make sure that the changefeed job is not marked failed + // due to a transient, retryable error. + err = jobs.NewRetryJobError(fmt.Sprintf("retryable flow error: %+v", err)) + } + + log.Warningf(ctx, `CHANGEFEED job %d returning with error: %+v`, jobID, err) + return err + } + + log.Warningf(ctx, `CHANGEFEED job %d encountered retryable error: %v`, jobID, err) + if metrics, ok := execCfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok { + metrics.ErrorRetries.Inc(1) + } // Re-load the job in order to update our progress object, which may have // been updated by the changeFrontier processor since the flow started. reloadedJob, reloadErr := execCfg.JobRegistry.LoadJob(ctx, jobID) @@ -564,24 +609,10 @@ func (b *changefeedResumer) Resume(ctx context.Context, execCtx interface{}) err } else { progress = reloadedJob.Progress() } - - // startedCh is normally used to signal back to the creator of the job that - // the job has started; however, in this case nothing will ever receive - // on the channel, causing the changefeed flow to block. Replace it with - // a dummy channel. - startedCh := make(chan tree.Datums, 1) - - return distChangefeedFlow(ctx, jobExec, jobID, details, progress, startedCh) - } - - logOnRetryableError := func(err error) { - log.Warningf(ctx, `CHANGEFEED job %d encountered retryable error: %v`, jobID, err) - if metrics, ok := execCfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok { - metrics.ErrorRetries.Inc(1) - } } - - return utilccl.RetryDistSQLFlowCustomRetryable(ctx, isChangefeedRetryableError, runChangefeed, logOnRetryableError) + // We only hit this if `r.Next()` returns false, which right now only happens + // on context cancellation. + return errors.Wrap(err, `ran out of retries`) } // OnFailOrCancel is part of the jobs.Resumer interface. diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index b0fe1de8431d..59cb002c3096 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -1722,7 +1722,7 @@ func TestChangefeedRetryableError(t *testing.T) { failSinkHook := func() error { switch atomic.LoadInt64(&failSink) { case 1: - return markRetryableError(fmt.Errorf("synthetic retryable error")) + return MarkRetryableError(fmt.Errorf("synthetic retryable error")) case 2: return fmt.Errorf("synthetic terminal error") } @@ -3000,7 +3000,7 @@ func TestChangefeedRestartDuringBackfill(t *testing.T) { t.Fatal(err) } // Make extra sure that the zombie changefeed can't write any more data. - beforeEmitRowCh <- markRetryableError(errors.New(`nope don't write it`)) + beforeEmitRowCh <- MarkRetryableError(errors.New(`nope don't write it`)) // Insert some data that we should only see out of the changefeed after it // re-runs the backfill. diff --git a/pkg/ccl/changefeedccl/encoder.go b/pkg/ccl/changefeedccl/encoder.go index 2445d97b1790..2e94b6f81142 100644 --- a/pkg/ccl/changefeedccl/encoder.go +++ b/pkg/ccl/changefeedccl/encoder.go @@ -547,7 +547,7 @@ func (e *confluentAvroEncoder) register( return nil }); err != nil { log.Warningf(ctx, "%+v", err) - return 0, markRetryableError(err) + return 0, MarkRetryableError(err) } return id, nil diff --git a/pkg/ccl/changefeedccl/errors.go b/pkg/ccl/changefeedccl/errors.go index 6e7f0f1b59b7..1bf7eac1b987 100644 --- a/pkg/ccl/changefeedccl/errors.go +++ b/pkg/ccl/changefeedccl/errors.go @@ -19,20 +19,13 @@ import ( const retryableErrorString = "retryable changefeed error" -// retryableError is deprecated, but used to maintain backwards -// compatibility with 20.2 nodes. -// TODO(pbardea): Remove in 21.2, and use the helpers in utilccl/errors.go -// instead. type retryableError struct { wrapped error } -// markRetryableError wraps the given error, marking it as retryable to +// MarkRetryableError wraps the given error, marking it as retryable to // changefeeds. -// TODO(pbardea): Remove in 21.2, and use utilccl.MarkRetryableError instead. -func markRetryableError(e error) error { - // Wrap all these errors with a more generic job retry error. - e = utilccl.MarkRetryableError(e) +func MarkRetryableError(e error) error { return &retryableError{wrapped: e} } @@ -48,11 +41,9 @@ func (e *retryableError) Cause() error { return e.wrapped } // planned to be moved to the stdlib in go 1.13. func (e *retryableError) Unwrap() error { return e.wrapped } -// isChangefeedRetryableError returns true if the supplied error, or any of its -// parent causes, is a isChangefeedRetryableError. This should be used in -// utilccl.RetryDistSQLFlowCustomRetryable, and should eventually be removed in -// 21.2, when no more errors will be marked with this changefeed specific error. -func isChangefeedRetryableError(err error) bool { +// IsRetryableError returns true if the supplied error, or any of its parent +// causes, is a IsRetryableError. +func IsRetryableError(err error) bool { if err == nil { return false } @@ -71,13 +62,13 @@ func isChangefeedRetryableError(err error) bool { return true } - return false + return utilccl.IsDistSQLRetryableError(err) } -// maybeStripRetryableErrorMarker performs some minimal attempt to clean the +// MaybeStripRetryableErrorMarker performs some minimal attempt to clean the // RetryableError marker out. This won't do anything if the RetryableError // itself has been wrapped, but that's okay, we'll just have an uglier string. -func maybeStripRetryableErrorMarker(err error) error { +func MaybeStripRetryableErrorMarker(err error) error { // The following is a hack to work around the error cast linter. // What we're doing here is really not kosher; this function // has no business in assuming that the retryableError{} wrapper diff --git a/pkg/ccl/changefeedccl/rowfetcher_cache.go b/pkg/ccl/changefeedccl/rowfetcher_cache.go index d155a2d49875..bf074613aae6 100644 --- a/pkg/ccl/changefeedccl/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/rowfetcher_cache.go @@ -86,7 +86,7 @@ func (c *rowFetcherCache) TableDescForKey( if err != nil { // Manager can return all kinds of errors during chaos, but based on // its usage, none of them should ever be terminal. - return nil, markRetryableError(err) + return nil, MarkRetryableError(err) } // Immediately release the lease, since we only need it for the exact // timestamp requested. @@ -112,7 +112,7 @@ func (c *rowFetcherCache) TableDescForKey( }); err != nil { // Manager can return all kinds of errors during chaos, but based on // its usage, none of them should ever be terminal. - return nil, markRetryableError(err) + return nil, MarkRetryableError(err) } // Immediately release the lease, since we only need it for the exact // timestamp requested. diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index ef01b1bd951d..74a9e2641f68 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -270,7 +270,7 @@ func (s errorWrapperSink) EmitRow( ctx context.Context, topicDescr TopicDescriptor, key, value []byte, updated hlc.Timestamp, ) error { if err := s.wrapped.EmitRow(ctx, topicDescr, key, value, updated); err != nil { - return markRetryableError(err) + return MarkRetryableError(err) } return nil } @@ -279,21 +279,21 @@ func (s errorWrapperSink) EmitResolvedTimestamp( ctx context.Context, encoder Encoder, resolved hlc.Timestamp, ) error { if err := s.wrapped.EmitResolvedTimestamp(ctx, encoder, resolved); err != nil { - return markRetryableError(err) + return MarkRetryableError(err) } return nil } func (s errorWrapperSink) Flush(ctx context.Context) error { if err := s.wrapped.Flush(ctx); err != nil { - return markRetryableError(err) + return MarkRetryableError(err) } return nil } func (s errorWrapperSink) Close() error { if err := s.wrapped.Close(); err != nil { - return markRetryableError(err) + return MarkRetryableError(err) } return nil } diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index 4a1fa8c77363..4168788aa3d0 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -19,6 +19,7 @@ import ( "sort" "strconv" "strings" + "time" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" @@ -1985,11 +1986,12 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error { } } - res, err := sql.DistIngest(ctx, p, r.job, tables, files, format, details.Walltime, + res, err := ingestWithRetry(ctx, p, r.job, tables, files, format, details.Walltime, r.testingKnobs.alwaysFlushJobProgress) if err != nil { return err } + pkIDs := make(map[uint64]struct{}, len(details.Tables)) for _, t := range details.Tables { pkIDs[roachpb.BulkOpSummaryID(uint64(t.Desc.ID), uint64(t.Desc.PrimaryIndex.ID))] = struct{}{} @@ -2047,6 +2049,48 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error { return nil } +func ingestWithRetry( + ctx context.Context, + execCtx sql.JobExecContext, + job *jobs.Job, + tables map[string]*execinfrapb.ReadImportDataSpec_ImportTable, + from []string, + format roachpb.IOFileFormat, + walltime int64, + alwaysFlushProgress bool, +) (roachpb.BulkOpSummary, error) { + + // We retry on pretty generic failures -- any rpc error. If a worker node were + // to restart, it would produce this kind of error, but there may be other + // errors that are also rpc errors. Don't retry to aggressively. + retryOpts := retry.Options{ + MaxBackoff: 1 * time.Second, + MaxRetries: 5, + } + + // We want to retry a restore if there are transient failures (i.e. worker nodes + // dying), so if we receive a retryable error, re-plan and retry the backup. + var res roachpb.BulkOpSummary + var err error + for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { + res, err = sql.DistIngest(ctx, execCtx, job, tables, from, format, walltime, alwaysFlushProgress) + if err == nil { + break + } + + if !utilccl.IsDistSQLRetryableError(err) { + return roachpb.BulkOpSummary{}, err + } + + log.Warningf(ctx, `encountered retryable error: %+v`, err) + } + + if err != nil { + return roachpb.BulkOpSummary{}, errors.Wrap(err, "exhausted retries") + } + return res, nil +} + func (r *importResumer) publishSchemas(ctx context.Context, execCfg *sql.ExecutorConfig) error { details := r.job.Details().(jobspb.ImportDetails) // Schemas should only be published once. diff --git a/pkg/ccl/utilccl/BUILD.bazel b/pkg/ccl/utilccl/BUILD.bazel index 5947e49a4f4c..f977d1ef5d3f 100644 --- a/pkg/ccl/utilccl/BUILD.bazel +++ b/pkg/ccl/utilccl/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "utilccl", srcs = [ + "errors.go", "jobutils.go", "license_check.go", ], diff --git a/pkg/ccl/utilccl/errors.go b/pkg/ccl/utilccl/errors.go index cbcb90d8d607..46ad5d387991 100644 --- a/pkg/ccl/utilccl/errors.go +++ b/pkg/ccl/utilccl/errors.go @@ -1,45 +1,20 @@ -package utilccl - -import ( - "context" - "fmt" - "strings" - "time" - - "github.com/cockroachdb/cockroach/pkg/jobs" - "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/retry" - "github.com/cockroachdb/errors" -) - -const retryableJobsFlowError = "retryable jobs error" - -type retryableError struct { - wrapped error -} - -// MarkRetryableError wraps the given error, marking it as retryable to -// jobs. -func MarkRetryableError(e error) error { - return &retryableError{wrapped: e} -} - -// Error implements the error interface. -func (e *retryableError) Error() string { - return fmt.Sprintf("%s: %s", retryableJobsFlowError, e.wrapped.Error()) -} +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -// Cause implements the github.com/pkg/errors.causer interface. -func (e *retryableError) Cause() error { return e.wrapped } +package utilccl -// Unwrap implements the github.com/golang/xerrors.Wrapper interface, which is -// planned to be moved to the stdlib in go 1.13. -func (e *retryableError) Unwrap() error { return e.wrapped } +import "strings" // IsDistSQLRetryableError returns true if the supplied error, or any of its parent -// causes, is a IsDistSQLRetryableError. -func isDistSQLRetryableError(err error) bool { +// causes is an rpc error. +// This is an unfortunate implementation that should be looking for a more +// specific error. +func IsDistSQLRetryableError(err error) bool { if err == nil { return false } @@ -48,72 +23,8 @@ func isDistSQLRetryableError(err error) bool { // by avoiding string comparisons. errStr := err.Error() - if strings.Contains(errStr, `rpc error`) { - // When a crdb node dies, any DistSQL flows with processors scheduled on - // it get an error with "rpc error" in the message from the call to - // `(*DistSQLPlanner).Run`. - return true - } - return false -} - -// RetryDistSQLFlowCustomRetryable retries the given func in the context of a -// long running DistSQL flow which is used by all jobs. If a node were to fail, -// either the work func should be retried, or the error returned will be a job -// retry error that will retry the entire job in the case of the coordinator -// node being drained. -// -// This is maintained to support old-version nodes running CDC that may return -// with CDC specific retryable errors. -// TODO(pbardea): In 20.2, remove the isRetryable argument. -func RetryDistSQLFlowCustomRetryable( - ctx context.Context, - isRetryable func(error) bool, - retryable func(ctx context.Context) error, - logRetryableError func(error), -) error { - opts := retry.Options{ - InitialBackoff: 5 * time.Millisecond, - Multiplier: 2, - MaxBackoff: 10 * time.Second, - } - - var err error - - for r := retry.StartWithCtx(ctx, opts); r.Next(); { - err = retryable(ctx) - if err == nil { - return nil - } - - isCustomRetryable := false - if isRetryable != nil && isRetryable(err) { - isCustomRetryable = true - } - if retryable := isDistSQLRetryableError(err) || isCustomRetryable; !retryable { - if ctx.Err() != nil { - return ctx.Err() - } - - if flowinfra.IsFlowRetryableError(err) { - // We don't want to retry flowinfra retryable error in the retry loop - // above. This error currently indicates that this node is being - // drained. As such, retries will not help. - // Instead, we want to make sure that the job is not marked failed due - // to a transient, retryable error. - err = jobs.NewRetryJobError(fmt.Sprintf("retryable flow error: %+v", err)) - } - - log.Warningf(ctx, `returning with error: %+v`, err) - return err - } - - if logRetryableError != nil { - logRetryableError(err) - } - } - - // We only hit this if `r.Next()` returns false, which right now only happens - // on context cancellation. - return errors.Wrap(err, `ran out of retries`) + // When a crdb node dies, any DistSQL flows with processors scheduled on + // it get an error with "rpc error" in the message from the call to + // `(*DistSQLPlanner).Run`. + return strings.Contains(errStr, `rpc error`) } From 7fd01c5012a211e5e82f7ef29a789c72976e33eb Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Fri, 5 Mar 2021 12:23:22 -0500 Subject: [PATCH 4/6] roachtest: add nodeShutdown roachtests This commit adds a roachtest that runs a backup and randomly shutsdown a node during the backup/restore/import and ensure that the jobs still complete. Release justification: test only change Release note: None --- pkg/cmd/roachtest/BUILD.bazel | 1 + pkg/cmd/roachtest/backup.go | 127 ++++++++++++++++++++++++------ pkg/cmd/roachtest/import.go | 59 ++++++++++++++ pkg/cmd/roachtest/jobs.go | 142 ++++++++++++++++++++++++++++++++++ pkg/cmd/roachtest/registry.go | 3 + pkg/cmd/roachtest/restore.go | 96 +++++++++++++++++++++++ 6 files changed, 404 insertions(+), 24 deletions(-) create mode 100644 pkg/cmd/roachtest/jobs.go diff --git a/pkg/cmd/roachtest/BUILD.bazel b/pkg/cmd/roachtest/BUILD.bazel index 62a25309cf4d..ca0facdc3dc0 100644 --- a/pkg/cmd/roachtest/BUILD.bazel +++ b/pkg/cmd/roachtest/BUILD.bazel @@ -51,6 +51,7 @@ go_library( "inverted_index.go", "java_helpers.go", "jepsen.go", + "jobs.go", "kv.go", "kvbench.go", "ledger.go", diff --git a/pkg/cmd/roachtest/backup.go b/pkg/cmd/roachtest/backup.go index e3a073df538a..b7765563ec55 100644 --- a/pkg/cmd/roachtest/backup.go +++ b/pkg/cmd/roachtest/backup.go @@ -32,37 +32,110 @@ const ( KMSRegionBEnvVar = "AWS_KMS_REGION_B" KMSKeyARNAEnvVar = "AWS_KMS_KEY_ARN_A" KMSKeyARNBEnvVar = "AWS_KMS_KEY_ARN_B" + + // rows2TiB is the number of rows to import to load 2TB of data (when + // replicated). + rows2TiB = 65_104_166 + rows100GiB = rows2TiB / 20 + rows30GiB = rows2TiB / 66 + rows15GiB = rows30GiB / 2 + rows5GiB = rows100GiB / 20 + rows3GiB = rows30GiB / 10 ) -func registerBackup(r *testRegistry) { - importBankData := func(ctx context.Context, rows int, t *test, c *cluster) string { - dest := c.name - // Randomize starting with encryption-at-rest enabled. - c.encryptAtRandom = true +func importBankDataSplit(ctx context.Context, rows, ranges int, t *test, c *cluster) string { + dest := c.name + // Randomize starting with encryption-at-rest enabled. + c.encryptAtRandom = true + + if local { + dest += fmt.Sprintf("%d", timeutil.Now().UnixNano()) + } + + c.Put(ctx, workload, "./workload") + c.Put(ctx, cockroach, "./cockroach") + + // NB: starting the cluster creates the logs dir as a side effect, + // needed below. + c.Start(ctx, t) + c.Run(ctx, c.All(), `./workload csv-server --port=8081 &> logs/workload-csv-server.log < /dev/null &`) + time.Sleep(time.Second) // wait for csv server to open listener + + importArgs := []string{ + "./workload", "fixtures", "import", "bank", + "--db=bank", "--payload-bytes=10240", fmt.Sprintf("--ranges=%d", ranges), "--csv-server", "http://localhost:8081", + fmt.Sprintf("--rows=%d", rows), "--seed=1", "{pgurl:1}", + } + c.Run(ctx, c.Node(1), importArgs...) + + return dest +} + +func importBankData(ctx context.Context, rows int, t *test, c *cluster) string { + return importBankDataSplit(ctx, rows, 0 /* ranges */, t, c) +} +func registerBackupNodeShutdown(r *testRegistry) { + // backupNodeRestartSpec runs a backup and randomly shuts down a node during + // the backup. + backupNodeRestartSpec := makeClusterSpec(4) + loadBackupData := func(ctx context.Context, t *test, c *cluster) string { + // This aught to be enough since this isn't a performance test. + rows := rows15GiB if local { - rows = 100 - dest += fmt.Sprintf("%d", timeutil.Now().UnixNano()) + // Needs to be sufficiently large to give each processor a good chunk of + // works so the job doesn't complete immediately. + rows = rows5GiB } + return importBankData(ctx, rows, t, c) + } + + r.Add(testSpec{ + Name: fmt.Sprintf("backup/nodeShutdown/worker/%s", backupNodeRestartSpec), + Owner: OwnerBulkIO, + Cluster: backupNodeRestartSpec, + MinVersion: "v21.1.0", + Run: func(ctx context.Context, t *test, c *cluster) { + gatewayNode := 2 + nodeToShutdown := 3 + dest := loadBackupData(ctx, t, c) + backupQuery := `BACKUP bank.bank TO 'nodelocal://1/` + dest + `' WITH DETACHED` + startBackup := func(c *cluster) (jobID string, err error) { + gatewayDB := c.Conn(ctx, gatewayNode) + defer gatewayDB.Close() + + err = gatewayDB.QueryRowContext(ctx, backupQuery).Scan(&jobID) + return + } - c.Put(ctx, workload, "./workload") - c.Put(ctx, cockroach, "./cockroach") + jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, startBackup) + }, + }) + r.Add(testSpec{ + Name: fmt.Sprintf("backup/nodeShutdown/coordinator/%s", backupNodeRestartSpec), + Owner: OwnerBulkIO, + Cluster: backupNodeRestartSpec, + MinVersion: "v21.1.0", + Run: func(ctx context.Context, t *test, c *cluster) { + gatewayNode := 2 + nodeToShutdown := 2 + dest := loadBackupData(ctx, t, c) + backupQuery := `BACKUP bank.bank TO 'nodelocal://1/` + dest + `' WITH DETACHED` + startBackup := func(c *cluster) (jobID string, err error) { + gatewayDB := c.Conn(ctx, gatewayNode) + defer gatewayDB.Close() + + err = gatewayDB.QueryRowContext(ctx, backupQuery).Scan(&jobID) + return + } - // NB: starting the cluster creates the logs dir as a side effect, - // needed below. - c.Start(ctx, t) - c.Run(ctx, c.All(), `./workload csv-server --port=8081 &> logs/workload-csv-server.log < /dev/null &`) - time.Sleep(time.Second) // wait for csv server to open listener + jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, startBackup) + }, + }) - importArgs := []string{ - "./workload", "fixtures", "import", "bank", - "--db=bank", "--payload-bytes=10240", "--ranges=0", "--csv-server", "http://localhost:8081", - fmt.Sprintf("--rows=%d", rows), "--seed=1", "{pgurl:1}", - } - c.Run(ctx, c.Node(1), importArgs...) +} - return dest - } +func registerBackup(r *testRegistry) { backup2TBSpec := makeClusterSpec(10) r.Add(testSpec{ @@ -71,7 +144,10 @@ func registerBackup(r *testRegistry) { Cluster: backup2TBSpec, MinVersion: "v2.1.0", Run: func(ctx context.Context, t *test, c *cluster) { - rows := 65104166 + rows := rows2TiB + if local { + rows = 100 + } dest := importBankData(ctx, rows, t, c) m := newMonitor(ctx, c) m.Go(func(ctx context.Context) error { @@ -96,7 +172,10 @@ func registerBackup(r *testRegistry) { } // ~10GiB - which is 30Gib replicated. - rows := 976562 + rows := rows30GiB + if local { + rows = 100 + } dest := importBankData(ctx, rows, t, c) conn := c.Conn(ctx, 1) diff --git a/pkg/cmd/roachtest/import.go b/pkg/cmd/roachtest/import.go index a06ab34b4d68..9c673109051d 100644 --- a/pkg/cmd/roachtest/import.go +++ b/pkg/cmd/roachtest/import.go @@ -20,6 +20,65 @@ import ( "github.com/cockroachdb/errors" ) +func registerImportNodeShutdown(r *testRegistry) { + getImportRunner := func(ctx context.Context, gatewayNode int) jobStarter { + startImport := func(c *cluster) (jobID string, err error) { + importStmt := ` + IMPORT TABLE partsupp + CREATE USING 'gs://cockroach-fixtures/tpch-csv/schema/partcupp.sql' + CSV DATA ( + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.1', + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.2', + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.3', + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.4', + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.5', + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.6', + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.7', + 'gs://cockroach-fixtures/tpch-csv/sf-100/partsupp.tbl.8' + ) WITH delimiter='|', detached + ` + gatewayDB := c.Conn(ctx, gatewayNode) + defer gatewayDB.Close() + + err = gatewayDB.QueryRowContext(ctx, importStmt).Scan(&jobID) + return + } + + return startImport + } + + r.Add(testSpec{ + Name: "import/nodeShutdown/worker", + Owner: OwnerBulkIO, + Cluster: makeClusterSpec(4), + MinVersion: "v21.1.0", + Run: func(ctx context.Context, t *test, c *cluster) { + c.Put(ctx, cockroach, "./cockroach") + c.Start(ctx, t) + gatewayNode := 2 + nodeToShutdown := 3 + startImport := getImportRunner(ctx, gatewayNode) + + jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, startImport) + }, + }) + r.Add(testSpec{ + Name: "import/nodeShutdown/coordinator", + Owner: OwnerBulkIO, + Cluster: makeClusterSpec(4), + MinVersion: "v21.1.0", + Run: func(ctx context.Context, t *test, c *cluster) { + c.Put(ctx, cockroach, "./cockroach") + c.Start(ctx, t) + gatewayNode := 2 + nodeToShutdown := 2 + startImport := getImportRunner(ctx, gatewayNode) + + jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, startImport) + }, + }) +} + func registerImportTPCC(r *testRegistry) { runImportTPCC := func(ctx context.Context, t *test, c *cluster, warehouses int) { // Randomize starting with encryption-at-rest enabled. diff --git a/pkg/cmd/roachtest/jobs.go b/pkg/cmd/roachtest/jobs.go new file mode 100644 index 000000000000..623a09b6f1b6 --- /dev/null +++ b/pkg/cmd/roachtest/jobs.go @@ -0,0 +1,142 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package main + +import ( + "context" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" +) + +type jobStarter func(c *cluster) (string, error) + +// jobSurvivesNodeShutdown is a helper that tests that a given job, +// running on the specified gatewayNode will still complete successfully +// if nodeToShutdown is shutdown partway through execution. +// This helper assumes: +// - That the job is long running and will take a least a minute to complete. +// - That the necessary setup is done (e.g. any data that the job relies on is +// already loaded) so that `query` can be run on its own to kick off the job. +// - That the statement running the job is a detached statement, and does not +// block until the job completes. +func jobSurvivesNodeShutdown( + ctx context.Context, t *test, c *cluster, nodeToShutdown int, startJob jobStarter, +) { + watcherNode := 1 + (nodeToShutdown)%c.spec.NodeCount + target := c.Node(nodeToShutdown) + t.l.Printf("test has chosen shutdown target node %d, and watcher node %d", + nodeToShutdown, watcherNode) + + jobIDCh := make(chan string, 1) + + m := newMonitor(ctx, c) + m.Go(func(ctx context.Context) error { + defer close(jobIDCh) + t.Status(`running job`) + var jobID string + jobID, err := startJob(c) + if err != nil { + return errors.Wrap(err, "starting the job") + } + t.l.Printf("started running job with ID %s", jobID) + jobIDCh <- jobID + + pollInterval := 5 * time.Second + ticker := time.NewTicker(pollInterval) + + watcherDB := c.Conn(ctx, watcherNode) + defer watcherDB.Close() + + var status string + for { + select { + case <-ticker.C: + err := watcherDB.QueryRowContext(ctx, `SELECT status FROM [SHOW JOBS] WHERE job_id=$1`, jobID).Scan(&status) + if err != nil { + return errors.Wrap(err, "getting the job status") + } + jobStatus := jobs.Status(status) + switch jobStatus { + case jobs.StatusSucceeded: + t.Status("job completed") + return nil + case jobs.StatusRunning: + t.l.Printf("job %s still running, waiting to succeed", jobID) + default: + // Waiting for job to complete. + return errors.Newf("unexpectedly found job %s in state %s", jobID, status) + } + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "context canceled while waiting for job to finish") + } + } + }) + + m.Go(func(ctx context.Context) error { + jobID, ok := <-jobIDCh + if !ok { + return errors.New("job never created") + } + + // Shutdown a node after a bit, and keep it shutdown for the remainder + // of the job. + timeToWait := 10 * time.Second + timer := timeutil.Timer{} + timer.Reset(timeToWait) + select { + case <-ctx.Done(): + return errors.Wrapf(ctx.Err(), "stopping test, did not shutdown node") + case <-timer.C: + timer.Read = true + } + + // Sanity check that the job is still running. + watcherDB := c.Conn(ctx, watcherNode) + defer watcherDB.Close() + + var status string + err := watcherDB.QueryRowContext(ctx, `SELECT status FROM [SHOW JOBS] WHERE job_id=$1`, jobID).Scan(&status) + if err != nil { + return errors.Wrap(err, "getting the job status") + } + jobStatus := jobs.Status(status) + if jobStatus != jobs.StatusRunning { + return errors.Newf("job too fast! job got to state %s before the target node could be shutdown", + status) + } + + t.l.Printf(`stopping node %s`, target) + if err := c.StopE(ctx, target); err != nil { + return errors.Wrapf(err, "could not stop node %s", target) + } + t.l.Printf("stopped node %s", target) + + return nil + }) + + // Before calling `m.Wait()`, do some cleanup. + if err := m.g.Wait(); err != nil { + t.Fatal(err) + } + + // NB: the roachtest harness checks that at the end of the test, all nodes + // that have data also have a running process. + t.Status(fmt.Sprintf("restarting %s (node restart test is done)\n", target)) + if err := c.StartE(ctx, target); err != nil { + t.Fatal(errors.Wrapf(err, "could not restart node %s", target)) + } + + m.Wait() +} diff --git a/pkg/cmd/roachtest/registry.go b/pkg/cmd/roachtest/registry.go index 8207da716f3f..ba1968a5eccc 100644 --- a/pkg/cmd/roachtest/registry.go +++ b/pkg/cmd/roachtest/registry.go @@ -21,6 +21,7 @@ func registerTests(r *testRegistry) { registerAlterPK(r) registerAutoUpgrade(r) registerBackup(r) + registerBackupNodeShutdown(r) registerCancel(r) registerCDC(r) registerClearRange(r) @@ -45,6 +46,7 @@ func registerTests(r *testRegistry) { registerImportMixedVersion(r) registerImportTPCC(r) registerImportTPCH(r) + registerImportNodeShutdown(r) registerInconsistency(r) registerIndexes(r) registerInterleaved(r) @@ -72,6 +74,7 @@ func registerTests(r *testRegistry) { registerRebalanceLoad(r) registerReplicaGC(r) registerRestart(r) + registerRestoreNodeShutdown(r) registerRestore(r) registerRoachmart(r) registerScaleData(r) diff --git a/pkg/cmd/roachtest/restore.go b/pkg/cmd/roachtest/restore.go index d3957509e7ff..32dbb4786c07 100644 --- a/pkg/cmd/roachtest/restore.go +++ b/pkg/cmd/roachtest/restore.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ts/tspb" "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -208,6 +209,101 @@ func (dul *DiskUsageLogger) Runner(ctx context.Context) error { logger.Printf("%s\n", strings.Join(s, ", ")) } } +func registerRestoreNodeShutdown(r *testRegistry) { + makeRestoreStarter := func(ctx context.Context, t *test, c *cluster, gatewayNode int) jobStarter { + return func(c *cluster) (string, error) { + t.l.Printf("connecting to gateway") + gatewayDB := c.Conn(ctx, gatewayNode) + defer gatewayDB.Close() + + t.l.Printf("creating bank database") + if _, err := gatewayDB.Exec("CREATE DATABASE bank"); err != nil { + return "", err + } + + errCh := make(chan error, 1) + go func() { + defer close(errCh) + + // 10 GiB restore. + restoreQuery := `RESTORE bank.bank FROM + 'gs://cockroach-fixtures/workload/bank/version=1.0.0,payload-bytes=100,ranges=10,rows=10000000,seed=1/bank'` + + t.l.Printf("starting to run the restore job") + if _, err := gatewayDB.Exec(restoreQuery); err != nil { + errCh <- err + } + t.l.Printf("done running restore job") + }() + + // Wait for the job. + retryOpts := retry.Options{ + MaxRetries: 50, + InitialBackoff: 1 * time.Second, + MaxBackoff: 5 * time.Second, + } + for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { + var jobCount int + if err := gatewayDB.QueryRowContext(ctx, "SELECT count(*) FROM [SHOW JOBS] WHERE job_type = 'RESTORE'").Scan(&jobCount); err != nil { + return "", err + } + + select { + case err := <-errCh: + // We got an error when starting the job. + return "", err + default: + } + + if jobCount == 0 { + t.l.Printf("waiting for restore job") + } else if jobCount == 1 { + t.l.Printf("found restore job") + break + } else { + t.l.Printf("found multiple restore jobs -- erroring") + return "", errors.New("unexpectedly found multiple restore jobs") + } + } + + var jobID string + if err := gatewayDB.QueryRowContext(ctx, "SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'RESTORE'").Scan(&jobID); err != nil { + return "", errors.Wrap(err, "querying the job ID") + } + return jobID, nil + } + } + + r.Add(testSpec{ + Name: "restore/nodeShutdown/worker", + Owner: OwnerBulkIO, + Cluster: makeClusterSpec(4), + MinVersion: "v21.1.0", + Run: func(ctx context.Context, t *test, c *cluster) { + gatewayNode := 2 + nodeToShutdown := 3 + c.Put(ctx, cockroach, "./cockroach") + c.Start(ctx, t) + + jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, makeRestoreStarter(ctx, t, c, gatewayNode)) + }, + }) + + r.Add(testSpec{ + Name: "restore/nodeShutdown/coordinator", + Owner: OwnerBulkIO, + Cluster: makeClusterSpec(4), + MinVersion: "v21.1.0", + Run: func(ctx context.Context, t *test, c *cluster) { + gatewayNode := 2 + nodeToShutdown := 2 + c.Put(ctx, cockroach, "./cockroach") + c.Start(ctx, t) + + jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, makeRestoreStarter(ctx, t, c, gatewayNode)) + }, + }) +} func registerRestore(r *testRegistry) { for _, item := range []struct { From fdc86d93bb5d37f6c2961c69e351f0645269e262 Mon Sep 17 00:00:00 2001 From: Nathan Stilwell Date: Tue, 23 Mar 2021 10:34:29 -0400 Subject: [PATCH 5/6] ui: Replace "Admin UI" with "DB Console" on login Changed a few remaining instances of "Admin UI" to "DB Console" that were spotted on the login page. Release note (ui change): Change copy that referred to the app as "Admin UI" to "DB Console" instead --- pkg/ui/src/views/login/loginPage.tsx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/ui/src/views/login/loginPage.tsx b/pkg/ui/src/views/login/loginPage.tsx index a4b14f380049..38003154e0fa 100644 --- a/pkg/ui/src/views/login/loginPage.tsx +++ b/pkg/ui/src/views/login/loginPage.tsx @@ -162,7 +162,7 @@ export class LoginPage extends React.Component {
- Log in to the Admin UI + Log in to the DB Console {this.renderError()} @@ -171,8 +171,8 @@ export class LoginPage extends React.Component {
- A user with a password is required to log in to the Admin UI on - secure clusters. + A user with a password is required to log in to the DB Console + on secure clusters. Create a user with this SQL command: From 3e1c9f18c02847f855e27618444113ddf2a1a41a Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Wed, 31 Mar 2021 09:50:47 -0400 Subject: [PATCH 6/6] Revert "sql: lease acquisition of OFFLINE descs may starve bulk operations" Fixes: #62864, #62853, #62849, #62844 Reverts offline descriptor leasing change to fix intermittent failures introduced inside the importccl tests. Release note: None --- pkg/sql/catalog/descs/collection.go | 2 +- pkg/sql/catalog/lease/BUILD.bazel | 4 - pkg/sql/catalog/lease/lease.go | 52 +++------- pkg/sql/catalog/lease/lease_test.go | 148 +--------------------------- 4 files changed, 17 insertions(+), 189 deletions(-) diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index 670c3f78f66d..69f9a225bcb1 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -282,7 +282,7 @@ func (tc *Collection) getLeasedDescriptorByName( // Read the descriptor from the store in the face of some specific errors // because of a known limitation of AcquireByName. See the known // limitations of AcquireByName for details. - if (catalog.HasInactiveDescriptorError(err) && errors.Is(err, catalog.ErrDescriptorDropped)) || + if catalog.HasInactiveDescriptorError(err) || errors.Is(err, catalog.ErrDescriptorNotFound) { return nil, true, nil } diff --git a/pkg/sql/catalog/lease/BUILD.bazel b/pkg/sql/catalog/lease/BUILD.bazel index b76963ccff67..15fe9bc50ae1 100644 --- a/pkg/sql/catalog/lease/BUILD.bazel +++ b/pkg/sql/catalog/lease/BUILD.bazel @@ -54,7 +54,6 @@ go_test( "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", - "//pkg/kv/kvserver", "//pkg/roachpb", "//pkg/security", "//pkg/security/securitytest", @@ -68,9 +67,7 @@ go_test( "//pkg/sql/catalog/tabledesc", "//pkg/sql/pgwire/pgcode", "//pkg/sql/sem/tree", - "//pkg/sql/sessiondata", "//pkg/sql/sqltestutils", - "//pkg/sql/sqlutil", "//pkg/sql/tests", "//pkg/testutils", "//pkg/testutils/serverutils", @@ -87,7 +84,6 @@ go_test( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", - "//pkg/util/uuid", "@com_github_cockroachdb_cockroach_go//crdb", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 572af1db8e38..b121babb6112 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -216,7 +216,7 @@ func (s storage) acquire( return err } if err := catalog.FilterDescriptorState( - desc, tree.CommonLookupFlags{IncludeOffline: true}, // filter dropped only + desc, tree.CommonLookupFlags{}, // filter all non-public state ); err != nil { return err } @@ -981,7 +981,7 @@ func purgeOldVersions( ctx context.Context, db *kv.DB, id descpb.ID, - dropped bool, + takenOffline bool, minVersion descpb.DescriptorVersion, m *Manager, ) error { @@ -995,15 +995,15 @@ func purgeOldVersions( } empty := len(t.mu.active.data) == 0 && t.mu.acquisitionsInProgress == 0 t.mu.Unlock() - if empty && !dropped { + if empty && !takenOffline { // We don't currently have a version on this descriptor, so no need to refresh // anything. return nil } - removeInactives := func(dropped bool) { + removeInactives := func(takenOffline bool) { t.mu.Lock() - t.mu.takenOffline = dropped + t.mu.takenOffline = takenOffline leases := t.removeInactiveVersions() t.mu.Unlock() for _, l := range leases { @@ -1011,8 +1011,8 @@ func purgeOldVersions( } } - if dropped { - removeInactives(true /* dropped */) + if takenOffline { + removeInactives(true /* takenOffline */) return nil } @@ -1028,7 +1028,7 @@ func purgeOldVersions( return errRenewLease } newest.incRefcount() - removeInactives(false /* dropped */) + removeInactives(false /* takenOffline */) s, err := t.release(newest.Descriptor, m.removeOnceDereferenced()) if err != nil { return err @@ -1398,28 +1398,6 @@ func (m *Manager) AcquireByName( parentSchemaID descpb.ID, name string, ) (catalog.Descriptor, hlc.Timestamp, error) { - // When offline descriptor leases were not allowed to be cached, - // attempt to acquire a lease on them would generate a descriptor - // offline error. Recent changes allow offline descriptor leases - // to be cached, but callers still need the offline error generated. - // This logic will release the lease (the lease manager will still - // cache it), and generate the offline descriptor error. - validateDescriptorForReturn := func(desc catalog.Descriptor, - expiration hlc.Timestamp) (catalog.Descriptor, hlc.Timestamp, error) { - if desc.Offline() { - if err := catalog.FilterDescriptorState( - desc, tree.CommonLookupFlags{}, - ); err != nil { - err2 := m.Release(desc) - if err2 != nil { - log.Warningf(ctx, "error releasing lease: %s", err2) - } - return nil, hlc.Timestamp{}, err - } - } - return desc, expiration, nil - } - // Check if we have cached an ID for this name. descVersion := m.names.get(parentID, parentSchemaID, name, timestamp) if descVersion != nil { @@ -1434,7 +1412,7 @@ func (m *Manager) AcquireByName( } } } - return validateDescriptorForReturn(descVersion.Descriptor, descVersion.expiration) + return descVersion.Descriptor, descVersion.expiration, nil } if err := m.Release(descVersion); err != nil { return nil, hlc.Timestamp{}, err @@ -1444,7 +1422,7 @@ func (m *Manager) AcquireByName( if err != nil { return nil, hlc.Timestamp{}, err } - return validateDescriptorForReturn(desc, expiration) + return desc, expiration, nil } // We failed to find something in the cache, or what we found is not @@ -1513,7 +1491,7 @@ func (m *Manager) AcquireByName( return nil, hlc.Timestamp{}, catalog.ErrDescriptorNotFound } } - return validateDescriptorForReturn(desc, expiration) + return desc, expiration, nil } // resolveName resolves a descriptor name to a descriptor ID at a particular @@ -1716,11 +1694,11 @@ func (m *Manager) RefreshLeases(ctx context.Context, s *stop.Stopper, db *kv.DB) } id, version, name, state := descpb.GetDescriptorMetadata(desc) - dropped := state == descpb.DescriptorState_DROP + goingOffline := state == descpb.DescriptorState_DROP || state == descpb.DescriptorState_OFFLINE // Try to refresh the lease to one >= this version. - log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (dropped %v)", - id, version, dropped) - if err := purgeOldVersions(ctx, db, id, dropped, version, m); err != nil { + log.VEventf(ctx, 2, "purging old version of descriptor %d@%d (offline %v)", + id, version, goingOffline) + if err := purgeOldVersions(ctx, db, id, goingOffline, version, m); err != nil { log.Warningf(ctx, "error purging leases for descriptor %d(%s): %s", id, name, err) } diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index aa5a5c14fbeb..742e60503cfb 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -27,9 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -41,9 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqltestutils" - "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -54,12 +50,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" - "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/lib/pq" @@ -2361,7 +2355,6 @@ func TestLeaseWithOfflineTables(t *testing.T) { func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error { flags := tree.ObjectLookupFlagsWithRequiredTableKind(tree.ResolveRequireTableDesc) flags.CommonLookupFlags.IncludeOffline = true - flags.CommonLookupFlags.IncludeDropped = true desc, err := descsCol.GetMutableTableByID(ctx, txn, testTableID(), flags) require.NoError(t, err) require.Equal(t, desc.State, expected) @@ -2405,16 +2398,9 @@ func TestLeaseWithOfflineTables(t *testing.T) { checkLeaseState(true /* shouldBePresent */) // Take the table offline and back online again. - // This should not relinquish the lease anymore - // and offline ones will now be held. + // This should relinquish the lease. setTableState(descpb.DescriptorState_PUBLIC, descpb.DescriptorState_OFFLINE) setTableState(descpb.DescriptorState_OFFLINE, descpb.DescriptorState_PUBLIC) - checkLeaseState(true /* shouldBePresent */) - - // Take the table dropped and back online again. - // This should relinquish the lease. - setTableState(descpb.DescriptorState_PUBLIC, descpb.DescriptorState_DROP) - setTableState(descpb.DescriptorState_DROP, descpb.DescriptorState_PUBLIC) checkLeaseState(false /* shouldBePresent */) // Query the table, thereby acquiring a lease once again. @@ -2717,135 +2703,3 @@ func TestDropDescriptorRacesWithAcquisition(t *testing.T) { return true }) } - -// TestOfflineLeaseRefresh validates that no live lock can occur, -// after a table is brought offline. Specifically a table a will be -// brought offline, and then one transaction will attempt to bring it -// online while another transaction will attempt to do a read. The read -// transaction could previously push back the lease of transaction -// trying to online the table perpetually (as seen in issue #61798). -func TestOfflineLeaseRefresh(t *testing.T) { - defer leaktest.AfterTest(t)() - ctx := context.Background() - waitForTxn := make(chan chan struct{}) - waitForRqstFilter := make(chan chan struct{}) - errorChan := make(chan error) - var txnID uuid.UUID - var mu syncutil.RWMutex - - knobs := &kvserver.StoreTestingKnobs{ - TestingRequestFilter: func(ctx context.Context, req roachpb.BatchRequest) *roachpb.Error { - mu.RLock() - checkRequest := req.Txn != nil && req.Txn.ID.Equal(txnID) - mu.RUnlock() - if _, ok := req.GetArg(roachpb.EndTxn); checkRequest && ok { - notify := make(chan struct{}) - waitForRqstFilter <- notify - <-notify - } - return nil - }, - } - params := base.TestServerArgs{Knobs: base.TestingKnobs{Store: knobs}} - tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params}) - s := tc.Server(0) - defer tc.Stopper().Stop(ctx) - conn := tc.ServerConn(0) - - // Create t1 that will be offline, and t2, - // that will serve inserts. - _, err := conn.Exec(` -CREATE DATABASE d1; -CREATE TABLE d1.t1 (name int); -INSERT INTO d1.t1 values(5); -INSERT INTO d1.t1 values(5); -INSERT INTO d1.t1 values(5); -CREATE TABLE d1.t2 (name int); -`) - require.NoError(t, err) - - tableID := descpb.InvalidID - - // Force the table descriptor into a offline state - err = descs.Txn(ctx, s.ClusterSettings(), s.LeaseManager().(*lease.Manager), s.InternalExecutor().(sqlutil.InternalExecutor), s.DB(), - func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error { - _, tableDesc, err := descriptors.GetMutableTableByName(ctx, txn, tree.NewTableNameWithSchema("d1", "public", "t1"), tree.ObjectLookupFlagsWithRequired()) - if err != nil { - return err - } - tableDesc.SetOffline("For unit test") - err = descriptors.WriteDesc(ctx, false, tableDesc, txn) - if err != nil { - return err - } - tableID = tableDesc.ID - return nil - }) - require.NoError(t, err) - - _, err = s.LeaseManager().(*lease.Manager).WaitForOneVersion(ctx, tableID, retry.Options{}) - require.NoError(t, err) - - go func() { - err := descs.Txn(ctx, s.ClusterSettings(), s.LeaseManager().(*lease.Manager), - s.InternalExecutor().(sqlutil.InternalExecutor), s.DB(), - func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error { - close(waitForRqstFilter) - mu.Lock() - waitForRqstFilter = make(chan chan struct{}) - txnID = txn.ID() - mu.Unlock() - - // Online the descriptor by making it public - _, tableDesc, err := descriptors.GetMutableTableByName(ctx, txn, - tree.NewTableNameWithSchema("d1", "public", "t1"), - tree.ObjectLookupFlags{CommonLookupFlags: tree.CommonLookupFlags{ - Required: true, - RequireMutable: true, - IncludeOffline: true, - AvoidCached: true, - }}) - if err != nil { - return err - } - tableDesc.SetPublic() - err = descriptors.WriteDesc(ctx, false, tableDesc, txn) - if err != nil { - return err - } - // Allow the select on the table to proceed, - // so that it waits on the channel at the appropriate - // moment. - notify := make(chan struct{}) - waitForTxn <- notify - <-notify - - // Select from an unrelated table - _, err = s.InternalExecutor().(sqlutil.InternalExecutor).ExecEx(ctx, "inline-exec", txn, - sessiondata.InternalExecutorOverride{User: security.RootUserName()}, - "insert into d1.t2 values (10);") - return err - - }) - close(waitForTxn) - close(waitForRqstFilter) - errorChan <- err - }() - - for notify := range waitForTxn { - close(notify) - mu.RLock() - rqstFilterChannel := waitForRqstFilter - mu.RUnlock() - for notify2 := range rqstFilterChannel { - // Push the query trying to online the table out by - // leasing out the table again - _, err = conn.Query("select * from d1.t1") - require.EqualError(t, err, "pq: relation \"t1\" is offline: For unit test", - "Table offline error was not generated as expected") - close(notify2) - } - } - require.NoError(t, <-errorChan) - close(errorChan) -}