Skip to content

Commit

Permalink
backupccl,spanconfig,kvserver: ExportRequest noops on ranges excluded…
Browse files Browse the repository at this point in the history
… from backup

This change is the first of two changes that gets us to the goal of backup
ignoring certain table row data, and not holding up GC on these ranges.

This change does a few things:

- It sets up the transport of the exclude_data_from_backup bit set on a
table descriptor, to the span configuration applied in KV.

- It teaches ExportRequest on a range marked as excluded to return
an empty ExportResponse. In this way, a backup processor will receive no row
data to backup up for an ephemeral table.

- A follow up change will also teach the SQLTranslator
to not populate the protected timestamp field on the SpanConfig for such
tables. This way, a long running backup will not hold up GC on such high-churn
tables. With no protection on such ranges, it is possible that an
ExportRequest targetting the range has a StartTime
below the range's GCThreshold. To avoid the returned BatchTimestampBeforeGCError
from failing the backup we decorate the the error with information about the
range being excluded from backup and handle the error in the backup processor.

Informs: #73536

Release note (sql change): BACKUP of a table marked with `exclude_data_from_backup`
via `ALTER TABLE ... SET (exclude_data_from_backup = true)` will no longer backup
that table's row data. The backup will continue to backup the table's descriptor
and related metadata, and so on restore we will end up with an empty version of
the backed up table.
  • Loading branch information
adityamaru committed Feb 3, 2022
1 parent 7bd7ec0 commit 1fd9303
Show file tree
Hide file tree
Showing 12 changed files with 438 additions and 3 deletions.
11 changes: 11 additions & 0 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,17 @@ func runBackupProcessor(
if errors.HasType(exportRequestErr, (*contextutil.TimeoutError)(nil)) {
return errors.Wrap(exportRequestErr, "export request timeout")
}
// BatchTimestampBeforeGCError is returned if the ExportRequest
// attempts to read below the range's GC threshold.
if batchTimestampBeforeGCError, ok := pErr.GetDetail().(*roachpb.BatchTimestampBeforeGCError); ok {
// If the range we are exporting is marked to be excluded from
// backup, it is safe to ignore the error. It is likely that the
// table has been configured with a low GC TTL, and so the data
// the backup is targeting has already been gc'ed.
if batchTimestampBeforeGCError.ExcludeDataFromBackup {
continue
}
}
return errors.Wrapf(exportRequestErr, "exporting %s", span.span)
}

Expand Down
265 changes: 265 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
Expand Down Expand Up @@ -9214,3 +9215,267 @@ func TestBackupRestoreSeparateIncrementalPrefix(t *testing.T) {
sqlDB.Exec(t, "DROP DATABASE inc_fkdb;")
}
}

func TestExcludeDataFromBackupAndRestore(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

tc, sqlDB, iodir, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, 10,
InitManualReplication, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speeds up test
SpanConfig: &spanconfig.TestingKnobs{
SQLWatcherCheckpointNoopsEveryDurationOverride: 100 * time.Millisecond,
},
},
},
})
defer cleanupFn()

_, restoreDB, cleanup := backupRestoreTestSetupEmpty(t, singleNode, iodir, InitManualReplication,
base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speeds up test
},
},
})
defer cleanup()

sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)

conn := tc.ServerConn(0)
waitForTableSplit := func(tableName string) {
testutils.SucceedsSoon(t, func() error {
count := 0
if err := conn.QueryRow(
"SELECT count(*) "+
"FROM crdb_internal.ranges_no_leases "+
"WHERE table_name = $1 "+
"AND database_name = current_database()",
tableName).Scan(&count); err != nil {
return err
}
if count == 0 {
return errors.New("waiting for table split")
}
return nil
})
}

getTableStartKey := func() roachpb.Key {
row := conn.QueryRow(
"SELECT start_key "+
"FROM crdb_internal.ranges_no_leases "+
"WHERE table_name = $1 "+
"AND database_name = current_database() "+
"ORDER BY start_key ASC "+
"LIMIT 1",
"foo")

var startKey roachpb.Key
require.NoError(t, row.Scan(&startKey))
return startKey
}

getStoreAndReplica := func() (*kvserver.Store, *kvserver.Replica) {
startKey := getTableStartKey()
// Okay great now we have a key and can go find replicas and stores and what not.
r := tc.LookupRangeOrFatal(t, startKey)
l, _, err := tc.FindRangeLease(r, nil)
require.NoError(t, err)

lhServer := tc.Server(int(l.Replica.NodeID) - 1)
return getFirstStoreReplica(t, lhServer, startKey)
}

waitForExcludeDataFromBackup := func() {
testutils.SucceedsSoon(t, func() error {
_, r := getStoreAndReplica()
if !r.ExcludeReplicaFromBackup() {
return errors.New("waiting for exclude_data_from_backup to be applied")
}
return nil
})
}

sqlDB.Exec(t, `CREATE TABLE data.foo (id INT, INDEX bar(id))`)
sqlDB.Exec(t, `INSERT INTO data.foo select * from generate_series(1,10)`)

// Create another table.
sqlDB.Exec(t, `CREATE TABLE data.bar (id INT, INDEX bar(id))`)
sqlDB.Exec(t, `INSERT INTO data.bar select * from generate_series(1,10)`)

// Set foo to exclude_data_from_backup and back it up. The ExportRequest
// should be a noop and backup no data.
sqlDB.Exec(t, `ALTER TABLE data.foo SET (exclude_data_from_backup = true)`)
waitForTableSplit("foo")
waitForTableSplit("bar")
waitForExcludeDataFromBackup()
sqlDB.Exec(t, `BACKUP DATABASE data TO $1`, LocalFoo)

restoreDB.Exec(t, `RESTORE DATABASE data FROM $1`, LocalFoo)
require.Len(t, restoreDB.QueryStr(t, `SELECT * FROM data.foo`), 0)
require.Len(t, restoreDB.QueryStr(t, `SELECT * FROM data.bar`), 10)
}

// TestExportRequestBelowGCThresholdOnDataExcludedFromBackup tests that a
// `BatchTimestampBeforeGCError` on an ExportRequest targeting a table that has
// been marked as excluded from backup, does not cause the backup to fail.
func TestExportRequestBelowGCThresholdOnDataExcludedFromBackup(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderStressRace(t, "test is too slow to run under race")

ctx := context.Background()
localExternalDir, cleanup := testutils.TempDir(t)
defer cleanup()
args := base.TestClusterArgs{}
args.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
DisableGCQueue: true,
DisableLastProcessedCheck: true,
}
args.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
args.ServerArgs.ExternalIODir = localExternalDir
tc := testcluster.StartTestCluster(t, 3, args)
defer tc.Stopper().Stop(ctx)

tc.WaitForNodeLiveness(t)
require.NoError(t, tc.WaitForFullReplication())

for _, server := range tc.Servers {
registry := server.JobRegistry().(*jobs.Registry)
registry.TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeBackup: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*backupResumer)
r.testingKnobs.ignoreProtectedTimestamps = true
return r
},
}
}
conn := tc.ServerConn(0)
_, err := conn.Exec("CREATE TABLE foo (k INT PRIMARY KEY, v BYTES)")
require.NoError(t, err)

_, err = conn.Exec("SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms';")
require.NoError(t, err)

_, err = conn.Exec("SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") // speeds up the test
require.NoError(t, err)

const tableRangeMaxBytes = 1 << 18
_, err = conn.Exec("ALTER TABLE foo CONFIGURE ZONE USING "+
"gc.ttlseconds = 1, range_max_bytes = $1, range_min_bytes = 1<<10;", tableRangeMaxBytes)
require.NoError(t, err)

rRand, _ := randutil.NewTestRand()
upsertUntilBackpressure := func() {
for {
_, err := conn.Exec("UPSERT INTO foo VALUES (1, $1)",
randutil.RandBytes(rRand, 1<<15))
if testutils.IsError(err, "backpressure") {
break
}
require.NoError(t, err)
}
}
const processedPattern = `(?s)shouldQueue=true.*processing replica.*GC score after GC`
processedRegexp := regexp.MustCompile(processedPattern)

waitForTableSplit := func() {
testutils.SucceedsSoon(t, func() error {
count := 0
if err := conn.QueryRow(
"SELECT count(*) "+
"FROM crdb_internal.ranges_no_leases "+
"WHERE table_name = $1 "+
"AND database_name = current_database()",
"foo").Scan(&count); err != nil {
return err
}
if count == 0 {
return errors.New("waiting for table split")
}
return nil
})
}

getTableStartKey := func() roachpb.Key {
row := conn.QueryRow(
"SELECT start_key "+
"FROM crdb_internal.ranges_no_leases "+
"WHERE table_name = $1 "+
"AND database_name = current_database() "+
"ORDER BY start_key ASC "+
"LIMIT 1",
"foo")

var startKey roachpb.Key
require.NoError(t, row.Scan(&startKey))
return startKey
}

getStoreAndReplica := func() (*kvserver.Store, *kvserver.Replica) {
startKey := getTableStartKey()
// Okay great now we have a key and can go find replicas and stores and what not.
r := tc.LookupRangeOrFatal(t, startKey)
l, _, err := tc.FindRangeLease(r, nil)
require.NoError(t, err)

lhServer := tc.Server(int(l.Replica.NodeID) - 1)
return getFirstStoreReplica(t, lhServer, startKey)
}

waitForExcludeDataFromBackup := func() {
testutils.SucceedsSoon(t, func() error {
_, r := getStoreAndReplica()
if !r.ExcludeReplicaFromBackup() {
return errors.New("waiting for exclude_data_from_backup to be applied")
}
return nil
})
}

waitForRangeMaxBytes := func(maxBytes int64) {
testutils.SucceedsSoon(t, func() error {
_, r := getStoreAndReplica()
if r.GetMaxBytes() != maxBytes {
return errors.New("waiting for range_max_bytes to be applied")
}
return nil
})
}

gcSoon := func() {
testutils.SucceedsSoon(t, func() error {
upsertUntilBackpressure()
s, repl := getStoreAndReplica()
trace, _, err := s.ManuallyEnqueue(ctx, "mvccGC", repl, false)
require.NoError(t, err)
if !processedRegexp.MatchString(trace.String()) {
return errors.Errorf("%q does not match %q", trace.String(), processedRegexp)
}
return nil
})
}

waitForTableSplit()
waitForRangeMaxBytes(tableRangeMaxBytes)

var tsBefore string
require.NoError(t, conn.QueryRow("SELECT cluster_logical_timestamp()").Scan(&tsBefore))
gcSoon()

_, err = conn.Exec(fmt.Sprintf("BACKUP TABLE foo TO $1 AS OF SYSTEM TIME '%s'", tsBefore), LocalFoo)
testutils.IsError(err, "must be after replica GC threshold")

_, err = conn.Exec(`ALTER TABLE foo SET (exclude_data_from_backup = true)`)
require.NoError(t, err)
waitForExcludeDataFromBackup()

_, err = conn.Exec(fmt.Sprintf("BACKUP TABLE foo TO $1 AS OF SYSTEM TIME '%s'", tsBefore), LocalFoo)
require.NoError(t, err)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
exec-sql
CREATE DATABASE db;
CREATE TABLE db.t1();
CREATE TABLE db.t2();
----

query-sql
SELECT id FROM system.namespace WHERE name='t1'
----
56

query-sql
SELECT id FROM system.namespace WHERE name='t2'
----
57

# We only expect there to be span config entries for tables t1 and t2.
translate database=db
----
/Table/5{6-7} range default
/Table/5{7-8} range default

# Alter table t1 to mark its data ephemeral.
exec-sql
ALTER TABLE db.t1 SET (exclude_data_from_backup = true)
----

translate database=db
----
/Table/5{6-7} exclude_data_from_backup=true
/Table/5{7-8} range default

# Translating the tables in the database individually should result in the same
# config as above.

translate database=db table=t1
----
/Table/5{6-7} exclude_data_from_backup=true

translate database=db table=t2
----
/Table/5{7-8} range default

# Alter table t1 to unmark its data ephemeral.
exec-sql
ALTER TABLE db.t1 SET (exclude_data_from_backup = false);
----

translate database=db
----
/Table/5{6-7} range default
/Table/5{7-8} range default
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
exec-sql
CREATE DATABASE db;
CREATE TABLE db.t1();
CREATE TABLE db.t2();
----

query-sql
SELECT id FROM system.namespace WHERE name='t1'
----
56

query-sql
SELECT id FROM system.namespace WHERE name='t2'
----
57

# We only expect there to be span config entries for tables t1 and t2.
translate database=db
----
/Tenant/10/Table/5{6-7} range default
/Tenant/10/Table/5{7-8} range default

# Alter table t1 to mark its data ephemeral.
exec-sql
ALTER TABLE db.t1 SET (exclude_data_from_backup = true)
----

translate database=db
----
/Tenant/10/Table/5{6-7} exclude_data_from_backup=true
/Tenant/10/Table/5{7-8} range default

# Translating the tables in the database individually should result in the same
# config as above.

translate database=db table=t1
----
/Tenant/10/Table/5{6-7} exclude_data_from_backup=true

translate database=db table=t2
----
/Tenant/10/Table/5{7-8} range default

# Alter table t1 to unmark its data ephemeral.
exec-sql
ALTER TABLE db.t1 SET (exclude_data_from_backup = false);
----

translate database=db
----
/Tenant/10/Table/5{6-7} range default
/Tenant/10/Table/5{7-8} range default
Loading

0 comments on commit 1fd9303

Please sign in to comment.