Skip to content

Commit

Permalink
backupccl: update RestoreDataProcessor to use ProcessorBase
Browse files Browse the repository at this point in the history
Previously, RestoreDataProcessor would not properly signal to consumers
that it had encountered an error and was closing. This meant that it
would not drain its inputs. This could result in the restore DistSQL
flow becoming stuck, since the SplitAndScatter processor would be
blocked on sending a row to the RestoreDataProcessor which would already
be closed.

Release justification: bug fix
Release note (bug fix): A failure while restoring data, may have
sometimes resulted in the restore job becoming stuck.
  • Loading branch information
pbardea committed Sep 3, 2020
1 parent 9491bf1 commit 89dc041
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 101 deletions.
121 changes: 84 additions & 37 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ package backupccl

import (
"fmt"
"os"
"path/filepath"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -429,53 +432,97 @@ func TestClusterRestoreFailCleanup(t *testing.T) {

const numAccounts = 1000
_, _, sqlDB, tempDir, cleanupFn := BackupRestoreTestSetup(t, singleNode, numAccounts, InitNone)
_, tcRestore, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty(
t, singleNode, tempDir, InitNone,
)
defer cleanupFn()
defer cleanupEmptyCluster()

// Setup the system systemTablesToVerify to ensure that they are copied to the new cluster.
// Populate system.users.
for i := 0; i < 1000; i++ {
sqlDB.Exec(t, fmt.Sprintf("CREATE USER maxroach%d", i))
}

sqlDB.Exec(t, `BACKUP TO 'nodelocal://1/missing-ssts'`)

// Bugger the backup by removing the SST files. (Note this messes up all of
// the backups, but there is only one at this point.)
if err := filepath.Walk(tempDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
t.Fatal(err)
}
if info.Name() == backupManifestName || !strings.HasSuffix(path, ".sst") {
return nil
}
return os.Remove(path)
}); err != nil {
t.Fatal(err)
}

// Create a non-corrupted backup.
sqlDB.Exec(t, `BACKUP TO $1`, LocalFoo)

// Bugger the backup by injecting a failure while restoring the system data.
for _, server := range tcRestore.Servers {
registry := server.JobRegistry().(*jobs.Registry)
registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*restoreResumer)
r.testingKnobs.duringSystemTableRestoration = func() error {
return errors.New("injected error")
}
return r
t.Run("during restoration of data", func(t *testing.T) {
_, _, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty(
t, singleNode, tempDir, InitNone,
)
defer cleanupEmptyCluster()
sqlDBRestore.ExpectErr(t, "sst: no such file", `RESTORE FROM 'nodelocal://1/missing-ssts'`)
// Verify the failed RESTORE added some DROP tables.
// Note that the system tables here correspond to the temporary tables
// imported, not the system tables themselves.
sqlDBRestore.CheckQueryResults(t,
`SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`,
[][]string{
{"bank"},
{"comments"},
{"jobs"},
{"locations"},
{"role_members"},
{"scheduled_jobs"},
{"settings"},
{"ui"},
{"users"},
{"zones"},
},
)
})

t.Run("after restoring data", func(t *testing.T) {
_, tcRestore, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty(
t, singleNode, tempDir, InitNone,
)
defer cleanupEmptyCluster()

// Bugger the backup by injecting a failure while restoring the system data.
for _, server := range tcRestore.Servers {
registry := server.JobRegistry().(*jobs.Registry)
registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*restoreResumer)
r.testingKnobs.duringSystemTableRestoration = func() error {
return errors.New("injected error")
}
return r
},
}
}
}

sqlDBRestore.ExpectErr(
t, "injected error",
`RESTORE FROM $1`, LocalFoo,
)
// Verify the failed RESTORE added some DROP tables.
// Note that the system tables here correspond to the temporary tables
// imported, not the system tables themselves.
sqlDBRestore.CheckQueryResults(t,
`SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`,
[][]string{
{"bank"},
{"comments"},
{"jobs"},
{"locations"},
{"role_members"},
{"scheduled_jobs"},
{"settings"},
{"ui"},
{"users"},
{"zones"},
},
)
sqlDBRestore.ExpectErr(t, "injected error", `RESTORE FROM $1`, LocalFoo)
// Verify the failed RESTORE added some DROP tables.
// Note that the system tables here correspond to the temporary tables
// imported, not the system tables themselves.
sqlDBRestore.CheckQueryResults(t,
`SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`,
[][]string{
{"bank"},
{"comments"},
{"jobs"},
{"locations"},
{"role_members"},
{"scheduled_jobs"},
{"settings"},
{"ui"},
{"users"},
{"zones"},
},
)
})
}
138 changes: 76 additions & 62 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/kv"
roachpb "github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
Expand All @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
gogotypes "github.com/gogo/protobuf/types"
)
Expand All @@ -31,13 +30,22 @@ import (
var restoreDataOutputTypes = []*types.T{}

type restoreDataProcessor struct {
execinfra.ProcessorBase

flowCtx *execinfra.FlowCtx
spec execinfrapb.RestoreDataSpec
input execinfra.RowSource
output execinfra.RowReceiver

progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
alloc rowenc.DatumAlloc
kr *storageccl.KeyRewriter
}

var _ execinfra.Processor = &restoreDataProcessor{}
var _ execinfra.RowSource = &restoreDataProcessor{}

const restoreDataProcName = "restoreDataProcessor"

// OutputTypes implements the execinfra.Processor interface.
func (rd *restoreDataProcessor) OutputTypes() []*types.T {
Expand All @@ -46,8 +54,9 @@ func (rd *restoreDataProcessor) OutputTypes() []*types.T {

func newRestoreDataProcessor(
flowCtx *execinfra.FlowCtx,
_ int32,
processorID int32,
spec execinfrapb.RestoreDataSpec,
post *execinfrapb.PostProcessSpec,
input execinfra.RowSource,
output execinfra.RowReceiver,
) (execinfra.Processor, error) {
Expand All @@ -57,119 +66,124 @@ func newRestoreDataProcessor(
spec: spec,
output: output,
}
return rd, nil
}

// Run implements the execinfra.Processor interface.
func (rd *restoreDataProcessor) Run(ctx context.Context) {
ctx, span := tracing.ChildSpan(ctx, "restoreDataProcessor")
defer tracing.FinishSpan(span)
defer rd.output.ProducerDone()

progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)

// We don't have to worry about this go routine leaking because next we loop over progCh
// which is closed only after the goroutine returns.
var err error
go func() {
defer close(progCh)
err = runRestoreData(ctx, rd.flowCtx, &rd.spec, rd.input, progCh)
}()

for prog := range progCh {
// Take a copy so that we can send the progress address to the output processor.
p := prog
rd.output.Push(nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &p})
rd.kr, err = storageccl.MakeKeyRewriterFromRekeys(rd.spec.Rekeys)
if err != nil {
return nil, err
}

if err != nil {
rd.output.Push(nil, &execinfrapb.ProducerMetadata{Err: err})
return
// We don't have to worry about this go routine leaking because next we loop over progCh
// which is closed only after the goroutine returns.
rd.progCh = make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)

if err := rd.Init(rd, post, rd.OutputTypes(), flowCtx, processorID, output, nil, /* memMonitor */
execinfra.ProcStateOpts{
InputsToDrain: []execinfra.RowSource{input},
TrailingMetaCallback: func(context.Context) []execinfrapb.ProducerMetadata {
rd.close()
return nil
},
}); err != nil {
return nil, err
}
return rd, nil
}

func runRestoreData(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
spec *execinfrapb.RestoreDataSpec,
input execinfra.RowSource,
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
) error {
input.Start(ctx)
kr, err := storageccl.MakeKeyRewriterFromRekeys(spec.Rekeys)
if err != nil {
return err
}
// Start is part of the RowSource interface.
func (rd *restoreDataProcessor) Start(ctx context.Context) context.Context {
rd.input.Start(ctx)
return rd.StartInternal(ctx, restoreDataProcName)
}

alloc := &rowenc.DatumAlloc{}
for {
// Next is part of the RowSource interface.
func (rd *restoreDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
for rd.State == execinfra.StateRunning {
// We read rows from the SplitAndScatter processor. We expect each row to
// contain 2 columns. The first is used to route the row to this processor,
// and the second contains the RestoreSpanEntry that we're interested in.
row, meta := input.Next()
row, meta := rd.input.Next()
if meta != nil {
return errors.Newf("unexpected metadata %+v", meta)
if meta.Err != nil {
rd.MoveToDraining(nil /* err */)
}
return nil, meta
}
if row == nil {
// Done.
rd.MoveToDraining(nil /* err */)
break
}

if len(row) != 2 {
return errors.New("expected input rows to have exactly 2 columns")
rd.MoveToDraining(errors.New("expected input rows to have exactly 2 columns"))
break
}
if err := row[1].EnsureDecoded(types.Bytes, alloc); err != nil {
return err
if err := row[1].EnsureDecoded(types.Bytes, &rd.alloc); err != nil {
rd.MoveToDraining(err)
break
}
datum := row[1].Datum
entryDatumBytes, ok := datum.(*tree.DBytes)
if !ok {
return errors.AssertionFailedf(`unexpected datum type %T: %+v`, datum, row)
rd.MoveToDraining(errors.AssertionFailedf(`unexpected datum type %T: %+v`, datum, row))
break
}

var entry execinfrapb.RestoreSpanEntry
if err := protoutil.Unmarshal([]byte(*entryDatumBytes), &entry); err != nil {
return errors.Wrap(err, "un-marshaling restore span entry")
rd.MoveToDraining(errors.Wrap(err, "un-marshaling restore span entry"))
break
}

newSpanKey, err := rewriteBackupSpanKey(kr, entry.Span.Key)
newSpanKey, err := rewriteBackupSpanKey(rd.kr, entry.Span.Key)
if err != nil {
return errors.Wrap(err, "re-writing span key to import")
rd.MoveToDraining(errors.Wrap(err, "re-writing span key to import"))
break
}

log.VEventf(ctx, 1 /* level */, "importing span %v", entry.Span)
log.VEventf(rd.Ctx, 1 /* level */, "importing span %v", entry.Span)
importRequest := &roachpb.ImportRequest{
// Import is a point request because we don't want DistSender to split
// it. Assume (but don't require) the entire post-rewrite span is on the
// same range.
RequestHeader: roachpb.RequestHeader{Key: newSpanKey},
DataSpan: entry.Span,
Files: entry.Files,
EndTime: spec.RestoreTime,
Rekeys: spec.Rekeys,
Encryption: spec.Encryption,
EndTime: rd.spec.RestoreTime,
Rekeys: rd.spec.Rekeys,
Encryption: rd.spec.Encryption,
}

importRes, pErr := kv.SendWrapped(ctx, flowCtx.Cfg.DB.NonTransactionalSender(), importRequest)
importRes, pErr := kv.SendWrapped(rd.Ctx, rd.flowCtx.Cfg.DB.NonTransactionalSender(), importRequest)
if pErr != nil {
return errors.Wrapf(pErr.GoError(), "importing span %v", importRequest.DataSpan)
rd.MoveToDraining(errors.Wrapf(pErr.GoError(), "importing span %v", importRequest.DataSpan))
break
}

var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
progDetails := RestoreProgress{}
progDetails.Summary = countRows(importRes.(*roachpb.ImportResponse).Imported, spec.PKIDs)
progDetails.Summary = countRows(importRes.(*roachpb.ImportResponse).Imported, rd.spec.PKIDs)
progDetails.ProgressIdx = entry.ProgressIdx
progDetails.DataSpan = entry.Span
details, err := gogotypes.MarshalAny(&progDetails)
if err != nil {
return err
rd.MoveToDraining(err)
break
}
prog.ProgressDetails = *details
progCh <- prog
log.VEventf(ctx, 1 /* level */, "imported span %v", entry.Span)
return nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &prog}
}

return nil
return nil, rd.DrainHelper()
}

// ConsumerClosed is part of the RowSource interface.
func (rd *restoreDataProcessor) ConsumerClosed() {
rd.close()
}

func (rd *restoreDataProcessor) close() {
rd.InternalClose()
}

func init() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/rowexec/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func NewProcessor(
if NewRestoreDataProcessor == nil {
return nil, errors.New("RestoreData processor unimplemented")
}
return NewRestoreDataProcessor(flowCtx, processorID, *core.RestoreData, inputs[0], outputs[0])
return NewRestoreDataProcessor(flowCtx, processorID, *core.RestoreData, post, inputs[0], outputs[0])
}
if core.CSVWriter != nil {
if err := checkNumInOut(inputs, outputs, 1, 1); err != nil {
Expand Down Expand Up @@ -369,7 +369,7 @@ var NewBackupDataProcessor func(*execinfra.FlowCtx, int32, execinfrapb.BackupDat
var NewSplitAndScatterProcessor func(*execinfra.FlowCtx, int32, execinfrapb.SplitAndScatterSpec, execinfra.RowReceiver) (execinfra.Processor, error)

// NewRestoreDataProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization.
var NewRestoreDataProcessor func(*execinfra.FlowCtx, int32, execinfrapb.RestoreDataSpec, execinfra.RowSource, execinfra.RowReceiver) (execinfra.Processor, error)
var NewRestoreDataProcessor func(*execinfra.FlowCtx, int32, execinfrapb.RestoreDataSpec, *execinfrapb.PostProcessSpec, execinfra.RowSource, execinfra.RowReceiver) (execinfra.Processor, error)

// NewCSVWriterProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization.
var NewCSVWriterProcessor func(*execinfra.FlowCtx, int32, execinfrapb.CSVWriterSpec, execinfra.RowSource, execinfra.RowReceiver) (execinfra.Processor, error)
Expand Down

0 comments on commit 89dc041

Please sign in to comment.