Skip to content

Commit

Permalink
backupccl: update RestoreDataProcessor to use ProcessorBase
Browse files Browse the repository at this point in the history
Previously, RestoreDataProcessor would not properly signal to consumers
that it had encountered an error and was closing. This meant that it
would not drain its inputs. This could result in the restore DistSQL
flow becoming stuck, since the SplitAndScatter processor would be
blocked on sending a row to the RestoreDataProcessor which would already
be closed.

Release justification: bug fix
Release note (bug fix): A failure while restoring data, may have
sometimes resulted in the restore job becoming stuck. This bug was only
present on 20.2 alphas and betas.
  • Loading branch information
pbardea committed Sep 8, 2020
1 parent fcb30c7 commit 97f8b95
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 142 deletions.
121 changes: 84 additions & 37 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ package backupccl

import (
"fmt"
"os"
"path/filepath"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -430,55 +433,99 @@ func TestClusterRestoreFailCleanup(t *testing.T) {

const numAccounts = 1000
_, _, sqlDB, tempDir, cleanupFn := BackupRestoreTestSetup(t, singleNode, numAccounts, InitNone)
_, tcRestore, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty(
t, singleNode, tempDir, InitNone,
)
defer cleanupFn()
defer cleanupEmptyCluster()

// Setup the system systemTablesToVerify to ensure that they are copied to the new cluster.
// Populate system.users.
for i := 0; i < 1000; i++ {
sqlDB.Exec(t, fmt.Sprintf("CREATE USER maxroach%d", i))
}

sqlDB.Exec(t, `BACKUP TO 'nodelocal://1/missing-ssts'`)

// Bugger the backup by removing the SST files. (Note this messes up all of
// the backups, but there is only one at this point.)
if err := filepath.Walk(tempDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
t.Fatal(err)
}
if info.Name() == backupManifestName || !strings.HasSuffix(path, ".sst") {
return nil
}
return os.Remove(path)
}); err != nil {
t.Fatal(err)
}

// Create a non-corrupted backup.
sqlDB.Exec(t, `BACKUP TO $1`, LocalFoo)

// Bugger the backup by injecting a failure while restoring the system data.
for _, server := range tcRestore.Servers {
registry := server.JobRegistry().(*jobs.Registry)
registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*restoreResumer)
r.testingKnobs.duringSystemTableRestoration = func() error {
return errors.New("injected error")
}
return r
t.Run("during restoration of data", func(t *testing.T) {
_, _, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty(
t, singleNode, tempDir, InitNone,
)
defer cleanupEmptyCluster()
sqlDBRestore.ExpectErr(t, "sst: no such file", `RESTORE FROM 'nodelocal://1/missing-ssts'`)
// Verify the failed RESTORE added some DROP tables.
// Note that the system tables here correspond to the temporary tables
// imported, not the system tables themselves.
sqlDBRestore.CheckQueryResults(t,
`SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`,
[][]string{
{"bank"},
{"comments"},
{"jobs"},
{"locations"},
{"role_members"},
{"scheduled_jobs"},
{"settings"},
{"ui"},
{"users"},
{"zones"},
},
)
})

t.Run("during system table restoration", func(t *testing.T) {
_, tcRestore, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty(
t, singleNode, tempDir, InitNone,
)
defer cleanupEmptyCluster()

// Bugger the backup by injecting a failure while restoring the system data.
for _, server := range tcRestore.Servers {
registry := server.JobRegistry().(*jobs.Registry)
registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*restoreResumer)
r.testingKnobs.duringSystemTableRestoration = func() error {
return errors.New("injected error")
}
return r
},
}
}
}

sqlDBRestore.ExpectErr(
t, "injected error",
`RESTORE FROM $1`, LocalFoo,
)
// Verify the failed RESTORE added some DROP tables.
// Note that the system tables here correspond to the temporary tables
// imported, not the system tables themselves.
sqlDBRestore.CheckQueryResults(t,
`SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`,
[][]string{
{"bank"},
{"comments"},
{"jobs"},
{"locations"},
{"role_members"},
{"scheduled_jobs"},
{"settings"},
{"ui"},
{"users"},
{"zones"},
},
)
sqlDBRestore.ExpectErr(t, "injected error", `RESTORE FROM $1`, LocalFoo)
// Verify the failed RESTORE added some DROP tables.
// Note that the system tables here correspond to the temporary tables
// imported, not the system tables themselves.
sqlDBRestore.CheckQueryResults(t,
`SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`,
[][]string{
{"bank"},
{"comments"},
{"jobs"},
{"locations"},
{"role_members"},
{"scheduled_jobs"},
{"settings"},
{"ui"},
{"users"},
{"zones"},
},
)
})
}

// TestClusterRevisionHistory tests that cluster backups can be taken with
Expand Down
Loading

0 comments on commit 97f8b95

Please sign in to comment.