diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index aac31229ff18..1eeb19d3c4f0 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -42,6 +42,27 @@ import ( "go.etcd.io/etcd/raft/v3/tracker" ) +var ( + // raftLogTruncationClearRangeThreshold is the number of entries at which Raft + // log truncation uses a Pebble range tombstone rather than point deletes. It + // is set high enough to avoid writing too many range tombstones to Pebble, + // but low enough that we don't do too many point deletes either (in + // particular, we don't want to overflow the Pebble write batch). + // + // In the steady state, Raft log truncation occurs when RaftLogQueueStaleSize + // (64 KB) or RaftLogQueueStaleThreshold (100 entries) is exceeded, so + // truncations are generally small. If followers are lagging, we let the log + // grow to RaftLogTruncationThreshold (16 MB) before truncating. + // + // 100k was chosen because it is unlikely to be hit in most common cases, + // keeping the number of range tombstones low, but will trigger when Raft logs + // have grown abnormally large. RaftLogTruncationThreshold will typically not + // trigger it, unless the average log entry is <= 160 bytes. The key size is + // ~16 bytes, so Pebble point deletion batches will be bounded at ~1.6MB. + raftLogTruncationClearRangeThreshold = uint64(util.ConstantWithMetamorphicTestRange( + "raft-log-truncation-clearrange-threshold", 100000 /* default */, 1 /* min */, 1e6 /* max */)) +) + func makeIDKey() kvserverbase.CmdIDKey { idKeyBuf := make([]byte, 0, raftCommandIDLen) idKeyBuf = encoding.EncodeUint64Ascending(idKeyBuf, uint64(rand.Int63())) @@ -1782,21 +1803,28 @@ func handleTruncatedStateBelowRaft( // truncation index to the new truncation index. This is performed // atomically with the raft command application so that the // TruncatedState index is always consistent with the state of the - // Raft log itself. We can use the distinct writer because we know - // all writes will be to distinct keys. - // - // Intentionally don't use range deletion tombstones (ClearRange()) - // due to performance concerns connected to having many range - // deletion tombstones. There is a chance that ClearRange will - // perform well here because the tombstones could be "collapsed", - // but it is hardly worth the risk at this point. + // Raft log itself. + var numTruncatedEntries uint64 + if newTruncatedState.Index > oldTruncatedState.Index { + numTruncatedEntries = newTruncatedState.Index - oldTruncatedState.Index + } prefixBuf := &loader.RangeIDPrefixBuf - for idx := oldTruncatedState.Index + 1; idx <= newTruncatedState.Index; idx++ { - // NB: RangeIDPrefixBufs have sufficient capacity (32 bytes) to - // avoid allocating when constructing Raft log keys (16 bytes). - unsafeKey := prefixBuf.RaftLogKey(idx) - if err := readWriter.ClearUnversioned(unsafeKey); err != nil { - return false, errors.Wrapf(err, "unable to clear truncated Raft entries for %+v", newTruncatedState) + if numTruncatedEntries >= raftLogTruncationClearRangeThreshold { + start := prefixBuf.RaftLogKey(oldTruncatedState.Index + 1).Clone() + end := prefixBuf.RaftLogKey(newTruncatedState.Index + 1).Clone() // end is exclusive + if err := readWriter.ClearRawRange(start, end); err != nil { + return false, errors.Wrapf(err, + "unable to clear truncated Raft entries for %+v between indexes %d-%d", + oldTruncatedState, oldTruncatedState.Index+1, newTruncatedState.Index+1) + } + } else { + for idx := oldTruncatedState.Index + 1; idx <= newTruncatedState.Index; idx++ { + // NB: RangeIDPrefixBufs have sufficient capacity (32 bytes) to + // avoid allocating when constructing Raft log keys (16 bytes). + unsafeKey := prefixBuf.RaftLogKey(idx) + if err := readWriter.ClearUnversioned(unsafeKey); err != nil { + return false, errors.Wrapf(err, "unable to clear truncated Raft entries for %+v", newTruncatedState) + } } } diff --git a/pkg/kv/kvserver/replica_raft_truncation_test.go b/pkg/kv/kvserver/replica_raft_truncation_test.go index 4867384b1c10..fc8725dc2ee9 100644 --- a/pkg/kv/kvserver/replica_raft_truncation_test.go +++ b/pkg/kv/kvserver/replica_raft_truncation_test.go @@ -13,6 +13,7 @@ package kvserver import ( "bytes" "context" + "encoding/binary" "fmt" "testing" @@ -20,11 +21,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/datadriven" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestHandleTruncatedStateBelowRaft(t *testing.T) { @@ -46,6 +49,7 @@ func TestHandleTruncatedStateBelowRaft(t *testing.T) { datadriven.Walk(t, "testdata/truncated_state_migration", func(t *testing.T, path string) { const rangeID = 12 loader := stateloader.Make(rangeID) + prefixBuf := &loader.RangeIDPrefixBuf eng := storage.NewDefaultInMemForTesting() defer eng.Close() @@ -55,9 +59,9 @@ func TestHandleTruncatedStateBelowRaft(t *testing.T) { d.ScanArgs(t, "index", &prevTruncatedState.Index) d.ScanArgs(t, "term", &prevTruncatedState.Term) return "" + case "put": - var index uint64 - var term uint64 + var index, term uint64 var legacy bool d.ScanArgs(t, "index", &index) d.ScanArgs(t, "term", &term) @@ -69,16 +73,15 @@ func TestHandleTruncatedStateBelowRaft(t *testing.T) { } if legacy { - assert.NoError(t, loader.SetLegacyRaftTruncatedState(ctx, eng, nil, truncState)) + require.NoError(t, loader.SetLegacyRaftTruncatedState(ctx, eng, nil, truncState)) } else { - assert.NoError(t, loader.SetRaftTruncatedState(ctx, eng, truncState)) + require.NoError(t, loader.SetRaftTruncatedState(ctx, eng, truncState)) } return "" + case "handle": var buf bytes.Buffer - - var index uint64 - var term uint64 + var index, term uint64 d.ScanArgs(t, "index", &index) d.ScanArgs(t, "term", &term) @@ -87,30 +90,60 @@ func TestHandleTruncatedStateBelowRaft(t *testing.T) { Term: term, } - apply, err := handleTruncatedStateBelowRaft(ctx, &prevTruncatedState, newTruncatedState, loader, eng, false) - if err != nil { - return err.Error() + // Write log entries at start, middle, end, and above the truncated interval. + if newTruncatedState.Index > prevTruncatedState.Index { + indexes := []uint64{ + prevTruncatedState.Index + 1, // start + (newTruncatedState.Index + prevTruncatedState.Index + 1) / 2, // middle + newTruncatedState.Index, // end + newTruncatedState.Index + 1, // new head + } + for _, idx := range indexes { + meta := enginepb.MVCCMetadata{RawBytes: make([]byte, 8)} + binary.BigEndian.PutUint64(meta.RawBytes, idx) + value, err := protoutil.Marshal(&meta) + require.NoError(t, err) + require.NoError(t, eng.PutUnversioned(prefixBuf.RaftLogKey(idx), value)) + } } + + // Apply truncation. + apply, err := handleTruncatedStateBelowRaft(ctx, &prevTruncatedState, newTruncatedState, loader, eng, false) + require.NoError(t, err) fmt.Fprintf(&buf, "apply: %t\n", apply) + // Check the truncated state. for _, key := range []roachpb.Key{ keys.RaftTruncatedStateLegacyKey(rangeID), keys.RaftTruncatedStateKey(rangeID), } { var truncatedState roachpb.RaftTruncatedState ok, err := storage.MVCCGetProto(ctx, eng, key, hlc.Timestamp{}, &truncatedState, storage.MVCCGetOptions{}) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) if !ok { continue } - fmt.Fprintf(&buf, "%s -> index=%d term=%d\n", key, truncatedState.Index, truncatedState.Term) + fmt.Fprintf(&buf, "state: %s -> index=%d term=%d\n", key, truncatedState.Index, truncatedState.Term) + } + + // Find the first untruncated log entry (the log head). + res, err := storage.MVCCScan(ctx, eng, + prefixBuf.RaftLogPrefix().Clone(), + prefixBuf.RaftLogPrefix().PrefixEnd(), + hlc.Timestamp{}, + storage.MVCCScanOptions{MaxKeys: 1}) + require.NoError(t, err) + var head roachpb.Key + if len(res.KVs) > 0 { + head = res.KVs[0].Key } + fmt.Fprintf(&buf, "head: %s\n", head) + return buf.String() + default: + return fmt.Sprintf("unsupported: %s", d.Cmd) } - return fmt.Sprintf("unsupported: %s", d.Cmd) }) }) } diff --git a/pkg/kv/kvserver/testdata/truncated_state_migration/migration b/pkg/kv/kvserver/testdata/truncated_state_migration/migration index d2c6ea1e6d83..4234ef0e3feb 100644 --- a/pkg/kv/kvserver/testdata/truncated_state_migration/migration +++ b/pkg/kv/kvserver/testdata/truncated_state_migration/migration @@ -8,18 +8,28 @@ prev index=100 term=9 handle index=150 term=9 ---- apply: true -/Local/RangeID/12/u/RaftTruncatedState -> index=150 term=9 +state: /Local/RangeID/12/u/RaftTruncatedState -> index=150 term=9 +head: /Local/RangeID/12/u/RaftLog/logIndex:151 # Simulate another truncation that moves forward. handle index=170 term=9 ---- apply: true -/Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9 +state: /Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9 +head: /Local/RangeID/12/u/RaftLog/logIndex:171 # ... and one that moves backwards and should not take effect. handle index=150 term=9 ---- apply: false -/Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9 +state: /Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9 +head: /Local/RangeID/12/u/RaftLog/logIndex:151 + +# A huge truncation (beyond raftLogTruncationClearRangeThreshold) also works. +handle index=12345678901234567890 term=9 +---- +apply: true +state: /Local/RangeID/12/u/RaftTruncatedState -> index=12345678901234567890 term=9 +head: /Local/RangeID/12/u/RaftLog/logIndex:12345678901234567891 diff --git a/pkg/kv/kvserver/testdata/truncated_state_migration/pre_migration b/pkg/kv/kvserver/testdata/truncated_state_migration/pre_migration index e84177bd0b51..0ecf300eb08c 100644 --- a/pkg/kv/kvserver/testdata/truncated_state_migration/pre_migration +++ b/pkg/kv/kvserver/testdata/truncated_state_migration/pre_migration @@ -10,7 +10,8 @@ put legacy=true index=100 term=9 handle index=100 term=9 ---- apply: true -/Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9 +state: /Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9 +head: /Min # Note that the below aren't actually possible in practice # as a divergence won't happen before the migration. @@ -18,9 +19,18 @@ apply: true handle index=150 term=9 ---- apply: true -/Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9 +state: /Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9 +head: /Local/RangeID/12/u/RaftLog/logIndex:151 handle index=60 term=9 ---- apply: true -/Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9 +state: /Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9 +head: /Local/RangeID/12/u/RaftLog/logIndex:151 + +# A huge truncation (beyond raftLogTruncationClearRangeThreshold) also works. +handle index=12345678901234567890 term=9 +---- +apply: true +state: /Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9 +head: /Local/RangeID/12/u/RaftLog/logIndex:12345678901234567891 diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go index 82a46cc49673..bb90903c102c 100644 --- a/pkg/roachpb/data.go +++ b/pkg/roachpb/data.go @@ -175,6 +175,16 @@ func bytesPrefixEnd(b []byte) []byte { return b } +// Clone returns a copy of the key. +func (k Key) Clone() Key { + if k == nil { + return nil + } + c := make(Key, len(k)) + copy(c, k) + return c +} + // Next returns the next key in lexicographic sort order. The method may only // take a shallow copy of the Key, so both the receiver and the return // value should be treated as immutable after. diff --git a/pkg/roachpb/data_test.go b/pkg/roachpb/data_test.go index a32f73129749..c1439d1b622b 100644 --- a/pkg/roachpb/data_test.go +++ b/pkg/roachpb/data_test.go @@ -62,6 +62,17 @@ func makeSynTS(walltime int64, logical int32) hlc.Timestamp { } } +func TestKeyClone(t *testing.T) { + k := Key{0x01, 0x02, 0x03} + c := k.Clone() + require.Equal(t, k, c) + + k[0] = 0xff + require.NotEqual(t, k, c) + + require.Nil(t, Key(nil).Clone()) +} + // TestKeyNext tests that the method for creating lexicographic // successors to byte slices works as expected. func TestKeyNext(t *testing.T) {