From 0ecdb40fe4d8222b0ef8682566fd58bec6b22773 Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Thu, 3 Sep 2020 11:48:07 -0400 Subject: [PATCH] backupccl: update RestoreDataProcessor to use ProcessorBase Previously, RestoreDataProcessor would not properly signal to consumers that it had encountered an error and was closing. This meant that it would not drain its inputs. This could result in the restore DistSQL flow becoming stuck, since the SplitAndScatter processor would be blocked on sending a row to the RestoreDataProcessor which would already be closed. Release justification: bug fix Release note (bug fix): A failure while restoring data, may have sometimes resulted in the restore job becoming stuck. This bug was only present on 20.2 alphas and betas. --- .../full_cluster_backup_restore_test.go | 136 +++++++++--- pkg/ccl/backupccl/restore_data_processor.go | 209 +++++++++--------- pkg/sql/rowexec/processors.go | 4 +- 3 files changed, 210 insertions(+), 139 deletions(-) diff --git a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go index 5ced37c5a04a..11989e14f731 100644 --- a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go +++ b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go @@ -10,6 +10,9 @@ package backupccl import ( "fmt" + "os" + "path/filepath" + "strings" "testing" "github.com/cockroachdb/cockroach/pkg/base" @@ -430,52 +433,99 @@ func TestClusterRestoreFailCleanup(t *testing.T) { const numAccounts = 1000 _, _, sqlDB, tempDir, cleanupFn := BackupRestoreTestSetup(t, singleNode, numAccounts, InitNone) - _, tcRestore, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty( - t, singleNode, tempDir, InitNone, - ) defer cleanupFn() - defer cleanupEmptyCluster() // Setup the system systemTablesToVerify to ensure that they are copied to the new cluster. // Populate system.users. for i := 0; i < 1000; i++ { sqlDB.Exec(t, fmt.Sprintf("CREATE USER maxroach%d", i)) } + + sqlDB.Exec(t, `BACKUP TO 'nodelocal://1/missing-ssts'`) + + // Bugger the backup by removing the SST files. (Note this messes up all of + // the backups, but there is only one at this point.) + if err := filepath.Walk(tempDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + t.Fatal(err) + } + if info.Name() == backupManifestName || !strings.HasSuffix(path, ".sst") { + return nil + } + return os.Remove(path) + }); err != nil { + t.Fatal(err) + } + + // Create a non-corrupted backup. sqlDB.Exec(t, `BACKUP TO $1`, LocalFoo) - // Bugger the backup by injecting a failure while restoring the system data. - for _, server := range tcRestore.Servers { - registry := server.JobRegistry().(*jobs.Registry) - registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{ - jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer { - r := raw.(*restoreResumer) - r.testingKnobs.duringSystemTableRestoration = func() error { - return errors.New("injected error") - } - return r + t.Run("during restoration of data", func(t *testing.T) { + _, _, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty( + t, singleNode, tempDir, InitNone, + ) + defer cleanupEmptyCluster() + sqlDBRestore.ExpectErr(t, "sst: no such file", `RESTORE FROM 'nodelocal://1/missing-ssts'`) + // Verify the failed RESTORE added some DROP tables. + // Note that the system tables here correspond to the temporary tables + // imported, not the system tables themselves. + sqlDBRestore.CheckQueryResults(t, + `SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`, + [][]string{ + {"bank"}, + {"comments"}, + {"jobs"}, + {"locations"}, + {"role_members"}, + {"scheduled_jobs"}, + {"settings"}, + {"ui"}, + {"users"}, + {"zones"}, }, + ) + }) + + t.Run("during system table restoration", func(t *testing.T) { + _, tcRestore, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty( + t, singleNode, tempDir, InitNone, + ) + defer cleanupEmptyCluster() + + // Bugger the backup by injecting a failure while restoring the system data. + for _, server := range tcRestore.Servers { + registry := server.JobRegistry().(*jobs.Registry) + registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{ + jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer { + r := raw.(*restoreResumer) + r.testingKnobs.duringSystemTableRestoration = func() error { + return errors.New("injected error") + } + return r + }, + } } - } - sqlDBRestore.ExpectErr(t, "injected error", `RESTORE FROM $1`, LocalFoo) - // Verify the failed RESTORE added some DROP tables. - // Note that the system tables here correspond to the temporary tables - // imported, not the system tables themselves. - sqlDBRestore.CheckQueryResults(t, - `SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`, - [][]string{ - {"bank"}, - {"comments"}, - {"jobs"}, - {"locations"}, - {"role_members"}, - {"scheduled_jobs"}, - {"settings"}, - {"ui"}, - {"users"}, - {"zones"}, - }, - ) + sqlDBRestore.ExpectErr(t, "injected error", `RESTORE FROM $1`, LocalFoo) + // Verify the failed RESTORE added some DROP tables. + // Note that the system tables here correspond to the temporary tables + // imported, not the system tables themselves. + sqlDBRestore.CheckQueryResults(t, + `SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`, + [][]string{ + {"bank"}, + {"comments"}, + {"jobs"}, + {"locations"}, + {"role_members"}, + {"scheduled_jobs"}, + {"settings"}, + {"ui"}, + {"users"}, + {"zones"}, + }, + ) + }) t.Run("after offline tables", func(t *testing.T) { _, tcRestore, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty( @@ -498,6 +548,24 @@ func TestClusterRestoreFailCleanup(t *testing.T) { } sqlDBRestore.ExpectErr(t, "injected error", `RESTORE FROM $1`, LocalFoo) + // Verify the failed RESTORE added some DROP tables. + // Note that the system tables here correspond to the temporary tables + // imported, not the system tables themselves. + sqlDBRestore.CheckQueryResults(t, + `SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`, + [][]string{ + {"bank"}, + {"comments"}, + {"jobs"}, + {"locations"}, + {"role_members"}, + {"scheduled_jobs"}, + {"settings"}, + {"ui"}, + {"users"}, + {"zones"}, + }, + ) }) } diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 6d190ad6cb94..aa332fd1bffa 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -13,7 +13,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/kv" - roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" gogotypes "github.com/gogo/protobuf/types" ) @@ -31,23 +30,27 @@ import ( var restoreDataOutputTypes = []*types.T{} type restoreDataProcessor struct { + execinfra.ProcessorBase + flowCtx *execinfra.FlowCtx spec execinfrapb.RestoreDataSpec input execinfra.RowSource output execinfra.RowReceiver + + alloc rowenc.DatumAlloc + kr *storageccl.KeyRewriter } var _ execinfra.Processor = &restoreDataProcessor{} +var _ execinfra.RowSource = &restoreDataProcessor{} -// OutputTypes implements the execinfra.Processor interface. -func (rd *restoreDataProcessor) OutputTypes() []*types.T { - return restoreDataOutputTypes -} +const restoreDataProcName = "restoreDataProcessor" func newRestoreDataProcessor( flowCtx *execinfra.FlowCtx, - _ int32, + processorID int32, spec execinfrapb.RestoreDataSpec, + post *execinfrapb.PostProcessSpec, input execinfra.RowSource, output execinfra.RowReceiver, ) (execinfra.Processor, error) { @@ -57,119 +60,119 @@ func newRestoreDataProcessor( spec: spec, output: output, } - return rd, nil -} -// Run implements the execinfra.Processor interface. -func (rd *restoreDataProcessor) Run(ctx context.Context) { - ctx, span := tracing.ChildSpan(ctx, "restoreDataProcessor") - defer tracing.FinishSpan(span) - defer rd.output.ProducerDone() - - progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) - - // We don't have to worry about this go routine leaking because next we loop over progCh - // which is closed only after the goroutine returns. var err error - go func() { - defer close(progCh) - err = runRestoreData(ctx, rd.flowCtx, &rd.spec, rd.input, progCh) - }() - - for prog := range progCh { - // Take a copy so that we can send the progress address to the output processor. - p := prog - rd.output.Push(nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &p}) + rd.kr, err = storageccl.MakeKeyRewriterFromRekeys(rd.spec.Rekeys) + if err != nil { + return nil, err } - if err != nil { - rd.output.Push(nil, &execinfrapb.ProducerMetadata{Err: err}) - return + if err := rd.Init(rd, post, restoreDataOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */ + execinfra.ProcStateOpts{ + InputsToDrain: []execinfra.RowSource{input}, + TrailingMetaCallback: func(context.Context) []execinfrapb.ProducerMetadata { + rd.close() + return nil + }, + }); err != nil { + return nil, err } + return rd, nil } -func runRestoreData( - ctx context.Context, - flowCtx *execinfra.FlowCtx, - spec *execinfrapb.RestoreDataSpec, - input execinfra.RowSource, - progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, -) error { - input.Start(ctx) - kr, err := storageccl.MakeKeyRewriterFromRekeys(spec.Rekeys) - if err != nil { - return err - } +// Start is part of the RowSource interface. +func (rd *restoreDataProcessor) Start(ctx context.Context) context.Context { + rd.input.Start(ctx) + return rd.StartInternal(ctx, restoreDataProcName) +} - alloc := &rowenc.DatumAlloc{} - for { - // We read rows from the SplitAndScatter processor. We expect each row to - // contain 2 columns. The first is used to route the row to this processor, - // and the second contains the RestoreSpanEntry that we're interested in. - row, meta := input.Next() - if meta != nil { - return errors.Newf("unexpected metadata %+v", meta) - } - if row == nil { - // Done. - break +// Next is part of the RowSource interface. +func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { + if rd.State != execinfra.StateRunning { + return nil, rd.DrainHelper() + } + // We read rows from the SplitAndScatter processor. We expect each row to + // contain 2 columns. The first is used to route the row to this processor, + // and the second contains the RestoreSpanEntry that we're interested in. + row, meta := rd.input.Next() + if meta != nil { + if meta.Err != nil { + rd.MoveToDraining(nil /* err */) } + return nil, meta + } + if row == nil { + rd.MoveToDraining(nil /* err */) + return nil, rd.DrainHelper() + } - if len(row) != 2 { - return errors.New("expected input rows to have exactly 2 columns") - } - if err := row[1].EnsureDecoded(types.Bytes, alloc); err != nil { - return err - } - datum := row[1].Datum - entryDatumBytes, ok := datum.(*tree.DBytes) - if !ok { - return errors.AssertionFailedf(`unexpected datum type %T: %+v`, datum, row) - } + if len(row) != 2 { + rd.MoveToDraining(errors.New("expected input rows to have exactly 2 columns")) + return nil, rd.DrainHelper() + } + if err := row[1].EnsureDecoded(types.Bytes, &rd.alloc); err != nil { + rd.MoveToDraining(err) + return nil, rd.DrainHelper() + } + datum := row[1].Datum + entryDatumBytes, ok := datum.(*tree.DBytes) + if !ok { + rd.MoveToDraining(errors.AssertionFailedf(`unexpected datum type %T: %+v`, datum, row)) + return nil, rd.DrainHelper() + } - var entry execinfrapb.RestoreSpanEntry - if err := protoutil.Unmarshal([]byte(*entryDatumBytes), &entry); err != nil { - return errors.Wrap(err, "un-marshaling restore span entry") - } + var entry execinfrapb.RestoreSpanEntry + if err := protoutil.Unmarshal([]byte(*entryDatumBytes), &entry); err != nil { + rd.MoveToDraining(errors.Wrap(err, "un-marshaling restore span entry")) + return nil, rd.DrainHelper() + } - newSpanKey, err := rewriteBackupSpanKey(kr, entry.Span.Key) - if err != nil { - return errors.Wrap(err, "re-writing span key to import") - } + newSpanKey, err := rewriteBackupSpanKey(rd.kr, entry.Span.Key) + if err != nil { + rd.MoveToDraining(errors.Wrap(err, "re-writing span key to import")) + return nil, rd.DrainHelper() + } - log.VEventf(ctx, 1 /* level */, "importing span %v", entry.Span) - importRequest := &roachpb.ImportRequest{ - // Import is a point request because we don't want DistSender to split - // it. Assume (but don't require) the entire post-rewrite span is on the - // same range. - RequestHeader: roachpb.RequestHeader{Key: newSpanKey}, - DataSpan: entry.Span, - Files: entry.Files, - EndTime: spec.RestoreTime, - Rekeys: spec.Rekeys, - Encryption: spec.Encryption, - } + log.VEventf(rd.Ctx, 1 /* level */, "importing span %v", entry.Span) + importRequest := &roachpb.ImportRequest{ + // Import is a point request because we don't want DistSender to split + // it. Assume (but don't require) the entire post-rewrite span is on the + // same range. + RequestHeader: roachpb.RequestHeader{Key: newSpanKey}, + DataSpan: entry.Span, + Files: entry.Files, + EndTime: rd.spec.RestoreTime, + Rekeys: rd.spec.Rekeys, + Encryption: rd.spec.Encryption, + } - importRes, pErr := kv.SendWrapped(ctx, flowCtx.Cfg.DB.NonTransactionalSender(), importRequest) - if pErr != nil { - return errors.Wrapf(pErr.GoError(), "importing span %v", importRequest.DataSpan) - } + importRes, pErr := kv.SendWrapped(rd.Ctx, rd.flowCtx.Cfg.DB.NonTransactionalSender(), importRequest) + if pErr != nil { + rd.MoveToDraining(errors.Wrapf(pErr.GoError(), "importing span %v", importRequest.DataSpan)) + return nil, rd.DrainHelper() + } - var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress - progDetails := RestoreProgress{} - progDetails.Summary = countRows(importRes.(*roachpb.ImportResponse).Imported, spec.PKIDs) - progDetails.ProgressIdx = entry.ProgressIdx - progDetails.DataSpan = entry.Span - details, err := gogotypes.MarshalAny(&progDetails) - if err != nil { - return err - } - prog.ProgressDetails = *details - progCh <- prog - log.VEventf(ctx, 1 /* level */, "imported span %v", entry.Span) + var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress + progDetails := RestoreProgress{} + progDetails.Summary = countRows(importRes.(*roachpb.ImportResponse).Imported, rd.spec.PKIDs) + progDetails.ProgressIdx = entry.ProgressIdx + progDetails.DataSpan = entry.Span + details, err := gogotypes.MarshalAny(&progDetails) + if err != nil { + rd.MoveToDraining(err) + return nil, rd.DrainHelper() } + prog.ProgressDetails = *details + return nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &prog} +} + +// ConsumerClosed is part of the RowSource interface. +func (rd *restoreDataProcessor) ConsumerClosed() { + rd.close() +} - return nil +func (rd *restoreDataProcessor) close() { + rd.InternalClose() } func init() { diff --git a/pkg/sql/rowexec/processors.go b/pkg/sql/rowexec/processors.go index 9df85b22d579..b7a4d1156ca4 100644 --- a/pkg/sql/rowexec/processors.go +++ b/pkg/sql/rowexec/processors.go @@ -268,7 +268,7 @@ func NewProcessor( if NewRestoreDataProcessor == nil { return nil, errors.New("RestoreData processor unimplemented") } - return NewRestoreDataProcessor(flowCtx, processorID, *core.RestoreData, inputs[0], outputs[0]) + return NewRestoreDataProcessor(flowCtx, processorID, *core.RestoreData, post, inputs[0], outputs[0]) } if core.CSVWriter != nil { if err := checkNumInOut(inputs, outputs, 1, 1); err != nil { @@ -369,7 +369,7 @@ var NewBackupDataProcessor func(*execinfra.FlowCtx, int32, execinfrapb.BackupDat var NewSplitAndScatterProcessor func(*execinfra.FlowCtx, int32, execinfrapb.SplitAndScatterSpec, execinfra.RowReceiver) (execinfra.Processor, error) // NewRestoreDataProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization. -var NewRestoreDataProcessor func(*execinfra.FlowCtx, int32, execinfrapb.RestoreDataSpec, execinfra.RowSource, execinfra.RowReceiver) (execinfra.Processor, error) +var NewRestoreDataProcessor func(*execinfra.FlowCtx, int32, execinfrapb.RestoreDataSpec, *execinfrapb.PostProcessSpec, execinfra.RowSource, execinfra.RowReceiver) (execinfra.Processor, error) // NewCSVWriterProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization. var NewCSVWriterProcessor func(*execinfra.FlowCtx, int32, execinfrapb.CSVWriterSpec, execinfra.RowSource, execinfra.RowReceiver) (execinfra.Processor, error)