Skip to content

Commit

Permalink
backupccl: update RestoreDataProcessor to use ProcessorBase
Browse files Browse the repository at this point in the history
Previously, RestoreDataProcessor would not properly signal to consumers
that it had encountered an error and was closing. This meant that it
would not drain its inputs. This could result in the restore DistSQL
flow becoming stuck, since the SplitAndScatter processor would be
blocked on sending a row to the RestoreDataProcessor which would already
be closed.

Release justification: bug fix
Release note (bug fix): A failure while restoring data, may have
sometimes resulted in the restore job becoming stuck. This bug was only
present on 20.2 alphas and betas.
  • Loading branch information
pbardea committed Sep 16, 2020
1 parent 009e4b9 commit 0ecdb40
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 139 deletions.
136 changes: 102 additions & 34 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ package backupccl

import (
"fmt"
"os"
"path/filepath"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -430,52 +433,99 @@ func TestClusterRestoreFailCleanup(t *testing.T) {

const numAccounts = 1000
_, _, sqlDB, tempDir, cleanupFn := BackupRestoreTestSetup(t, singleNode, numAccounts, InitNone)
_, tcRestore, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty(
t, singleNode, tempDir, InitNone,
)
defer cleanupFn()
defer cleanupEmptyCluster()

// Setup the system systemTablesToVerify to ensure that they are copied to the new cluster.
// Populate system.users.
for i := 0; i < 1000; i++ {
sqlDB.Exec(t, fmt.Sprintf("CREATE USER maxroach%d", i))
}

sqlDB.Exec(t, `BACKUP TO 'nodelocal://1/missing-ssts'`)

// Bugger the backup by removing the SST files. (Note this messes up all of
// the backups, but there is only one at this point.)
if err := filepath.Walk(tempDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
t.Fatal(err)
}
if info.Name() == backupManifestName || !strings.HasSuffix(path, ".sst") {
return nil
}
return os.Remove(path)
}); err != nil {
t.Fatal(err)
}

// Create a non-corrupted backup.
sqlDB.Exec(t, `BACKUP TO $1`, LocalFoo)

// Bugger the backup by injecting a failure while restoring the system data.
for _, server := range tcRestore.Servers {
registry := server.JobRegistry().(*jobs.Registry)
registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*restoreResumer)
r.testingKnobs.duringSystemTableRestoration = func() error {
return errors.New("injected error")
}
return r
t.Run("during restoration of data", func(t *testing.T) {
_, _, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty(
t, singleNode, tempDir, InitNone,
)
defer cleanupEmptyCluster()
sqlDBRestore.ExpectErr(t, "sst: no such file", `RESTORE FROM 'nodelocal://1/missing-ssts'`)
// Verify the failed RESTORE added some DROP tables.
// Note that the system tables here correspond to the temporary tables
// imported, not the system tables themselves.
sqlDBRestore.CheckQueryResults(t,
`SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`,
[][]string{
{"bank"},
{"comments"},
{"jobs"},
{"locations"},
{"role_members"},
{"scheduled_jobs"},
{"settings"},
{"ui"},
{"users"},
{"zones"},
},
)
})

t.Run("during system table restoration", func(t *testing.T) {
_, tcRestore, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty(
t, singleNode, tempDir, InitNone,
)
defer cleanupEmptyCluster()

// Bugger the backup by injecting a failure while restoring the system data.
for _, server := range tcRestore.Servers {
registry := server.JobRegistry().(*jobs.Registry)
registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeRestore: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*restoreResumer)
r.testingKnobs.duringSystemTableRestoration = func() error {
return errors.New("injected error")
}
return r
},
}
}
}

sqlDBRestore.ExpectErr(t, "injected error", `RESTORE FROM $1`, LocalFoo)
// Verify the failed RESTORE added some DROP tables.
// Note that the system tables here correspond to the temporary tables
// imported, not the system tables themselves.
sqlDBRestore.CheckQueryResults(t,
`SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`,
[][]string{
{"bank"},
{"comments"},
{"jobs"},
{"locations"},
{"role_members"},
{"scheduled_jobs"},
{"settings"},
{"ui"},
{"users"},
{"zones"},
},
)
sqlDBRestore.ExpectErr(t, "injected error", `RESTORE FROM $1`, LocalFoo)
// Verify the failed RESTORE added some DROP tables.
// Note that the system tables here correspond to the temporary tables
// imported, not the system tables themselves.
sqlDBRestore.CheckQueryResults(t,
`SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`,
[][]string{
{"bank"},
{"comments"},
{"jobs"},
{"locations"},
{"role_members"},
{"scheduled_jobs"},
{"settings"},
{"ui"},
{"users"},
{"zones"},
},
)
})

t.Run("after offline tables", func(t *testing.T) {
_, tcRestore, sqlDBRestore, cleanupEmptyCluster := backupRestoreTestSetupEmpty(
Expand All @@ -498,6 +548,24 @@ func TestClusterRestoreFailCleanup(t *testing.T) {
}

sqlDBRestore.ExpectErr(t, "injected error", `RESTORE FROM $1`, LocalFoo)
// Verify the failed RESTORE added some DROP tables.
// Note that the system tables here correspond to the temporary tables
// imported, not the system tables themselves.
sqlDBRestore.CheckQueryResults(t,
`SELECT name FROM crdb_internal.tables WHERE state = 'DROP' ORDER BY name`,
[][]string{
{"bank"},
{"comments"},
{"jobs"},
{"locations"},
{"role_members"},
{"scheduled_jobs"},
{"settings"},
{"ui"},
{"users"},
{"zones"},
},
)
})
}

Expand Down
Loading

0 comments on commit 0ecdb40

Please sign in to comment.