Skip to content

Commit

Permalink
Merge pull request cockroachdb#55719 from pbardea/backport20.2-55685
Browse files Browse the repository at this point in the history
release-20.2: backupccl: only restore system tables included in backup
  • Loading branch information
pbardea authored Oct 20, 2020
2 parents 018118f + 4c7d200 commit 314007f
Show file tree
Hide file tree
Showing 16 changed files with 130 additions and 24 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ CREATE TABLE data2.foo (a int);
// createdAt and statisticsID are re-generated on RESTORE.
query := fmt.Sprintf("SELECT \"tableID\", name, \"columnIDs\", \"rowCount\" FROM system.table_statistics")
verificationQueries[i] = query
case systemschema.SettingsTable.Name:
// We don't include the cluster version.
query := fmt.Sprintf("SELECT * FROM system.%s WHERE name <> 'version'", table)
verificationQueries[i] = query
default:
query := fmt.Sprintf("SELECT * FROM system.%s", table)
verificationQueries[i] = query
Expand Down
73 changes: 56 additions & 17 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,17 @@ func isDatabaseEmpty(
return true, nil
}

func getTempSystemDBID(details jobspb.RestoreDetails) descpb.ID {
tempSystemDBID := keys.MinNonPredefinedUserDescID
for id := range details.DescriptorRewrites {
if int(id) > tempSystemDBID {
tempSystemDBID = int(id)
}
}

return descpb.ID(tempSystemDBID)
}

// createImportingDescriptors create the tables that we will restore into. It also
// fetches the information from the old tables that we need for the restore.
func createImportingDescriptors(
Expand Down Expand Up @@ -837,15 +848,11 @@ func createImportingDescriptors(
types = append(types, mut)
}
}
tempSystemDBID := keys.MinNonPredefinedUserDescID
for id := range details.DescriptorRewrites {
if int(id) > tempSystemDBID {
tempSystemDBID = int(id)
}
}

if details.DescriptorCoverage == tree.AllDescriptors {
databases = append(databases, dbdesc.NewInitial(
descpb.ID(tempSystemDBID), restoreTempSystemDB, security.AdminRole))
tempSystemDBID := getTempSystemDBID(details)
databases = append(databases, dbdesc.NewInitial(tempSystemDBID, restoreTempSystemDB,
security.AdminRole))
}

// We get the spans of the restoring tables _as they appear in the backup_,
Expand Down Expand Up @@ -1199,7 +1206,7 @@ func (r *restoreResumer) Resume(
// restores were a special case, but really we should be making only the
// temporary system tables public before we restore all the system table data.
if details.DescriptorCoverage == tree.AllDescriptors {
if err := r.restoreSystemTables(ctx, p.ExecCfg().DB); err != nil {
if err := r.restoreSystemTables(ctx, p.ExecCfg().DB, details, tables); err != nil {
return err
}
}
Expand Down Expand Up @@ -1768,25 +1775,57 @@ func getRestoringPrivileges(

// restoreSystemTables atomically replaces the contents of the system tables
// with the data from the restored system tables.
func (r *restoreResumer) restoreSystemTables(ctx context.Context, db *kv.DB) error {
func (r *restoreResumer) restoreSystemTables(
ctx context.Context,
db *kv.DB,
restoreDetails jobspb.RestoreDetails,
tables []catalog.TableDescriptor,
) error {
tempSystemDBID := getTempSystemDBID(restoreDetails)

executor := r.execCfg.InternalExecutor
var err error
for _, systemTable := range fullClusterSystemTables {

// Iterate through all the tables that we're restoring, and if it was restored
// to the temporary system DB then copy it's data over to the real system
// table.
for _, table := range tables {
if table.GetParentID() != tempSystemDBID {
continue
}
systemTableName := table.GetName()

if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetDebugName("system-restore-txn")
stmtDebugName := fmt.Sprintf("restore-system-systemTable-%s", systemTable)
stmtDebugName := fmt.Sprintf("restore-system-systemTable-%s", systemTableName)
// Don't clear the jobs table as to not delete the jobs that are performing
// the restore.
if systemTable != systemschema.JobsTable.Name {
deleteQuery := fmt.Sprintf("DELETE FROM system.%s WHERE true;", systemTable)
if systemTableName == systemschema.SettingsTable.Name {
// Don't delete the cluster version.
deleteQuery := fmt.Sprintf("DELETE FROM system.%s WHERE name <> 'version';", systemTableName)
_, err = executor.Exec(ctx, stmtDebugName+"-data-deletion", txn, deleteQuery)
if err != nil {
return errors.Wrapf(err, "deleting data from system.%s", systemTable)
return errors.Wrapf(err, "deleting data from system.%s", systemTableName)
}
} else if systemTableName != systemschema.JobsTable.Name {
// Don't clear the jobs table as to not delete the jobs that are
// performing the restore.
deleteQuery := fmt.Sprintf("DELETE FROM system.%s WHERE true;", systemTableName)
_, err = executor.Exec(ctx, stmtDebugName+"-data-deletion", txn, deleteQuery)
if err != nil {
return errors.Wrapf(err, "deleting data from system.%s", systemTableName)
}
}

restoreQuery := fmt.Sprintf("INSERT INTO system.%s (SELECT * FROM %s.%s);", systemTableName,
restoreTempSystemDB, systemTableName)
if systemTableName == systemschema.SettingsTable.Name {
// Don't overwrite the cluster version.
restoreQuery = fmt.Sprintf("INSERT INTO system.%s (SELECT * FROM %s.%s WHERE name <> 'version');",
systemTableName, restoreTempSystemDB, systemTableName)
}
restoreQuery := fmt.Sprintf("INSERT INTO system.%s (SELECT * FROM %s.%s);", systemTable, restoreTempSystemDB, systemTable)
if _, err := executor.Exec(ctx, stmtDebugName+"-data-insert", txn, restoreQuery); err != nil {
return errors.Wrapf(err, "inserting data to system.%s", systemTable)
return errors.Wrapf(err, "inserting data to system.%s", systemTableName)
}
return nil
}); err != nil {
Expand Down
66 changes: 59 additions & 7 deletions pkg/ccl/backupccl/restore_old_versions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@
package backupccl

import (
"context"
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -46,15 +50,30 @@ func TestRestoreOldVersions(t *testing.T) {
const (
testdataBase = "testdata/restore_old_versions"
exportDirs = testdataBase + "/exports"
clusterDirs = testdataBase + "/cluster"
)
dirs, err := ioutil.ReadDir(exportDirs)
require.NoError(t, err)
for _, dir := range dirs {
require.True(t, dir.IsDir())
exportDir, err := filepath.Abs(filepath.Join(exportDirs, dir.Name()))

t.Run("table-restore", func(t *testing.T) {
dirs, err := ioutil.ReadDir(exportDirs)
require.NoError(t, err)
t.Run(dir.Name(), restoreOldVersionTest(exportDir))
}
for _, dir := range dirs {
require.True(t, dir.IsDir())
exportDir, err := filepath.Abs(filepath.Join(exportDirs, dir.Name()))
require.NoError(t, err)
t.Run(dir.Name(), restoreOldVersionTest(exportDir))
}
})

t.Run("cluster-restore", func(t *testing.T) {
dirs, err := ioutil.ReadDir(clusterDirs)
require.NoError(t, err)
for _, dir := range dirs {
require.True(t, dir.IsDir())
exportDir, err := filepath.Abs(filepath.Join(clusterDirs, dir.Name()))
require.NoError(t, err)
t.Run(dir.Name(), restoreOldVersionClusterTest(exportDir))
}
})
}

func restoreOldVersionTest(exportDir string) func(t *testing.T) {
Expand Down Expand Up @@ -86,3 +105,36 @@ func restoreOldVersionTest(exportDir string) func(t *testing.T) {
sqlDB.CheckQueryResults(t, `SELECT * FROM test.t4 ORDER BY k`, results)
}
}

func restoreOldVersionClusterTest(exportDir string) func(t *testing.T) {
return func(t *testing.T) {
externalDir, dirCleanup := testutils.TempDir(t)
ctx := context.Background()
tc := testcluster.StartTestCluster(t, singleNode, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
ExternalIODir: externalDir,
},
})
sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0])
defer func() {
tc.Stopper().Stop(ctx)
dirCleanup()
}()
err := os.Symlink(exportDir, filepath.Join(externalDir, "foo"))
require.NoError(t, err)

// Ensure that the restore succeeds.
sqlDB.Exec(t, `RESTORE FROM $1`, LocalFoo)

sqlDB.CheckQueryResults(t, "SHOW USERS", [][]string{
{"admin", "", "{}"},
{"craig", "", "{}"},
{"root", "", "{admin}"},
})
sqlDB.CheckQueryResults(t, "SELECT * FROM system.comments", [][]string{
{"0", "52", "0", "database comment string"},
{"1", "53", "0", "table comment string"},
})
sqlDB.CheckQueryResults(t, "SELECT * FROM data.bank", [][]string{{"1"}})
}
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
11 changes: 11 additions & 0 deletions pkg/ccl/backupccl/testdata/restore_old_versions/create_cluster.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE USER craig;
CREATE DATABASE data;
CREATE TABLE data.bank (a INT);
INSERT INTO TABLE data.bank VALUES (1);

COMMENT ON TABLE data.bank IS 'table comment string';
COMMENT ON DATABASE data IS 'database comment string';

INSERT INTO system.locations ("localityKey", "localityValue", latitude, longitude) VALUES ('city', 'nyc', 10, 10);

ALTER TABLE data.bank CONFIGURE ZONE USING gc.ttlseconds = 3600;

0 comments on commit 314007f

Please sign in to comment.