Skip to content

Commit

Permalink
kvserver: use ClearRawRange to truncate very large Raft logs
Browse files Browse the repository at this point in the history
Raft log truncation was done using point deletes in a single Pebble
batch. If the number of entries to truncate was very large, this could
result in overflowing the Pebble batch, causing the node to panic. This
has been seen to happen e.g. when the snapshot rate was set very low,
effectively stalling snapshot transfers which in turn held up log
truncations for extended periods of time.

This patch uses a Pebble range tombstone if the number of entries to
truncate is very large (>100k). In most common cases, point deletes are
still used, to avoid writing too many range tombstones to Pebble.

Release note (bug fix): Fixed a bug which could cause nodes to crash
when truncating abnormally large Raft logs.
  • Loading branch information
erikgrinaker committed Feb 3, 2022
1 parent d471ca8 commit 431212c
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 36 deletions.
56 changes: 42 additions & 14 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,27 @@ import (
"go.etcd.io/etcd/raft/v3/tracker"
)

var (
// raftLogTruncationClearRangeThreshold is the number of entries at which Raft
// log truncation uses a Pebble range tombstone rather than point deletes. It
// is set high enough to avoid writing too many range tombstones to Pebble,
// but low enough that we don't do too many point deletes either (in
// particular, we don't want to overflow the Pebble write batch).
//
// In the steady state, Raft log truncation occurs when RaftLogQueueStaleSize
// (64 KB) or RaftLogQueueStaleThreshold (100 entries) is exceeded, so
// truncations are generally small. If followers are lagging, we let the log
// grow to RaftLogTruncationThreshold (16 MB) before truncating.
//
// 100k was chosen because it is unlikely to be hit in most common cases,
// keeping the number of range tombstones low, but will trigger when Raft logs
// have grown abnormally large. RaftLogTruncationThreshold will typically not
// trigger it, unless the average log entry is <= 160 bytes. The key size is
// ~16 bytes, so Pebble point deletion batches will be bounded at ~1.6MB.
raftLogTruncationClearRangeThreshold = uint64(util.ConstantWithMetamorphicTestRange(
"raft-log-truncation-clearrange-threshold", 100000 /* default */, 1 /* min */, 1e6 /* max */))
)

func makeIDKey() kvserverbase.CmdIDKey {
idKeyBuf := make([]byte, 0, raftCommandIDLen)
idKeyBuf = encoding.EncodeUint64Ascending(idKeyBuf, uint64(rand.Int63()))
Expand Down Expand Up @@ -1782,21 +1803,28 @@ func handleTruncatedStateBelowRaft(
// truncation index to the new truncation index. This is performed
// atomically with the raft command application so that the
// TruncatedState index is always consistent with the state of the
// Raft log itself. We can use the distinct writer because we know
// all writes will be to distinct keys.
//
// Intentionally don't use range deletion tombstones (ClearRange())
// due to performance concerns connected to having many range
// deletion tombstones. There is a chance that ClearRange will
// perform well here because the tombstones could be "collapsed",
// but it is hardly worth the risk at this point.
// Raft log itself.
var numTruncatedEntries uint64
if newTruncatedState.Index > oldTruncatedState.Index {
numTruncatedEntries = newTruncatedState.Index - oldTruncatedState.Index
}
prefixBuf := &loader.RangeIDPrefixBuf
for idx := oldTruncatedState.Index + 1; idx <= newTruncatedState.Index; idx++ {
// NB: RangeIDPrefixBufs have sufficient capacity (32 bytes) to
// avoid allocating when constructing Raft log keys (16 bytes).
unsafeKey := prefixBuf.RaftLogKey(idx)
if err := readWriter.ClearUnversioned(unsafeKey); err != nil {
return false, errors.Wrapf(err, "unable to clear truncated Raft entries for %+v", newTruncatedState)
if numTruncatedEntries >= raftLogTruncationClearRangeThreshold {
start := prefixBuf.RaftLogKey(oldTruncatedState.Index + 1).Clone()
end := prefixBuf.RaftLogKey(newTruncatedState.Index + 1).Clone() // end is exclusive
if err := readWriter.ClearRawRange(start, end); err != nil {
return false, errors.Wrapf(err,
"unable to clear truncated Raft entries for %+v between indexes %d-%d",
oldTruncatedState, oldTruncatedState.Index+1, newTruncatedState.Index+1)
}
} else {
for idx := oldTruncatedState.Index + 1; idx <= newTruncatedState.Index; idx++ {
// NB: RangeIDPrefixBufs have sufficient capacity (32 bytes) to
// avoid allocating when constructing Raft log keys (16 bytes).
unsafeKey := prefixBuf.RaftLogKey(idx)
if err := readWriter.ClearUnversioned(unsafeKey); err != nil {
return false, errors.Wrapf(err, "unable to clear truncated Raft entries for %+v", newTruncatedState)
}
}
}

Expand Down
65 changes: 49 additions & 16 deletions pkg/kv/kvserver/replica_raft_truncation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@ package kvserver
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/datadriven"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestHandleTruncatedStateBelowRaft(t *testing.T) {
Expand All @@ -46,6 +49,7 @@ func TestHandleTruncatedStateBelowRaft(t *testing.T) {
datadriven.Walk(t, "testdata/truncated_state_migration", func(t *testing.T, path string) {
const rangeID = 12
loader := stateloader.Make(rangeID)
prefixBuf := &loader.RangeIDPrefixBuf
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()

Expand All @@ -55,9 +59,9 @@ func TestHandleTruncatedStateBelowRaft(t *testing.T) {
d.ScanArgs(t, "index", &prevTruncatedState.Index)
d.ScanArgs(t, "term", &prevTruncatedState.Term)
return ""

case "put":
var index uint64
var term uint64
var index, term uint64
var legacy bool
d.ScanArgs(t, "index", &index)
d.ScanArgs(t, "term", &term)
Expand All @@ -69,16 +73,15 @@ func TestHandleTruncatedStateBelowRaft(t *testing.T) {
}

if legacy {
assert.NoError(t, loader.SetLegacyRaftTruncatedState(ctx, eng, nil, truncState))
require.NoError(t, loader.SetLegacyRaftTruncatedState(ctx, eng, nil, truncState))
} else {
assert.NoError(t, loader.SetRaftTruncatedState(ctx, eng, truncState))
require.NoError(t, loader.SetRaftTruncatedState(ctx, eng, truncState))
}
return ""

case "handle":
var buf bytes.Buffer

var index uint64
var term uint64
var index, term uint64
d.ScanArgs(t, "index", &index)
d.ScanArgs(t, "term", &term)

Expand All @@ -87,30 +90,60 @@ func TestHandleTruncatedStateBelowRaft(t *testing.T) {
Term: term,
}

apply, err := handleTruncatedStateBelowRaft(ctx, &prevTruncatedState, newTruncatedState, loader, eng, false)
if err != nil {
return err.Error()
// Write log entries at start, middle, end, and above the truncated interval.
if newTruncatedState.Index > prevTruncatedState.Index {
indexes := []uint64{
prevTruncatedState.Index + 1, // start
(newTruncatedState.Index + prevTruncatedState.Index + 1) / 2, // middle
newTruncatedState.Index, // end
newTruncatedState.Index + 1, // new head
}
for _, idx := range indexes {
meta := enginepb.MVCCMetadata{RawBytes: make([]byte, 8)}
binary.BigEndian.PutUint64(meta.RawBytes, idx)
value, err := protoutil.Marshal(&meta)
require.NoError(t, err)
require.NoError(t, eng.PutUnversioned(prefixBuf.RaftLogKey(idx), value))
}
}

// Apply truncation.
apply, err := handleTruncatedStateBelowRaft(ctx, &prevTruncatedState, newTruncatedState, loader, eng, false)
require.NoError(t, err)
fmt.Fprintf(&buf, "apply: %t\n", apply)

// Check the truncated state.
for _, key := range []roachpb.Key{
keys.RaftTruncatedStateLegacyKey(rangeID),
keys.RaftTruncatedStateKey(rangeID),
} {
var truncatedState roachpb.RaftTruncatedState
ok, err := storage.MVCCGetProto(ctx, eng, key, hlc.Timestamp{}, &truncatedState, storage.MVCCGetOptions{})
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
if !ok {
continue
}
fmt.Fprintf(&buf, "%s -> index=%d term=%d\n", key, truncatedState.Index, truncatedState.Term)
fmt.Fprintf(&buf, "state: %s -> index=%d term=%d\n", key, truncatedState.Index, truncatedState.Term)
}

// Find the first untruncated log entry (the log head).
res, err := storage.MVCCScan(ctx, eng,
prefixBuf.RaftLogPrefix().Clone(),
prefixBuf.RaftLogPrefix().PrefixEnd(),
hlc.Timestamp{},
storage.MVCCScanOptions{MaxKeys: 1})
require.NoError(t, err)
var head roachpb.Key
if len(res.KVs) > 0 {
head = res.KVs[0].Key
}
fmt.Fprintf(&buf, "head: %s\n", head)

return buf.String()

default:
return fmt.Sprintf("unsupported: %s", d.Cmd)
}
return fmt.Sprintf("unsupported: %s", d.Cmd)
})
})
}
16 changes: 13 additions & 3 deletions pkg/kv/kvserver/testdata/truncated_state_migration/migration
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,28 @@ prev index=100 term=9
handle index=150 term=9
----
apply: true
/Local/RangeID/12/u/RaftTruncatedState -> index=150 term=9
state: /Local/RangeID/12/u/RaftTruncatedState -> index=150 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:151

# Simulate another truncation that moves forward.

handle index=170 term=9
----
apply: true
/Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9
state: /Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:171

# ... and one that moves backwards and should not take effect.

handle index=150 term=9
----
apply: false
/Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9
state: /Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:151

# A huge truncation (beyond raftLogTruncationClearRangeThreshold) also works.
handle index=12345678901234567890 term=9
----
apply: true
state: /Local/RangeID/12/u/RaftTruncatedState -> index=12345678901234567890 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:12345678901234567891
16 changes: 13 additions & 3 deletions pkg/kv/kvserver/testdata/truncated_state_migration/pre_migration
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,27 @@ put legacy=true index=100 term=9
handle index=100 term=9
----
apply: true
/Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9
state: /Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9
head: /Min

# Note that the below aren't actually possible in practice
# as a divergence won't happen before the migration.

handle index=150 term=9
----
apply: true
/Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9
state: /Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:151

handle index=60 term=9
----
apply: true
/Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9
state: /Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:151

# A huge truncation (beyond raftLogTruncationClearRangeThreshold) also works.
handle index=12345678901234567890 term=9
----
apply: true
state: /Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:12345678901234567891
10 changes: 10 additions & 0 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,16 @@ func bytesPrefixEnd(b []byte) []byte {
return b
}

// Clone returns a copy of the key.
func (k Key) Clone() Key {
if k == nil {
return nil
}
c := make(Key, len(k))
copy(c, k)
return c
}

// Next returns the next key in lexicographic sort order. The method may only
// take a shallow copy of the Key, so both the receiver and the return
// value should be treated as immutable after.
Expand Down
11 changes: 11 additions & 0 deletions pkg/roachpb/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ func makeSynTS(walltime int64, logical int32) hlc.Timestamp {
}
}

func TestKeyClone(t *testing.T) {
k := Key{0x01, 0x02, 0x03}
c := k.Clone()
require.Equal(t, k, c)

k[0] = 0xff
require.NotEqual(t, k, c)

require.Nil(t, Key(nil).Clone())
}

// TestKeyNext tests that the method for creating lexicographic
// successors to byte slices works as expected.
func TestKeyNext(t *testing.T) {
Expand Down

0 comments on commit 431212c

Please sign in to comment.