Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[DO NOT MERGE] storage: queue replicas for raft log truncation after …
Browse files Browse the repository at this point in the history
…write ops

Fixes cockroachdb#6012.
petermattis committed Jun 10, 2016

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 22d3b97 commit 4cba101
Showing 6 changed files with 31 additions and 9 deletions.
5 changes: 5 additions & 0 deletions storage/client_metrics_test.go
Original file line number Diff line number Diff line change
@@ -150,6 +150,11 @@ func TestStoreMetrics(t *testing.T) {
t.Fatal(err)
}

// Disable the raft log truncation which confuses this test.
for _, s := range mtc.stores {
s.DisableRaftLogQueue(true)
}

// Perform a split, which has special metrics handling.
splitArgs := adminSplitArgs(roachpb.KeyMin, roachpb.Key("m"))
if _, err := client.SendWrapped(rg1(mtc.stores[0]), nil, &splitArgs); err != nil {
5 changes: 5 additions & 0 deletions storage/queue.go
Original file line number Diff line number Diff line change
@@ -258,6 +258,11 @@ func (bq *baseQueue) Add(repl *Replica, priority float64) error {
// not be added, as the replica with the lowest priority will be
// dropped.
func (bq *baseQueue) MaybeAdd(repl *Replica, now roachpb.Timestamp) {
if bq.gossip == nil {
// The queue is disabled due to being part of the bootstrap store.
return
}

// Load the system config.
cfg, ok := bq.gossip.GetSystemConfig()
if !ok {
13 changes: 7 additions & 6 deletions storage/raft_log_queue.go
Original file line number Diff line number Diff line change
@@ -35,13 +35,14 @@ const (
// raftLogPadding is the number of log entries that should be kept for
// lagging replicas.
raftLogPadding = 10000
// RaftLogQueueTimerDuration is the duration between checking the
// raft logs.
RaftLogQueueTimerDuration = time.Second
// RaftLogQueueTimerDuration is the duration between truncations. This needs
// to be relatively short so that truncations can keep up with raft log entry
// creation.
RaftLogQueueTimerDuration = 50 * time.Millisecond
// RaftLogQueueStaleThreshold is the minimum threshold for stale raft log
// entries. A stale entry is one which all replicas of the range have
// progressed past and thus is no longer needed and can be pruned.
RaftLogQueueStaleThreshold = 1
RaftLogQueueStaleThreshold = 100
)

// raftLogQueue manages a queue of replicas slated to have their raft logs
@@ -115,7 +116,7 @@ func (*raftLogQueue) shouldQueue(
return false, 0
}

return truncatableIndexes > RaftLogQueueStaleThreshold, float64(truncatableIndexes)
return truncatableIndexes >= RaftLogQueueStaleThreshold, float64(truncatableIndexes)
}

// process truncates the raft log of the range if the replica is the raft
@@ -128,7 +129,7 @@ func (rlq *raftLogQueue) process(now roachpb.Timestamp, r *Replica, _ config.Sys
}

// Can and should the raft logs be truncated?
if truncatableIndexes > RaftLogQueueStaleThreshold {
if truncatableIndexes >= RaftLogQueueStaleThreshold {
if log.V(1) {
log.Infof("truncating the raft log of range %d to %d", r.RangeID, oldestIndex)
}
2 changes: 1 addition & 1 deletion storage/raft_log_queue_test.go
Original file line number Diff line number Diff line change
@@ -108,7 +108,7 @@ func TestGetTruncatableIndexes(t *testing.T) {
}

// Write a few keys to the range.
for i := 0; i < 10; i++ {
for i := 0; i < RaftLogQueueStaleThreshold+1; i++ {
key := roachpb.Key(fmt.Sprintf("key%02d", i))
args := putArgs(key, []byte(fmt.Sprintf("value%02d", i)))
if _, err := client.SendWrapped(store.testSender(), nil, &args); err != nil {
13 changes: 11 additions & 2 deletions storage/replica.go
Original file line number Diff line number Diff line change
@@ -1710,10 +1710,10 @@ func (r *Replica) applyRaftCommand(idKey storagebase.CmdIDKey, ctx context.Conte
}

// On successful write commands handle write-related triggers including
// splitting.
// splitting and raft log truncation.
if rErr == nil && ba.IsWrite() {
// If the commit succeeded, potentially add range to split queue.
r.maybeAddToSplitQueue()
r.maybeAddToRaftLogQueue(index)
}

// On the replica on which this command originated, resolve skipped intents
@@ -2345,6 +2345,15 @@ func (r *Replica) maybeAddToSplitQueue() {
}
}

// maybeAddToRaftLogQueue checks whether the raft log is a candidate for
// truncation. If yes, the range is added to the raft log queue.
func (r *Replica) maybeAddToRaftLogQueue(appliedIndex uint64) {
const raftLogCheckFrequency = RaftLogQueueStaleThreshold / 5
if appliedIndex%raftLogCheckFrequency == 0 {
r.store.raftLogQueue.MaybeAdd(r, r.store.Clock().Now())
}
}

// PendingCmdsLen returns the number of pending commands.
func (r *Replica) PendingCmdsLen() int {
r.mu.Lock()
2 changes: 2 additions & 0 deletions storage/replica_test.go
Original file line number Diff line number Diff line change
@@ -863,6 +863,8 @@ func TestReplicaTSCacheLowWaterOnLease(t *testing.T) {
tc.Start(t)
defer tc.Stop()
tc.clock.SetMaxOffset(maxClockOffset)
// Disable raft log truncation which confuses this test.
tc.store.DisableRaftLogQueue(true)

// Modify range descriptor to include a second replica; leader lease can
// only be obtained by Replicas which are part of the range descriptor. This

0 comments on commit 4cba101

Please sign in to comment.