Skip to content

Commit

Permalink
backupccl: backup/restore handle comments
Browse files Browse the repository at this point in the history
Fixes cockroachdb#44396

Release note: Enterprise `BACKUP` does capture database, table,column and index comments
  • Loading branch information
hueypark committed May 11, 2020
1 parent 329b0f5 commit f47dd60
Show file tree
Hide file tree
Showing 10 changed files with 365 additions and 164 deletions.
232 changes: 143 additions & 89 deletions pkg/ccl/backupccl/backup.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/ccl/backupccl/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ message BackupManifest {
repeated sql.stats.TableStatisticProto statistics = 21;
int32 descriptor_coverage = 22 [
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/sem/tree.DescriptorCoverage"];
repeated string stmts = 23;
}

message BackupPartitionDescriptor{
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/backupccl/backup_cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestCloudBackupRestoreS3(t *testing.T) {
values.Add(cloud.S3SecretParam, creds.SecretAccessKey)
uri.RawQuery = values.Encode()

backupAndRestore(ctx, t, tc, []string{uri.String()}, []string{uri.String()}, numAccounts)
backupAndRestore(ctx, t, tc, []string{uri.String()}, []string{uri.String()}, numAccounts, numAccounts)
}

// TestBackupRestoreGoogleCloudStorage hits the real GCS and so could
Expand All @@ -77,7 +77,7 @@ func TestCloudBackupRestoreGoogleCloudStorage(t *testing.T) {
defer cleanupFn()
prefix := fmt.Sprintf("TestBackupRestoreGoogleCloudStorage-%d", timeutil.Now().UnixNano())
uri := url.URL{Scheme: "gs", Host: bucket, Path: prefix}
backupAndRestore(ctx, t, tc, []string{uri.String()}, []string{uri.String()}, numAccounts)
backupAndRestore(ctx, t, tc, []string{uri.String()}, []string{uri.String()}, numAccounts, numAccounts)
}

// TestBackupRestoreAzure hits the real Azure Blob Storage and so could
Expand Down Expand Up @@ -108,5 +108,5 @@ func TestCloudBackupRestoreAzure(t *testing.T) {
values.Add(cloud.AzureAccountKeyParam, accountKey)
uri.RawQuery = values.Encode()

backupAndRestore(ctx, t, tc, []string{uri.String()}, []string{uri.String()}, numAccounts)
backupAndRestore(ctx, t, tc, []string{uri.String()}, []string{uri.String()}, numAccounts, numAccounts)
}
71 changes: 70 additions & 1 deletion pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package backupccl

import (
"context"
"fmt"
"net/url"
"sort"

Expand Down Expand Up @@ -63,7 +64,6 @@ var fullClusterSystemTables = []string{
sqlbase.LocationsTable.Name,
sqlbase.RoleMembersTable.Name,
sqlbase.UITable.Name,
sqlbase.CommentsTable.Name,
sqlbase.JobsTable.Name,
// Table statistics are backed up in the backup descriptor for now.
}
Expand Down Expand Up @@ -344,11 +344,14 @@ func backupPlanHook(
statsCache := p.ExecCfg().TableStatsCache
tableStatistics := make([]*stats.TableStatisticProto, 0)
var tables []*sqlbase.TableDescriptor
databases := make(map[sqlbase.ID]*sqlbase.DatabaseDescriptor)
for _, desc := range targetDescs {
if dbDesc := desc.GetDatabase(); dbDesc != nil {
if err := p.CheckPrivilege(ctx, dbDesc, privilege.SELECT); err != nil {
return err
}

databases[dbDesc.ID] = dbDesc
}
if tableDesc := desc.Table(hlc.Timestamp{}); tableDesc != nil {
if err := p.CheckPrivilege(ctx, tableDesc, privilege.SELECT); err != nil {
Expand Down Expand Up @@ -575,6 +578,11 @@ func backupPlanHook(
return err
}

stmts, err := generateCommentStmts(ctx, p, databases, tables)
if err != nil {
return err
}

// if CompleteDbs is lost by a 1.x node, FormatDescriptorTrackingVersion
// means that a 2.0 node will disallow `RESTORE DATABASE foo`, but `RESTORE
// foo.table1, foo.table2...` will still work. MVCCFilter would be
Expand All @@ -597,6 +605,7 @@ func backupPlanHook(
ClusterID: p.ExecCfg().ClusterID(),
Statistics: tableStatistics,
DescriptorCoverage: backupStmt.DescriptorCoverage,
Stmts: stmts,
}

// Sanity check: re-run the validation that RESTORE will do, but this time
Expand Down Expand Up @@ -783,6 +792,66 @@ func checkForNewTables(
return nil
}

// generateCommentStmts generates comment statements.
func generateCommentStmts(
ctx context.Context,
p sql.PlanHookState,
databases map[sqlbase.ID]*sqlbase.DatabaseDescriptor,
tables []*sqlbase.TableDescriptor,
) (stmts []string, err error) {
query := fmt.Sprintf(
"SELECT object_id, comment FROM system.comments WHERE type = %d", keys.DatabaseCommentType)

databaseComments, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.Query(
ctx,
"select-database-comment",
nil,
query)
if err != nil {
return nil, err
}

for _, comment := range databaseComments {
objID := sqlbase.ID(tree.MustBeDInt(comment[0]))
if database, ok := databases[objID]; ok {
stmts = append(stmts, fmt.Sprintf("COMMENT ON DATABASE %s IS %s", database.Name, comment[1]))
}
}

for _, table := range tables {
tc := sql.SelectComment(ctx, p, table.ID)
if tc == nil {
continue
}

databaseName := databases[table.ParentID].Name

if tc.Comment != nil {
stmts = append(stmts, fmt.Sprintf("COMMENT ON TABLE %s.%s IS '%s'", databaseName, table.Name, *tc.Comment))
}

for _, columnComment := range tc.Columns {
col, err := table.FindColumnByID(sqlbase.ColumnID(columnComment.SubID))
if err != nil {
return nil, err
}

stmts = append(stmts, fmt.Sprintf("COMMENT ON COLUMN %s.%s.%s IS '%s'", databaseName, table.Name, col.Name, columnComment.Comment))
}

for _, indexComment := range tc.Indexes {
idx, err := table.FindIndexByID(sqlbase.IndexID(indexComment.SubID))
if err != nil {
return nil, err
}

stmts = append(stmts, fmt.Sprintf("COMMENT ON INDEX %s.%s IS '%s'", databaseName, idx.Name, indexComment.Comment))
}
}

return stmts, nil
}

func init() {
sql.AddPlanHook(backupPlanHook)
}
62 changes: 51 additions & 11 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func TestBackupRestoreSingleNodeLocal(t *testing.T) {
ctx, tc, _, _, cleanupFn := backupRestoreTestSetup(t, singleNode, numAccounts, initNone)
defer cleanupFn()

backupAndRestore(ctx, t, tc, []string{localFoo}, []string{localFoo}, numAccounts)
backupAndRestore(ctx, t, tc, []string{localFoo}, []string{localFoo}, numAccounts, numAccounts)
}

func TestBackupRestoreMultiNodeLocal(t *testing.T) {
Expand All @@ -310,7 +310,7 @@ func TestBackupRestoreMultiNodeLocal(t *testing.T) {
ctx, tc, _, _, cleanupFn := backupRestoreTestSetup(t, multiNode, numAccounts, initNone)
defer cleanupFn()

backupAndRestore(ctx, t, tc, []string{localFoo}, []string{localFoo}, numAccounts)
backupAndRestore(ctx, t, tc, []string{localFoo}, []string{localFoo}, numAccounts, numAccounts)
}

func TestBackupRestoreMultiNodeRemote(t *testing.T) {
Expand All @@ -322,7 +322,7 @@ func TestBackupRestoreMultiNodeRemote(t *testing.T) {
// Backing up to node2's local file system
remoteFoo := "nodelocal://2/foo"

backupAndRestore(ctx, t, tc, []string{remoteFoo}, []string{localFoo}, numAccounts)
backupAndRestore(ctx, t, tc, []string{remoteFoo}, []string{localFoo}, numAccounts, numAccounts)
}

func TestBackupRestorePartitioned(t *testing.T) {
Expand Down Expand Up @@ -359,7 +359,7 @@ func TestBackupRestorePartitioned(t *testing.T) {
localFoo2,
localFoo3,
}
backupAndRestore(ctx, t, tc, backupURIs, restoreURIs, numAccounts)
backupAndRestore(ctx, t, tc, backupURIs, restoreURIs, numAccounts, numAccounts)

// Verify that at least one SST exists in each backup destination.
sstMatcher := regexp.MustCompile(`\d+\.sst`)
Expand Down Expand Up @@ -456,7 +456,7 @@ func TestBackupRestorePartitionedMergeDirectories(t *testing.T) {
restoreURIs := []string{
localFoo1,
}
backupAndRestore(ctx, t, tc, backupURIs, restoreURIs, numAccounts)
backupAndRestore(ctx, t, tc, backupURIs, restoreURIs, numAccounts, numAccounts)
}

func TestBackupRestoreEmpty(t *testing.T) {
Expand All @@ -466,7 +466,7 @@ func TestBackupRestoreEmpty(t *testing.T) {
ctx, tc, _, _, cleanupFn := backupRestoreTestSetup(t, singleNode, numAccounts, initNone)
defer cleanupFn()

backupAndRestore(ctx, t, tc, []string{localFoo}, []string{localFoo}, numAccounts)
backupAndRestore(ctx, t, tc, []string{localFoo}, []string{localFoo}, numAccounts, numAccounts)
}

// Regression test for #16008. In short, the way RESTORE constructed split keys
Expand All @@ -488,7 +488,7 @@ func TestBackupRestoreNegativePrimaryKey(t *testing.T) {
-numAccounts/2, numAccounts/backupRestoreDefaultRanges/2,
)

backupAndRestore(ctx, t, tc, []string{localFoo}, []string{localFoo}, numAccounts)
backupAndRestore(ctx, t, tc, []string{localFoo}, []string{localFoo}, numAccounts, numAccounts)

sqlDB.Exec(t, `CREATE UNIQUE INDEX id2 ON data.bank (id)`)
sqlDB.Exec(t, `ALTER TABLE data.bank ALTER PRIMARY KEY USING COLUMNS(id)`)
Expand All @@ -515,6 +515,7 @@ func backupAndRestore(
backupURIs []string,
restoreURIs []string,
numAccounts int,
numIdxs int64,
) {
// uriFmtStringAndArgs returns format strings like "$1" or "($1, $2, $3)" and
// an []interface{} of URIs for the BACKUP/RESTORE queries.
Expand All @@ -539,15 +540,19 @@ func backupAndRestore(

conn := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(conn)
var expectedCreateTable string
var expectedDatabaseComment gosql.NullString
const selectDatabaseCommentQuery = "SELECT shobj_description(oid, 'pg_database') FROM pg_database WHERE datname = 'data'"
{
sqlDB.Exec(t, `CREATE INDEX balance_idx ON data.bank (balance)`)
testutils.SucceedsSoon(t, func() error {
var unused string
var createTable string
sqlDB.QueryRow(t, `SHOW CREATE TABLE data.bank`).Scan(&unused, &createTable)
if !strings.Contains(createTable, "balance_idx") {
sqlDB.QueryRow(t, `SHOW CREATE TABLE data.bank`).Scan(&unused, &expectedCreateTable)
if !strings.Contains(expectedCreateTable, "balance_idx") {
return errors.New("expected a balance_idx index")
}

sqlDB.QueryRow(t, selectDatabaseCommentQuery).Scan(&expectedDatabaseComment)
return nil
})

Expand Down Expand Up @@ -638,6 +643,8 @@ func backupAndRestore(
var unused string
var restored struct {
rows, idx, bytes int64
createTable string
databaseComment gosql.NullString
}

restoreURIFmtString, restoreURIArgs := uriFmtStringAndArgs(restoreURIs)
Expand All @@ -652,10 +659,20 @@ func backupAndRestore(
if expected := int64(numAccounts); restored.rows != expected {
t.Fatalf("expected %d rows for %d accounts, got %d", expected, numAccounts, restored.rows)
}
if expected := int64(numAccounts); restored.idx != expected {
if expected := numIdxs; restored.idx != expected {
t.Fatalf("expected %d idx rows for %d accounts, got %d", expected, numAccounts, restored.idx)
}

sqlDBRestore.QueryRow(t, `SHOW CREATE TABLE data.bank`).Scan(&unused, &restored.createTable)
if expectedCreateTable != restored.createTable {
t.Fatalf("expected %s for SHOW CREATE TABLE, got %s", expectedCreateTable, restored.createTable)
}

sqlDBRestore.QueryRow(t, selectDatabaseCommentQuery).Scan(&restored.databaseComment)
if expectedDatabaseComment != restored.databaseComment {
t.Fatalf("expected %v for database comment, got %v", expectedDatabaseComment, restored.databaseComment)
}

var rowCount int64
sqlDBRestore.QueryRow(t, `SELECT count(*) FROM data.bank`).Scan(&rowCount)
if rowCount != int64(numAccounts) {
Expand Down Expand Up @@ -3692,6 +3709,29 @@ func TestProtectedTimestampsDuringBackup(t *testing.T) {
})
}

func TestBackupComment(t *testing.T) {
defer leaktest.AfterTest(t)()

const numAccounts = 1000
ctx, tc, sqlDB, _, cleanupFn := backupRestoreTestSetup(t, singleNode, numAccounts, initNone)
defer cleanupFn()

for _, stmt := range []string{
`COMMENT ON DATABASE data IS 'Database'`,
`COMMENT ON TABLE data.bank IS 'Bank'`,
`COMMENT ON COLUMN data.bank.id IS 'ID'`,
`CREATE INDEX bank_id_idx ON data.bank (id);`,
`COMMENT ON INDEX data.bank_id_idx IS 'ID index'`,
} {
testutils.SucceedsSoon(t, func() error {
_, err := sqlDB.DB.ExecContext(ctx, stmt)
return err
})
}

backupAndRestore(ctx, t, tc, []string{localFoo}, []string{localFoo}, numAccounts, numAccounts*2)
}

func getFirstStoreReplica(
t *testing.T, s serverutils.TestServerInterface, key roachpb.Key,
) (*kvserver.Store, *kvserver.Replica) {
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func TestFullClusterBackup(t *testing.T) {
// Note the absence of the jobs table. Jobs are tested by another test as
// jobs are created during the RESTORE process.
systemTablesToVerify := []string{
sqlbase.CommentsTable.Name,
sqlbase.LocationsTable.Name,
sqlbase.RoleMembersTable.Name,
sqlbase.SettingsTable.Name,
Expand Down
17 changes: 17 additions & 0 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,10 @@ func (r *restoreResumer) Resume(
}
}

if err := r.restoreStmts(ctx, backupManifests); err != nil {
return err
}

resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*r.job.ID())),
tree.NewDString(string(jobs.StatusSucceeded)),
Expand Down Expand Up @@ -1225,6 +1229,19 @@ func (r *restoreResumer) dropTables(ctx context.Context, jr *jobs.Registry, txn
return nil
}

func (r *restoreResumer) restoreStmts(ctx context.Context, backupManifests []BackupManifest) error {
for _, backupManifest := range backupManifests {
for _, stmt := range backupManifest.Stmts {
_, err := r.execCfg.InternalExecutor.Exec(ctx, "stmt", nil, stmt)
if err != nil {
return errors.Wrapf(err, "stmt: %s", stmt)
}
}
}

return nil
}

// restoreSystemTables atomically replaces the contents of the system tables
// with the data from the restored system tables.
func (r *restoreResumer) restoreSystemTables(ctx context.Context) error {
Expand Down
Loading

0 comments on commit f47dd60

Please sign in to comment.