Skip to content

Commit

Permalink
backupccl: support RESTORE inside tenant
Browse files Browse the repository at this point in the history
This commit adds support for RESTORE inside tenants. A RESTORE performed
by a tenant will produce a DistSQL plan that is planned on a single
node.

Release note: None
  • Loading branch information
pbardea committed Feb 3, 2021
1 parent f677742 commit b3d2cfb
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 9 deletions.
63 changes: 62 additions & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
"os"
"path"
Expand Down Expand Up @@ -6198,8 +6199,64 @@ func TestBackupRestoreTenant(t *testing.T) {

defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)()

tmp, dirCleanup := testutils.TempDir(t)
defer dirCleanup()
const badHeadResponse = "bad-head-response"
// This test uses this mock HTTP server to pass the backup files between tenants.
makeServer := func() (*url.URL, func() int, func()) {
var files int
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
localfile := filepath.Join(tmp, filepath.Base(r.URL.Path))
switch r.Method {
case "PUT":
f, err := os.Create(localfile)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
defer f.Close()
if _, err := io.Copy(f, r.Body); err != nil {
http.Error(w, err.Error(), 500)
return
}
files++
w.WriteHeader(201)
case "GET", "HEAD":
if filepath.Base(localfile) == badHeadResponse {
http.Error(w, "HEAD not implemented", 500)
return
}
http.ServeFile(w, r, localfile)
case "DELETE":
if err := os.Remove(localfile); err != nil {
http.Error(w, err.Error(), 500)
return
}
w.WriteHeader(204)
default:
http.Error(w, "unsupported method "+r.Method, 400)
}
}))

cleanup := func() {
srv.Close()
}

t.Logf("Mock HTTP Storage %q", srv.URL)
uri, err := url.Parse(srv.URL)
if err != nil {
srv.Close()
t.Fatal(err)
}
uri.Path = filepath.Join(uri.Path, "testing")
return uri, func() int { return files }, cleanup
}
httpServer, _, cleanup := makeServer()
defer cleanup()

const numAccounts = 1
ctx, tc, systemDB, dir, cleanupFn := BackupRestoreTestSetup(t, singleNode, numAccounts, InitManualReplication)
_, _ = tc, systemDB
defer cleanupFn()
srv := tc.Server(0)

Expand Down Expand Up @@ -6247,7 +6304,11 @@ func TestBackupRestoreTenant(t *testing.T) {
systemDB.Exec(t, `BACKUP TENANT 20 TO 'nodelocal://1/t20'`)

t.Run("inside-tenant", func(t *testing.T) {
tenant10.Exec(t, `BACKUP DATABASE foo TO 'userfile://defaultdb.myfililes/test'`)
httpAddr := httpServer.String() + "/test"
tenant10.Exec(t, `BACKUP DATABASE foo TO $1`, httpAddr)
tenant11.Exec(t, `CREATE DATABASE foo2`)
tenant11.Exec(t, `RESTORE foo.bar FROM $1 WITH into_db='foo2'`, httpAddr)
tenant11.CheckQueryResults(t, `SELECT * FROM foo2.bar`, tenant10.QueryStr(t, `SELECT * FROM foo.bar`))
})

t.Run("non-existent", func(t *testing.T) {
Expand Down
25 changes: 21 additions & 4 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,11 @@ func spansForAllRestoreTableIndexes(
// createImportingDescriptors create the tables that we will restore into. It also
// fetches the information from the old tables that we need for the restore.
func createImportingDescriptors(
ctx context.Context, p sql.JobExecContext, sqlDescs []catalog.Descriptor, r *restoreResumer,
ctx context.Context,
p sql.JobExecContext,
backupCodec keys.SQLCodec,
sqlDescs []catalog.Descriptor,
r *restoreResumer,
) (tables []catalog.TableDescriptor, oldTableIDs []descpb.ID, spans []roachpb.Span, err error) {
details := r.job.Details().(jobspb.RestoreDetails)

Expand Down Expand Up @@ -925,7 +929,7 @@ func createImportingDescriptors(

// We get the spans of the restoring tables _as they appear in the backup_,
// that is, in the 'old' keyspace, before we reassign the table IDs.
spans = spansForAllRestoreTableIndexes(p.ExecCfg().Codec, tables, nil)
spans = spansForAllRestoreTableIndexes(backupCodec, tables, nil)

log.Eventf(ctx, "starting restore for %d tables", len(mutableTables))

Expand Down Expand Up @@ -1146,6 +1150,19 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error
if err != nil {
return err
}
// backupCodec is the codec that was used to encode the keys in the backup. It
// is the tenant in which the backup was taken.
backupCodec := keys.SystemSQLCodec
if len(sqlDescs) != 0 {
if len(latestBackupManifest.Spans) != 0 {
_, tenantID, err := keys.DecodeTenantPrefix(latestBackupManifest.Spans[0].Key)
if err != nil {
return err
}
backupCodec = keys.MakeSQLCodec(tenantID)
}
}

lastBackupIndex, err := getBackupIndexAtTime(backupManifests, details.EndTime)
if err != nil {
return err
Expand All @@ -1159,7 +1176,7 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error
return err
}

tables, oldTableIDs, spans, err := createImportingDescriptors(ctx, p, sqlDescs, r)
tables, oldTableIDs, spans, err := createImportingDescriptors(ctx, p, backupCodec, sqlDescs, r)
if err != nil {
return err
}
Expand Down Expand Up @@ -1215,7 +1232,7 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error

numClusterNodes, err := clusterNodeCount(p.ExecCfg().Gossip)
if err != nil {
if !build.IsRelease() {
if !build.IsRelease() && p.ExecCfg().Codec.ForSystemTenant() {
return err
}
log.Warningf(ctx, "unable to determine cluster node count: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
descpb "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down
18 changes: 17 additions & 1 deletion pkg/ccl/backupccl/split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ type splitAndScatterer interface {
splitAndScatterKey(ctx context.Context, codec keys.SQLCodec, db *kv.DB, kr *storageccl.KeyRewriter, key roachpb.Key, randomizeLeases bool) (roachpb.NodeID, error)
}

type noopSplitAndScatterer struct{}

// splitAndScatterKey implements the splitAndScatterer interface.
// It is safe to always return 0 since during processor planning the range
// router has a `DefaultStream` specified in case the range generated by this
// node ID doesn't match any of the result router's spans.
func (n noopSplitAndScatterer) splitAndScatterKey(
_ context.Context, _ keys.SQLCodec, _ *kv.DB, _ *storageccl.KeyRewriter, _ roachpb.Key, _ bool,
) (roachpb.NodeID, error) {
return 0, nil
}

// dbSplitAndScatter is the production implementation of this processor's
// scatterer. It actually issues the split and scatter requests for KV. This is
// mocked out in some tests.
Expand Down Expand Up @@ -163,11 +175,15 @@ func newSplitAndScatterProcessor(
numEntries += len(chunk.Entries)
}

var scatterer splitAndScatterer = dbSplitAndScatterer{}
if !flowCtx.Cfg.Codec.ForSystemTenant() {
scatterer = noopSplitAndScatterer{}
}
ssp := &splitAndScatterProcessor{
flowCtx: flowCtx,
spec: spec,
output: output,
scatterer: dbSplitAndScatterer{},
scatterer: scatterer,
// Large enough so that it never blocks.
doneScatterCh: make(chan entryNode, numEntries),
routingDatumCache: make(map[roachpb.NodeID]rowenc.EncDatum),
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/storageccl/key_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ func MakeKeyRewriterFromRekeys(
return makeKeyRewriter(codec, descs)
}

// MakeKeyRewriter makes a KeyRewriter from a map of descs keyed by original ID.
func MakeKeyRewriter(
// makeKeyRewriter makes a KeyRewriter from a map of descs keyed by original ID.
func makeKeyRewriter(
codec keys.SQLCodec, descs map[descpb.ID]catalog.TableDescriptor,
) (*KeyRewriter, error) {
var prefixes prefixRewriter
Expand Down

0 comments on commit b3d2cfb

Please sign in to comment.