Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-20.2: backupccl: only restore system tables included in backup #55719

Merged
merged 2 commits into from
Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ CREATE TABLE data2.foo (a int);
// createdAt and statisticsID are re-generated on RESTORE.
query := fmt.Sprintf("SELECT \"tableID\", name, \"columnIDs\", \"rowCount\" FROM system.table_statistics")
verificationQueries[i] = query
case systemschema.SettingsTable.Name:
// We don't include the cluster version.
query := fmt.Sprintf("SELECT * FROM system.%s WHERE name <> 'version'", table)
verificationQueries[i] = query
default:
query := fmt.Sprintf("SELECT * FROM system.%s", table)
verificationQueries[i] = query
Expand Down
73 changes: 56 additions & 17 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,17 @@ func isDatabaseEmpty(
return true, nil
}

func getTempSystemDBID(details jobspb.RestoreDetails) descpb.ID {
tempSystemDBID := keys.MinNonPredefinedUserDescID
for id := range details.DescriptorRewrites {
if int(id) > tempSystemDBID {
tempSystemDBID = int(id)
}
}

return descpb.ID(tempSystemDBID)
}

// createImportingDescriptors create the tables that we will restore into. It also
// fetches the information from the old tables that we need for the restore.
func createImportingDescriptors(
Expand Down Expand Up @@ -837,15 +848,11 @@ func createImportingDescriptors(
types = append(types, mut)
}
}
tempSystemDBID := keys.MinNonPredefinedUserDescID
for id := range details.DescriptorRewrites {
if int(id) > tempSystemDBID {
tempSystemDBID = int(id)
}
}

if details.DescriptorCoverage == tree.AllDescriptors {
databases = append(databases, dbdesc.NewInitial(
descpb.ID(tempSystemDBID), restoreTempSystemDB, security.AdminRole))
tempSystemDBID := getTempSystemDBID(details)
databases = append(databases, dbdesc.NewInitial(tempSystemDBID, restoreTempSystemDB,
security.AdminRole))
}

// We get the spans of the restoring tables _as they appear in the backup_,
Expand Down Expand Up @@ -1199,7 +1206,7 @@ func (r *restoreResumer) Resume(
// restores were a special case, but really we should be making only the
// temporary system tables public before we restore all the system table data.
if details.DescriptorCoverage == tree.AllDescriptors {
if err := r.restoreSystemTables(ctx, p.ExecCfg().DB); err != nil {
if err := r.restoreSystemTables(ctx, p.ExecCfg().DB, details, tables); err != nil {
return err
}
}
Expand Down Expand Up @@ -1768,25 +1775,57 @@ func getRestoringPrivileges(

// restoreSystemTables atomically replaces the contents of the system tables
// with the data from the restored system tables.
func (r *restoreResumer) restoreSystemTables(ctx context.Context, db *kv.DB) error {
func (r *restoreResumer) restoreSystemTables(
ctx context.Context,
db *kv.DB,
restoreDetails jobspb.RestoreDetails,
tables []catalog.TableDescriptor,
) error {
tempSystemDBID := getTempSystemDBID(restoreDetails)

executor := r.execCfg.InternalExecutor
var err error
for _, systemTable := range fullClusterSystemTables {

// Iterate through all the tables that we're restoring, and if it was restored
// to the temporary system DB then copy it's data over to the real system
// table.
for _, table := range tables {
if table.GetParentID() != tempSystemDBID {
continue
}
systemTableName := table.GetName()

if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetDebugName("system-restore-txn")
stmtDebugName := fmt.Sprintf("restore-system-systemTable-%s", systemTable)
stmtDebugName := fmt.Sprintf("restore-system-systemTable-%s", systemTableName)
// Don't clear the jobs table as to not delete the jobs that are performing
// the restore.
if systemTable != systemschema.JobsTable.Name {
deleteQuery := fmt.Sprintf("DELETE FROM system.%s WHERE true;", systemTable)
if systemTableName == systemschema.SettingsTable.Name {
// Don't delete the cluster version.
deleteQuery := fmt.Sprintf("DELETE FROM system.%s WHERE name <> 'version';", systemTableName)
_, err = executor.Exec(ctx, stmtDebugName+"-data-deletion", txn, deleteQuery)
if err != nil {
return errors.Wrapf(err, "deleting data from system.%s", systemTable)
return errors.Wrapf(err, "deleting data from system.%s", systemTableName)
}
} else if systemTableName != systemschema.JobsTable.Name {
// Don't clear the jobs table as to not delete the jobs that are
// performing the restore.
deleteQuery := fmt.Sprintf("DELETE FROM system.%s WHERE true;", systemTableName)
_, err = executor.Exec(ctx, stmtDebugName+"-data-deletion", txn, deleteQuery)
if err != nil {
return errors.Wrapf(err, "deleting data from system.%s", systemTableName)
}
}

restoreQuery := fmt.Sprintf("INSERT INTO system.%s (SELECT * FROM %s.%s);", systemTableName,
restoreTempSystemDB, systemTableName)
if systemTableName == systemschema.SettingsTable.Name {
// Don't overwrite the cluster version.
restoreQuery = fmt.Sprintf("INSERT INTO system.%s (SELECT * FROM %s.%s WHERE name <> 'version');",
systemTableName, restoreTempSystemDB, systemTableName)
}
restoreQuery := fmt.Sprintf("INSERT INTO system.%s (SELECT * FROM %s.%s);", systemTable, restoreTempSystemDB, systemTable)
if _, err := executor.Exec(ctx, stmtDebugName+"-data-insert", txn, restoreQuery); err != nil {
return errors.Wrapf(err, "inserting data to system.%s", systemTable)
return errors.Wrapf(err, "inserting data to system.%s", systemTableName)
}
return nil
}); err != nil {
Expand Down
66 changes: 59 additions & 7 deletions pkg/ccl/backupccl/restore_old_versions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@
package backupccl

import (
"context"
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -46,15 +50,30 @@ func TestRestoreOldVersions(t *testing.T) {
const (
testdataBase = "testdata/restore_old_versions"
exportDirs = testdataBase + "/exports"
clusterDirs = testdataBase + "/cluster"
)
dirs, err := ioutil.ReadDir(exportDirs)
require.NoError(t, err)
for _, dir := range dirs {
require.True(t, dir.IsDir())
exportDir, err := filepath.Abs(filepath.Join(exportDirs, dir.Name()))

t.Run("table-restore", func(t *testing.T) {
dirs, err := ioutil.ReadDir(exportDirs)
require.NoError(t, err)
t.Run(dir.Name(), restoreOldVersionTest(exportDir))
}
for _, dir := range dirs {
require.True(t, dir.IsDir())
exportDir, err := filepath.Abs(filepath.Join(exportDirs, dir.Name()))
require.NoError(t, err)
t.Run(dir.Name(), restoreOldVersionTest(exportDir))
}
})

t.Run("cluster-restore", func(t *testing.T) {
dirs, err := ioutil.ReadDir(clusterDirs)
require.NoError(t, err)
for _, dir := range dirs {
require.True(t, dir.IsDir())
exportDir, err := filepath.Abs(filepath.Join(clusterDirs, dir.Name()))
require.NoError(t, err)
t.Run(dir.Name(), restoreOldVersionClusterTest(exportDir))
}
})
}

func restoreOldVersionTest(exportDir string) func(t *testing.T) {
Expand Down Expand Up @@ -86,3 +105,36 @@ func restoreOldVersionTest(exportDir string) func(t *testing.T) {
sqlDB.CheckQueryResults(t, `SELECT * FROM test.t4 ORDER BY k`, results)
}
}

func restoreOldVersionClusterTest(exportDir string) func(t *testing.T) {
return func(t *testing.T) {
externalDir, dirCleanup := testutils.TempDir(t)
ctx := context.Background()
tc := testcluster.StartTestCluster(t, singleNode, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
ExternalIODir: externalDir,
},
})
sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0])
defer func() {
tc.Stopper().Stop(ctx)
dirCleanup()
}()
err := os.Symlink(exportDir, filepath.Join(externalDir, "foo"))
require.NoError(t, err)

// Ensure that the restore succeeds.
sqlDB.Exec(t, `RESTORE FROM $1`, LocalFoo)

sqlDB.CheckQueryResults(t, "SHOW USERS", [][]string{
{"admin", "", "{}"},
{"craig", "", "{}"},
{"root", "", "{admin}"},
})
sqlDB.CheckQueryResults(t, "SELECT * FROM system.comments", [][]string{
{"0", "52", "0", "database comment string"},
{"1", "53", "0", "table comment string"},
})
sqlDB.CheckQueryResults(t, "SELECT * FROM data.bank", [][]string{{"1"}})
}
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
11 changes: 11 additions & 0 deletions pkg/ccl/backupccl/testdata/restore_old_versions/create_cluster.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE USER craig;
CREATE DATABASE data;
CREATE TABLE data.bank (a INT);
INSERT INTO TABLE data.bank VALUES (1);

COMMENT ON TABLE data.bank IS 'table comment string';
COMMENT ON DATABASE data IS 'database comment string';

INSERT INTO system.locations ("localityKey", "localityValue", latitude, longitude) VALUES ('city', 'nyc', 10, 10);

ALTER TABLE data.bank CONFIGURE ZONE USING gc.ttlseconds = 3600;