Skip to content

Commit

Permalink
Merge #53905
Browse files Browse the repository at this point in the history
53905: backupccl: update RestoreDataProcessor to use ProcessorBase r=yuzefovich,dt a=pbardea

Previously, RestoreDataProcessor would not properly signal to consumers
that it had encountered an error and was closing. This meant that it
would not drain its inputs. This could result in the restore DistSQL
flow becoming stuck, since the SplitAndScatter processor would be
blocked on sending a row to the RestoreDataProcessor which would already
be closed.

Fixes #53900.

Release justification: bug fix
Release note (bug fix): A failure while restoring data, may have
sometimes resulted in the restore job becoming stuck.

Co-authored-by: Paul Bardea <[email protected]>
  • Loading branch information
craig[bot] and pbardea committed Sep 9, 2020
2 parents 357d2c8 + 97f8b95 commit 0c89e7b
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 142 deletions.
121 changes: 84 additions & 37 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ package backupccl

import (
"fmt"
"os"
"path/filepath"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -430,55 +433,99 @@ func TestClusterRestoreFailCleanup(t *testing.T) {

const numAccounts = 1000
_, _, sqlDB, tempDir, cleanupFn := BackupRestoreTestSetup(t, singleNode, numAccounts, InitNone)
_, tcRestore, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty(
t, singleNode, tempDir, InitNone,
)
defer cleanupFn()
defer cleanupEmptyCluster()

// Setup the system systemTablesToVerify to ensure that they are copied to the new cluster.
// Populate system.users.
for i := 0; i < 1000; i++ {
sqlDB.Exec(t, fmt.Sprintf("CREATE USER maxroach%d", i))
}

sqlDB.Exec(t, `BACKUP TO 'nodelocal://1/missing-ssts'`)

// Bugger the backup by removing the SST files. (Note this messes up all of
// the backups, but there is only one at this point.)
if err := filepath.Walk(tempDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
t.Fatal(err)
}
if info.Name() == backupManifestName || !strings.HasSuffix(path, ".sst") {
return nil
}
return os.Remove(path)
}); err != nil {
t.Fatal(err)
}

// Create a non-corrupted backup.
sqlDB.Exec(t, `BACKUP TO $1`, LocalFoo)

// Bugger the backup by injecting a failure while restoring the system data.
for _, server := range tcRestore.Servers {
registry := server.JobRegistry().(*jobs.Registry)
registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*restoreResumer)
r.testingKnobs.duringSystemTableRestoration = func() error {
return errors.New("injected error")
}
return r
t.Run("during restoration of data", func(t *testing.T) {
_, _, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty(
t, singleNode, tempDir, InitNone,
)
defer cleanupEmptyCluster()
sqlDBRestore.ExpectErr(t, "sst: no such file", `RESTORE FROM 'nodelocal://1/missing-ssts'`)
// Verify the failed RESTORE added some DROP tables.
// Note that the system tables here correspond to the temporary tables
// imported, not the system tables themselves.
sqlDBRestore.CheckQueryResults(t,
`SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`,
[][]string{
{"bank"},
{"comments"},
{"jobs"},
{"locations"},
{"role_members"},
{"scheduled_jobs"},
{"settings"},
{"ui"},
{"users"},
{"zones"},
},
)
})

t.Run("during system table restoration", func(t *testing.T) {
_, tcRestore, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty(
t, singleNode, tempDir, InitNone,
)
defer cleanupEmptyCluster()

// Bugger the backup by injecting a failure while restoring the system data.
for _, server := range tcRestore.Servers {
registry := server.JobRegistry().(*jobs.Registry)
registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*restoreResumer)
r.testingKnobs.duringSystemTableRestoration = func() error {
return errors.New("injected error")
}
return r
},
}
}
}

sqlDBRestore.ExpectErr(
t, "injected error",
`RESTORE FROM $1`, LocalFoo,
)
// Verify the failed RESTORE added some DROP tables.
// Note that the system tables here correspond to the temporary tables
// imported, not the system tables themselves.
sqlDBRestore.CheckQueryResults(t,
`SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`,
[][]string{
{"bank"},
{"comments"},
{"jobs"},
{"locations"},
{"role_members"},
{"scheduled_jobs"},
{"settings"},
{"ui"},
{"users"},
{"zones"},
},
)
sqlDBRestore.ExpectErr(t, "injected error", `RESTORE FROM $1`, LocalFoo)
// Verify the failed RESTORE added some DROP tables.
// Note that the system tables here correspond to the temporary tables
// imported, not the system tables themselves.
sqlDBRestore.CheckQueryResults(t,
`SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`,
[][]string{
{"bank"},
{"comments"},
{"jobs"},
{"locations"},
{"role_members"},
{"scheduled_jobs"},
{"settings"},
{"ui"},
{"users"},
{"zones"},
},
)
})
}

// TestClusterRevisionHistory tests that cluster backups can be taken with
Expand Down
Loading

0 comments on commit 0c89e7b

Please sign in to comment.