Skip to content

Commit

Permalink
Merge #51139
Browse files Browse the repository at this point in the history
51139: kvserver: check start-time above GC threshold r=dt a=dt

checkExecutionCanProceed typically checks that a request that is intending to
operate at a specific time will be doing so at or above the GC threshold.

It previously did this by comparing the ba.Timestamp to the (implied) GC
threshold, as this is the common field on all batches that indicate the
time at which they will operate. However a few special types of requests
operate on *a span* of time, rather than at a particular time. For
example, ExportRequest can ask to export all revisions between its
start and end times, or RevertRange can ask that all revisions between
its TargetTime and EndTime be destoryed. These commands require that
the GC threshold be not just below their end-time -- which they set
in ba.Timestamp, but also below their start-time, to ensure those
revisions they want to operate on are still there.

Previously they each checked this manually during evaluation. However
these checks were inconsistent with the common check that the kvserver
usually does on ba.Timestamp before even calling evaluation: they used
a different error type and, when the common check was extended to be
strict w.r.t. the TTL, they continued to only check the actual GC time.

This change instead updates that common check to read the ealier of the
batch timestamp or, if one of the requests in the batch is known to have
a start time, the earliest of those times. This then means that the
enforcement of 'this batch is operating at a timestamp above the GC TTL'
has the same semantics for batches thatt operate at a single time or on
a span of time.

Release note: none.

Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
craig[bot] and dt committed Jul 12, 2020
2 parents cc13f46 + 420395b commit 12b58af
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 40 deletions.
21 changes: 6 additions & 15 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,12 @@ func evalExport(
ctx, span := tracing.ChildSpan(ctx, fmt.Sprintf("Export [%s,%s)", args.Key, args.EndKey))
defer tracing.FinishSpan(span)

// If the startTime is zero, then we're doing a full backup and the gc
// threshold is irrelevant for MVCC_Lastest backups. Otherwise, make sure
// startTime is after the gc threshold. If it's not, the mvcc tombstones could
// have been deleted and the resulting RocksDB tombstones compacted, which
// means we'd miss deletions in the incremental backup. For MVCC_All backups
// with no start time, they'll only be capturing the *revisions* since the
// gc threshold, so noting that in the reply allows the BACKUP to correctly
// note the supported time bounds for RESTORE AS OF SYSTEM TIME.
gcThreshold := cArgs.EvalCtx.GetGCThreshold()
if !args.StartTime.IsEmpty() {
if args.StartTime.LessEq(gcThreshold) {
return result.Result{}, errors.Errorf("start timestamp %v must be after replica GC threshold %v", args.StartTime, gcThreshold)
}
} else if args.MVCCFilter == roachpb.MVCCFilter_All {
reply.StartTime = gcThreshold
// For MVCC_All backups with no start time, they'll only be capturing the
// *revisions* since the gc threshold, so noting that in the reply allows the
// BACKUP to correctly note the supported time bounds for RESTORE AS OF SYSTEM
// TIME.
if args.MVCCFilter == roachpb.MVCCFilter_All {
reply.StartTime = cArgs.EvalCtx.GetGCThreshold()
}

if err := cArgs.EvalCtx.GetLimiters().ConcurrentExportRequests.Begin(ctx); err != nil {
Expand Down
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_revert_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@ func RevertRange(
reply := resp.(*roachpb.RevertRangeResponse)
var pd result.Result

if gc := cArgs.EvalCtx.GetGCThreshold(); args.TargetTime.LessEq(gc) {
return result.Result{}, errors.Errorf("cannot revert before replica GC threshold %v", gc)
}

if empty, err := isEmptyKeyTimeRange(
readWriter, args.Key, args.EndKey, args.TargetTime, cArgs.Header.Timestamp,
); err != nil {
Expand Down
12 changes: 0 additions & 12 deletions pkg/kv/kvserver/batcheval/cmd_revert_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,18 +189,6 @@ func TestCmdRevertRange(t *testing.T) {
})
}

t.Run("checks gc threshold", func(t *testing.T) {
batch := &wrappedBatch{Batch: eng.NewBatch()}
defer batch.Close()
evalCtx.GCThreshold = tsB
cArgs.Args = &roachpb.RevertRangeRequest{
RequestHeader: roachpb.RequestHeader{Key: startKey, EndKey: endKey}, TargetTime: tsB,
}
if _, err := RevertRange(ctx, batch, cArgs, &roachpb.RevertRangeResponse{}); !testutils.IsError(err, "replica GC threshold") {
t.Fatal(err)
}
})

txn := roachpb.MakeTransaction("test", nil, roachpb.NormalUserPriority, tsC, 1)
if err := storage.MVCCPut(
ctx, eng, &stats, []byte("0012"), tsC, roachpb.MakeValueFromBytes([]byte("i")), &txn,
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,7 +1102,9 @@ func (r *Replica) checkExecutionCanProceed(
return err
} else if err := r.checkSpanInRangeRLocked(ctx, rSpan); err != nil {
return err
} else if err := r.checkTSAboveGCThresholdRLocked(ba.Timestamp, st, ba.IsAdmin()); err != nil {
} else if err := r.checkTSAboveGCThresholdRLocked(
ba.EarliestActiveTimestamp(), st, ba.IsAdmin(),
); err != nil {
return err
} else if g.HoldingLatches() && st != nil {
// Only check for a pending merge if latches are held and the Range
Expand Down
20 changes: 20 additions & 0 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,26 @@ func (ba *BatchRequest) SetActiveTimestamp(nowFn func() hlc.Timestamp) error {
return nil
}

// EarliestActiveTimestamp returns the earliest timestamp at which the batch
// would operate, which is nominally ba.Timestamp but could be earlier if a
// request in the batch operates on a time span such as ExportRequest or
// RevertRangeRequest, which both specify the start of that span in their
// arguments while using ba.Timestamp to indicate the upper bound of that span.
func (ba BatchRequest) EarliestActiveTimestamp() hlc.Timestamp {
ts := ba.Timestamp
for _, ru := range ba.Requests {
switch t := ru.GetInner().(type) {
case *ExportRequest:
if !t.StartTime.IsEmpty() {
ts.Backward(t.StartTime)
}
case *RevertRangeRequest:
ts.Backward(t.TargetTime)
}
}
return ts
}

// UpdateTxn updates the batch transaction from the supplied one in
// a copy-on-write fashion, i.e. without mutating an existing
// Transaction struct.
Expand Down
40 changes: 32 additions & 8 deletions pkg/sql/revert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql
package sql_test

import (
"context"
Expand All @@ -17,9 +17,15 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
Expand All @@ -34,7 +40,7 @@ func TestRevertTable(t *testing.T) {
s, sqlDB, kv := serverutils.StartServer(
t, base.TestServerArgs{UseDatabase: "test"})
defer s.Stopper().Stop(context.Background())
execCfg := s.ExecutorConfig().(ExecutorConfig)
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)

db := sqlutils.MakeSQLRunner(sqlDB)
db.Exec(t, `CREATE DATABASE IF NOT EXISTS test`)
Expand All @@ -50,7 +56,7 @@ func TestRevertTable(t *testing.T) {
var ts string
var before int
db.QueryRow(t, `SELECT cluster_logical_timestamp(), xor_agg(k # rev) FROM test`).Scan(&ts, &before)
targetTime, err := ParseHLC(ts)
targetTime, err := sql.ParseHLC(ts)
require.NoError(t, err)

t.Run("simple", func(t *testing.T) {
Expand All @@ -72,7 +78,7 @@ func TestRevertTable(t *testing.T) {
// Revert the table to ts.
desc := sqlbase.TestingGetTableDescriptor(kv, keys.SystemSQLCodec, "test", "test")
desc.State = sqlbase.TableDescriptor_OFFLINE // bypass the offline check.
require.NoError(t, RevertTables(context.Background(), kv, &execCfg, []*sqlbase.TableDescriptor{desc}, targetTime, 10))
require.NoError(t, sql.RevertTables(context.Background(), kv, &execCfg, []*sqlbase.TableDescriptor{desc}, targetTime, 10))

var reverted int
db.QueryRow(t, `SELECT xor_agg(k # rev) FROM test`).Scan(&reverted)
Expand All @@ -85,7 +91,7 @@ func TestRevertTable(t *testing.T) {
db.Exec(t, `UPDATE child SET rev = 1 WHERE a % 3 = 0`)

db.QueryRow(t, `SELECT cluster_logical_timestamp() FROM test`).Scan(&ts)
targetTime, err = ParseHLC(ts)
targetTime, err = sql.ParseHLC(ts)
require.NoError(t, err)

var beforeChild int
Expand All @@ -101,14 +107,14 @@ func TestRevertTable(t *testing.T) {
child := sqlbase.TestingGetTableDescriptor(kv, keys.SystemSQLCodec, "test", "child")
child.State = sqlbase.TableDescriptor_OFFLINE
t.Run("reject only parent", func(t *testing.T) {
require.Error(t, RevertTables(ctx, kv, &execCfg, []*sqlbase.TableDescriptor{desc}, targetTime, 10))
require.Error(t, sql.RevertTables(ctx, kv, &execCfg, []*sqlbase.TableDescriptor{desc}, targetTime, 10))
})
t.Run("reject only child", func(t *testing.T) {
require.Error(t, RevertTables(ctx, kv, &execCfg, []*sqlbase.TableDescriptor{child}, targetTime, 10))
require.Error(t, sql.RevertTables(ctx, kv, &execCfg, []*sqlbase.TableDescriptor{child}, targetTime, 10))
})

t.Run("rollback parent and child", func(t *testing.T) {
require.NoError(t, RevertTables(ctx, kv, &execCfg, []*sqlbase.TableDescriptor{desc, child}, targetTime, RevertTableDefaultBatchSize))
require.NoError(t, sql.RevertTables(ctx, kv, &execCfg, []*sqlbase.TableDescriptor{desc, child}, targetTime, sql.RevertTableDefaultBatchSize))

var reverted, revertedChild int
db.QueryRow(t, `SELECT xor_agg(k # rev) FROM test`).Scan(&reverted)
Expand All @@ -118,3 +124,21 @@ func TestRevertTable(t *testing.T) {
})
})
}

func TestRevertGCThreshold(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)
kvDB := tc.Server(0).DB()

req := &roachpb.RevertRangeRequest{
RequestHeader: roachpb.RequestHeader{Key: keys.UserTableDataMin, EndKey: keys.MaxKey},
TargetTime: hlc.Timestamp{WallTime: -1},
}
_, pErr := kv.SendWrapped(ctx, kvDB.NonTransactionalSender(), req)
if !testutils.IsPError(pErr, "must be after replica GC threshold") {
t.Fatalf(`expected "must be after replica GC threshold" error got: %+v`, pErr)
}
}

0 comments on commit 12b58af

Please sign in to comment.