Skip to content

Commit

Permalink
backupccl: RESTORE TENANT x .. WITH tenant = y
Browse files Browse the repository at this point in the history
This adds support for restoring a tenant with a new tenant ID, allowing restoring
into a cluster that already has an existing tenant (perhaps the same one) at the
ID of the backed up tenant.

Release note: none.
  • Loading branch information
dt committed Feb 22, 2022
1 parent 2b9fef1 commit 3a321e8
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 22 deletions.
1 change: 1 addition & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -2237,6 +2237,7 @@ restore_options ::=
| 'DEBUG_PAUSE_ON' '=' string_or_placeholder
| 'NEW_DB_NAME' '=' string_or_placeholder
| 'INCREMENTAL_LOCATION' '=' string_or_placeholder_opt_list
| 'TENANT' '=' string_or_placeholder

scrub_option_list ::=
( scrub_option ) ( ( ',' scrub_option ) )*
Expand Down
21 changes: 21 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7414,6 +7414,27 @@ func TestBackupRestoreTenant(t *testing.T) {

// Check the all-tenant override.
restoreTenant11.CheckQueryResults(t, `SHOW CLUSTER SETTING tenant_cost_model.kv_read_cost_per_megabyte`, [][]string{{"123"}})

restoreDB.Exec(t, `SELECT crdb_internal.destroy_tenant(20, true)`)

restoreDB.Exec(t, `RESTORE TENANT 11 FROM 'nodelocal://1/clusterwide' WITH tenant = '20'`)
_, restoreConn20 := serverutils.StartTenant(
t, restoreTC.Server(0), base.TestTenantArgs{TenantID: roachpb.MakeTenantID(20), Existing: true},
)
defer restoreConn20.Close()
restoreTenant20 := sqlutils.MakeSQLRunner(restoreConn20)

// Tenant 20 gets results that matched the backed up tenant 11.
restoreTenant20.CheckQueryResults(t, `select * from foo.baz`, tenant11.QueryStr(t, `select * from foo.baz`))
// Check the all-tenant override.
restoreTenant20.CheckQueryResults(t, `SHOW CLUSTER SETTING tenant_cost_model.kv_read_cost_per_megabyte`, [][]string{{"123"}})

// Remove tenant 11, then confirm restoring 11 over 10 fails.
restoreDB.Exec(t, `SELECT crdb_internal.destroy_tenant(11, true)`)
restoreDB.ExpectErr(t, `exists`, `RESTORE TENANT 11 FROM 'nodelocal://1/clusterwide' WITH tenant = '10'`)

// Verify tenant 20 is still unaffected.
restoreTenant20.CheckQueryResults(t, `select * from foo.baz`, tenant11.QueryStr(t, `select * from foo.baz`))
})

t.Run("restore-tenant10-to-ts1", func(t *testing.T) {
Expand Down
13 changes: 5 additions & 8 deletions pkg/ccl/backupccl/key_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func makeKeyRewriter(
) (*KeyRewriter, error) {
var prefixes prefixRewriter
var tenantPrefixes prefixRewriter
tenantPrefixes.rewrites = make([]prefixRewrite, len(tenants))
tenantPrefixes.rewrites = make([]prefixRewrite, 0, len(tenants))

seenPrefixes := make(map[string]bool)
for oldID, desc := range descs {
Expand Down Expand Up @@ -179,13 +179,10 @@ func makeKeyRewriter(
fromSystemTenant = true
continue
}
tenantPrefixes.rewrites[i] = prefixRewrite{
OldPrefix: keys.MakeSQLCodec(tenants[i].OldID).TenantPrefix(),
NewPrefix: keys.MakeSQLCodec(tenants[i].NewID).TenantPrefix(),
}
tenantPrefixes.rewrites[i].noop = bytes.Equal(
tenantPrefixes.rewrites[i].OldPrefix, tenantPrefixes.rewrites[i].NewPrefix,
)
from, to := keys.MakeSQLCodec(tenants[i].OldID).TenantPrefix(), keys.MakeSQLCodec(tenants[i].NewID).TenantPrefix()
tenantPrefixes.rewrites = append(tenantPrefixes.rewrites, prefixRewrite{
OldPrefix: from, NewPrefix: to, noop: bytes.Equal(from, to),
})
}
sort.Slice(tenantPrefixes.rewrites, func(i, j int) bool {
return bytes.Compare(tenantPrefixes.rewrites[i].OldPrefix, tenantPrefixes.rewrites[j].OldPrefix) < 0
Expand Down
18 changes: 10 additions & 8 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,6 @@ func WriteDescriptors(
func rewriteBackupSpanKey(
codec keys.SQLCodec, kr *KeyRewriter, key roachpb.Key,
) (roachpb.Key, error) {
// TODO(dt): support rewriting tenant keys.
if bytes.HasPrefix(key, keys.TenantPrefix) {
return key, nil
}

newKey, rewritten, err := kr.RewriteKey(append([]byte(nil), key...))
if err != nil {
return nil, errors.NewAssertionErrorWithWrappedErrf(err,
Expand All @@ -288,6 +283,10 @@ func rewriteBackupSpanKey(
return nil, errors.AssertionFailedf(
"no rewrite for span start key: %s", key)
}
if bytes.HasPrefix(newKey, keys.TenantPrefix) {
return newKey, nil
}

// Modify all spans that begin at the primary index to instead begin at the
// start of the table. That is, change a span start key from /Table/51/1 to
// /Table/51. Otherwise a permanently empty span at /Table/51-/Table/51/1
Expand Down Expand Up @@ -1520,9 +1519,12 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
}

for _, tenant := range details.Tenants {
// TODO(dt): extend the job to keep track of new ID.
id := roachpb.MakeTenantID(tenant.ID)
mainData.addTenant(id, id)
to := roachpb.MakeTenantID(tenant.ID)
from := to
if details.PreRewriteTenantId != nil {
from = *details.PreRewriteTenantId
}
mainData.addTenant(from, to)
}

numNodes, err := clusterNodeCount(p.ExecCfg().Gossip)
Expand Down
68 changes: 64 additions & 4 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const (
restoreOptSkipMissingViews = "skip_missing_views"
restoreOptSkipLocalitiesCheck = "skip_localities_check"
restoreOptDebugPauseOn = "debug_pause_on"
restoreOptAsTenant = "tenant"

// The temporary database system tables will be restored into for full
// cluster backups.
Expand Down Expand Up @@ -1656,6 +1657,36 @@ func restorePlanHook(
}
}

var newTenantIDFn func() (*roachpb.TenantID, error)
if restoreStmt.Options.AsTenant != nil {
if restoreStmt.DescriptorCoverage == tree.AllDescriptors || !restoreStmt.Targets.Tenant.IsSet() {
err := errors.Errorf("%q can only be used when running RESTORE TENANT for a single tenant", restoreOptAsTenant)
return nil, nil, nil, false, err
}
// TODO(dt): it'd be nice to have TypeAsInt or TypeAsTenantID and then in
// sql.y an int_or_placeholder, but right now the hook view of planner only
// has TypeAsString so we'll just atoi it.
fn, err := p.TypeAsString(ctx, restoreStmt.Options.AsTenant, "RESTORE")
if err != nil {
return nil, nil, nil, false, err
}
newTenantIDFn = func() (*roachpb.TenantID, error) {
s, err := fn()
if err != nil {
return nil, err
}
x, err := strconv.Atoi(s)
if err != nil {
return nil, err
}
if x < int(roachpb.MinTenantID.ToUint64()) {
return nil, errors.New("invalid tenant ID value")
}
id := roachpb.MakeTenantID(uint64(x))
return &id, nil
}
}

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
// TODO(dan): Move this span into sql.
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
Expand Down Expand Up @@ -1722,6 +1753,13 @@ func restorePlanHook(
return err
}
}
var newTenantID *roachpb.TenantID
if newTenantIDFn != nil {
newTenantID, err = newTenantIDFn()
if err != nil {
return err
}
}

// incFrom will contain the directory URIs for incremental backups (i.e.
// <prefix>/<subdir>) iff len(From)==1, regardless of the
Expand Down Expand Up @@ -1772,7 +1810,7 @@ func restorePlanHook(
}

return doRestorePlan(ctx, restoreStmt, p, from, incFrom, passphrase, kms, intoDB,
newDBName, endTime, resultsCh)
newDBName, newTenantID, endTime, resultsCh)
}

if restoreStmt.Options.Detached {
Expand Down Expand Up @@ -1896,6 +1934,7 @@ func doRestorePlan(
kms []string,
intoDB string,
newDBName string,
newTenantID *roachpb.TenantID,
endTime hlc.Timestamp,
resultsCh chan<- tree.Datums,
) error {
Expand Down Expand Up @@ -2087,20 +2126,40 @@ func doRestorePlan(
}
}

var oldTenantID *roachpb.TenantID
if len(tenants) > 0 {
if !p.ExecCfg().Codec.ForSystemTenant() {
return pgerror.Newf(pgcode.InsufficientPrivilege, "only the system tenant can restore other tenants")
}
for _, i := range tenants {
if newTenantID != nil {
if len(tenants) != 1 {
return errors.Errorf("%q option can only be used when restoring a single tenant", restoreOptAsTenant)
}
res, err := p.ExecCfg().InternalExecutor.QueryRow(
ctx, "restore-lookup-tenant", p.ExtendedEvalContext().Txn,
`SELECT active FROM system.tenants WHERE id = $1`, i.ID,
`SELECT active FROM system.tenants WHERE id = $1`, newTenantID.ToUint64(),
)
if err != nil {
return err
}
if res != nil {
return errors.Errorf("tenant %d already exists", i.ID)
return errors.Errorf("tenant %s already exists", newTenantID)
}
old := roachpb.MakeTenantID(tenants[0].ID)
tenants[0].ID = newTenantID.ToUint64()
oldTenantID = &old
} else {
for _, i := range tenants {
res, err := p.ExecCfg().InternalExecutor.QueryRow(
ctx, "restore-lookup-tenant", p.ExtendedEvalContext().Txn,
`SELECT active FROM system.tenants WHERE id = $1`, i.ID,
)
if err != nil {
return err
}
if res != nil {
return errors.Errorf("tenant %d already exists", i.ID)
}
}
}
}
Expand Down Expand Up @@ -2260,6 +2319,7 @@ func doRestorePlan(
DatabaseModifiers: databaseModifiers,
DebugPauseOn: debugPauseOn,
RestoreSystemUsers: restoreStmt.SystemUsers,
PreRewriteTenantId: oldTenantID,
},
Progress: jobspb.RestoreProgress{},
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,11 @@ message RestoreDetails {
string debug_pause_on = 20;
bool restore_system_users = 22;

// NEXT ID: 23.
// PreRewrittenTenantID is the ID of tenants[0] in the backup, aka its old ID;
// it is only valid to set this if len(tenants) == 1.
roachpb.TenantID pre_rewrite_tenant_id = 23;

// NEXT ID: 24.
}

message RestoreProgress {
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/parser/sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -3221,6 +3221,11 @@ restore_options:
{
$$.val = &tree.RestoreOptions{IncrementalStorage: $3.stringOrPlaceholderOptList()}
}
| TENANT '=' string_or_placeholder
{
$$.val = &tree.RestoreOptions{AsTenant: $3.expr()}
}

import_format:
name
{
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/parser/testdata/backup_restore
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,14 @@ RESTORE TENANT 36 FROM (($1), ($2)) AS OF SYSTEM TIME ('1') -- fully parenthesiz
RESTORE TENANT 36 FROM ($1, $2) AS OF SYSTEM TIME '_' -- literals removed
RESTORE TENANT 36 FROM ($1, $2) AS OF SYSTEM TIME '1' -- identifiers removed

parse
RESTORE TENANT 36 FROM ($1, $2) WITH tenant = '5'
----
RESTORE TENANT 36 FROM ($1, $2) WITH tenant = '5'
RESTORE TENANT 36 FROM (($1), ($2)) WITH tenant = ('5') -- fully parenthesized
RESTORE TENANT 36 FROM ($1, $2) WITH tenant = '_' -- literals removed
RESTORE TENANT 36 FROM ($1, $2) WITH tenant = '5' -- identifiers removed

parse
RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar'
----
Expand Down
17 changes: 16 additions & 1 deletion pkg/sql/sem/tree/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ type RestoreOptions struct {
DebugPauseOn Expr
NewDBName Expr
IncrementalStorage StringOrPlaceholderOptList
AsTenant Expr
}

var _ NodeFormatter = &RestoreOptions{}
Expand Down Expand Up @@ -391,11 +392,18 @@ func (o *RestoreOptions) Format(ctx *FmtCtx) {
ctx.WriteString("new_db_name = ")
ctx.FormatNode(o.NewDBName)
}

if o.IncrementalStorage != nil {
maybeAddSep()
ctx.WriteString("incremental_location = ")
ctx.FormatNode(&o.IncrementalStorage)
}

if o.AsTenant != nil {
maybeAddSep()
ctx.WriteString("tenant = ")
ctx.FormatNode(o.AsTenant)
}
}

// CombineWith merges other backup options into this backup options struct.
Expand Down Expand Up @@ -485,6 +493,12 @@ func (o *RestoreOptions) CombineWith(other *RestoreOptions) error {
return errors.New("incremental_location option specified multiple times")
}

if o.AsTenant == nil {
o.AsTenant = other.AsTenant
} else if other.AsTenant != nil {
return errors.New("tenant option specified multiple times")
}

return nil
}

Expand All @@ -502,5 +516,6 @@ func (o RestoreOptions) IsDefault() bool {
o.SkipLocalitiesCheck == options.SkipLocalitiesCheck &&
o.DebugPauseOn == options.DebugPauseOn &&
o.NewDBName == options.NewDBName &&
cmp.Equal(o.IncrementalStorage, options.IncrementalStorage)
cmp.Equal(o.IncrementalStorage, options.IncrementalStorage) &&
o.AsTenant == options.AsTenant
}

0 comments on commit 3a321e8

Please sign in to comment.