Skip to content

Commit

Permalink
storage: use raft log size (in bytes) as metric for truncation
Browse files Browse the repository at this point in the history
The raft log currently truncates based of the number of entries (max 10000).
These entries can be of any size so it makes more sense to use the byte size of
the raft log as a metric for truncation.

See cockroachdb#7065.
  • Loading branch information
d4l3k committed Jun 16, 2016
1 parent ff035c6 commit f947970
Show file tree
Hide file tree
Showing 16 changed files with 321 additions and 147 deletions.
2 changes: 2 additions & 0 deletions keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ var (
localRaftLastIndexSuffix = []byte("rfti")
// LocalRaftLogSuffix is the suffix for the raft log.
LocalRaftLogSuffix = []byte("rftl")
// LocalRaftLogSizeSuffix is the suffix for the size of the raft log.
localRaftLogSizeSuffix = []byte("rfts")
// localRangeLastReplicaGCTimestampSuffix is the suffix for a range's
// last replica GC timestamp (for GC of old replicas).
localRangeLastReplicaGCTimestampSuffix = []byte("rlrt")
Expand Down
5 changes: 5 additions & 0 deletions keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ func RaftLogKey(rangeID roachpb.RangeID, logIndex uint64) roachpb.Key {
return key
}

// RaftLogSizeKey returns a system-local key for the size of the Raft log.
func RaftLogSizeKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDUnreplicatedKey(rangeID, localRaftLogSizeSuffix, nil)
}

// RangeLastReplicaGCTimestampKey returns a range-local key for
// the range's last replica GC timestamp.
func RangeLastReplicaGCTimestampKey(rangeID roachpb.RangeID) roachpb.Key {
Expand Down
188 changes: 109 additions & 79 deletions server/serverpb/status.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions server/serverpb/status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ message RangeInfo {
PrettySpan span = 1 [(gogoproto.nullable) = false];
string raft_state = 2;
storage.storagebase.RangeInfo state = 4 [(gogoproto.nullable) = false];
int64 raft_log_size = 5;
}

message RangesRequest {
Expand Down
5 changes: 3 additions & 2 deletions server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,8 +663,9 @@ func (s *statusServer) Ranges(ctx context.Context, req *serverpb.RangesRequest)
StartKey: desc.StartKey.String(),
EndKey: desc.EndKey.String(),
},
RaftState: raftState,
State: state,
RaftState: raftState,
State: state,
RaftLogSize: rep.RaftLogSize(),
})
return false, nil
})
Expand Down
19 changes: 17 additions & 2 deletions storage/client_raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package storage_test

import (
"bytes"
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/client"
"github.com/cockroachdb/cockroach/config"
"github.com/cockroachdb/cockroach/storage"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/leaktest"
Expand All @@ -34,6 +36,13 @@ func TestRaftLogQueue(t *testing.T) {

var mtc multiTestContext

// Set maxBytes to something small so we can trigger the raft log truncation
// without adding 64MB of logs.
const maxBytes = 1 << 16
defer config.TestingSetDefaultZoneConfig(config.ZoneConfig{
RangeMaxBytes: maxBytes,
})()

// Turn off raft elections so the raft leader won't change out from under
// us in this test.
sc := storage.TestStoreContext()
Expand Down Expand Up @@ -61,9 +70,15 @@ func TestRaftLogQueue(t *testing.T) {
t.Fatal(err)
}

// Disable splits since we're increasing the raft log with puts.
for _, store := range mtc.stores {
store.DisableSplitQueue(true)
}

// Write a collection of values to increase the raft log.
for i := 0; i < storage.RaftLogQueueStaleThreshold+1; i++ {
pArgs = putArgs([]byte(fmt.Sprintf("key-%d", i)), []byte("value"))
value := bytes.Repeat([]byte("a"), 1000) // 1KB
for size := int64(0); size < 2*maxBytes; size += int64(len(value)) {
pArgs = putArgs([]byte(fmt.Sprintf("key-%d", size)), value)
if _, err := client.SendWrapped(rg1(mtc.stores[0]), nil, &pArgs); err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func TestStoreZoneUpdateAndRangeSplit(t *testing.T) {
config.TestingSetupZoneConfigHook(stopper)
defer stopper.Stop()

maxBytes := int64(1 << 16)
const maxBytes = 1 << 16
// Set max bytes.
descID := uint32(keys.MaxReservedDescID + 1)
config.TestingSetZoneConfig(descID, &config.ZoneConfig{RangeMaxBytes: maxBytes})
Expand Down Expand Up @@ -597,7 +597,7 @@ func TestStoreRangeSplitWithMaxBytesUpdate(t *testing.T) {
origRng := store.LookupReplica(roachpb.RKeyMin, nil)

// Set max bytes.
maxBytes := int64(1 << 16)
const maxBytes = 1 << 16
descID := uint32(keys.MaxReservedDescID + 1)
config.TestingSetZoneConfig(descID, &config.ZoneConfig{RangeMaxBytes: maxBytes})

Expand Down
5 changes: 5 additions & 0 deletions storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ func (s *Store) DisableRaftLogQueue(disabled bool) {
s.raftLogQueue.SetDisabled(disabled)
}

// DisableSplitQueue disables or enables the replica split queue.
func (s *Store) DisableSplitQueue(disabled bool) {
s.splitQueue.SetDisabled(disabled)
}

// ForceRaftLogScanAndProcess iterates over all ranges and enqueues any that
// need their raft logs truncated and then process each of them.
func (s *Store) ForceRaftLogScanAndProcess() {
Expand Down
73 changes: 49 additions & 24 deletions storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,24 @@ import (
"math"
"time"

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/client"
"github.com/cockroachdb/cockroach/config"
"github.com/cockroachdb/cockroach/gossip"
"github.com/cockroachdb/cockroach/keys"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/storage/engine"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/log"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
)

const (
// raftLogQueueMaxSize is the max size of the queue.
raftLogQueueMaxSize = 100
// raftLogPadding is the number of log entries that should be kept for
// lagging replicas.
raftLogPadding = 10000
// RaftLogQueueTimerDuration is the duration between truncations. This needs
// to be relatively short so that truncations can keep up with raft log entry
// creation.
Expand Down Expand Up @@ -86,12 +88,30 @@ func getTruncatableIndexes(r *Replica) (uint64, uint64, error) {
return 0, 0, nil
}

// Calculate the quorum matched index and adjust based on padding.
oldestIndex := getQuorumMatchedIndex(raftStatus, raftLogPadding)

r.mu.Lock()
defer r.mu.Unlock()
firstIndex, err := r.FirstIndex()
raftLogSize := r.mu.raftLogSize
targetSize := r.mu.maxBytes
r.mu.Unlock()

// Truncate raft logs that are older than the most behind range.
oldestIndex := getMinimumMatchedIndex(raftStatus)

// Truncate raft logs if the log is too big.
if raftLogSize > targetSize {
oldestSizeIndex, err := getLogIndexForSize(r.store.Engine(), r.RangeID, raftLogSize, targetSize)
if err != nil {
return 0, 0, err
}
if oldestSizeIndex > oldestIndex {
oldestIndex = oldestSizeIndex
}
}

if oldestIndex > raftStatus.Commit {
oldestIndex = raftStatus.Commit
}

firstIndex, err := r.GetFirstIndex()
if err != nil {
return 0, 0, util.Errorf("error retrieving first index for range %d: %s", rangeID, err)
}
Expand Down Expand Up @@ -155,26 +175,31 @@ func (*raftLogQueue) purgatoryChan() <-chan struct{} {
return nil
}

// getQuorumMatchedIndex returns the index which a quorum of the nodes have
// committed. The returned value is adjusted by padding to allow retaining
// additional entries, but this adjustment is limited so that we won't keep
// entries which all nodes have matched.
func getQuorumMatchedIndex(raftStatus *raft.Status, padding uint64) uint64 {
index := raftStatus.Commit
if index >= padding {
index -= padding
} else {
index = 0
}

// getMinimumMatchedIndex returns the index that has been committed by every
// node in the cluster.
func getMinimumMatchedIndex(raftStatus *raft.Status) uint64 {
smallestMatch := uint64(math.MaxUint64)
for _, progress := range raftStatus.Progress {
if smallestMatch > progress.Match {
smallestMatch = progress.Match
}
}
if index < smallestMatch {
index = smallestMatch
}
return index
return smallestMatch
}

// getLogIndexForSize returns the oldest raft log index that keeps the log size
// under the target size. If there are no log entries 0 is returned.
func getLogIndexForSize(eng engine.Engine, rangeID roachpb.RangeID, currentSize, targetSize int64) (uint64, error) {
prefix := keys.RaftLogPrefix(rangeID)
var entry raftpb.Entry
_, err := engine.MVCCIterate(context.Background(), eng, prefix, prefix.PrefixEnd(),
hlc.ZeroTimestamp, true /* consistent */, nil /* txn */, false, /* reverse */
func(kv roachpb.KeyValue) (bool, error) {
currentSize -= int64(kv.Size())
if currentSize < targetSize {
return true, kv.Value.GetProto(&entry)
}
return false, nil
})
return entry.Index, err
}
32 changes: 9 additions & 23 deletions storage/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,46 +25,32 @@ import (
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/leaktest"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
)

func TestGetQuorumMatchedIndex(t *testing.T) {
func TestGetMinimumMatchedIndex(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
commit uint64
progress []uint64
padding uint64
expected uint64
}{
// Basic cases.
{1, []uint64{1}, 0, 1},
{1, []uint64{1, 2}, 0, 1},
{2, []uint64{1, 2, 3}, 0, 2},
{2, []uint64{1, 2, 3, 4}, 0, 2},
{3, []uint64{1, 2, 3, 4, 5}, 0, 3},
// Sorting.
{3, []uint64{5, 4, 3, 2, 1}, 0, 3},
// Padding.
{3, []uint64{1, 3, 3}, 1, 2},
{3, []uint64{1, 3, 3}, 2, 1},
{3, []uint64{1, 3, 3}, 3, 1},
// Minimum progress value limits padding.
{3, []uint64{2, 3, 3}, 3, 2},
{[]uint64{1}, 1},
{[]uint64{1, 2}, 1},
{[]uint64{2, 3, 4}, 2},
// sorting.
{[]uint64{5, 4, 3, 2, 1}, 1},
}
for i, c := range testCases {
status := &raft.Status{
HardState: raftpb.HardState{
Commit: c.commit,
},
Progress: make(map[uint64]raft.Progress),
}
for j, v := range c.progress {
status.Progress[uint64(j)] = raft.Progress{Match: v}
}
quorumMatchedIndex := getQuorumMatchedIndex(status, c.padding)
if c.expected != quorumMatchedIndex {
t.Fatalf("%d: expected %d, but got %d", i, c.expected, quorumMatchedIndex)
minimumMatchedIndex := getMinimumMatchedIndex(status)
if c.expected != minimumMatchedIndex {
t.Fatalf("%d: expected %d, but got %d", i, c.expected, minimumMatchedIndex)
}
}
}
Expand Down
29 changes: 28 additions & 1 deletion storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ type Replica struct {
cmdQ *CommandQueue
// Last index persisted to the raft log (not necessarily committed).
lastIndex uint64
// The size of the persisted raft log.
raftLogSize int64
// Max bytes before split.
maxBytes int64
// pendingCmds stores the Raft in-flight commands which
Expand Down Expand Up @@ -365,6 +367,22 @@ func (r *Replica) newReplicaInner(desc *roachpb.RangeDescriptor, clock *hlc.Cloc
if err != nil {
return err
}

var found bool
r.mu.raftLogSize, found, err = loadRaftLogSize(r.store.Engine(), r.RangeID)
if err != nil {
return err
}
if !found {
if log.V(3) {
log.Infof("raft log size not found; computing for range %s", r.RangeID)
}
r.mu.raftLogSize, err = computeRaftLogSize(r.store.Engine(), r.RangeID, clock.PhysicalNow())
if err != nil {
return err
}
}

if r.isInitializedLocked() && replicaID != 0 {
return util.Errorf("replicaID must be 0 when creating an initialized replica")
}
Expand Down Expand Up @@ -409,6 +427,13 @@ func (r *Replica) Destroy(origDesc roachpb.RangeDescriptor) error {
return r.store.destroyReplicaData(desc)
}

// RaftLogSize returns the size of the raft log in bytes of the replica.
func (r *Replica) RaftLogSize() int64 {
r.mu.Lock()
defer r.mu.Unlock()
return r.mu.raftLogSize
}

func (r *Replica) setReplicaID(replicaID roachpb.ReplicaID) error {
r.mu.Lock()
defer r.mu.Unlock()
Expand Down Expand Up @@ -1515,6 +1540,7 @@ func (r *Replica) handleRaftReady() error {
var rd raft.Ready
r.mu.Lock()
lastIndex := r.mu.lastIndex // used for append below
raftLogSize := r.mu.raftLogSize
err := r.withRaftGroupLocked(func(raftGroup *raft.RawNode) error {
if hasReady = raftGroup.HasReady(); hasReady {
rd = raftGroup.Ready()
Expand Down Expand Up @@ -1559,13 +1585,14 @@ func (r *Replica) handleRaftReady() error {
// All of the entries are appended to distinct keys, returning a new
// last index.
var err error
if lastIndex, err = r.append(writer, lastIndex, rd.Entries); err != nil {
if lastIndex, raftLogSize, err = r.append(writer, lastIndex, raftLogSize, rd.Entries); err != nil {
return err
}
batch.Defer(func() {
// Update last index on commit.
r.mu.Lock()
r.mu.lastIndex = lastIndex
r.mu.raftLogSize = raftLogSize
r.mu.Unlock()
})

Expand Down
16 changes: 12 additions & 4 deletions storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1441,12 +1441,19 @@ func (r *Replica) TruncateLog(
}
start := keys.RaftLogKey(r.RangeID, 0)
end := keys.RaftLogKey(r.RangeID, args.Index)
if err = batch.Iterate(engine.MakeMVCCMetadataKey(start), engine.MakeMVCCMetadataKey(end),
func(kv engine.MVCCKeyValue) (bool, error) {
return false, batch.Clear(kv.Key)
}); err != nil {
var diff enginepb.MVCCStats
// Passing zero timestamp to MVCCDeleteRange is equivalent to a ranged clear
// but it also computes stats.
if _, err := engine.MVCCDeleteRange(ctx, batch, &diff, start, end, 0, /* max */
hlc.ZeroTimestamp, nil /* txn */, false /* returnKeys */); err != nil {
return reply, err
}

raftLogSize := r.mu.raftLogSize + diff.SysBytes
if err := setRaftLogSize(batch, r.RangeID, raftLogSize); err != nil {
return reply, err
}

tState := &roachpb.RaftTruncatedState{
Index: args.Index - 1,
Term: term,
Expand All @@ -1455,6 +1462,7 @@ func (r *Replica) TruncateLog(
batch.(engine.Batch).Defer(func() {
r.mu.Lock()
r.mu.state.TruncatedState = tState
r.mu.raftLogSize = raftLogSize
r.mu.Unlock()
})
return reply, engine.MVCCPutProto(ctx, batch, ms, keys.RaftTruncatedStateKey(r.RangeID), hlc.ZeroTimestamp, nil, tState)
Expand Down
Loading

0 comments on commit f947970

Please sign in to comment.