Skip to content

Commit

Permalink
Merge #76271
Browse files Browse the repository at this point in the history
76271: backup: restore using mvcc at-now addsstable r=dt a=dt

Release note (sql change): Restored data now appears to have been written at the time it was restored, rather than the time at which it was backed up, when reading the lower-level write timestamps from the rows themselves. This affects various internal operations and the result of crdb_internal_mvcc_timestamp.

Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
craig[bot] and dt committed Feb 12, 2022
2 parents 7a65c42 + 831d856 commit 212ddab
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 10 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ go_test(
"backup_destination_test.go",
"backup_intents_test.go",
"backup_rand_test.go",
"backup_tenant_test.go",
"backup_test.go",
"bench_covering_test.go",
"bench_test.go",
Expand All @@ -167,6 +168,7 @@ go_test(
deps = [
"//pkg/base",
"//pkg/blobs",
"//pkg/ccl/importccl",
"//pkg/ccl/kvccl",
"//pkg/ccl/multiregionccl",
"//pkg/ccl/multitenantccl",
Expand Down
121 changes: 121 additions & 0 deletions pkg/ccl/backupccl/backup_tenant_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package backupccl_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
_ "github.com/cockroachdb/cockroach/pkg/ccl/importccl"
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func TestBackupTenantImportingTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0])

tSrv, tSQL := serverutils.StartTenant(t, tc.Server(0), base.TestTenantArgs{
TenantID: roachpb.MakeTenantID(10),
TestingKnobs: base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()},
})
defer tSQL.Close()

if _, err := tSQL.Exec("SET CLUSTER SETTING jobs.debug.pausepoints = 'import.after_ingest';"); err != nil {
t.Fatal(err)
}
if _, err := tSQL.Exec("CREATE TABLE x (id INT PRIMARY KEY, n INT, s STRING)"); err != nil {
t.Fatal(err)
}
if _, err := tSQL.Exec("INSERT INTO x VALUES (1000, 1, 'existing')"); err != nil {
t.Fatal(err)
}
if _, err := tSQL.Exec("IMPORT INTO x CSV DATA ('workload:///csv/bank/bank?rows=100&version=1.0.0')"); !testutils.IsError(err, "pause") {
t.Fatal(err)
}
var jobID int
if err := tSQL.QueryRow(`SELECT job_id FROM [show jobs] WHERE job_type = 'IMPORT'`).Scan(&jobID); err != nil {
t.Fatal(err)
}
tc.Servers[0].JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue()
// wait for it to pause

testutils.SucceedsSoon(t, func() error {
var status string
if err := tSQL.QueryRow(`SELECT status FROM [show jobs] WHERE job_id = $1`, jobID).Scan(&status); err != nil {
t.Fatal(err)
}
if status == string(jobs.StatusPaused) {
return nil
}
return errors.Newf("%s", status)
})

// tenant now has a fully ingested, paused import, so back them up.
const dst = "userfile:///t"
if _, err := sqlDB.DB.ExecContext(ctx, `BACKUP TENANT 10 TO $1`, dst); err != nil {
t.Fatal(err)
}
// Destroy the tenant, then restore it.
tSrv.Stopper().Stop(ctx)
if _, err := sqlDB.DB.ExecContext(ctx, "SELECT crdb_internal.destroy_tenant(10, true)"); err != nil {
t.Fatal(err)
}
if _, err := sqlDB.DB.ExecContext(ctx, "RESTORE TENANT 10 FROM $1", dst); err != nil {
t.Fatal(err)
}
tSrv, tSQL = serverutils.StartTenant(t, tc.Server(0), base.TestTenantArgs{
TenantID: roachpb.MakeTenantID(10),
Existing: true,
TestingKnobs: base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()},
})
defer tSQL.Close()

if _, err := tSQL.Exec(`UPDATE system.jobs SET claim_session_id = NULL, claim_instance_id = NULL WHERE id = $1`, jobID); err != nil {
t.Fatal(err)
}
if _, err := tSQL.Exec(`DELETE FROM system.lease`); err != nil {
t.Fatal(err)
}
if _, err := tSQL.Exec(`CANCEL JOB $1`, jobID); err != nil {
t.Fatal(err)
}
tSrv.JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue()
testutils.SucceedsSoon(t, func() error {
var status string
if err := tSQL.QueryRow(`SELECT status FROM [show jobs] WHERE job_id = $1`, jobID).Scan(&status); err != nil {
t.Fatal(err)
}
if status == string(jobs.StatusCanceled) {
return nil
}
return errors.Newf("%s", status)
})

var rowCount int
if err := tSQL.QueryRow(`SELECT count(*) FROM x`).Scan(&rowCount); err != nil {
t.Fatal(err)
}
require.Equal(t, 1, rowCount)
}
14 changes: 13 additions & 1 deletion pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
package backupccl

import (
"bytes"
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -105,7 +107,7 @@ var restoreAtNow = settings.RegisterBoolSetting(
settings.TenantWritable,
"bulkio.restore_at_current_time.enabled",
"write restored data at the current timestamp",
false,
true,
)

func newRestoreDataProcessor(
Expand Down Expand Up @@ -387,6 +389,16 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
)
}

// If the system tenant is restoring a guest tenant span, we don't want to
// forward all the restored data to now, as there may be importing tables in
// that span, that depend on the difference in timestamps on restored existing
// vs importing keys to rollback.
if writeAtBatchTS && kr.fromSystemTenant &&
(bytes.HasPrefix(entry.Span.Key, keys.TenantPrefix) || bytes.HasPrefix(entry.Span.EndKey, keys.TenantPrefix)) {
log.Warningf(ctx, "restoring span %s at its original timestamps because it is a tenant span", entry.Span)
writeAtBatchTS = false
}

// "disallowing" shadowing of anything older than logical=1 is i.e. allow all
// shadowing. We must allow shadowing in case the RESTORE has to retry any
// ingestions, but setting a (permissive) disallow like this serves to force
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,11 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
t.Fatalf("%+v", err)
}
kvs := clientKVsToEngineKVs(clientKVs)
for i := range kvs {
if i < len(expectedKVs) {
expectedKVs[i].Key.Timestamp = kvs[i].Key.Timestamp
}
}

if !reflect.DeepEqual(kvs, expectedKVs) {
for i := 0; i < len(kvs) || i < len(expectedKVs); i++ {
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/importccl/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
return err
}
}
if err := p.ExecCfg().JobRegistry.CheckPausepoint("import.after_ingest"); err != nil {
return err
}

// If the table being imported into referenced UDTs, ensure that a concurrent
// schema change on any of the typeDescs has not modified the type descriptor. If
Expand Down
6 changes: 5 additions & 1 deletion pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3417,7 +3417,11 @@ func TestPausepoints(t *testing.T) {
return registry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, rec)
}))
require.NoError(t, sj.Start(ctx))
require.NoError(t, sj.AwaitCompletion(ctx))
if tc.expected == jobs.StatusSucceeded {
require.NoError(t, sj.AwaitCompletion(ctx))
} else {
require.Error(t, sj.AwaitCompletion(ctx))
}
status, err := sj.TestingCurrentStatus(ctx, nil)
// Map pause-requested to paused to avoid races.
if status == jobs.StatusPauseRequested {
Expand Down
5 changes: 4 additions & 1 deletion pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1141,7 +1141,10 @@ func (r *Registry) stepThroughStateMachine(
}

if errors.Is(err, errPauseSelfSentinel) {
return r.PauseRequested(ctx, nil, job.ID(), err.Error())
if err := r.PauseRequested(ctx, nil, job.ID(), err.Error()); err != nil {
return err
}
return errPauseSelfSentinel
}
// TODO(spaskob): enforce a limit on retries.

Expand Down
24 changes: 17 additions & 7 deletions pkg/util/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// IsMetamorphicBuild returns whether this build is metamorphic. By build being
Expand Down Expand Up @@ -67,7 +68,9 @@ const (
// The given name is used for logging.
func ConstantWithMetamorphicTestValue(name string, defaultValue, metamorphicValue int) int {
if metamorphicBuild {
if rng.Float64() < metamorphicValueProbability {
rng.Lock()
defer rng.Unlock()
if rng.r.Float64() < metamorphicValueProbability {
logMetamorphicValue(name, metamorphicValue)
return metamorphicValue
}
Expand All @@ -76,7 +79,10 @@ func ConstantWithMetamorphicTestValue(name string, defaultValue, metamorphicValu
}

// rng is initialized to a rand.Rand if crdbTestBuild is enabled.
var rng *rand.Rand
var rng struct {
r *rand.Rand
syncutil.Mutex
}

// DisableMetamorphicEnvVar can be used to disable metamorhpic tests for
// sub-processes. If it exists and is set to something truthy as defined by
Expand All @@ -87,8 +93,8 @@ func init() {
if buildutil.CrdbTestBuild {
disabled := envutil.EnvOrDefaultBool(DisableMetamorphicEnvVar, false)
if !disabled {
rng, _ = randutil.NewTestRand()
metamorphicBuild = rng.Float64() < metamorphicBuildProbability
rng.r, _ = randutil.NewTestRand()
metamorphicBuild = rng.r.Float64() < metamorphicBuildProbability
}
}
}
Expand All @@ -100,10 +106,12 @@ func init() {
// The given name is used for logging.
func ConstantWithMetamorphicTestRange(name string, defaultValue, min, max int) int {
if metamorphicBuild {
if rng.Float64() < metamorphicValueProbability {
rng.Lock()
defer rng.Unlock()
if rng.r.Float64() < metamorphicValueProbability {
ret := min
if max > min {
ret = int(rng.Int31())%(max-min) + min
ret = int(rng.r.Int31())%(max-min) + min
}
logMetamorphicValue(name, ret)
return ret
Expand All @@ -118,7 +126,9 @@ func ConstantWithMetamorphicTestRange(name string, defaultValue, min, max int) i
// The given name is used for logging.
func ConstantWithMetamorphicTestBool(name string, defaultValue bool) bool {
if metamorphicBuild {
if rng.Float64() < metamorphicBoolProbability {
rng.Lock()
defer rng.Unlock()
if rng.r.Float64() < metamorphicBoolProbability {
ret := !defaultValue
logMetamorphicValue(name, ret)
return ret
Expand Down

0 comments on commit 212ddab

Please sign in to comment.