Skip to content

Commit

Permalink
Merge pull request #75980 from erikgrinaker/backport21.1-75793
Browse files Browse the repository at this point in the history
release-21.1: kvserver: use `ClearRawRange` to truncate very large Raft logs
  • Loading branch information
erikgrinaker authored Feb 9, 2022
2 parents eb9d87c + 431212c commit 0490498
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 36 deletions.
56 changes: 42 additions & 14 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,27 @@ import (
"go.etcd.io/etcd/raft/v3/tracker"
)

var (
// raftLogTruncationClearRangeThreshold is the number of entries at which Raft
// log truncation uses a Pebble range tombstone rather than point deletes. It
// is set high enough to avoid writing too many range tombstones to Pebble,
// but low enough that we don't do too many point deletes either (in
// particular, we don't want to overflow the Pebble write batch).
//
// In the steady state, Raft log truncation occurs when RaftLogQueueStaleSize
// (64 KB) or RaftLogQueueStaleThreshold (100 entries) is exceeded, so
// truncations are generally small. If followers are lagging, we let the log
// grow to RaftLogTruncationThreshold (16 MB) before truncating.
//
// 100k was chosen because it is unlikely to be hit in most common cases,
// keeping the number of range tombstones low, but will trigger when Raft logs
// have grown abnormally large. RaftLogTruncationThreshold will typically not
// trigger it, unless the average log entry is <= 160 bytes. The key size is
// ~16 bytes, so Pebble point deletion batches will be bounded at ~1.6MB.
raftLogTruncationClearRangeThreshold = uint64(util.ConstantWithMetamorphicTestRange(
"raft-log-truncation-clearrange-threshold", 100000 /* default */, 1 /* min */, 1e6 /* max */))
)

func makeIDKey() kvserverbase.CmdIDKey {
idKeyBuf := make([]byte, 0, raftCommandIDLen)
idKeyBuf = encoding.EncodeUint64Ascending(idKeyBuf, uint64(rand.Int63()))
Expand Down Expand Up @@ -1782,21 +1803,28 @@ func handleTruncatedStateBelowRaft(
// truncation index to the new truncation index. This is performed
// atomically with the raft command application so that the
// TruncatedState index is always consistent with the state of the
// Raft log itself. We can use the distinct writer because we know
// all writes will be to distinct keys.
//
// Intentionally don't use range deletion tombstones (ClearRange())
// due to performance concerns connected to having many range
// deletion tombstones. There is a chance that ClearRange will
// perform well here because the tombstones could be "collapsed",
// but it is hardly worth the risk at this point.
// Raft log itself.
var numTruncatedEntries uint64
if newTruncatedState.Index > oldTruncatedState.Index {
numTruncatedEntries = newTruncatedState.Index - oldTruncatedState.Index
}
prefixBuf := &loader.RangeIDPrefixBuf
for idx := oldTruncatedState.Index + 1; idx <= newTruncatedState.Index; idx++ {
// NB: RangeIDPrefixBufs have sufficient capacity (32 bytes) to
// avoid allocating when constructing Raft log keys (16 bytes).
unsafeKey := prefixBuf.RaftLogKey(idx)
if err := readWriter.ClearUnversioned(unsafeKey); err != nil {
return false, errors.Wrapf(err, "unable to clear truncated Raft entries for %+v", newTruncatedState)
if numTruncatedEntries >= raftLogTruncationClearRangeThreshold {
start := prefixBuf.RaftLogKey(oldTruncatedState.Index + 1).Clone()
end := prefixBuf.RaftLogKey(newTruncatedState.Index + 1).Clone() // end is exclusive
if err := readWriter.ClearRawRange(start, end); err != nil {
return false, errors.Wrapf(err,
"unable to clear truncated Raft entries for %+v between indexes %d-%d",
oldTruncatedState, oldTruncatedState.Index+1, newTruncatedState.Index+1)
}
} else {
for idx := oldTruncatedState.Index + 1; idx <= newTruncatedState.Index; idx++ {
// NB: RangeIDPrefixBufs have sufficient capacity (32 bytes) to
// avoid allocating when constructing Raft log keys (16 bytes).
unsafeKey := prefixBuf.RaftLogKey(idx)
if err := readWriter.ClearUnversioned(unsafeKey); err != nil {
return false, errors.Wrapf(err, "unable to clear truncated Raft entries for %+v", newTruncatedState)
}
}
}

Expand Down
65 changes: 49 additions & 16 deletions pkg/kv/kvserver/replica_raft_truncation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@ package kvserver
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/datadriven"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestHandleTruncatedStateBelowRaft(t *testing.T) {
Expand All @@ -46,6 +49,7 @@ func TestHandleTruncatedStateBelowRaft(t *testing.T) {
datadriven.Walk(t, "testdata/truncated_state_migration", func(t *testing.T, path string) {
const rangeID = 12
loader := stateloader.Make(rangeID)
prefixBuf := &loader.RangeIDPrefixBuf
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()

Expand All @@ -55,9 +59,9 @@ func TestHandleTruncatedStateBelowRaft(t *testing.T) {
d.ScanArgs(t, "index", &prevTruncatedState.Index)
d.ScanArgs(t, "term", &prevTruncatedState.Term)
return ""

case "put":
var index uint64
var term uint64
var index, term uint64
var legacy bool
d.ScanArgs(t, "index", &index)
d.ScanArgs(t, "term", &term)
Expand All @@ -69,16 +73,15 @@ func TestHandleTruncatedStateBelowRaft(t *testing.T) {
}

if legacy {
assert.NoError(t, loader.SetLegacyRaftTruncatedState(ctx, eng, nil, truncState))
require.NoError(t, loader.SetLegacyRaftTruncatedState(ctx, eng, nil, truncState))
} else {
assert.NoError(t, loader.SetRaftTruncatedState(ctx, eng, truncState))
require.NoError(t, loader.SetRaftTruncatedState(ctx, eng, truncState))
}
return ""

case "handle":
var buf bytes.Buffer

var index uint64
var term uint64
var index, term uint64
d.ScanArgs(t, "index", &index)
d.ScanArgs(t, "term", &term)

Expand All @@ -87,30 +90,60 @@ func TestHandleTruncatedStateBelowRaft(t *testing.T) {
Term: term,
}

apply, err := handleTruncatedStateBelowRaft(ctx, &prevTruncatedState, newTruncatedState, loader, eng, false)
if err != nil {
return err.Error()
// Write log entries at start, middle, end, and above the truncated interval.
if newTruncatedState.Index > prevTruncatedState.Index {
indexes := []uint64{
prevTruncatedState.Index + 1, // start
(newTruncatedState.Index + prevTruncatedState.Index + 1) / 2, // middle
newTruncatedState.Index, // end
newTruncatedState.Index + 1, // new head
}
for _, idx := range indexes {
meta := enginepb.MVCCMetadata{RawBytes: make([]byte, 8)}
binary.BigEndian.PutUint64(meta.RawBytes, idx)
value, err := protoutil.Marshal(&meta)
require.NoError(t, err)
require.NoError(t, eng.PutUnversioned(prefixBuf.RaftLogKey(idx), value))
}
}

// Apply truncation.
apply, err := handleTruncatedStateBelowRaft(ctx, &prevTruncatedState, newTruncatedState, loader, eng, false)
require.NoError(t, err)
fmt.Fprintf(&buf, "apply: %t\n", apply)

// Check the truncated state.
for _, key := range []roachpb.Key{
keys.RaftTruncatedStateLegacyKey(rangeID),
keys.RaftTruncatedStateKey(rangeID),
} {
var truncatedState roachpb.RaftTruncatedState
ok, err := storage.MVCCGetProto(ctx, eng, key, hlc.Timestamp{}, &truncatedState, storage.MVCCGetOptions{})
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
if !ok {
continue
}
fmt.Fprintf(&buf, "%s -> index=%d term=%d\n", key, truncatedState.Index, truncatedState.Term)
fmt.Fprintf(&buf, "state: %s -> index=%d term=%d\n", key, truncatedState.Index, truncatedState.Term)
}

// Find the first untruncated log entry (the log head).
res, err := storage.MVCCScan(ctx, eng,
prefixBuf.RaftLogPrefix().Clone(),
prefixBuf.RaftLogPrefix().PrefixEnd(),
hlc.Timestamp{},
storage.MVCCScanOptions{MaxKeys: 1})
require.NoError(t, err)
var head roachpb.Key
if len(res.KVs) > 0 {
head = res.KVs[0].Key
}
fmt.Fprintf(&buf, "head: %s\n", head)

return buf.String()

default:
return fmt.Sprintf("unsupported: %s", d.Cmd)
}
return fmt.Sprintf("unsupported: %s", d.Cmd)
})
})
}
16 changes: 13 additions & 3 deletions pkg/kv/kvserver/testdata/truncated_state_migration/migration
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,28 @@ prev index=100 term=9
handle index=150 term=9
----
apply: true
/Local/RangeID/12/u/RaftTruncatedState -> index=150 term=9
state: /Local/RangeID/12/u/RaftTruncatedState -> index=150 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:151

# Simulate another truncation that moves forward.

handle index=170 term=9
----
apply: true
/Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9
state: /Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:171

# ... and one that moves backwards and should not take effect.

handle index=150 term=9
----
apply: false
/Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9
state: /Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:151

# A huge truncation (beyond raftLogTruncationClearRangeThreshold) also works.
handle index=12345678901234567890 term=9
----
apply: true
state: /Local/RangeID/12/u/RaftTruncatedState -> index=12345678901234567890 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:12345678901234567891
16 changes: 13 additions & 3 deletions pkg/kv/kvserver/testdata/truncated_state_migration/pre_migration
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,27 @@ put legacy=true index=100 term=9
handle index=100 term=9
----
apply: true
/Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9
state: /Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9
head: /Min

# Note that the below aren't actually possible in practice
# as a divergence won't happen before the migration.

handle index=150 term=9
----
apply: true
/Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9
state: /Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:151

handle index=60 term=9
----
apply: true
/Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9
state: /Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:151

# A huge truncation (beyond raftLogTruncationClearRangeThreshold) also works.
handle index=12345678901234567890 term=9
----
apply: true
state: /Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:12345678901234567891
10 changes: 10 additions & 0 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,16 @@ func bytesPrefixEnd(b []byte) []byte {
return b
}

// Clone returns a copy of the key.
func (k Key) Clone() Key {
if k == nil {
return nil
}
c := make(Key, len(k))
copy(c, k)
return c
}

// Next returns the next key in lexicographic sort order. The method may only
// take a shallow copy of the Key, so both the receiver and the return
// value should be treated as immutable after.
Expand Down
11 changes: 11 additions & 0 deletions pkg/roachpb/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ func makeSynTS(walltime int64, logical int32) hlc.Timestamp {
}
}

func TestKeyClone(t *testing.T) {
k := Key{0x01, 0x02, 0x03}
c := k.Clone()
require.Equal(t, k, c)

k[0] = 0xff
require.NotEqual(t, k, c)

require.Nil(t, Key(nil).Clone())
}

// TestKeyNext tests that the method for creating lexicographic
// successors to byte slices works as expected.
func TestKeyNext(t *testing.T) {
Expand Down

0 comments on commit 0490498

Please sign in to comment.