Skip to content

Commit

Permalink
storage: use raft log size (in bytes) as metric for truncation
Browse files Browse the repository at this point in the history
The raft log currently truncates based of the number of entries (max 10000).
These entries can be of any size so it makes more sense to use the byte size of
the raft log as a metric for truncation.

See cockroachdb#7065.
  • Loading branch information
d4l3k committed Jun 16, 2016
1 parent ff035c6 commit d39705c
Show file tree
Hide file tree
Showing 15 changed files with 299 additions and 110 deletions.
2 changes: 2 additions & 0 deletions keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ var (
localRaftLastIndexSuffix = []byte("rfti")
// LocalRaftLogSuffix is the suffix for the raft log.
LocalRaftLogSuffix = []byte("rftl")
// LocalRaftLogSizeSuffix is the suffix for the size of the raft log.
localRaftLogSizeSuffix = []byte("rfts")
// localRangeLastReplicaGCTimestampSuffix is the suffix for a range's
// last replica GC timestamp (for GC of old replicas).
localRangeLastReplicaGCTimestampSuffix = []byte("rlrt")
Expand Down
5 changes: 5 additions & 0 deletions keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ func RaftLogKey(rangeID roachpb.RangeID, logIndex uint64) roachpb.Key {
return key
}

// RaftLogSizeKey returns a system-local key for the size of the Raft log.
func RaftLogSizeKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDUnreplicatedKey(rangeID, localRaftLogSizeSuffix, nil)
}

// RangeLastReplicaGCTimestampKey returns a range-local key for
// the range's last replica GC timestamp.
func RangeLastReplicaGCTimestampKey(rangeID roachpb.RangeID) roachpb.Key {
Expand Down
19 changes: 17 additions & 2 deletions storage/client_raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package storage_test

import (
"bytes"
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/client"
"github.com/cockroachdb/cockroach/config"
"github.com/cockroachdb/cockroach/storage"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/leaktest"
Expand All @@ -34,6 +36,13 @@ func TestRaftLogQueue(t *testing.T) {

var mtc multiTestContext

// Set maxBytes to something small so we can trigger the raft log truncation
// without adding 64MB of logs.
const maxBytes = 1 << 16
defer config.TestingSetDefaultZoneConfig(config.ZoneConfig{
RangeMaxBytes: maxBytes,
})()

// Turn off raft elections so the raft leader won't change out from under
// us in this test.
sc := storage.TestStoreContext()
Expand Down Expand Up @@ -61,9 +70,15 @@ func TestRaftLogQueue(t *testing.T) {
t.Fatal(err)
}

// Disable splits since we're increasing the raft log with puts.
for _, store := range mtc.stores {
store.DisableSplitQueue(true)
}

// Write a collection of values to increase the raft log.
for i := 0; i < storage.RaftLogQueueStaleThreshold+1; i++ {
pArgs = putArgs([]byte(fmt.Sprintf("key-%d", i)), []byte("value"))
value := bytes.Repeat([]byte("a"), 1000) // 1KB
for size := int64(0); size < 2*maxBytes; size += int64(len(value)) {
pArgs = putArgs([]byte(fmt.Sprintf("key-%d", size)), value)
if _, err := client.SendWrapped(rg1(mtc.stores[0]), nil, &pArgs); err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func TestStoreZoneUpdateAndRangeSplit(t *testing.T) {
config.TestingSetupZoneConfigHook(stopper)
defer stopper.Stop()

maxBytes := int64(1 << 16)
const maxBytes = 1 << 16
// Set max bytes.
descID := uint32(keys.MaxReservedDescID + 1)
config.TestingSetZoneConfig(descID, &config.ZoneConfig{RangeMaxBytes: maxBytes})
Expand Down Expand Up @@ -597,7 +597,7 @@ func TestStoreRangeSplitWithMaxBytesUpdate(t *testing.T) {
origRng := store.LookupReplica(roachpb.RKeyMin, nil)

// Set max bytes.
maxBytes := int64(1 << 16)
const maxBytes = 1 << 16
descID := uint32(keys.MaxReservedDescID + 1)
config.TestingSetZoneConfig(descID, &config.ZoneConfig{RangeMaxBytes: maxBytes})

Expand Down
5 changes: 5 additions & 0 deletions storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ func (s *Store) DisableRaftLogQueue(disabled bool) {
s.raftLogQueue.SetDisabled(disabled)
}

// DisableSplitQueue disables or enables the replica split queue.
func (s *Store) DisableSplitQueue(disabled bool) {
s.splitQueue.SetDisabled(disabled)
}

// ForceRaftLogScanAndProcess iterates over all ranges and enqueues any that
// need their raft logs truncated and then process each of them.
func (s *Store) ForceRaftLogScanAndProcess() {
Expand Down
89 changes: 61 additions & 28 deletions storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,24 @@ import (
"math"
"time"

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/client"
"github.com/cockroachdb/cockroach/config"
"github.com/cockroachdb/cockroach/gossip"
"github.com/cockroachdb/cockroach/keys"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/storage/engine"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/log"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
)

const (
// raftLogQueueMaxSize is the max size of the queue.
raftLogQueueMaxSize = 100
// raftLogPadding is the number of log entries that should be kept for
// lagging replicas.
raftLogPadding = 10000
// RaftLogQueueTimerDuration is the duration between truncations. This needs
// to be relatively short so that truncations can keep up with raft log entry
// creation.
Expand Down Expand Up @@ -67,8 +69,10 @@ func newRaftLogQueue(db *client.DB, gossip *gossip.Gossip) *raftLogQueue {
}

// getTruncatableIndexes returns the total number of stale raft log entries that
// can be truncated and the oldest index that cannot be pruned.
func getTruncatableIndexes(r *Replica) (uint64, uint64, error) {
// can be truncated and the oldest index that cannot be pruned. If estimate is
// true, it returns a resource cheap estimate that can be used for scheduling
// purposes.
func getTruncatableIndexes(r *Replica, estimate bool) (uint64, uint64, error) {
rangeID := r.RangeID
// TODO(bram): r.store.RaftStatus(rangeID) differs from r.RaftStatus() in
// tests, causing TestGetTruncatableIndexes to fail. Figure out why and fix.
Expand All @@ -86,12 +90,36 @@ func getTruncatableIndexes(r *Replica) (uint64, uint64, error) {
return 0, 0, nil
}

// Calculate the quorum matched index and adjust based on padding.
oldestIndex := getQuorumMatchedIndex(raftStatus, raftLogPadding)

r.mu.Lock()
defer r.mu.Unlock()
firstIndex, err := r.FirstIndex()
raftLogSize := r.mu.raftLogSize
targetSize := r.mu.maxBytes
r.mu.Unlock()

// Always truncate the oldest raft log entry which has been committed by all replicas.
oldestIndex := getMaximumMatchedIndex(raftStatus)

// Truncate raft logs if the log is too big.
if raftLogSize > targetSize {
if estimate {
// When estimating assume that the truncate threshold has been reached.
oldestIndex += RaftLogQueueStaleThreshold
} else {
oldestSizeIndex, err := getLogIndexForSize(r.store.Engine(), r.RangeID, raftLogSize, targetSize)
if err != nil {
return 0, 0, err
}
if oldestSizeIndex > oldestIndex {
oldestIndex = oldestSizeIndex
}
}
}

// Never truncate uncommitted logs.
if oldestIndex > raftStatus.Commit {
oldestIndex = raftStatus.Commit
}

firstIndex, err := r.GetFirstIndex()
if err != nil {
return 0, 0, util.Errorf("error retrieving first index for range %d: %s", rangeID, err)
}
Expand All @@ -111,7 +139,7 @@ func getTruncatableIndexes(r *Replica) (uint64, uint64, error) {
func (*raftLogQueue) shouldQueue(
now hlc.Timestamp, r *Replica, _ config.SystemConfig,
) (shouldQ bool, priority float64) {
truncatableIndexes, _, err := getTruncatableIndexes(r)
truncatableIndexes, _, err := getTruncatableIndexes(r, true /* estimate */)
if err != nil {
log.Warning(err)
return false, 0
Expand All @@ -124,7 +152,7 @@ func (*raftLogQueue) shouldQueue(
// leader and if the total number of the range's raft log's stale entries
// exceeds RaftLogQueueStaleThreshold.
func (rlq *raftLogQueue) process(now hlc.Timestamp, r *Replica, _ config.SystemConfig) error {
truncatableIndexes, oldestIndex, err := getTruncatableIndexes(r)
truncatableIndexes, oldestIndex, err := getTruncatableIndexes(r, false /* estimate */)
if err != nil {
return err
}
Expand Down Expand Up @@ -155,26 +183,31 @@ func (*raftLogQueue) purgatoryChan() <-chan struct{} {
return nil
}

// getQuorumMatchedIndex returns the index which a quorum of the nodes have
// committed. The returned value is adjusted by padding to allow retaining
// additional entries, but this adjustment is limited so that we won't keep
// entries which all nodes have matched.
func getQuorumMatchedIndex(raftStatus *raft.Status, padding uint64) uint64 {
index := raftStatus.Commit
if index >= padding {
index -= padding
} else {
index = 0
}

// getMaximumMatchedIndex returns the index that has been committed by every
// node in the raft group.
func getMaximumMatchedIndex(raftStatus *raft.Status) uint64 {
smallestMatch := uint64(math.MaxUint64)
for _, progress := range raftStatus.Progress {
if smallestMatch > progress.Match {
smallestMatch = progress.Match
}
}
if index < smallestMatch {
index = smallestMatch
}
return index
return smallestMatch
}

// getLogIndexForSize returns the oldest raft log index that keeps the log size
// under the target size. If there are no log entries 0 is returned.
func getLogIndexForSize(eng engine.Engine, rangeID roachpb.RangeID, currentSize, targetSize int64) (uint64, error) {
prefix := keys.RaftLogPrefix(rangeID)
var entry raftpb.Entry
_, err := engine.MVCCIterate(context.Background(), eng, prefix, prefix.PrefixEnd(),
hlc.ZeroTimestamp, true /* consistent */, nil /* txn */, false, /* reverse */
func(kv roachpb.KeyValue) (bool, error) {
currentSize -= int64(kv.Size())
if currentSize < targetSize {
return true, kv.Value.GetProto(&entry)
}
return false, nil
})
return entry.Index, err
}
38 changes: 12 additions & 26 deletions storage/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,46 +25,32 @@ import (
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/leaktest"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
)

func TestGetQuorumMatchedIndex(t *testing.T) {
func TestGetMaximumMatchedIndex(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
commit uint64
progress []uint64
padding uint64
expected uint64
}{
// Basic cases.
{1, []uint64{1}, 0, 1},
{1, []uint64{1, 2}, 0, 1},
{2, []uint64{1, 2, 3}, 0, 2},
{2, []uint64{1, 2, 3, 4}, 0, 2},
{3, []uint64{1, 2, 3, 4, 5}, 0, 3},
// Sorting.
{3, []uint64{5, 4, 3, 2, 1}, 0, 3},
// Padding.
{3, []uint64{1, 3, 3}, 1, 2},
{3, []uint64{1, 3, 3}, 2, 1},
{3, []uint64{1, 3, 3}, 3, 1},
// Minimum progress value limits padding.
{3, []uint64{2, 3, 3}, 3, 2},
{[]uint64{1}, 1},
{[]uint64{1, 2}, 1},
{[]uint64{2, 3, 4}, 2},
// sorting.
{[]uint64{5, 4, 3, 2, 1}, 1},
}
for i, c := range testCases {
status := &raft.Status{
HardState: raftpb.HardState{
Commit: c.commit,
},
Progress: make(map[uint64]raft.Progress),
}
for j, v := range c.progress {
status.Progress[uint64(j)] = raft.Progress{Match: v}
}
quorumMatchedIndex := getQuorumMatchedIndex(status, c.padding)
if c.expected != quorumMatchedIndex {
t.Fatalf("%d: expected %d, but got %d", i, c.expected, quorumMatchedIndex)
index := getMaximumMatchedIndex(status)
if c.expected != index {
t.Fatalf("%d: expected %d, but got %d", i, c.expected, index)
}
}
}
Expand All @@ -84,7 +70,7 @@ func TestGetTruncatableIndexes(t *testing.T) {

// Test on a new range which should not have a raft group yet.
rngNew := createRange(store, 100, roachpb.RKey("a"), roachpb.RKey("c"))
truncatableIndexes, oldestIndex, err := getTruncatableIndexes(rngNew)
truncatableIndexes, oldestIndex, err := getTruncatableIndexes(rngNew, false)
if err != nil {
t.Errorf("expected no error, got %s", err)
}
Expand Down Expand Up @@ -116,7 +102,7 @@ func TestGetTruncatableIndexes(t *testing.T) {
}
}

truncatableIndexes, oldestIndex, err = getTruncatableIndexes(r)
truncatableIndexes, oldestIndex, err = getTruncatableIndexes(r, false)
if err != nil {
t.Errorf("expected no error, got %s", err)
}
Expand Down Expand Up @@ -152,7 +138,7 @@ func TestGetTruncatableIndexes(t *testing.T) {
// client_raft_log_queue_test.
util.SucceedsSoon(t, func() error {
store.ForceRaftLogScanAndProcess()
truncatableIndexes, oldestIndex, err := getTruncatableIndexes(rngNew)
truncatableIndexes, oldestIndex, err := getTruncatableIndexes(rngNew, false)
if err != nil {
return util.Errorf("expected no error, got %s", err)
}
Expand Down
23 changes: 22 additions & 1 deletion storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ type Replica struct {
cmdQ *CommandQueue
// Last index persisted to the raft log (not necessarily committed).
lastIndex uint64
// The size of the persisted raft log.
raftLogSize int64
// Max bytes before split.
maxBytes int64
// pendingCmds stores the Raft in-flight commands which
Expand Down Expand Up @@ -365,6 +367,22 @@ func (r *Replica) newReplicaInner(desc *roachpb.RangeDescriptor, clock *hlc.Cloc
if err != nil {
return err
}

var found bool
r.mu.raftLogSize, found, err = loadRaftLogSize(r.store.Engine(), r.RangeID)
if err != nil {
return err
}
if !found {
if log.V(3) {
log.Infof("raft log size not found; computing for range %s", r.RangeID)
}
r.mu.raftLogSize, err = computeRaftLogSize(r.store.Engine(), r.RangeID, clock.PhysicalNow())
if err != nil {
return err
}
}

if r.isInitializedLocked() && replicaID != 0 {
return util.Errorf("replicaID must be 0 when creating an initialized replica")
}
Expand Down Expand Up @@ -907,6 +925,7 @@ func (r *Replica) State() storagebase.RangeInfo {
ri.ReplicaState = *(protoutil.Clone(&r.mu.state)).(*storagebase.ReplicaState)
ri.LastIndex = r.mu.lastIndex
ri.NumPending = uint64(len(r.mu.pendingCmds))
ri.RaftLogSize = r.mu.raftLogSize
return ri
}

Expand Down Expand Up @@ -1515,6 +1534,7 @@ func (r *Replica) handleRaftReady() error {
var rd raft.Ready
r.mu.Lock()
lastIndex := r.mu.lastIndex // used for append below
raftLogSize := r.mu.raftLogSize
err := r.withRaftGroupLocked(func(raftGroup *raft.RawNode) error {
if hasReady = raftGroup.HasReady(); hasReady {
rd = raftGroup.Ready()
Expand Down Expand Up @@ -1559,13 +1579,14 @@ func (r *Replica) handleRaftReady() error {
// All of the entries are appended to distinct keys, returning a new
// last index.
var err error
if lastIndex, err = r.append(writer, lastIndex, rd.Entries); err != nil {
if lastIndex, raftLogSize, err = r.append(writer, lastIndex, raftLogSize, rd.Entries); err != nil {
return err
}
batch.Defer(func() {
// Update last index on commit.
r.mu.Lock()
r.mu.lastIndex = lastIndex
r.mu.raftLogSize = raftLogSize
r.mu.Unlock()
})

Expand Down
Loading

0 comments on commit d39705c

Please sign in to comment.