diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 2289fdf8c5f9..3221ca58d747 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -142,6 +142,7 @@ go_test( "backup_destination_test.go", "backup_intents_test.go", "backup_rand_test.go", + "backup_tenant_test.go", "backup_test.go", "bench_covering_test.go", "bench_test.go", @@ -167,6 +168,7 @@ go_test( deps = [ "//pkg/base", "//pkg/blobs", + "//pkg/ccl/importccl", "//pkg/ccl/kvccl", "//pkg/ccl/multiregionccl", "//pkg/ccl/multitenantccl", diff --git a/pkg/ccl/backupccl/backup_tenant_test.go b/pkg/ccl/backupccl/backup_tenant_test.go new file mode 100644 index 000000000000..9ee4aeafb85a --- /dev/null +++ b/pkg/ccl/backupccl/backup_tenant_test.go @@ -0,0 +1,121 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupccl_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + _ "github.com/cockroachdb/cockroach/pkg/ccl/importccl" + _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func TestBackupTenantImportingTable(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0]) + + tSrv, tSQL := serverutils.StartTenant(t, tc.Server(0), base.TestTenantArgs{ + TenantID: roachpb.MakeTenantID(10), + TestingKnobs: base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()}, + }) + defer tSQL.Close() + + if _, err := tSQL.Exec("SET CLUSTER SETTING jobs.debug.pausepoints = 'import.after_ingest';"); err != nil { + t.Fatal(err) + } + if _, err := tSQL.Exec("CREATE TABLE x (id INT PRIMARY KEY, n INT, s STRING)"); err != nil { + t.Fatal(err) + } + if _, err := tSQL.Exec("INSERT INTO x VALUES (1000, 1, 'existing')"); err != nil { + t.Fatal(err) + } + if _, err := tSQL.Exec("IMPORT INTO x CSV DATA ('workload:///csv/bank/bank?rows=100&version=1.0.0')"); !testutils.IsError(err, "pause") { + t.Fatal(err) + } + var jobID int + if err := tSQL.QueryRow(`SELECT job_id FROM [show jobs] WHERE job_type = 'IMPORT'`).Scan(&jobID); err != nil { + t.Fatal(err) + } + tc.Servers[0].JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue() + // wait for it to pause + + testutils.SucceedsSoon(t, func() error { + var status string + if err := tSQL.QueryRow(`SELECT status FROM [show jobs] WHERE job_id = $1`, jobID).Scan(&status); err != nil { + t.Fatal(err) + } + if status == string(jobs.StatusPaused) { + return nil + } + return errors.Newf("%s", status) + }) + + // tenant now has a fully ingested, paused import, so back them up. + const dst = "userfile:///t" + if _, err := sqlDB.DB.ExecContext(ctx, `BACKUP TENANT 10 TO $1`, dst); err != nil { + t.Fatal(err) + } + // Destroy the tenant, then restore it. + tSrv.Stopper().Stop(ctx) + if _, err := sqlDB.DB.ExecContext(ctx, "SELECT crdb_internal.destroy_tenant(10, true)"); err != nil { + t.Fatal(err) + } + if _, err := sqlDB.DB.ExecContext(ctx, "RESTORE TENANT 10 FROM $1", dst); err != nil { + t.Fatal(err) + } + tSrv, tSQL = serverutils.StartTenant(t, tc.Server(0), base.TestTenantArgs{ + TenantID: roachpb.MakeTenantID(10), + Existing: true, + TestingKnobs: base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()}, + }) + defer tSQL.Close() + + if _, err := tSQL.Exec(`UPDATE system.jobs SET claim_session_id = NULL, claim_instance_id = NULL WHERE id = $1`, jobID); err != nil { + t.Fatal(err) + } + if _, err := tSQL.Exec(`DELETE FROM system.lease`); err != nil { + t.Fatal(err) + } + if _, err := tSQL.Exec(`CANCEL JOB $1`, jobID); err != nil { + t.Fatal(err) + } + tSrv.JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue() + testutils.SucceedsSoon(t, func() error { + var status string + if err := tSQL.QueryRow(`SELECT status FROM [show jobs] WHERE job_id = $1`, jobID).Scan(&status); err != nil { + t.Fatal(err) + } + if status == string(jobs.StatusCanceled) { + return nil + } + return errors.Newf("%s", status) + }) + + var rowCount int + if err := tSQL.QueryRow(`SELECT count(*) FROM x`).Scan(&rowCount); err != nil { + t.Fatal(err) + } + require.Equal(t, 1, rowCount) +} diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 591f21f9161c..87e516c433b7 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -9,12 +9,14 @@ package backupccl import ( + "bytes" "context" "fmt" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/bulk" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -105,7 +107,7 @@ var restoreAtNow = settings.RegisterBoolSetting( settings.TenantWritable, "bulkio.restore_at_current_time.enabled", "write restored data at the current timestamp", - false, + true, ) func newRestoreDataProcessor( @@ -387,6 +389,16 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( ) } + // If the system tenant is restoring a guest tenant span, we don't want to + // forward all the restored data to now, as there may be importing tables in + // that span, that depend on the difference in timestamps on restored existing + // vs importing keys to rollback. + if writeAtBatchTS && kr.fromSystemTenant && + (bytes.HasPrefix(entry.Span.Key, keys.TenantPrefix) || bytes.HasPrefix(entry.Span.EndKey, keys.TenantPrefix)) { + log.Warningf(ctx, "restoring span %s at its original timestamps because it is a tenant span", entry.Span) + writeAtBatchTS = false + } + // "disallowing" shadowing of anything older than logical=1 is i.e. allow all // shadowing. We must allow shadowing in case the RESTORE has to retry any // ingestions, but setting a (permissive) disallow like this serves to force diff --git a/pkg/ccl/backupccl/restore_data_processor_test.go b/pkg/ccl/backupccl/restore_data_processor_test.go index 3c3fc7f2bbbf..dde017820eb3 100644 --- a/pkg/ccl/backupccl/restore_data_processor_test.go +++ b/pkg/ccl/backupccl/restore_data_processor_test.go @@ -402,6 +402,11 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { t.Fatalf("%+v", err) } kvs := clientKVsToEngineKVs(clientKVs) + for i := range kvs { + if i < len(expectedKVs) { + expectedKVs[i].Key.Timestamp = kvs[i].Key.Timestamp + } + } if !reflect.DeepEqual(kvs, expectedKVs) { for i := 0; i < len(kvs) || i < len(expectedKVs); i++ { diff --git a/pkg/ccl/importccl/import_job.go b/pkg/ccl/importccl/import_job.go index 32ae5cc66f53..22a3583c708c 100644 --- a/pkg/ccl/importccl/import_job.go +++ b/pkg/ccl/importccl/import_job.go @@ -283,6 +283,9 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error { return err } } + if err := p.ExecCfg().JobRegistry.CheckPausepoint("import.after_ingest"); err != nil { + return err + } // If the table being imported into referenced UDTs, ensure that a concurrent // schema change on any of the typeDescs has not modified the type descriptor. If diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 5d9b7af44f2b..ffbfd03c8ba0 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -3417,7 +3417,11 @@ func TestPausepoints(t *testing.T) { return registry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, rec) })) require.NoError(t, sj.Start(ctx)) - require.NoError(t, sj.AwaitCompletion(ctx)) + if tc.expected == jobs.StatusSucceeded { + require.NoError(t, sj.AwaitCompletion(ctx)) + } else { + require.Error(t, sj.AwaitCompletion(ctx)) + } status, err := sj.TestingCurrentStatus(ctx, nil) // Map pause-requested to paused to avoid races. if status == jobs.StatusPauseRequested { diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index ab1033b914be..c002a4361a66 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -1141,7 +1141,10 @@ func (r *Registry) stepThroughStateMachine( } if errors.Is(err, errPauseSelfSentinel) { - return r.PauseRequested(ctx, nil, job.ID(), err.Error()) + if err := r.PauseRequested(ctx, nil, job.ID(), err.Error()); err != nil { + return err + } + return errPauseSelfSentinel } // TODO(spaskob): enforce a limit on retries. diff --git a/pkg/util/constants.go b/pkg/util/constants.go index 6150a1925e94..f82dca2a9172 100644 --- a/pkg/util/constants.go +++ b/pkg/util/constants.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) // IsMetamorphicBuild returns whether this build is metamorphic. By build being @@ -67,7 +68,9 @@ const ( // The given name is used for logging. func ConstantWithMetamorphicTestValue(name string, defaultValue, metamorphicValue int) int { if metamorphicBuild { - if rng.Float64() < metamorphicValueProbability { + rng.Lock() + defer rng.Unlock() + if rng.r.Float64() < metamorphicValueProbability { logMetamorphicValue(name, metamorphicValue) return metamorphicValue } @@ -76,7 +79,10 @@ func ConstantWithMetamorphicTestValue(name string, defaultValue, metamorphicValu } // rng is initialized to a rand.Rand if crdbTestBuild is enabled. -var rng *rand.Rand +var rng struct { + r *rand.Rand + syncutil.Mutex +} // DisableMetamorphicEnvVar can be used to disable metamorhpic tests for // sub-processes. If it exists and is set to something truthy as defined by @@ -87,8 +93,8 @@ func init() { if buildutil.CrdbTestBuild { disabled := envutil.EnvOrDefaultBool(DisableMetamorphicEnvVar, false) if !disabled { - rng, _ = randutil.NewTestRand() - metamorphicBuild = rng.Float64() < metamorphicBuildProbability + rng.r, _ = randutil.NewTestRand() + metamorphicBuild = rng.r.Float64() < metamorphicBuildProbability } } } @@ -100,10 +106,12 @@ func init() { // The given name is used for logging. func ConstantWithMetamorphicTestRange(name string, defaultValue, min, max int) int { if metamorphicBuild { - if rng.Float64() < metamorphicValueProbability { + rng.Lock() + defer rng.Unlock() + if rng.r.Float64() < metamorphicValueProbability { ret := min if max > min { - ret = int(rng.Int31())%(max-min) + min + ret = int(rng.r.Int31())%(max-min) + min } logMetamorphicValue(name, ret) return ret @@ -118,7 +126,9 @@ func ConstantWithMetamorphicTestRange(name string, defaultValue, min, max int) i // The given name is used for logging. func ConstantWithMetamorphicTestBool(name string, defaultValue bool) bool { if metamorphicBuild { - if rng.Float64() < metamorphicBoolProbability { + rng.Lock() + defer rng.Unlock() + if rng.r.Float64() < metamorphicBoolProbability { ret := !defaultValue logMetamorphicValue(name, ret) return ret