Skip to content

Commit

Permalink
kvserver: make MVCC GC less disruptive to foreground traffic
Browse files Browse the repository at this point in the history
This commit changes GC requests to no longer declare exclusive latches
at their BatchRequest's timestamp. This was already incorrect as
explained in cockroachdb#55293.

>The first use is broken because we acquire write latches at the batch
header's timestamp, which is set to time.Now(), so we're only
serializing with reads in the future and all other writes [1]. So we're
disruptive to everyone except who we want to serialize with – reads in
the past!

This commit makes GC requests only declare a non-mvcc exclusive latch
over the `RangeGCThresholdKey`. This is correct because:
```

// 1. We define "correctness" to be the property that a reader reading at /
// around the GC threshold will either see the correct results or receive an
// error.
// 2. Readers perform their command evaluation over a stable snapshot of the
// storage engine. This means that the reader will not see the effects of a
// subsequent GC run as long as it created a Pebble iterator before the GC
// request.
// 3. A reader checks the in-memory GC threshold of a Replica after it has
// created this snapshot (i.e. after a Pebble iterator has been created).
// 4. If the in-memory GC threshold is above the timestamp of the read, the
// reader receives an error. Otherwise, the reader is guaranteed to see a
// state of the storage engine that hasn't been affected by the GC request [5].
// 5. GC requests bump the in-memory GC threshold of a Replica as a pre-apply
// side effect. This means that if a reader checks the in-memory GC threshold
// after it has created a Pebble iterator, it is impossible for the iterator
// to point to a storage engine state that has been affected by the GC
// request.

```

As a result, GC requests should now be much less disruptive to
foreground traffic since they're no longer redundantly declaring
exclusive latches over global keys.

Release note (performance improvement): MVCC garbage collection should
now be much less disruptive to foreground traffic than before.
  • Loading branch information
aayushshah15 committed Aug 4, 2022
1 parent 9bd9d64 commit af7e8d6
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 97 deletions.
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func declareKeysAddSSTable(
l, r := rangeTombstonePeekBounds(args.Key, args.EndKey, rs.GetStartKey().AsRawKey(), nil)
latchSpans.AddMVCC(spanset.SpanReadOnly, roachpb.Span{Key: l, EndKey: r}, header.Timestamp)
}
latchSpans.DisableUndeclaredAccessAssertions()
}

// AddSSTableRewriteConcurrency sets the concurrency of a single SST rewrite.
Expand Down Expand Up @@ -349,7 +350,7 @@ func EvalAddSSTable(
// addition, and instead just use this key-only iterator. If a caller actually
// needs to know what data is there, it must issue its own real Scan.
if args.ReturnFollowingLikelyNonEmptySpanStart {
existingIter := spanset.DisableReaderAssertions(readWriter).NewMVCCIterator(
existingIter := readWriter.NewMVCCIterator(
storage.MVCCKeyIterKind, // don't care if it is committed or not, just that it isn't empty.
storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
Expand Down
71 changes: 43 additions & 28 deletions pkg/kv/kvserver/batcheval/cmd_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)

Expand All @@ -35,35 +36,51 @@ func declareKeysGC(
latchSpans, _ *spanset.SpanSet,
_ time.Duration,
) {
// Intentionally don't call DefaultDeclareKeys: the key range in the header
// is usually the whole range (pending resolution of #7880).
gcr := req.(*roachpb.GCRequest)
for _, key := range gcr.Keys {
if keys.IsLocal(key.Key) {
latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: key.Key})
} else {
latchSpans.AddMVCC(spanset.SpanReadWrite, roachpb.Span{Key: key.Key}, header.Timestamp)
}
}
// Extend the range key latches by feather to ensure MVCC stats
// computations correctly account for adjacent range keys tombstones if they
// need to be split.
// TODO(oleg): These latches are very broad and will be disruptive to read and
// write operations despite only accessing "stale" data. We should think of
// better integrating it with latchless GC approach.
for _, span := range mergeAdjacentSpans(makeLookupBoundariesForGCRanges(rs.GetStartKey().AsRawKey(),
nil, gcr.RangeKeys)) {
latchSpans.AddMVCC(spanset.SpanReadWrite, span,
header.Timestamp)
}
// Be smart here about blocking on the threshold keys. The MVCC GC queue can
// send an empty request first to bump the thresholds, and then another one
// that actually does work but can avoid declaring these keys below.
if !gcr.Threshold.IsEmpty() {
latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangeGCThresholdKey(rs.GetRangeID())})
// When GC-ing MVCC range key tombstones or individual range keys, we need to
// serialize with all writes that overlap the MVCC range tombstone, as well as
// the immediate left/right neighboring keys. This is because a range key
// write has non-local effects, i.e. it can fragment or merge other range keys
// at other timestamps and at its boundaries, and this has a non-commutative
// effect on MVCC stats -- if someone writes a new range key while we're GCing
// one below, the stats would come out wrong.
// Note that we only need to serialize with writers (including other GC
// processes) and not with readers (that are guaranteed to be above the GC
// threshold). To achieve this, we declare read-write access at
// hlc.MaxTimestamp which will not block any readers.
for _, span := range mergeAdjacentSpans(makeLookupBoundariesForGCRanges(
rs.GetStartKey().AsRawKey(), nil, gcr.RangeKeys,
)) {
latchSpans.AddMVCC(spanset.SpanReadWrite, span, hlc.MaxTimestamp)
}
// The RangeGCThresholdKey is only written to if the
// req.(*GCRequest).Threshold is set. However, we always declare an exclusive
// access over this key in order to serialize with other GC requests.
//
// Correctness:
// It is correct for a GC request to not declare exclusive access over the
// keys being GCed because of the following:
// 1. We define "correctness" to be the property that a reader reading at /
// around the GC threshold will either see the correct results or receive an
// error.
// 2. Readers perform their command evaluation over a stable snapshot of the
// storage engine. This means that the reader will not see the effects of a
// subsequent GC run as long as it created a Pebble iterator before the GC
// request.
// 3. A reader checks the in-memory GC threshold of a Replica after it has
// created this snapshot (i.e. after a Pebble iterator has been created).
// 4. If the in-memory GC threshold is above the timestamp of the read, the
// reader receives an error. Otherwise, the reader is guaranteed to see a
// state of the storage engine that hasn't been affected by the GC request [5].
// 5. GC requests bump the in-memory GC threshold of a Replica as a pre-apply
// side effect. This means that if a reader checks the in-memory GC threshold
// after it has created a Pebble iterator, it is impossible for the iterator
// to point to a storage engine state that has been affected by the GC
// request.
latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangeGCThresholdKey(rs.GetRangeID())})
// Needed for Range bounds checks in calls to EvalContext.ContainsKey.
latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(rs.GetStartKey())})
latchSpans.DisableUndeclaredAccessAssertions()
}

// Create latches and merge adjacent.
Expand Down Expand Up @@ -171,9 +188,7 @@ func GC(
newThreshold := oldThreshold
updated := newThreshold.Forward(args.Threshold)

// Don't write the GC threshold key unless we have to. We also don't
// declare the key unless we have to (to allow the MVCC GC queue to
// batch requests more efficiently), and we must honor what we declare.
// Don't write the GC threshold key unless we have to.
if updated {
if err := MakeStateLoader(cArgs.EvalCtx).SetGCThreshold(
ctx, readWriter, cArgs.Stats, &newThreshold,
Expand Down
6 changes: 2 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_recompute_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func declareKeysRecomputeStats(
rdKey := keys.RangeDescriptorKey(rs.GetStartKey())
latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: rdKey})
latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.TransactionKey(rdKey, uuid.Nil)})
// Disable the assertions which check that all reads were previously declared.
latchSpans.DisableUndeclaredAccessAssertions()
}

// RecomputeStats recomputes the MVCCStats stored for this range and adjust them accordingly,
Expand All @@ -69,10 +71,6 @@ func RecomputeStats(

args = nil // avoid accidental use below

// Disable the assertions which check that all reads were previously declared.
// See the comment in `declareKeysRecomputeStats` for details on this.
reader = spanset.DisableReaderAssertions(reader)

actualMS, err := rditer.ComputeStatsForRange(desc, reader, cArgs.Header.Timestamp.WallTime)
if err != nil {
return result.Result{}, err
Expand Down
142 changes: 92 additions & 50 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1721,6 +1721,14 @@ func gcKey(key roachpb.Key, timestamp hlc.Timestamp) roachpb.GCRequest_GCKey {
}
}

func recomputeStatsArgs(key roachpb.Key) roachpb.RecomputeStatsRequest {
return roachpb.RecomputeStatsRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
},
}
}

func gcArgs(startKey []byte, endKey []byte, keys ...roachpb.GCRequest_GCKey) roachpb.GCRequest {
return roachpb.GCRequest{
RequestHeader: roachpb.RequestHeader{
Expand Down Expand Up @@ -8392,56 +8400,6 @@ func TestReplicaReproposalWithNewLeaseIndexError(t *testing.T) {
}
}

// TestGCWithoutThreshold validates that GCRequest only declares the threshold
// key if it is subject to change, and that it does not access this key if it
// does not declare them.
func TestGCWithoutThreshold(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := &testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.Start(ctx, t, stopper)

for _, keyThresh := range []hlc.Timestamp{{}, {Logical: 1}} {
t.Run(fmt.Sprintf("thresh=%s", keyThresh), func(t *testing.T) {
var gc roachpb.GCRequest
var spans spanset.SpanSet

gc.Threshold = keyThresh
cmd, _ := batcheval.LookupCommand(roachpb.GC)
cmd.DeclareKeys(tc.repl.Desc(), &roachpb.Header{RangeID: tc.repl.RangeID}, &gc, &spans, nil, 0)

expSpans := 1
if !keyThresh.IsEmpty() {
expSpans++
}
if numSpans := spans.Len(); numSpans != expSpans {
t.Fatalf("expected %d declared keys, found %d", expSpans, numSpans)
}

eng := storage.NewDefaultInMemForTesting()
defer eng.Close()

batch := eng.NewBatch()
defer batch.Close()
rw := spanset.NewBatch(batch, &spans)

var resp roachpb.GCResponse
if _, err := batcheval.GC(ctx, rw, batcheval.CommandArgs{
Args: &gc,
EvalCtx: NewReplicaEvalContext(
ctx, tc.repl, &spans, false, /* requiresClosedTSOlderThanStorageSnap */
),
}, &resp); err != nil {
t.Fatal(err)
}
})
}
}

// Test that, if the Raft command resulting from EndTxn request fails to be
// processed/apply, then the LocalResult associated with that command is
// cleared.
Expand Down Expand Up @@ -8512,6 +8470,78 @@ func TestFailureToProcessCommandClearsLocalResult(t *testing.T) {
}
}

// TestMVCCStatsGCCommutesWithWrites tests that the MVCCStats updates
// corresponding to writes and GCs are commutative.
//
// This test does so by:
// 1. Initially writing N versions of a key.
// 2. Concurrently GC-ing the N-1 versions written in step 1 while writing N-1
// new versions of the key.
// 3. Concurrently recomputing MVCC stats (via RecomputeStatsRequests) in the
// background and ensuring that the stats are always consistent at all times.
func TestMVCCStatsGCCommutesWithWrites(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)
key := tc.ScratchRange(t)
store, err := tc.Server(0).GetStores().(*Stores).GetStore(tc.Server(0).GetFirstStoreID())
require.NoError(t, err)

write := func() hlc.Timestamp {
var ba roachpb.BatchRequest
put := putArgs(key, []byte("0"))
ba.Add(&put)
resp, pErr := store.TestSender().Send(ctx, ba)
require.Nil(t, pErr)
return resp.Timestamp
}

// Write `numIterations` versions for a key.
const numIterations = 100
writeTimestamps := make([]hlc.Timestamp, 0, numIterations)
for i := 0; i < numIterations; i++ {
writeTimestamps = append(writeTimestamps, write())
}

// Now, we GC the first `numIterations-1` versions we wrote above while
// concurrently writing `numIterations-1` new versions.
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
for _, ts := range writeTimestamps[:numIterations-1] {
gcReq := gcArgs(key, key.Next(), gcKey(key, ts))
_, pErr := kv.SendWrapped(ctx, store.TestSender(), &gcReq)
require.Nil(t, pErr)
}
}()
go func() {
defer wg.Done()
for i := 0; i < numIterations-1; i++ {
write()
}
}()
// Also concurrently recompute stats and ensure that they're consistent at all
// times.
go func() {
defer wg.Done()
expDelta := enginepb.MVCCStats{}
for i := 0; i < numIterations; i++ {
recomputeReq := recomputeStatsArgs(key)
resp, pErr := kv.SendWrapped(ctx, store.TestSender(), &recomputeReq)
require.Nil(t, pErr)
delta := enginepb.MVCCStats(resp.(*roachpb.RecomputeStatsResponse).AddedDelta)
delta.AgeTo(expDelta.LastUpdateNanos)
require.Equal(t, expDelta, delta)
}
}()

wg.Wait()
}

// TestBatchTimestampBelowGCThreshold verifies that commands below the replica
// GC threshold fail.
func TestBatchTimestampBelowGCThreshold(t *testing.T) {
Expand Down Expand Up @@ -8778,6 +8808,18 @@ func TestGCThresholdRacesWithRead(t *testing.T) {
require.Nil(t, err)
require.Equal(t, va, b)

// Since the GC request does not acquire latches on the keys being GC'ed,
// they're not guaranteed to wait for these above Puts to get applied on
// the leaseholder. See AckCommittedEntriesBeforeApplication() the comment
// above it for more details. So we separately ensure both these Puts have
// been applied by just trying to read the latest value @ ts2. These Get
// requests do indeed declare latches on the keys being read, so by the
// time they return, subsequent GC requests are guaranteed to see the
// latest keys.
gArgs = getArgs(key)
_, pErr = kv.SendWrappedWith(ctx, reader, h2, &gArgs)
require.Nil(t, pErr)

// Perform two actions concurrently:
// 1. GC up to ts2. This should remove the k@ts1 version.
// 2. Read @ ts1.
Expand Down
13 changes: 0 additions & 13 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,19 +814,6 @@ func NewBatchAt(b storage.Batch, spans *SpanSet, ts hlc.Timestamp) storage.Batch
}
}

// DisableReaderAssertions unwraps any storage.Reader implementations that may
// assert access against a given SpanSet.
func DisableReaderAssertions(reader storage.Reader) storage.Reader {
switch v := reader.(type) {
case ReadWriter:
return DisableReaderAssertions(v.r)
case *spanSetBatch:
return DisableReaderAssertions(v.r)
default:
return reader
}
}

// addLockTableSpans adds corresponding lock table spans for the declared
// spans. This is to implicitly allow raw access to separated intents in the
// lock table for any declared keys. Explicitly declaring lock table spans is
Expand Down
19 changes: 18 additions & 1 deletion pkg/kv/kvserver/spanset/spanset.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ type Span struct {
// The Span slice for a particular access and scope contains non-overlapping
// spans in increasing key order after calls to SortAndDedup.
type SpanSet struct {
spans [NumSpanAccess][NumSpanScope][]Span
spans [NumSpanAccess][NumSpanScope][]Span
allowUndeclared bool
}

var spanSetPool = sync.Pool{
Expand Down Expand Up @@ -111,6 +112,7 @@ func (s *SpanSet) Release() {
s.spans[sa][ss] = recycle
}
}
s.allowUndeclared = false
spanSetPool.Put(s)
}

Expand Down Expand Up @@ -152,6 +154,7 @@ func (s *SpanSet) Copy() *SpanSet {
n.spans[sa][ss] = append(n.spans[sa][ss], s.spans[sa][ss]...)
}
}
n.allowUndeclared = s.allowUndeclared
return n
}

Expand Down Expand Up @@ -204,6 +207,7 @@ func (s *SpanSet) Merge(s2 *SpanSet) {
s.spans[sa][ss] = append(s.spans[sa][ss], s2.spans[sa][ss]...)
}
}
s.allowUndeclared = s2.allowUndeclared
s.SortAndDedup()
}

Expand Down Expand Up @@ -335,6 +339,12 @@ func (s *SpanSet) CheckAllowedAt(
func (s *SpanSet) checkAllowed(
access SpanAccess, span roachpb.Span, check func(SpanAccess, Span) bool,
) error {
if s.allowUndeclared {
// If the request has specified that undeclared spans are allowed, do
// nothing.
return nil
}

scope := SpanGlobal
if (span.Key != nil && keys.IsLocal(span.Key)) ||
(span.EndKey != nil && keys.IsLocal(span.EndKey)) {
Expand Down Expand Up @@ -387,3 +397,10 @@ func (s *SpanSet) Validate() error {

return nil
}

// DisableUndeclaredAccessAssertions disables the assertions that prevent
// undeclared access to spans. This is generally set by requests that rely on
// other forms of synchronization for correctness (e.g. GCRequest).
func (s *SpanSet) DisableUndeclaredAccessAssertions() {
s.allowUndeclared = true
}

0 comments on commit af7e8d6

Please sign in to comment.