Skip to content

Commit

Permalink
backupccl: ignore ProtectionPolicy for exclude_data_from_backup
Browse files Browse the repository at this point in the history
This is the last of the changes needed to achieve #73536.
It teaches the helper used to read PTS records that apply to a
replica, to ignore ProtectionPolicies that were written by a backup
if the replica has been marked as `exclude_data_from_backup`.

From a users point of view, this allows them to mark a table whose
row data will be excluded from backup, and to set that tables gc.ttl
to a very low value. Backups that write PTS records will no longer
holdup GC on such low GC TTL tables.

Fixes: #73536

Release note: None

Release justification: low risk update to new functionality
  • Loading branch information
adityamaru committed Mar 8, 2022
1 parent 36f3550 commit 5fb8960
Show file tree
Hide file tree
Showing 10 changed files with 207 additions and 51 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,10 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
MaxRetries: 5,
}

if err := p.ExecCfg().JobRegistry.CheckPausepoint("backup.before_flow"); err != nil {
return err
}

// We want to retry a backup if there are transient failures (i.e. worker nodes
// dying), so if we receive a retryable error, re-plan and retry the backup.
var res roachpb.RowCount
Expand Down
28 changes: 10 additions & 18 deletions pkg/ccl/backupccl/backup_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package backupccl_test
package backupccl

import (
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
_ "github.com/cockroachdb/cockroach/pkg/sql/importer"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
Expand All @@ -41,6 +44,7 @@ func TestBackupTenantImportingTable(t *testing.T) {
TestingKnobs: base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()},
})
defer tSQL.Close()
runner := sqlutils.MakeSQLRunner(tSQL)

if _, err := tSQL.Exec("SET CLUSTER SETTING jobs.debug.pausepoints = 'import.after_ingest';"); err != nil {
t.Fatal(err)
Expand All @@ -54,23 +58,11 @@ func TestBackupTenantImportingTable(t *testing.T) {
if _, err := tSQL.Exec("IMPORT INTO x CSV DATA ('workload:///csv/bank/bank?rows=100&version=1.0.0')"); !testutils.IsError(err, "pause") {
t.Fatal(err)
}
var jobID int
if err := tSQL.QueryRow(`SELECT job_id FROM [show jobs] WHERE job_type = 'IMPORT'`).Scan(&jobID); err != nil {
t.Fatal(err)
}
tc.Servers[0].JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue()
// wait for it to pause

testutils.SucceedsSoon(t, func() error {
var status string
if err := tSQL.QueryRow(`SELECT status FROM [show jobs] WHERE job_id = $1`, jobID).Scan(&status); err != nil {
t.Fatal(err)
}
if status == string(jobs.StatusPaused) {
return nil
}
return errors.Newf("%s", status)
})
var jobID jobspb.JobID
err := tSQL.QueryRow(fmt.Sprintf(
`SELECT job_id FROM [show jobs] WHERE job_type = 'IMPORT'`)).Scan(&jobID)
require.NoError(t, err)
jobutils.WaitForJobToPause(t, runner, jobID)

// tenant now has a fully ingested, paused import, so back them up.
const dst = "userfile:///t"
Expand Down
150 changes: 127 additions & 23 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -1790,7 +1791,7 @@ func createAndWaitForJob(
t, `INSERT INTO system.jobs (created, status, payload, progress) VALUES ($1, $2, $3, $4) RETURNING id`,
timeutil.FromUnixMicros(now), jobs.StatusRunning, payload, progressBytes,
).Scan(&jobID)
jobutils.WaitForJob(t, db, jobID)
jobutils.WaitForJobToSucceed(t, db, jobID)
}

// TestBackupRestoreResume tests whether backup and restore jobs are properly
Expand Down Expand Up @@ -2015,7 +2016,7 @@ func TestBackupRestoreControlJob(t *testing.T) {
t.Fatalf("%d: expected 'job paused' error, but got %+v", i, err)
}
sqlDB.Exec(t, fmt.Sprintf(`RESUME JOB %d`, jobID))
jobutils.WaitForJob(t, sqlDB, jobID)
jobutils.WaitForJobToSucceed(t, sqlDB, jobID)
}

sqlDB.CheckQueryResults(t,
Expand Down Expand Up @@ -2051,7 +2052,7 @@ func TestBackupRestoreControlJob(t *testing.T) {
sqlDB.CheckQueryResults(t, fmt.Sprintf("SHOW BACKUP '%s'", noOfflineDir), [][]string{})
}
sqlDB.Exec(t, fmt.Sprintf(`RESUME JOB %d`, jobID))
jobutils.WaitForJob(t, sqlDB, jobID)
jobutils.WaitForJobToSucceed(t, sqlDB, jobID)
}
sqlDB.CheckQueryResults(t,
`SELECT count(*) FROM pause.bank`,
Expand All @@ -2071,7 +2072,7 @@ func TestBackupRestoreControlJob(t *testing.T) {
if err != nil {
t.Fatalf("error while running backup %+v", err)
}
jobutils.WaitForJob(t, sqlDB, backupJobID)
jobutils.WaitForJobToSucceed(t, sqlDB, backupJobID)

sqlDB.Exec(t, `DROP DATABASE data`)

Expand Down Expand Up @@ -9051,7 +9052,7 @@ func TestBackupWorkerFailure(t *testing.T) {
}

// But the job should be restarted and succeed eventually.
jobutils.WaitForJob(t, sqlDB, jobID)
jobutils.WaitForJobToSucceed(t, sqlDB, jobID)

// Drop database and restore to ensure that the backup was successful.
sqlDB.Exec(t, `DROP DATABASE data`)
Expand Down Expand Up @@ -9400,7 +9401,7 @@ DROP INDEX foo@bar;
close(allowGC)

// Wait for the GC to complete.
jobutils.WaitForJob(t, sqlRunner, gcJobID)
jobutils.WaitForJobToSucceed(t, sqlRunner, gcJobID)
waitForTableSplit(t, conn, "foo", "test")

// This backup should succeed since the spans being backed up have a default
Expand Down Expand Up @@ -9642,7 +9643,7 @@ func TestExportRequestBelowGCThresholdOnDataExcludedFromBackup(t *testing.T) {
}
args.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
args.ServerArgs.ExternalIODir = localExternalDir
tc := testcluster.StartTestCluster(t, 3, args)
tc := testcluster.StartTestCluster(t, 1, args)
defer tc.Stopper().Stop(ctx)

tc.WaitForNodeLiveness(t)
Expand Down Expand Up @@ -9684,21 +9685,6 @@ func TestExportRequestBelowGCThresholdOnDataExcludedFromBackup(t *testing.T) {
require.NoError(t, err)
}
}
const processedPattern = `(?s)shouldQueue=true.*processing replica.*GC score after GC`
processedRegexp := regexp.MustCompile(processedPattern)

gcSoon := func() {
testutils.SucceedsSoon(t, func() error {
upsertUntilBackpressure()
s, repl := getStoreAndReplica(t, tc, conn, "foo", "defaultdb")
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, false)
require.NoError(t, err)
if !processedRegexp.MatchString(trace.String()) {
return errors.Errorf("%q does not match %q", trace.String(), processedRegexp)
}
return nil
})
}

waitForTableSplit(t, conn, "foo", "defaultdb")
waitForReplicaFieldToBeSet(t, tc, conn, "foo", "defaultdb", func(r *kvserver.Replica) (bool, error) {
Expand All @@ -9710,7 +9696,15 @@ func TestExportRequestBelowGCThresholdOnDataExcludedFromBackup(t *testing.T) {

var tsBefore string
require.NoError(t, conn.QueryRow("SELECT cluster_logical_timestamp()").Scan(&tsBefore))
gcSoon()
upsertUntilBackpressure()
runGCAndCheckTrace(ctx, t, tc, conn, false /* skipShouldQueue */, "foo", "defaultdb", func(traceStr string) error {
const processedPattern = `(?s)shouldQueue=true.*processing replica.*GC score after GC`
processedRegexp := regexp.MustCompile(processedPattern)
if !processedRegexp.MatchString(traceStr) {
return errors.Errorf("%q does not match %q", traceStr, processedRegexp)
}
return nil
})

_, err = conn.Exec(fmt.Sprintf("BACKUP TABLE foo TO $1 AS OF SYSTEM TIME '%s'", tsBefore), localFoo)
testutils.IsError(err, "must be after replica GC threshold")
Expand All @@ -9728,6 +9722,116 @@ func TestExportRequestBelowGCThresholdOnDataExcludedFromBackup(t *testing.T) {
require.NoError(t, err)
}

// TestExcludeDataFromBackupDoesNotHoldupGC tests that a table marked as
// `exclude_data_from_backup` and with a protected timestamp record covering it
// does not holdup GC, since its data is not going to be backed up.
func TestExcludeDataFromBackupDoesNotHoldupGC(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dir, dirCleanupFn := testutils.TempDir(t)
defer dirCleanupFn()
params := base.TestClusterArgs{}
params.ServerArgs.ExternalIODir = dir
params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
DisableGCQueue: true,
DisableLastProcessedCheck: true,
}
params.ServerArgs.Knobs.ProtectedTS = &protectedts.TestingKnobs{
EnableProtectedTimestampForMultiTenant: true}
params.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
tc := testcluster.StartTestCluster(t, 1, params)
defer tc.Stopper().Stop(ctx)

tc.WaitForNodeLiveness(t)
require.NoError(t, tc.WaitForFullReplication())

conn := tc.ServerConn(0)
runner := sqlutils.MakeSQLRunner(conn)
runner.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
// speeds up the test
runner.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)
runner.Exec(t, `SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms'`)

runner.Exec(t, `CREATE DATABASE test;`)
runner.Exec(t, `CREATE TABLE test.foo (k INT PRIMARY KEY, v BYTES)`)

// Exclude the table from backup so that it does not hold up GC.
runner.Exec(t, `ALTER TABLE test.foo SET (exclude_data_from_backup = true)`)

const tableRangeMaxBytes = 1 << 18
runner.Exec(t, "ALTER TABLE test.foo CONFIGURE ZONE USING "+
"gc.ttlseconds = 1, range_max_bytes = $1, range_min_bytes = 1<<10;", tableRangeMaxBytes)

rRand, _ := randutil.NewTestRand()
upsertUntilBackpressure := func() {
for {
_, err := conn.Exec("UPSERT INTO test.foo VALUES (1, $1)",
randutil.RandBytes(rRand, 1<<15))
if testutils.IsError(err, "backpressure") {
break
}
require.NoError(t, err)
}
}

// Wait for the span config fields to apply.
waitForTableSplit(t, conn, "foo", "test")
waitForReplicaFieldToBeSet(t, tc, conn, "foo", "test", func(r *kvserver.Replica) (bool, error) {
if !r.ExcludeDataFromBackup() {
return false, errors.New("waiting for exclude_data_from_backup to be applied")
}
conf := r.SpanConfig()
if conf.TTL() != 1*time.Second {
return false, errors.New("waiting for gc.ttlseconds to be applied")
}
if r.GetMaxBytes() != tableRangeMaxBytes {
return false, errors.New("waiting for range_max_bytes to be applied")
}
return true, nil
})

runner.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'backup.before_flow'`)
if _, err := conn.Exec(`BACKUP DATABASE test INTO $1`, localFoo); !testutils.IsError(err, "pause") {
t.Fatal(err)
}
// We pause the backup resumer before it plans its flow so this timestamp
// should be very close to the timestamp protected by the record written by
// the backup.
afterBackup := tc.Server(0).Clock().Now()
var jobID jobspb.JobID
err := conn.QueryRow(fmt.Sprintf(
`SELECT job_id FROM [show jobs] WHERE job_type = 'BACKUP'`)).Scan(&jobID)
require.NoError(t, err)
jobutils.WaitForJobToPause(t, runner, jobID)

// Ensure that the replica sees the ProtectionPolicies.
waitForReplicaFieldToBeSet(t, tc, conn, "foo", "test", func(r *kvserver.Replica) (bool, error) {
if len(r.SpanConfig().GCPolicy.ProtectionPolicies) == 0 {
return false, errors.New("no protection policy applied to replica")
}
return true, nil
})

// Now that the backup has written a PTS record protecting the database, we
// check that the replica corresponding to `test.foo` continue to GC data
// since it has been marked as `exclude_data_from_backup`.
upsertUntilBackpressure()
runGCAndCheckTrace(ctx, t, tc, conn, false /* skipShouldQueue */, "foo", "test", func(traceStr string) error {
const processedPattern = `(?s)shouldQueue=true.*processing replica.*GC score after GC`
processedRegexp := regexp.MustCompile(processedPattern)
if !processedRegexp.MatchString(traceStr) {
return errors.Errorf("%q does not match %q", traceStr, processedRegexp)
}
thresh := thresholdFromTrace(t, traceStr)
require.Truef(t, afterBackup.Less(thresh), "%v >= %v", afterBackup, thresh)
return nil
})
}

// TestBackupRestoreSystemUsers tests RESTORE SYSTEM USERS feature which allows user to
// restore users from a backup into current cluster and regrant roles.
func TestBackupRestoreSystemUsers(t *testing.T) {
Expand Down
43 changes: 43 additions & 0 deletions pkg/ccl/backupccl/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"path/filepath"
"reflect"
"regexp"
"strings"
"testing"

Expand All @@ -33,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/workload/bank"
"github.com/cockroachdb/cockroach/pkg/workload/workloadsql"
Expand Down Expand Up @@ -485,3 +487,44 @@ func waitForReplicaFieldToBeSet(
return nil
})
}

func thresholdFromTrace(t *testing.T, traceString string) hlc.Timestamp {
t.Helper()
thresholdRE := regexp.MustCompile(`(?s).*Threshold:(?P<threshold>[^\s]*)`)
threshStr := string(thresholdRE.ExpandString(nil, "$threshold",
traceString, thresholdRE.FindStringSubmatchIndex(traceString)))
thresh, err := hlc.ParseTimestamp(threshStr)
require.NoError(t, err)
return thresh
}

// runGCAndCheckTrace manually enqueues the replica corresponding to
// `databaseName.tableName`, and runs `checkGCTrace` until it succeeds.
func runGCAndCheckTrace(
ctx context.Context,
t *testing.T,
tc *testcluster.TestCluster,
conn *gosql.DB,
skipShouldQueue bool,
tableName, databaseName string,
checkGCTrace func(traceStr string) error,
) {
t.Helper()
var startKey roachpb.Key
err := conn.QueryRow("SELECT start_key"+
" FROM crdb_internal.ranges_no_leases"+
" WHERE table_name = $1"+
" AND database_name = $2"+
" ORDER BY start_key ASC", tableName, databaseName).Scan(&startKey)
require.NoError(t, err)
r := tc.LookupRangeOrFatal(t, startKey)
l, _, err := tc.FindRangeLease(r, nil)
require.NoError(t, err)
lhServer := tc.Server(int(l.Replica.NodeID) - 1)
s, repl := getFirstStoreReplica(t, lhServer, startKey)
testutils.SucceedsSoon(t, func() error {
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, skipShouldQueue)
require.NoError(t, err)
return checkGCTrace(trace.String())
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ INSERT INTO d.t2 VALUES (2);
`SELECT crdb_internal.complete_stream_ingestion_job($1, $2)`,
ingestionJobID, cutoverTime)

jobutils.WaitForJob(t, destSQL, jobspb.JobID(ingestionJobID))
jobutils.WaitForJobToSucceed(t, destSQL, jobspb.JobID(ingestionJobID))

query := "SELECT * FROM d.t1"
sourceData := sourceSQL.QueryStr(t, query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestPartitionedTenantStreamingEndToEnd(t *testing.T) {

// Cut over the ingestion job and the job will stop eventually.
destSysSQL.Exec(t, `SELECT crdb_internal.complete_stream_ingestion_job($1, $2)`, ingestionJobID, cutoverTime)
jobutils.WaitForJob(t, destSysSQL, jobspb.JobID(ingestionJobID))
jobutils.WaitForJobToSucceed(t, destSysSQL, jobspb.JobID(ingestionJobID))
// TODO(casper): Make producer job exit normally in the cutover scenario.
sourceSysSQL.CheckQueryResultsRetry(t,
fmt.Sprintf("SELECT status, error FROM [SHOW JOBS] WHERE job_id = %d", streamProducerJobID),
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ func (r *Replica) ExcludeDataFromBackup() bool {
return r.mu.conf.ExcludeDataFromBackup
}

func (r *Replica) exludeReplicaFromBackupRLocked() bool {
func (r *Replica) excludeReplicaFromBackupRLocked() bool {
return r.mu.conf.ExcludeDataFromBackup
}

Expand Down Expand Up @@ -1540,7 +1540,7 @@ func (r *Replica) checkTSAboveGCThresholdRLocked(
return &roachpb.BatchTimestampBeforeGCError{
Timestamp: ts,
Threshold: threshold,
DataExcludedFromBackup: r.exludeReplicaFromBackupRLocked(),
DataExcludedFromBackup: r.excludeReplicaFromBackupRLocked(),
}
}

Expand Down
Loading

0 comments on commit 5fb8960

Please sign in to comment.