Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backupccl: support RESTORE inside tenant #58908

Merged
merged 2 commits into from
Feb 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6200,6 +6200,7 @@ func TestBackupRestoreTenant(t *testing.T) {

const numAccounts = 1
ctx, tc, systemDB, dir, cleanupFn := BackupRestoreTestSetup(t, singleNode, numAccounts, InitManualReplication)
_, _ = tc, systemDB
defer cleanupFn()
srv := tc.Server(0)

Expand Down Expand Up @@ -6247,7 +6248,26 @@ func TestBackupRestoreTenant(t *testing.T) {
systemDB.Exec(t, `BACKUP TENANT 20 TO 'nodelocal://1/t20'`)

t.Run("inside-tenant", func(t *testing.T) {
tenant10.Exec(t, `BACKUP DATABASE foo TO 'userfile://defaultdb.myfililes/test'`)
// This test uses this mock HTTP server to pass the backup files between tenants.
httpServer, httpServerCleanup := makeInsecureHTTPServer(t)
defer httpServerCleanup()
httpAddr := httpServer.String() + "/test"

tenant10.Exec(t, `BACKUP DATABASE foo TO $1`, httpAddr)
t.Run("same-tenant", func(t *testing.T) {
tenant10.Exec(t, `CREATE DATABASE foo2`)
tenant10.Exec(t, `RESTORE foo.bar FROM $1 WITH into_db='foo2'`, httpAddr)
tenant10.CheckQueryResults(t, `SELECT * FROM foo2.bar`, tenant10.QueryStr(t, `SELECT * FROM foo.bar`))
})
t.Run("another-tenant", func(t *testing.T) {
tenant11.Exec(t, `RESTORE foo.bar FROM $1`, httpAddr)
tenant11.CheckQueryResults(t, `SELECT * FROM foo.bar`, tenant10.QueryStr(t, `SELECT * FROM foo.bar`))
})
t.Run("system-tenant", func(t *testing.T) {
systemDB.Exec(t, `CREATE DATABASE foo2`)
systemDB.ExpectErr(t, `cannot restore tenant backups into system tenant`,
`RESTORE foo.bar FROM $1 WITH into_db='foo2'`, httpAddr)
})
})

t.Run("non-existent", func(t *testing.T) {
Expand Down
59 changes: 59 additions & 0 deletions pkg/ccl/backupccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import (
"context"
gosql "database/sql"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"os"
"path/filepath"
"reflect"
"strings"
"testing"
Expand Down Expand Up @@ -295,3 +301,56 @@ func injectStatsWithRowCount(
]'`, tableName, columnName, rowCount, rowCount))
return sqlDB.QueryStr(t, getStatsQuery(tableName))
}

func makeInsecureHTTPServer(t *testing.T) (*url.URL, func()) {
t.Helper()

const badHeadResponse = "bad-head-response"

tmp, dirCleanup := testutils.TempDir(t)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
localfile := filepath.Join(tmp, filepath.Base(r.URL.Path))
switch r.Method {
case "PUT":
f, err := os.Create(localfile)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
defer f.Close()
if _, err := io.Copy(f, r.Body); err != nil {
http.Error(w, err.Error(), 500)
return
}
w.WriteHeader(201)
case "GET", "HEAD":
if filepath.Base(localfile) == badHeadResponse {
http.Error(w, "HEAD not implemented", 500)
return
}
http.ServeFile(w, r, localfile)
case "DELETE":
if err := os.Remove(localfile); err != nil {
http.Error(w, err.Error(), 500)
return
}
w.WriteHeader(204)
default:
http.Error(w, "unsupported method "+r.Method, 400)
}
}))

cleanup := func() {
srv.Close()
dirCleanup()
}

t.Logf("Mock HTTP Storage %q", srv.URL)
uri, err := url.Parse(srv.URL)
if err != nil {
srv.Close()
t.Fatal(err)
}
uri.Path = filepath.Join(uri.Path, "testing")
return uri, cleanup
}
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func newRestoreDataProcessor(
}

var err error
rd.kr, err = storageccl.MakeKeyRewriterFromRekeys(rd.spec.Rekeys)
rd.kr, err = storageccl.MakeKeyRewriterFromRekeys(flowCtx.Codec(), rd.spec.Rekeys)
if err != nil {
return nil, err
}
Expand Down
33 changes: 29 additions & 4 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,11 @@ func spansForAllRestoreTableIndexes(
// createImportingDescriptors create the tables that we will restore into. It also
// fetches the information from the old tables that we need for the restore.
func createImportingDescriptors(
ctx context.Context, p sql.JobExecContext, sqlDescs []catalog.Descriptor, r *restoreResumer,
ctx context.Context,
p sql.JobExecContext,
backupCodec keys.SQLCodec,
sqlDescs []catalog.Descriptor,
r *restoreResumer,
) (tables []catalog.TableDescriptor, oldTableIDs []descpb.ID, spans []roachpb.Span, err error) {
details := r.job.Details().(jobspb.RestoreDetails)

Expand Down Expand Up @@ -925,7 +929,7 @@ func createImportingDescriptors(

// We get the spans of the restoring tables _as they appear in the backup_,
// that is, in the 'old' keyspace, before we reassign the table IDs.
spans = spansForAllRestoreTableIndexes(p.ExecCfg().Codec, tables, nil)
spans = spansForAllRestoreTableIndexes(backupCodec, tables, nil)

log.Eventf(ctx, "starting restore for %d tables", len(mutableTables))

Expand Down Expand Up @@ -1146,6 +1150,27 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error
if err != nil {
return err
}
// backupCodec is the codec that was used to encode the keys in the backup. It
// is the tenant in which the backup was taken.
backupCodec := keys.SystemSQLCodec
if len(sqlDescs) != 0 {
if len(latestBackupManifest.Spans) != 0 && len(latestBackupManifest.Tenants) == 0 {
// If there are no tenant targets, then the entire keyspace covered by
// Spans must lie in 1 tenant.
_, backupTenantID, err := keys.DecodeTenantPrefix(latestBackupManifest.Spans[0].Key)
if err != nil {
return err
}
backupCodec = keys.MakeSQLCodec(backupTenantID)
if backupTenantID != roachpb.SystemTenantID && p.ExecCfg().Codec.ForSystemTenant() {
// TODO(pbardea): This is unsupported for now because the key-rewriter
// cannot distinguish between RESTORE TENANT and table restore from a
// backup taken in a tenant, into the system tenant.
return errors.New("cannot restore tenant backups into system tenant")
}
}
}

lastBackupIndex, err := getBackupIndexAtTime(backupManifests, details.EndTime)
if err != nil {
return err
Expand All @@ -1159,7 +1184,7 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error
return err
}

tables, oldTableIDs, spans, err := createImportingDescriptors(ctx, p, sqlDescs, r)
tables, oldTableIDs, spans, err := createImportingDescriptors(ctx, p, backupCodec, sqlDescs, r)
if err != nil {
return err
}
Expand Down Expand Up @@ -1215,7 +1240,7 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error

numClusterNodes, err := clusterNodeCount(p.ExecCfg().Gossip)
if err != nil {
if !build.IsRelease() {
if !build.IsRelease() && p.ExecCfg().Codec.ForSystemTenant() {
return err
}
log.Warningf(ctx, "unable to determine cluster node count: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
descpb "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down
20 changes: 18 additions & 2 deletions pkg/ccl/backupccl/split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ type splitAndScatterer interface {
splitAndScatterKey(ctx context.Context, codec keys.SQLCodec, db *kv.DB, kr *storageccl.KeyRewriter, key roachpb.Key, randomizeLeases bool) (roachpb.NodeID, error)
}

type noopSplitAndScatterer struct{}

// splitAndScatterKey implements the splitAndScatterer interface.
// It is safe to always return 0 since during processor planning the range
// router has a `DefaultStream` specified in case the range generated by this
// node ID doesn't match any of the result router's spans.
func (n noopSplitAndScatterer) splitAndScatterKey(
_ context.Context, _ keys.SQLCodec, _ *kv.DB, _ *storageccl.KeyRewriter, _ roachpb.Key, _ bool,
) (roachpb.NodeID, error) {
return 0, nil
}

// dbSplitAndScatter is the production implementation of this processor's
// scatterer. It actually issues the split and scatter requests for KV. This is
// mocked out in some tests.
Expand Down Expand Up @@ -163,11 +175,15 @@ func newSplitAndScatterProcessor(
numEntries += len(chunk.Entries)
}

var scatterer splitAndScatterer = dbSplitAndScatterer{}
if !flowCtx.Cfg.Codec.ForSystemTenant() {
scatterer = noopSplitAndScatterer{}
}
ssp := &splitAndScatterProcessor{
flowCtx: flowCtx,
spec: spec,
output: output,
scatterer: dbSplitAndScatterer{},
scatterer: scatterer,
// Large enough so that it never blocks.
doneScatterCh: make(chan entryNode, numEntries),
routingDatumCache: make(map[roachpb.NodeID]rowenc.EncDatum),
Expand Down Expand Up @@ -267,7 +283,7 @@ func (ssp *splitAndScatterProcessor) runSplitAndScatter(
scatterer splitAndScatterer,
) error {
db := flowCtx.Cfg.DB
kr, err := storageccl.MakeKeyRewriterFromRekeys(spec.Rekeys)
kr, err := storageccl.MakeKeyRewriterFromRekeys(flowCtx.Codec(), spec.Rekeys)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/split_and_scatter_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func TestSplitAndScatterProcessor(t *testing.T) {
Settings: st,
DB: kvDB,
DiskMonitor: testDiskMonitor,
Codec: keys.SystemSQLCodec,
},
EvalCtx: &evalCtx,
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/storageccl/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
Expand Down Expand Up @@ -149,7 +150,7 @@ func evalImport(ctx context.Context, cArgs batcheval.CommandArgs) (*roachpb.Impo
// args.Rekeys could be using table descriptors from either the old or new
// foreign key representation on the table descriptor, but this is fine
// because foreign keys don't matter for the key rewriter.
kr, err := MakeKeyRewriterFromRekeys(args.Rekeys)
kr, err := MakeKeyRewriterFromRekeys(keys.SystemSQLCodec, args.Rekeys)
if err != nil {
return nil, errors.Wrap(err, "make key rewriter")
}
Expand Down
57 changes: 48 additions & 9 deletions pkg/ccl/storageccl/key_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,16 @@ func (p prefixRewriter) rewriteKey(key []byte) ([]byte, bool) {
// into interleaved keys, and is able to function on partial keys for spans
// and splits.
type KeyRewriter struct {
codec keys.SQLCodec

prefixes prefixRewriter
descs map[descpb.ID]catalog.TableDescriptor
}

// MakeKeyRewriterFromRekeys makes a KeyRewriter from Rekey protos.
func MakeKeyRewriterFromRekeys(rekeys []roachpb.ImportRequest_TableRekey) (*KeyRewriter, error) {
func MakeKeyRewriterFromRekeys(
codec keys.SQLCodec, rekeys []roachpb.ImportRequest_TableRekey,
) (*KeyRewriter, error) {
descs := make(map[descpb.ID]catalog.TableDescriptor)
for _, rekey := range rekeys {
var desc descpb.Descriptor
Expand All @@ -95,11 +99,13 @@ func MakeKeyRewriterFromRekeys(rekeys []roachpb.ImportRequest_TableRekey) (*KeyR
}
descs[descpb.ID(rekey.OldID)] = tabledesc.NewImmutable(*table)
}
return MakeKeyRewriter(descs)
return makeKeyRewriter(codec, descs)
}

// MakeKeyRewriter makes a KeyRewriter from a map of descs keyed by original ID.
func MakeKeyRewriter(descs map[descpb.ID]catalog.TableDescriptor) (*KeyRewriter, error) {
// makeKeyRewriter makes a KeyRewriter from a map of descs keyed by original ID.
func makeKeyRewriter(
codec keys.SQLCodec, descs map[descpb.ID]catalog.TableDescriptor,
) (*KeyRewriter, error) {
var prefixes prefixRewriter
seenPrefixes := make(map[string]bool)
for oldID, desc := range descs {
Expand Down Expand Up @@ -136,6 +142,7 @@ func MakeKeyRewriter(descs map[descpb.ID]catalog.TableDescriptor) (*KeyRewriter,
return bytes.Compare(prefixes.rewrites[i].OldPrefix, prefixes.rewrites[j].OldPrefix) < 0
})
return &KeyRewriter{
codec: codec,
prefixes: prefixes,
descs: descs,
}, nil
Expand All @@ -146,7 +153,7 @@ func MakeKeyRewriter(descs map[descpb.ID]catalog.TableDescriptor) (*KeyRewriter,
// function, but it takes into account interleaved ancestors, which we don't
// want here.
func makeKeyRewriterPrefixIgnoringInterleaved(tableID descpb.ID, indexID descpb.IndexID) []byte {
return keys.TODOSQLCodec.IndexPrefix(uint32(tableID), uint32(indexID))
return keys.SystemSQLCodec.IndexPrefix(uint32(tableID), uint32(indexID))
}

// RewriteKey modifies key (possibly in place), changing all table IDs to their
Expand All @@ -166,13 +173,45 @@ func makeKeyRewriterPrefixIgnoringInterleaved(tableID descpb.ID, indexID descpb.
// byte that we're likely at the end anyway and do not need to search for any
// further table IDs to replace.
func (kr *KeyRewriter) RewriteKey(key []byte, isFromSpan bool) ([]byte, bool, error) {
if bytes.HasPrefix(key, keys.TenantPrefix) {
if kr.codec.ForSystemTenant() && bytes.HasPrefix(key, keys.TenantPrefix) {
// If we're rewriting from the system tenant, we don't rewrite tenant keys
// at all since we assume that we're restoring an entire tenant.
return key, true, nil
}

noTenantPrefix, oldTenantID, err := keys.DecodeTenantPrefix(key)
if err != nil {
return nil, false, err
}

rekeyed, ok, err := kr.rewriteTableKey(noTenantPrefix, isFromSpan)
if err != nil {
return nil, false, err
}

oldCodec := keys.MakeSQLCodec(oldTenantID)
oldTenantPrefix := oldCodec.TenantPrefix()
newTenantPrefix := kr.codec.TenantPrefix()
if len(newTenantPrefix) == len(oldTenantPrefix) {
keyTenantPrefix := key[:len(oldTenantPrefix)]
copy(keyTenantPrefix, newTenantPrefix)
rekeyed = append(keyTenantPrefix, rekeyed...)
} else {
rekeyed = append(newTenantPrefix, rekeyed...)
}

return rekeyed, ok, err
}

// rewriteTableKey recursively (in the case of interleaved tables) rewrites the
// table IDs in the key. It assumes that any tenant ID has been stripped from
// the key so it operates with the system codec. It is the responsibility of the
// caller to either remap, or re-prepend any required tenant prefix.
func (kr *KeyRewriter) rewriteTableKey(key []byte, isFromSpan bool) ([]byte, bool, error) {
// Fetch the original table ID for descriptor lookup. Ignore errors because
// they will be caught later on if tableID isn't in descs or kr doesn't
// perform a rewrite.
_, tableID, _ := keys.TODOSQLCodec.DecodeTablePrefix(key)
_, tableID, _ := keys.SystemSQLCodec.DecodeTablePrefix(key)
// Rewrite the first table ID.
key, ok := kr.prefixes.rewriteKey(key)
if !ok {
Expand All @@ -183,7 +222,7 @@ func (kr *KeyRewriter) RewriteKey(key []byte, isFromSpan bool) ([]byte, bool, er
return nil, false, errors.Errorf("missing descriptor for table %d", tableID)
}
// Check if this key may have interleaved children.
k, _, indexID, err := keys.TODOSQLCodec.DecodeIndexPrefix(key)
k, _, indexID, err := keys.SystemSQLCodec.DecodeIndexPrefix(key)
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -257,7 +296,7 @@ func (kr *KeyRewriter) RewriteKey(key []byte, isFromSpan bool) ([]byte, bool, er
return key, true, nil
}
prefix := key[:len(key)-len(k)]
k, ok, err = kr.RewriteKey(k, isFromSpan)
k, ok, err = kr.rewriteTableKey(k, isFromSpan)
if err != nil {
return nil, false, err
}
Expand Down
Loading