From 89dc041349abf92916ed63bd02b523b1d33f0f13 Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Thu, 3 Sep 2020 11:48:07 -0400 Subject: [PATCH] backupccl: update RestoreDataProcessor to use ProcessorBase Previously, RestoreDataProcessor would not properly signal to consumers that it had encountered an error and was closing. This meant that it would not drain its inputs. This could result in the restore DistSQL flow becoming stuck, since the SplitAndScatter processor would be blocked on sending a row to the RestoreDataProcessor which would already be closed. Release justification: bug fix Release note (bug fix): A failure while restoring data, may have sometimes resulted in the restore job becoming stuck. --- .../full_cluster_backup_restore_test.go | 121 ++++++++++----- pkg/ccl/backupccl/restore_data_processor.go | 138 ++++++++++-------- pkg/sql/rowexec/processors.go | 4 +- 3 files changed, 162 insertions(+), 101 deletions(-) diff --git a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go index bb4e8ca2a5db..e3c5e1bc06b3 100644 --- a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go +++ b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go @@ -10,6 +10,9 @@ package backupccl import ( "fmt" + "os" + "path/filepath" + "strings" "testing" "github.com/cockroachdb/cockroach/pkg/base" @@ -429,53 +432,97 @@ func TestClusterRestoreFailCleanup(t *testing.T) { const numAccounts = 1000 _, _, sqlDB, tempDir, cleanupFn := BackupRestoreTestSetup(t, singleNode, numAccounts, InitNone) - _, tcRestore, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty( - t, singleNode, tempDir, InitNone, - ) defer cleanupFn() - defer cleanupEmptyCluster() // Setup the system systemTablesToVerify to ensure that they are copied to the new cluster. // Populate system.users. for i := 0; i < 1000; i++ { sqlDB.Exec(t, fmt.Sprintf("CREATE USER maxroach%d", i)) } + + sqlDB.Exec(t, `BACKUP TO 'nodelocal://1/missing-ssts'`) + + // Bugger the backup by removing the SST files. (Note this messes up all of + // the backups, but there is only one at this point.) + if err := filepath.Walk(tempDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + t.Fatal(err) + } + if info.Name() == backupManifestName || !strings.HasSuffix(path, ".sst") { + return nil + } + return os.Remove(path) + }); err != nil { + t.Fatal(err) + } + + // Create a non-corrupted backup. sqlDB.Exec(t, `BACKUP TO $1`, LocalFoo) - // Bugger the backup by injecting a failure while restoring the system data. - for _, server := range tcRestore.Servers { - registry := server.JobRegistry().(*jobs.Registry) - registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{ - jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer { - r := raw.(*restoreResumer) - r.testingKnobs.duringSystemTableRestoration = func() error { - return errors.New("injected error") - } - return r + t.Run("during restoration of data", func(t *testing.T) { + _, _, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty( + t, singleNode, tempDir, InitNone, + ) + defer cleanupEmptyCluster() + sqlDBRestore.ExpectErr(t, "sst: no such file", `RESTORE FROM 'nodelocal://1/missing-ssts'`) + // Verify the failed RESTORE added some DROP tables. + // Note that the system tables here correspond to the temporary tables + // imported, not the system tables themselves. + sqlDBRestore.CheckQueryResults(t, + `SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`, + [][]string{ + {"bank"}, + {"comments"}, + {"jobs"}, + {"locations"}, + {"role_members"}, + {"scheduled_jobs"}, + {"settings"}, + {"ui"}, + {"users"}, + {"zones"}, }, + ) + }) + + t.Run("after restoring data", func(t *testing.T) { + _, tcRestore, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty( + t, singleNode, tempDir, InitNone, + ) + defer cleanupEmptyCluster() + + // Bugger the backup by injecting a failure while restoring the system data. + for _, server := range tcRestore.Servers { + registry := server.JobRegistry().(*jobs.Registry) + registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{ + jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer { + r := raw.(*restoreResumer) + r.testingKnobs.duringSystemTableRestoration = func() error { + return errors.New("injected error") + } + return r + }, + } } - } - sqlDBRestore.ExpectErr( - t, "injected error", - `RESTORE FROM $1`, LocalFoo, - ) - // Verify the failed RESTORE added some DROP tables. - // Note that the system tables here correspond to the temporary tables - // imported, not the system tables themselves. - sqlDBRestore.CheckQueryResults(t, - `SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`, - [][]string{ - {"bank"}, - {"comments"}, - {"jobs"}, - {"locations"}, - {"role_members"}, - {"scheduled_jobs"}, - {"settings"}, - {"ui"}, - {"users"}, - {"zones"}, - }, - ) + sqlDBRestore.ExpectErr(t, "injected error", `RESTORE FROM $1`, LocalFoo) + // Verify the failed RESTORE added some DROP tables. + // Note that the system tables here correspond to the temporary tables + // imported, not the system tables themselves. + sqlDBRestore.CheckQueryResults(t, + `SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`, + [][]string{ + {"bank"}, + {"comments"}, + {"jobs"}, + {"locations"}, + {"role_members"}, + {"scheduled_jobs"}, + {"settings"}, + {"ui"}, + {"users"}, + {"zones"}, + }, + ) + }) } diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 6d190ad6cb94..62d6e3077ae0 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -13,7 +13,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/kv" - roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" gogotypes "github.com/gogo/protobuf/types" ) @@ -31,13 +30,22 @@ import ( var restoreDataOutputTypes = []*types.T{} type restoreDataProcessor struct { + execinfra.ProcessorBase + flowCtx *execinfra.FlowCtx spec execinfrapb.RestoreDataSpec input execinfra.RowSource output execinfra.RowReceiver + + progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress + alloc rowenc.DatumAlloc + kr *storageccl.KeyRewriter } var _ execinfra.Processor = &restoreDataProcessor{} +var _ execinfra.RowSource = &restoreDataProcessor{} + +const restoreDataProcName = "restoreDataProcessor" // OutputTypes implements the execinfra.Processor interface. func (rd *restoreDataProcessor) OutputTypes() []*types.T { @@ -46,8 +54,9 @@ func (rd *restoreDataProcessor) OutputTypes() []*types.T { func newRestoreDataProcessor( flowCtx *execinfra.FlowCtx, - _ int32, + processorID int32, spec execinfrapb.RestoreDataSpec, + post *execinfrapb.PostProcessSpec, input execinfra.RowSource, output execinfra.RowReceiver, ) (execinfra.Processor, error) { @@ -57,87 +66,82 @@ func newRestoreDataProcessor( spec: spec, output: output, } - return rd, nil -} - -// Run implements the execinfra.Processor interface. -func (rd *restoreDataProcessor) Run(ctx context.Context) { - ctx, span := tracing.ChildSpan(ctx, "restoreDataProcessor") - defer tracing.FinishSpan(span) - defer rd.output.ProducerDone() - progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) - - // We don't have to worry about this go routine leaking because next we loop over progCh - // which is closed only after the goroutine returns. var err error - go func() { - defer close(progCh) - err = runRestoreData(ctx, rd.flowCtx, &rd.spec, rd.input, progCh) - }() - - for prog := range progCh { - // Take a copy so that we can send the progress address to the output processor. - p := prog - rd.output.Push(nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &p}) + rd.kr, err = storageccl.MakeKeyRewriterFromRekeys(rd.spec.Rekeys) + if err != nil { + return nil, err } - if err != nil { - rd.output.Push(nil, &execinfrapb.ProducerMetadata{Err: err}) - return + // We don't have to worry about this go routine leaking because next we loop over progCh + // which is closed only after the goroutine returns. + rd.progCh = make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) + + if err := rd.Init(rd, post, rd.OutputTypes(), flowCtx, processorID, output, nil, /* memMonitor */ + execinfra.ProcStateOpts{ + InputsToDrain: []execinfra.RowSource{input}, + TrailingMetaCallback: func(context.Context) []execinfrapb.ProducerMetadata { + rd.close() + return nil + }, + }); err != nil { + return nil, err } + return rd, nil } -func runRestoreData( - ctx context.Context, - flowCtx *execinfra.FlowCtx, - spec *execinfrapb.RestoreDataSpec, - input execinfra.RowSource, - progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, -) error { - input.Start(ctx) - kr, err := storageccl.MakeKeyRewriterFromRekeys(spec.Rekeys) - if err != nil { - return err - } +// Start is part of the RowSource interface. +func (rd *restoreDataProcessor) Start(ctx context.Context) context.Context { + rd.input.Start(ctx) + return rd.StartInternal(ctx, restoreDataProcName) +} - alloc := &rowenc.DatumAlloc{} - for { +// Next is part of the RowSource interface. +func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { + for rd.State == execinfra.StateRunning { // We read rows from the SplitAndScatter processor. We expect each row to // contain 2 columns. The first is used to route the row to this processor, // and the second contains the RestoreSpanEntry that we're interested in. - row, meta := input.Next() + row, meta := rd.input.Next() if meta != nil { - return errors.Newf("unexpected metadata %+v", meta) + if meta.Err != nil { + rd.MoveToDraining(nil /* err */) + } + return nil, meta } if row == nil { - // Done. + rd.MoveToDraining(nil /* err */) break } if len(row) != 2 { - return errors.New("expected input rows to have exactly 2 columns") + rd.MoveToDraining(errors.New("expected input rows to have exactly 2 columns")) + break } - if err := row[1].EnsureDecoded(types.Bytes, alloc); err != nil { - return err + if err := row[1].EnsureDecoded(types.Bytes, &rd.alloc); err != nil { + rd.MoveToDraining(err) + break } datum := row[1].Datum entryDatumBytes, ok := datum.(*tree.DBytes) if !ok { - return errors.AssertionFailedf(`unexpected datum type %T: %+v`, datum, row) + rd.MoveToDraining(errors.AssertionFailedf(`unexpected datum type %T: %+v`, datum, row)) + break } var entry execinfrapb.RestoreSpanEntry if err := protoutil.Unmarshal([]byte(*entryDatumBytes), &entry); err != nil { - return errors.Wrap(err, "un-marshaling restore span entry") + rd.MoveToDraining(errors.Wrap(err, "un-marshaling restore span entry")) + break } - newSpanKey, err := rewriteBackupSpanKey(kr, entry.Span.Key) + newSpanKey, err := rewriteBackupSpanKey(rd.kr, entry.Span.Key) if err != nil { - return errors.Wrap(err, "re-writing span key to import") + rd.MoveToDraining(errors.Wrap(err, "re-writing span key to import")) + break } - log.VEventf(ctx, 1 /* level */, "importing span %v", entry.Span) + log.VEventf(rd.Ctx, 1 /* level */, "importing span %v", entry.Span) importRequest := &roachpb.ImportRequest{ // Import is a point request because we don't want DistSender to split // it. Assume (but don't require) the entire post-rewrite span is on the @@ -145,31 +149,41 @@ func runRestoreData( RequestHeader: roachpb.RequestHeader{Key: newSpanKey}, DataSpan: entry.Span, Files: entry.Files, - EndTime: spec.RestoreTime, - Rekeys: spec.Rekeys, - Encryption: spec.Encryption, + EndTime: rd.spec.RestoreTime, + Rekeys: rd.spec.Rekeys, + Encryption: rd.spec.Encryption, } - importRes, pErr := kv.SendWrapped(ctx, flowCtx.Cfg.DB.NonTransactionalSender(), importRequest) + importRes, pErr := kv.SendWrapped(rd.Ctx, rd.flowCtx.Cfg.DB.NonTransactionalSender(), importRequest) if pErr != nil { - return errors.Wrapf(pErr.GoError(), "importing span %v", importRequest.DataSpan) + rd.MoveToDraining(errors.Wrapf(pErr.GoError(), "importing span %v", importRequest.DataSpan)) + break } var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress progDetails := RestoreProgress{} - progDetails.Summary = countRows(importRes.(*roachpb.ImportResponse).Imported, spec.PKIDs) + progDetails.Summary = countRows(importRes.(*roachpb.ImportResponse).Imported, rd.spec.PKIDs) progDetails.ProgressIdx = entry.ProgressIdx progDetails.DataSpan = entry.Span details, err := gogotypes.MarshalAny(&progDetails) if err != nil { - return err + rd.MoveToDraining(err) + break } prog.ProgressDetails = *details - progCh <- prog - log.VEventf(ctx, 1 /* level */, "imported span %v", entry.Span) + return nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &prog} } - return nil + return nil, rd.DrainHelper() +} + +// ConsumerClosed is part of the RowSource interface. +func (rd *restoreDataProcessor) ConsumerClosed() { + rd.close() +} + +func (rd *restoreDataProcessor) close() { + rd.InternalClose() } func init() { diff --git a/pkg/sql/rowexec/processors.go b/pkg/sql/rowexec/processors.go index 9df85b22d579..b7a4d1156ca4 100644 --- a/pkg/sql/rowexec/processors.go +++ b/pkg/sql/rowexec/processors.go @@ -268,7 +268,7 @@ func NewProcessor( if NewRestoreDataProcessor == nil { return nil, errors.New("RestoreData processor unimplemented") } - return NewRestoreDataProcessor(flowCtx, processorID, *core.RestoreData, inputs[0], outputs[0]) + return NewRestoreDataProcessor(flowCtx, processorID, *core.RestoreData, post, inputs[0], outputs[0]) } if core.CSVWriter != nil { if err := checkNumInOut(inputs, outputs, 1, 1); err != nil { @@ -369,7 +369,7 @@ var NewBackupDataProcessor func(*execinfra.FlowCtx, int32, execinfrapb.BackupDat var NewSplitAndScatterProcessor func(*execinfra.FlowCtx, int32, execinfrapb.SplitAndScatterSpec, execinfra.RowReceiver) (execinfra.Processor, error) // NewRestoreDataProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization. -var NewRestoreDataProcessor func(*execinfra.FlowCtx, int32, execinfrapb.RestoreDataSpec, execinfra.RowSource, execinfra.RowReceiver) (execinfra.Processor, error) +var NewRestoreDataProcessor func(*execinfra.FlowCtx, int32, execinfrapb.RestoreDataSpec, *execinfrapb.PostProcessSpec, execinfra.RowSource, execinfra.RowReceiver) (execinfra.Processor, error) // NewCSVWriterProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization. var NewCSVWriterProcessor func(*execinfra.FlowCtx, int32, execinfrapb.CSVWriterSpec, execinfra.RowSource, execinfra.RowReceiver) (execinfra.Processor, error)