Skip to content

Commit

Permalink
backupccl,spanconfig,kvserver: ExportRequest noops on ranges excluded…
Browse files Browse the repository at this point in the history
… from backup

This change is the first of two changes that gets us to the goal of backup
ignoring certain table row data, and not holding up GC on these ranges.

This change does a few things:

- It sets up the transport of the exclude_data_from_backup bit set on a
table descriptor, to the span configuration applied in KV.

- It teaches ExportRequest on a range marked as excluded to return
an empty ExportResponse. In this way, a backup processor will receive no row
data to backup up for an ephemeral table.

- A follow up change will also teach the SQLTranslator
to not populate the protected timestamp field on the SpanConfig for such
tables. This way, a long running backup will not hold up GC on such high-churn
tables. With no protection on such ranges, it is possible that an
ExportRequest targetting the range has a StartTime
below the range's GCThreshold. To avoid the returned BatchTimestampBeforeGCError
from failing the backup we decorate the the error with information about the
range being excluded from backup and handle the error in the backup processor.

Informs: #73536

Release note (sql change): BACKUP of a table marked with `exclude_data_from_backup`
via `ALTER TABLE ... SET (exclude_data_from_backup = true)` will no longer backup
that table's row data. The backup will continue to backup the table's descriptor
and related metadata, and so on restore we will end up with an empty version of
the backed up table.
  • Loading branch information
adityamaru committed Feb 9, 2022
1 parent 90629ae commit 52472d2
Show file tree
Hide file tree
Showing 15 changed files with 814 additions and 438 deletions.
8 changes: 6 additions & 2 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ go_library(
"//pkg/sql/catalog/descidgen",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/catalog/multiregion",
"//pkg/sql/catalog/resolver",
"//pkg/sql/catalog/schemadesc",
Expand Down Expand Up @@ -103,6 +104,9 @@ go_library(
"//pkg/sql/types",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/contextutil",
Expand All @@ -123,6 +127,8 @@ go_library(
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"//pkg/workload/bank",
"//pkg/workload/workloadsql",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_gogo_protobuf//jsonpb",
Expand All @@ -147,7 +153,6 @@ go_test(
"bench_test.go",
"create_scheduled_backup_test.go",
"full_cluster_backup_restore_test.go",
"helpers_test.go",
"insert_missing_public_schema_namespace_entry_restore_test.go",
"key_rewriter_test.go",
"main_test.go",
Expand Down Expand Up @@ -245,7 +250,6 @@ go_test(
"//pkg/util/timeutil",
"//pkg/util/uuid",
"//pkg/workload/bank",
"//pkg/workload/workloadsql",
"@com_github_aws_aws_sdk_go//aws/credentials",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_datadriven//:datadriven",
Expand Down
11 changes: 11 additions & 0 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,17 @@ func runBackupProcessor(
if errors.HasType(exportRequestErr, (*contextutil.TimeoutError)(nil)) {
return errors.Wrap(exportRequestErr, "export request timeout")
}
// BatchTimestampBeforeGCError is returned if the ExportRequest
// attempts to read below the range's GC threshold.
if batchTimestampBeforeGCError, ok := pErr.GetDetail().(*roachpb.BatchTimestampBeforeGCError); ok {
// If the range we are exporting is marked to be excluded from
// backup, it is safe to ignore the error. It is likely that the
// table has been configured with a low GC TTL, and so the data
// the backup is targeting has already been gc'ed.
if batchTimestampBeforeGCError.DataExcludedFromBackup {
continue
}
}
return errors.Wrapf(exportRequestErr, "exporting %s", span.span)
}

Expand Down
198 changes: 164 additions & 34 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
Expand Down Expand Up @@ -6691,23 +6692,6 @@ func TestPublicIndexTableSpans(t *testing.T) {
}
}

func getFirstStoreReplica(
t *testing.T, s serverutils.TestServerInterface, key roachpb.Key,
) (*kvserver.Store, *kvserver.Replica) {
t.Helper()
store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID())
require.NoError(t, err)
var repl *kvserver.Replica
testutils.SucceedsSoon(t, func() error {
repl = store.LookupReplica(roachpb.RKey(key))
if repl == nil {
return errors.New(`could not find replica`)
}
return nil
})
return store, repl
}

// TestRestoreJobErrorPropagates ensures that errors from creating the job
// record propagate correctly.
func TestRestoreErrorPropagates(t *testing.T) {
Expand Down Expand Up @@ -9061,23 +9045,7 @@ DROP INDEX foo@bar;

// Wait for the GC to complete.
jobutils.WaitForJob(t, sqlRunner, gcJobID)

waitForTableSplit := func() {
testutils.SucceedsSoon(t, func() error {
count := 0
sqlRunner.QueryRow(t,
"SELECT count(*) "+
"FROM crdb_internal.ranges_no_leases "+
"WHERE table_name = $1 "+
"AND database_name = $2",
"foo", "test").Scan(&count)
if count == 0 {
return errors.New("waiting for table split")
}
return nil
})
}
waitForTableSplit()
waitForTableSplit(t, sqlRunner, "foo", "test")

// This backup should succeed since the spans being backed up have a default
// GC TTL of 25 hours.
Expand Down Expand Up @@ -9239,3 +9207,165 @@ func TestBackupRestoreSeparateIncrementalPrefix(t *testing.T) {
sqlDB.Exec(t, "DROP DATABASE inc_fkdb;")
}
}

func TestExcludeDataFromBackupAndRestore(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

tc, sqlDB, iodir, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, 10,
InitManualReplication, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speeds up test
SpanConfig: &spanconfig.TestingKnobs{
SQLWatcherCheckpointNoopsEveryDurationOverride: 100 * time.Millisecond,
},
},
},
})
defer cleanupFn()

_, restoreDB, cleanup := backupRestoreTestSetupEmpty(t, singleNode, iodir, InitManualReplication,
base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speeds up test
},
},
})
defer cleanup()

sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)
sqlRunner := sqlutils.MakeSQLRunner(tc.Conns[0])

sqlDB.Exec(t, `CREATE TABLE data.foo (id INT, INDEX bar(id))`)
sqlDB.Exec(t, `INSERT INTO data.foo select * from generate_series(1,10)`)

// Create another table.
sqlDB.Exec(t, `CREATE TABLE data.bar (id INT, INDEX bar(id))`)
sqlDB.Exec(t, `INSERT INTO data.bar select * from generate_series(1,10)`)

// Set foo to exclude_data_from_backup and back it up. The ExportRequest
// should be a noop and backup no data.
sqlDB.Exec(t, `ALTER TABLE data.foo SET (exclude_data_from_backup = true)`)
waitForTableSplit(t, sqlRunner, "foo", "data")
waitForTableSplit(t, sqlRunner, "bar", "data")
waitForReplicaFieldToBeSet(t, tc, sqlRunner, "foo", "data", func(r *kvserver.Replica) (bool, error) {
if !r.ExcludeDataFromBackup() {
return false, errors.New("waiting for exclude_data_from_backup to be applied")
}
return true, nil
})
sqlDB.Exec(t, `BACKUP DATABASE data TO $1`, LocalFoo)

restoreDB.Exec(t, `RESTORE DATABASE data FROM $1`, LocalFoo)
require.Len(t, restoreDB.QueryStr(t, `SELECT * FROM data.foo`), 0)
require.Len(t, restoreDB.QueryStr(t, `SELECT * FROM data.bar`), 10)
}

// TestExportRequestBelowGCThresholdOnDataExcludedFromBackup tests that a
// `BatchTimestampBeforeGCError` on an ExportRequest targeting a table that has
// been marked as excluded from backup, does not cause the backup to fail.
func TestExportRequestBelowGCThresholdOnDataExcludedFromBackup(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderStressRace(t, "test is too slow to run under race")

ctx := context.Background()
localExternalDir, cleanup := testutils.TempDir(t)
defer cleanup()
args := base.TestClusterArgs{}
args.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
DisableGCQueue: true,
DisableLastProcessedCheck: true,
}
args.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
args.ServerArgs.ExternalIODir = localExternalDir
tc := testcluster.StartTestCluster(t, 3, args)
defer tc.Stopper().Stop(ctx)

tc.WaitForNodeLiveness(t)
require.NoError(t, tc.WaitForFullReplication())

for _, server := range tc.Servers {
registry := server.JobRegistry().(*jobs.Registry)
registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeBackup: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*backupResumer)
r.testingKnobs.ignoreProtectedTimestamps = true
return r
},
}
}
conn := tc.ServerConn(0)
sqlRunner := sqlutils.MakeSQLRunner(tc.Conns[0])
_, err := conn.Exec("CREATE TABLE foo (k INT PRIMARY KEY, v BYTES)")
require.NoError(t, err)

_, err = conn.Exec("SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms';")
require.NoError(t, err)

_, err = conn.Exec("SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") // speeds up the test
require.NoError(t, err)

const tableRangeMaxBytes = 1 << 18
_, err = conn.Exec("ALTER TABLE foo CONFIGURE ZONE USING "+
"gc.ttlseconds = 1, range_max_bytes = $1, range_min_bytes = 1<<10;", tableRangeMaxBytes)
require.NoError(t, err)

rRand, _ := randutil.NewTestRand()
upsertUntilBackpressure := func() {
for {
_, err := conn.Exec("UPSERT INTO foo VALUES (1, $1)",
randutil.RandBytes(rRand, 1<<15))
if testutils.IsError(err, "backpressure") {
break
}
require.NoError(t, err)
}
}
const processedPattern = `(?s)shouldQueue=true.*processing replica.*GC score after GC`
processedRegexp := regexp.MustCompile(processedPattern)

gcSoon := func() {
testutils.SucceedsSoon(t, func() error {
upsertUntilBackpressure()
s, repl := getStoreAndReplica(t, tc, sqlRunner, "foo", "defaultdb")
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, false)
require.NoError(t, err)
if !processedRegexp.MatchString(trace.String()) {
return errors.Errorf("%q does not match %q", trace.String(), processedRegexp)
}
return nil
})
}

waitForTableSplit(t, sqlRunner, "foo", "defaultdb")
waitForReplicaFieldToBeSet(t, tc, sqlRunner, "foo", "defaultdb", func(r *kvserver.Replica) (bool, error) {
if r.GetMaxBytes() != tableRangeMaxBytes {
return false, errors.New("waiting for range_max_bytes to be applied")
}
return true, nil
})

var tsBefore string
require.NoError(t, conn.QueryRow("SELECT cluster_logical_timestamp()").Scan(&tsBefore))
gcSoon()

_, err = conn.Exec(fmt.Sprintf("BACKUP TABLE foo TO $1 AS OF SYSTEM TIME '%s'", tsBefore), LocalFoo)
testutils.IsError(err, "must be after replica GC threshold")

_, err = conn.Exec(`ALTER TABLE foo SET (exclude_data_from_backup = true)`)
require.NoError(t, err)
waitForReplicaFieldToBeSet(t, tc, sqlRunner, "foo", "defaultdb", func(r *kvserver.Replica) (bool, error) {
if !r.ExcludeDataFromBackup() {
return false, errors.New("waiting for exclude_data_from_backup to be applied")
}
return true, nil
})

_, err = conn.Exec(fmt.Sprintf("BACKUP TABLE foo TO $1 AS OF SYSTEM TIME '%s'", tsBefore), LocalFoo)
require.NoError(t, err)
}
Loading

0 comments on commit 52472d2

Please sign in to comment.