Skip to content

Commit

Permalink
storage: use raft log size (in bytes) as metric for truncation
Browse files Browse the repository at this point in the history
The raft log currently truncates based of the number of entries (max 10000).
These entries can be of any size so it makes more sense to use the byte size of
the raft log as a metric for truncation.

If the raft log is greater than 64MB and there is a behind node, it will
truncate the log to the quorum committed index.

The in-memory size is the approximate size in bytes of the persisted raft log.
On server restart, this value is assumed to be zero to avoid costly scans of the
raft log. After the first raft log truncation it will be correct.

See cockroachdb#7065.
  • Loading branch information
d4l3k committed Jul 5, 2016
1 parent a8b5286 commit 3ec0c8b
Show file tree
Hide file tree
Showing 14 changed files with 288 additions and 126 deletions.
19 changes: 17 additions & 2 deletions storage/client_raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package storage_test

import (
"bytes"
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/config"
"github.com/cockroachdb/cockroach/internal/client"
"github.com/cockroachdb/cockroach/storage"
"github.com/cockroachdb/cockroach/util"
Expand All @@ -35,6 +37,13 @@ func TestRaftLogQueue(t *testing.T) {

var mtc multiTestContext

// Set maxBytes to something small so we can trigger the raft log truncation
// without adding 64MB of logs.
const maxBytes = 1 << 16
defer config.TestingSetDefaultZoneConfig(config.ZoneConfig{
RangeMaxBytes: maxBytes,
})()

// Turn off raft elections so the raft leader won't change out from under
// us in this test.
sc := storage.TestStoreContext()
Expand Down Expand Up @@ -62,9 +71,15 @@ func TestRaftLogQueue(t *testing.T) {
t.Fatal(err)
}

// Disable splits since we're increasing the raft log with puts.
for _, store := range mtc.stores {
store.DisableSplitQueue(true)
}

// Write a collection of values to increase the raft log.
for i := 0; i < storage.RaftLogQueueStaleThreshold+1; i++ {
pArgs = putArgs([]byte(fmt.Sprintf("key-%d", i)), []byte("value"))
value := bytes.Repeat([]byte("a"), 1000) // 1KB
for size := int64(0); size < 2*maxBytes; size += int64(len(value)) {
pArgs = putArgs([]byte(fmt.Sprintf("key-%d", size)), value)
if _, err := client.SendWrapped(rg1(mtc.stores[0]), nil, &pArgs); err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func TestStoreZoneUpdateAndRangeSplit(t *testing.T) {
config.TestingSetupZoneConfigHook(stopper)
defer stopper.Stop()

maxBytes := int64(1 << 16)
const maxBytes = 1 << 16
// Set max bytes.
descID := uint32(keys.MaxReservedDescID + 1)
config.TestingSetZoneConfig(descID, &config.ZoneConfig{RangeMaxBytes: maxBytes})
Expand Down Expand Up @@ -598,7 +598,7 @@ func TestStoreRangeSplitWithMaxBytesUpdate(t *testing.T) {
origRng := store.LookupReplica(roachpb.RKeyMin, nil)

// Set max bytes.
maxBytes := int64(1 << 16)
const maxBytes = 1 << 16
descID := uint32(keys.MaxReservedDescID + 1)
config.TestingSetZoneConfig(descID, &config.ZoneConfig{RangeMaxBytes: maxBytes})

Expand Down
5 changes: 5 additions & 0 deletions storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ func (s *Store) DisableRaftLogQueue(disabled bool) {
s.raftLogQueue.SetDisabled(disabled)
}

// DisableSplitQueue disables or enables the replica split queue.
func (s *Store) DisableSplitQueue(disabled bool) {
s.splitQueue.SetDisabled(disabled)
}

// ForceRaftLogScanAndProcess iterates over all ranges and enqueues any that
// need their raft logs truncated and then process each of them.
func (s *Store) ForceRaftLogScanAndProcess() {
Expand Down
109 changes: 71 additions & 38 deletions storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package storage

import (
"math"
"sort"
"time"

"github.com/cockroachdb/cockroach/config"
Expand All @@ -33,16 +33,13 @@ import (
const (
// raftLogQueueMaxSize is the max size of the queue.
raftLogQueueMaxSize = 100
// raftLogPadding is the number of log entries that should be kept for
// lagging replicas.
raftLogPadding = 10000
// RaftLogQueueTimerDuration is the duration between truncations. This needs
// to be relatively short so that truncations can keep up with raft log entry
// creation.
RaftLogQueueTimerDuration = 50 * time.Millisecond
// RaftLogQueueStaleThreshold is the minimum threshold for stale raft log
// entries. A stale entry is one which all replicas of the range have
// progressed past and thus is no longer needed and can be pruned.
// progressed past and thus is no longer needed and can be truncated.
RaftLogQueueStaleThreshold = 100
)

Expand All @@ -66,13 +63,12 @@ func newRaftLogQueue(db *client.DB, gossip *gossip.Gossip) *raftLogQueue {
return rlq
}

// getTruncatableIndexes returns the total number of stale raft log entries that
// can be truncated and the oldest index that cannot be pruned.
// getTruncatableIndexes returns the number of truncatable indexes and the
// oldest index that cannot be truncated for the replica.
// See computeTruncatableIndex.
func getTruncatableIndexes(r *Replica) (uint64, uint64, error) {
rangeID := r.RangeID
// TODO(bram): r.store.RaftStatus(rangeID) differs from r.RaftStatus() in
// tests, causing TestGetTruncatableIndexes to fail. Figure out why and fix.
raftStatus := r.store.RaftStatus(rangeID)
raftStatus := r.RaftStatus()
if raftStatus == nil {
if log.V(1) {
log.Infof("the raft group doesn't exist for range %d", rangeID)
Expand All @@ -86,23 +82,51 @@ func getTruncatableIndexes(r *Replica) (uint64, uint64, error) {
return 0, 0, nil
}

// Calculate the quorum matched index and adjust based on padding.
oldestIndex := getQuorumMatchedIndex(raftStatus, raftLogPadding)

r.mu.Lock()
defer r.mu.Unlock()
raftLogSize := r.mu.raftLogSize
targetSize := r.mu.maxBytes
firstIndex, err := r.FirstIndex()
r.mu.Unlock()
if err != nil {
return 0, 0, errors.Errorf("error retrieving first index for range %d: %s", rangeID, err)
}

if oldestIndex < firstIndex {
return 0, 0, errors.Errorf("raft log's oldest index (%d) is less than the first index (%d) for range %d",
oldestIndex, firstIndex, rangeID)
truncatableIndex := computeTruncatableIndex(raftStatus, raftLogSize, targetSize, firstIndex)
// Return the number of truncatable indexes.
return truncatableIndex - firstIndex, truncatableIndex, nil
}

// computeTruncatableIndex returns the oldest index that cannot be truncated. If
// there is a behind node, we want to keep old raft logs so it can catch up
// without having to send a full snapshot. However, if a node down is down long
// enough, sending a snapshot is more efficient and we should truncate the log
// to the next behind node or the quorum committed index. We currently truncate
// when the raft log size is bigger than the range max bytes value (typically
// 64MB). When there are no nodes behind, or we can't catch any of them up via
// the raft log (due to a previous truncation) this returns the quorum committed
// index.
func computeTruncatableIndex(raftStatus *raft.Status, raftLogSize, targetSize int64, firstIndex uint64) uint64 {
truncatableIndex := raftStatus.Commit
behindIndexes := getBehindIndexes(raftStatus)
for _, behindIndex := range behindIndexes {
// If the behind node's committed index is before the first raft log
// entry we have, the raft log is unable to catch that node up and the log
// should be truncated since a snapshot will have to be sent instead.
// If the raft log is too big, truncate to the next behind node index or the
// quorum committed index. This allows for multiple nodes to be behind and not
// give up on the more recently behind node.
if behindIndex > firstIndex || (behindIndex == firstIndex && raftLogSize <= targetSize) {
truncatableIndex = behindIndex
break
}
}

// Never truncate past the quorum committed index.
if truncatableIndex > raftStatus.Commit {
truncatableIndex = raftStatus.Commit
}

// Return the number of truncatable indexes.
return oldestIndex - firstIndex, oldestIndex, nil
return truncatableIndex
}

// shouldQueue determines whether a range should be queued for truncating. This
Expand Down Expand Up @@ -155,26 +179,35 @@ func (*raftLogQueue) purgatoryChan() <-chan struct{} {
return nil
}

// getQuorumMatchedIndex returns the index which a quorum of the nodes have
// committed. The returned value is adjusted by padding to allow retaining
// additional entries, but this adjustment is limited so that we won't keep
// entries which all nodes have matched.
func getQuorumMatchedIndex(raftStatus *raft.Status, padding uint64) uint64 {
index := raftStatus.Commit
if index >= padding {
index -= padding
} else {
index = 0
}

smallestMatch := uint64(math.MaxUint64)
// getBehindIndexes returns the indexes of nodes that are behind the quorum
// commit index without duplicates and sorted from most behind to most recent.
func getBehindIndexes(raftStatus *raft.Status) []uint64 {
var behind []uint64
behindUniq := make(map[uint64]struct{})
for _, progress := range raftStatus.Progress {
if smallestMatch > progress.Match {
smallestMatch = progress.Match
index := progress.Match
if index < raftStatus.Commit {
if _, ok := behindUniq[index]; ok {
continue
}
behindUniq[index] = struct{}{}
behind = append(behind, index)
}
}
if index < smallestMatch {
index = smallestMatch
}
return index
sort.Sort(uint64Slice(behind))
return behind
}

var _ sort.Interface = uint64Slice(nil)

// uint64Slice implements sort.Interface
type uint64Slice []uint64

// Len implements sort.Interface
func (a uint64Slice) Len() int { return len(a) }

// Swap implements sort.Interface
func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

// Less implements sort.Interface
func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }
83 changes: 56 additions & 27 deletions storage/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,63 +18,92 @@ package storage

import (
"fmt"
"reflect"
"testing"

"github.com/cockroachdb/cockroach/internal/client"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/leaktest"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/pkg/errors"
)

func TestGetQuorumMatchedIndex(t *testing.T) {
// TestGetBehindIndexes verifies that the correc
func TestGetBehindIndexes(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
commit uint64
progress []uint64
padding uint64
expected uint64
commit uint64
expected []uint64
}{
// Basic cases.
{1, []uint64{1}, 0, 1},
{1, []uint64{1, 2}, 0, 1},
{2, []uint64{1, 2, 3}, 0, 2},
{2, []uint64{1, 2, 3, 4}, 0, 2},
{3, []uint64{1, 2, 3, 4, 5}, 0, 3},
// Sorting.
{3, []uint64{5, 4, 3, 2, 1}, 0, 3},
// Padding.
{3, []uint64{1, 3, 3}, 1, 2},
{3, []uint64{1, 3, 3}, 2, 1},
{3, []uint64{1, 3, 3}, 3, 1},
// Minimum progress value limits padding.
{3, []uint64{2, 3, 3}, 3, 2},
{[]uint64{1}, 1, nil},
{[]uint64{1, 2}, 2, []uint64{1}},
{[]uint64{2, 3, 4}, 4, []uint64{2, 3}},
{[]uint64{1, 2, 3, 4, 5}, 3, []uint64{1, 2}},
// sorting.
{[]uint64{5, 4, 3, 2, 1}, 3, []uint64{1, 2}},
}
for i, c := range testCases {
status := &raft.Status{
Progress: make(map[uint64]raft.Progress),
}
status.Commit = c.commit
for j, v := range c.progress {
status.Progress[uint64(j)] = raft.Progress{Match: v}
}
out := getBehindIndexes(status)
if !reflect.DeepEqual(c.expected, out) {
t.Errorf("%d: getBehindIndexes(...) expected %d, but got %d", i, c.expected, out)
}
}
}

// TestComputeTruncatableIndex verifies that the correc
func TestComputeTruncatableIndex(t *testing.T) {
defer leaktest.AfterTest(t)()

const targetSize = 1000

testCases := []struct {
progress []uint64
commit uint64
raftLogSize int64
firstIndex uint64
expected uint64
}{
{[]uint64{1, 2}, 1, 100, 1, 1},
{[]uint64{1, 2, 3, 4}, 3, 100, 1, 1},
{[]uint64{1, 2, 3, 4}, 3, 100, 2, 2},
// If over targetSize, should truncate to next behind replica, or quorum
// committed index.
{[]uint64{1, 2, 3, 4}, 3, 2000, 1, 2},
{[]uint64{1, 2, 3, 4}, 3, 2000, 2, 3},
{[]uint64{1, 2, 3, 4}, 3, 2000, 3, 3},
// Never truncate past raftStatus.Commit.
{[]uint64{4, 5, 6}, 3, 100, 4, 3},
}
for i, c := range testCases {
status := &raft.Status{
HardState: raftpb.HardState{
Commit: c.commit,
},
Progress: make(map[uint64]raft.Progress),
}
status.Commit = c.commit
for j, v := range c.progress {
status.Progress[uint64(j)] = raft.Progress{Match: v}
}
quorumMatchedIndex := getQuorumMatchedIndex(status, c.padding)
if c.expected != quorumMatchedIndex {
t.Fatalf("%d: expected %d, but got %d", i, c.expected, quorumMatchedIndex)
out := computeTruncatableIndex(status, c.raftLogSize, targetSize, c.firstIndex)
if !reflect.DeepEqual(c.expected, out) {
t.Errorf("%d: computeTruncatableIndex(...) expected %d, but got %d", i, c.expected, out)
}
}
}

// TestGetTruncatableIndexes verifies that the correctly returns when there are
// indexes to be truncated.
// TestGetTruncatableIndexes verifies that old raft log entries are correctly
// removed.
func TestGetTruncatableIndexes(t *testing.T) {
defer leaktest.AfterTest(t)()
t.Skip("TODO(bram): #7056")
store, _, stopper := createTestStore(t)
defer stopper.Stop()
if _, err := store.GetReplica(0); err == nil {
Expand Down
Loading

0 comments on commit 3ec0c8b

Please sign in to comment.