diff --git a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go index 5ced37c5a04a..11989e14f731 100644 --- a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go +++ b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go @@ -10,6 +10,9 @@ package backupccl import ( "fmt" + "os" + "path/filepath" + "strings" "testing" "github.com/cockroachdb/cockroach/pkg/base" @@ -430,52 +433,99 @@ func TestClusterRestoreFailCleanup(t *testing.T) { const numAccounts = 1000 _, _, sqlDB, tempDir, cleanupFn := BackupRestoreTestSetup(t, singleNode, numAccounts, InitNone) - _, tcRestore, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty( - t, singleNode, tempDir, InitNone, - ) defer cleanupFn() - defer cleanupEmptyCluster() // Setup the system systemTablesToVerify to ensure that they are copied to the new cluster. // Populate system.users. for i := 0; i < 1000; i++ { sqlDB.Exec(t, fmt.Sprintf("CREATE USER maxroach%d", i)) } + + sqlDB.Exec(t, `BACKUP TO 'nodelocal://1/missing-ssts'`) + + // Bugger the backup by removing the SST files. (Note this messes up all of + // the backups, but there is only one at this point.) + if err := filepath.Walk(tempDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + t.Fatal(err) + } + if info.Name() == backupManifestName || !strings.HasSuffix(path, ".sst") { + return nil + } + return os.Remove(path) + }); err != nil { + t.Fatal(err) + } + + // Create a non-corrupted backup. sqlDB.Exec(t, `BACKUP TO $1`, LocalFoo) - // Bugger the backup by injecting a failure while restoring the system data. - for _, server := range tcRestore.Servers { - registry := server.JobRegistry().(*jobs.Registry) - registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{ - jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer { - r := raw.(*restoreResumer) - r.testingKnobs.duringSystemTableRestoration = func() error { - return errors.New("injected error") - } - return r + t.Run("during restoration of data", func(t *testing.T) { + _, _, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty( + t, singleNode, tempDir, InitNone, + ) + defer cleanupEmptyCluster() + sqlDBRestore.ExpectErr(t, "sst: no such file", `RESTORE FROM 'nodelocal://1/missing-ssts'`) + // Verify the failed RESTORE added some DROP tables. + // Note that the system tables here correspond to the temporary tables + // imported, not the system tables themselves. + sqlDBRestore.CheckQueryResults(t, + `SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`, + [][]string{ + {"bank"}, + {"comments"}, + {"jobs"}, + {"locations"}, + {"role_members"}, + {"scheduled_jobs"}, + {"settings"}, + {"ui"}, + {"users"}, + {"zones"}, }, + ) + }) + + t.Run("during system table restoration", func(t *testing.T) { + _, tcRestore, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty( + t, singleNode, tempDir, InitNone, + ) + defer cleanupEmptyCluster() + + // Bugger the backup by injecting a failure while restoring the system data. + for _, server := range tcRestore.Servers { + registry := server.JobRegistry().(*jobs.Registry) + registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{ + jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer { + r := raw.(*restoreResumer) + r.testingKnobs.duringSystemTableRestoration = func() error { + return errors.New("injected error") + } + return r + }, + } } - } - sqlDBRestore.ExpectErr(t, "injected error", `RESTORE FROM $1`, LocalFoo) - // Verify the failed RESTORE added some DROP tables. - // Note that the system tables here correspond to the temporary tables - // imported, not the system tables themselves. - sqlDBRestore.CheckQueryResults(t, - `SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`, - [][]string{ - {"bank"}, - {"comments"}, - {"jobs"}, - {"locations"}, - {"role_members"}, - {"scheduled_jobs"}, - {"settings"}, - {"ui"}, - {"users"}, - {"zones"}, - }, - ) + sqlDBRestore.ExpectErr(t, "injected error", `RESTORE FROM $1`, LocalFoo) + // Verify the failed RESTORE added some DROP tables. + // Note that the system tables here correspond to the temporary tables + // imported, not the system tables themselves. + sqlDBRestore.CheckQueryResults(t, + `SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`, + [][]string{ + {"bank"}, + {"comments"}, + {"jobs"}, + {"locations"}, + {"role_members"}, + {"scheduled_jobs"}, + {"settings"}, + {"ui"}, + {"users"}, + {"zones"}, + }, + ) + }) t.Run("after offline tables", func(t *testing.T) { _, tcRestore, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty( @@ -498,6 +548,24 @@ func TestClusterRestoreFailCleanup(t *testing.T) { } sqlDBRestore.ExpectErr(t, "injected error", `RESTORE FROM $1`, LocalFoo) + // Verify the failed RESTORE added some DROP tables. + // Note that the system tables here correspond to the temporary tables + // imported, not the system tables themselves. + sqlDBRestore.CheckQueryResults(t, + `SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`, + [][]string{ + {"bank"}, + {"comments"}, + {"jobs"}, + {"locations"}, + {"role_members"}, + {"scheduled_jobs"}, + {"settings"}, + {"ui"}, + {"users"}, + {"zones"}, + }, + ) }) } diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 6d190ad6cb94..aa332fd1bffa 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -13,7 +13,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/kv" - roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" gogotypes "github.com/gogo/protobuf/types" ) @@ -31,23 +30,27 @@ import ( var restoreDataOutputTypes = []*types.T{} type restoreDataProcessor struct { + execinfra.ProcessorBase + flowCtx *execinfra.FlowCtx spec execinfrapb.RestoreDataSpec input execinfra.RowSource output execinfra.RowReceiver + + alloc rowenc.DatumAlloc + kr *storageccl.KeyRewriter } var _ execinfra.Processor = &restoreDataProcessor{} +var _ execinfra.RowSource = &restoreDataProcessor{} -// OutputTypes implements the execinfra.Processor interface. -func (rd *restoreDataProcessor) OutputTypes() []*types.T { - return restoreDataOutputTypes -} +const restoreDataProcName = "restoreDataProcessor" func newRestoreDataProcessor( flowCtx *execinfra.FlowCtx, - _ int32, + processorID int32, spec execinfrapb.RestoreDataSpec, + post *execinfrapb.PostProcessSpec, input execinfra.RowSource, output execinfra.RowReceiver, ) (execinfra.Processor, error) { @@ -57,119 +60,119 @@ func newRestoreDataProcessor( spec: spec, output: output, } - return rd, nil -} -// Run implements the execinfra.Processor interface. -func (rd *restoreDataProcessor) Run(ctx context.Context) { - ctx, span := tracing.ChildSpan(ctx, "restoreDataProcessor") - defer tracing.FinishSpan(span) - defer rd.output.ProducerDone() - - progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) - - // We don't have to worry about this go routine leaking because next we loop over progCh - // which is closed only after the goroutine returns. var err error - go func() { - defer close(progCh) - err = runRestoreData(ctx, rd.flowCtx, &rd.spec, rd.input, progCh) - }() - - for prog := range progCh { - // Take a copy so that we can send the progress address to the output processor. - p := prog - rd.output.Push(nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &p}) + rd.kr, err = storageccl.MakeKeyRewriterFromRekeys(rd.spec.Rekeys) + if err != nil { + return nil, err } - if err != nil { - rd.output.Push(nil, &execinfrapb.ProducerMetadata{Err: err}) - return + if err := rd.Init(rd, post, restoreDataOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */ + execinfra.ProcStateOpts{ + InputsToDrain: []execinfra.RowSource{input}, + TrailingMetaCallback: func(context.Context) []execinfrapb.ProducerMetadata { + rd.close() + return nil + }, + }); err != nil { + return nil, err } + return rd, nil } -func runRestoreData( - ctx context.Context, - flowCtx *execinfra.FlowCtx, - spec *execinfrapb.RestoreDataSpec, - input execinfra.RowSource, - progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, -) error { - input.Start(ctx) - kr, err := storageccl.MakeKeyRewriterFromRekeys(spec.Rekeys) - if err != nil { - return err - } +// Start is part of the RowSource interface. +func (rd *restoreDataProcessor) Start(ctx context.Context) context.Context { + rd.input.Start(ctx) + return rd.StartInternal(ctx, restoreDataProcName) +} - alloc := &rowenc.DatumAlloc{} - for { - // We read rows from the SplitAndScatter processor. We expect each row to - // contain 2 columns. The first is used to route the row to this processor, - // and the second contains the RestoreSpanEntry that we're interested in. - row, meta := input.Next() - if meta != nil { - return errors.Newf("unexpected metadata %+v", meta) - } - if row == nil { - // Done. - break +// Next is part of the RowSource interface. +func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { + if rd.State != execinfra.StateRunning { + return nil, rd.DrainHelper() + } + // We read rows from the SplitAndScatter processor. We expect each row to + // contain 2 columns. The first is used to route the row to this processor, + // and the second contains the RestoreSpanEntry that we're interested in. + row, meta := rd.input.Next() + if meta != nil { + if meta.Err != nil { + rd.MoveToDraining(nil /* err */) } + return nil, meta + } + if row == nil { + rd.MoveToDraining(nil /* err */) + return nil, rd.DrainHelper() + } - if len(row) != 2 { - return errors.New("expected input rows to have exactly 2 columns") - } - if err := row[1].EnsureDecoded(types.Bytes, alloc); err != nil { - return err - } - datum := row[1].Datum - entryDatumBytes, ok := datum.(*tree.DBytes) - if !ok { - return errors.AssertionFailedf(`unexpected datum type %T: %+v`, datum, row) - } + if len(row) != 2 { + rd.MoveToDraining(errors.New("expected input rows to have exactly 2 columns")) + return nil, rd.DrainHelper() + } + if err := row[1].EnsureDecoded(types.Bytes, &rd.alloc); err != nil { + rd.MoveToDraining(err) + return nil, rd.DrainHelper() + } + datum := row[1].Datum + entryDatumBytes, ok := datum.(*tree.DBytes) + if !ok { + rd.MoveToDraining(errors.AssertionFailedf(`unexpected datum type %T: %+v`, datum, row)) + return nil, rd.DrainHelper() + } - var entry execinfrapb.RestoreSpanEntry - if err := protoutil.Unmarshal([]byte(*entryDatumBytes), &entry); err != nil { - return errors.Wrap(err, "un-marshaling restore span entry") - } + var entry execinfrapb.RestoreSpanEntry + if err := protoutil.Unmarshal([]byte(*entryDatumBytes), &entry); err != nil { + rd.MoveToDraining(errors.Wrap(err, "un-marshaling restore span entry")) + return nil, rd.DrainHelper() + } - newSpanKey, err := rewriteBackupSpanKey(kr, entry.Span.Key) - if err != nil { - return errors.Wrap(err, "re-writing span key to import") - } + newSpanKey, err := rewriteBackupSpanKey(rd.kr, entry.Span.Key) + if err != nil { + rd.MoveToDraining(errors.Wrap(err, "re-writing span key to import")) + return nil, rd.DrainHelper() + } - log.VEventf(ctx, 1 /* level */, "importing span %v", entry.Span) - importRequest := &roachpb.ImportRequest{ - // Import is a point request because we don't want DistSender to split - // it. Assume (but don't require) the entire post-rewrite span is on the - // same range. - RequestHeader: roachpb.RequestHeader{Key: newSpanKey}, - DataSpan: entry.Span, - Files: entry.Files, - EndTime: spec.RestoreTime, - Rekeys: spec.Rekeys, - Encryption: spec.Encryption, - } + log.VEventf(rd.Ctx, 1 /* level */, "importing span %v", entry.Span) + importRequest := &roachpb.ImportRequest{ + // Import is a point request because we don't want DistSender to split + // it. Assume (but don't require) the entire post-rewrite span is on the + // same range. + RequestHeader: roachpb.RequestHeader{Key: newSpanKey}, + DataSpan: entry.Span, + Files: entry.Files, + EndTime: rd.spec.RestoreTime, + Rekeys: rd.spec.Rekeys, + Encryption: rd.spec.Encryption, + } - importRes, pErr := kv.SendWrapped(ctx, flowCtx.Cfg.DB.NonTransactionalSender(), importRequest) - if pErr != nil { - return errors.Wrapf(pErr.GoError(), "importing span %v", importRequest.DataSpan) - } + importRes, pErr := kv.SendWrapped(rd.Ctx, rd.flowCtx.Cfg.DB.NonTransactionalSender(), importRequest) + if pErr != nil { + rd.MoveToDraining(errors.Wrapf(pErr.GoError(), "importing span %v", importRequest.DataSpan)) + return nil, rd.DrainHelper() + } - var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress - progDetails := RestoreProgress{} - progDetails.Summary = countRows(importRes.(*roachpb.ImportResponse).Imported, spec.PKIDs) - progDetails.ProgressIdx = entry.ProgressIdx - progDetails.DataSpan = entry.Span - details, err := gogotypes.MarshalAny(&progDetails) - if err != nil { - return err - } - prog.ProgressDetails = *details - progCh <- prog - log.VEventf(ctx, 1 /* level */, "imported span %v", entry.Span) + var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress + progDetails := RestoreProgress{} + progDetails.Summary = countRows(importRes.(*roachpb.ImportResponse).Imported, rd.spec.PKIDs) + progDetails.ProgressIdx = entry.ProgressIdx + progDetails.DataSpan = entry.Span + details, err := gogotypes.MarshalAny(&progDetails) + if err != nil { + rd.MoveToDraining(err) + return nil, rd.DrainHelper() } + prog.ProgressDetails = *details + return nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &prog} +} + +// ConsumerClosed is part of the RowSource interface. +func (rd *restoreDataProcessor) ConsumerClosed() { + rd.close() +} - return nil +func (rd *restoreDataProcessor) close() { + rd.InternalClose() } func init() { diff --git a/pkg/sql/rowexec/processors.go b/pkg/sql/rowexec/processors.go index 9df85b22d579..b7a4d1156ca4 100644 --- a/pkg/sql/rowexec/processors.go +++ b/pkg/sql/rowexec/processors.go @@ -268,7 +268,7 @@ func NewProcessor( if NewRestoreDataProcessor == nil { return nil, errors.New("RestoreData processor unimplemented") } - return NewRestoreDataProcessor(flowCtx, processorID, *core.RestoreData, inputs[0], outputs[0]) + return NewRestoreDataProcessor(flowCtx, processorID, *core.RestoreData, post, inputs[0], outputs[0]) } if core.CSVWriter != nil { if err := checkNumInOut(inputs, outputs, 1, 1); err != nil { @@ -369,7 +369,7 @@ var NewBackupDataProcessor func(*execinfra.FlowCtx, int32, execinfrapb.BackupDat var NewSplitAndScatterProcessor func(*execinfra.FlowCtx, int32, execinfrapb.SplitAndScatterSpec, execinfra.RowReceiver) (execinfra.Processor, error) // NewRestoreDataProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization. -var NewRestoreDataProcessor func(*execinfra.FlowCtx, int32, execinfrapb.RestoreDataSpec, execinfra.RowSource, execinfra.RowReceiver) (execinfra.Processor, error) +var NewRestoreDataProcessor func(*execinfra.FlowCtx, int32, execinfrapb.RestoreDataSpec, *execinfrapb.PostProcessSpec, execinfra.RowSource, execinfra.RowReceiver) (execinfra.Processor, error) // NewCSVWriterProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization. var NewCSVWriterProcessor func(*execinfra.FlowCtx, int32, execinfrapb.CSVWriterSpec, execinfra.RowSource, execinfra.RowReceiver) (execinfra.Processor, error)