Skip to content

Commit

Permalink
Merge pull request cockroachdb#2544 from xiang90/raft-inflight
Browse files Browse the repository at this point in the history
raft: add flow control for progress
  • Loading branch information
xiang90 committed Mar 21, 2015
2 parents 42ecf74 + 262bc21 commit cb8c4e1
Show file tree
Hide file tree
Showing 5 changed files with 434 additions and 15 deletions.
80 changes: 79 additions & 1 deletion progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,22 @@ type Progress struct {
// this Progress will be paused. raft will not resend snapshot until the pending one
// is reported to be failed.
PendingSnapshot uint64

// inflights is a sliding window for the inflight messages.
// When inflights is full, no more message should be sent.
// When sends out a message, the index of the last entry should
// be add to inflights. The index MUST be added into inflights
// in order.
// When receives a reply, the previous inflights should be freed
// by calling inflights.freeTo.
ins *inflights
}

func (pr *Progress) resetState(state ProgressStateType) {
pr.Paused = false
pr.PendingSnapshot = 0
pr.State = state
pr.ins.reset()
}

func (pr *Progress) becomeProbe() {
Expand Down Expand Up @@ -135,7 +145,16 @@ func (pr *Progress) resume() { pr.Paused = false }

// isPaused returns whether progress stops sending message.
func (pr *Progress) isPaused() bool {
return pr.State == ProgressStateProbe && pr.Paused || pr.State == ProgressStateSnapshot
switch pr.State {
case ProgressStateProbe:
return pr.Paused
case ProgressStateReplicate:
return pr.ins.full()
case ProgressStateSnapshot:
return true
default:
panic("unexpected state")
}
}

func (pr *Progress) snapshotFailure() { pr.PendingSnapshot = 0 }
Expand All @@ -149,3 +168,62 @@ func (pr *Progress) maybeSnapshotAbort() bool {
func (pr *Progress) String() string {
return fmt.Sprintf("next = %d, match = %d, state = %s, waiting = %v, pendingSnapshot = %d", pr.Next, pr.Match, pr.State, pr.isPaused(), pr.PendingSnapshot)
}

type inflights struct {
// the starting index in the buffer
start int
// number of inflights in the buffer
count int

// the size of the buffer
size int
buffer []uint64
}

func newInflights(size int) *inflights {
return &inflights{
size: size,
buffer: make([]uint64, size),
}
}

// add adds an inflight into inflights
func (in *inflights) add(inflight uint64) {
if in.full() {
panic("cannot add into a full inflights")
}
next := in.start + in.count
if next >= in.size {
next -= in.size
}
in.buffer[next] = inflight
in.count++
}

// freeTo frees the inflights smaller or equal to the given `to` flight.
func (in *inflights) freeTo(to uint64) {
for i := in.start; i < in.start+in.count; i++ {
idx := i
if i >= in.size {
idx -= in.size
}
if to < in.buffer[idx] {
in.count -= i - in.start
in.start = idx
break
}
}
}

func (in *inflights) freeFirstOne() { in.freeTo(in.buffer[in.start]) }

// full returns true if the inflights is full.
func (in *inflights) full() bool {
return in.count == in.size
}

// resets frees all inflights.
func (in *inflights) reset() {
in.count = 0
in.start = 0
}
175 changes: 175 additions & 0 deletions progress_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package raft

import (
"reflect"
"testing"
)

func TestInflightsAdd(t *testing.T) {
// no rotating case
in := &inflights{
size: 10,
buffer: make([]uint64, 10),
}

for i := 0; i < 5; i++ {
in.add(uint64(i))
}

wantIn := &inflights{
start: 0,
count: 5,
size: 10,
// ↓------------
buffer: []uint64{0, 1, 2, 3, 4, 0, 0, 0, 0, 0},
}

if !reflect.DeepEqual(in, wantIn) {
t.Fatalf("in = %+v, want %+v", in, wantIn)
}

for i := 5; i < 10; i++ {
in.add(uint64(i))
}

wantIn2 := &inflights{
start: 0,
count: 10,
size: 10,
// ↓---------------------------
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
}

if !reflect.DeepEqual(in, wantIn2) {
t.Fatalf("in = %+v, want %+v", in, wantIn2)
}

// rotating case
in2 := &inflights{
start: 5,
size: 10,
buffer: make([]uint64, 10),
}

for i := 0; i < 5; i++ {
in2.add(uint64(i))
}

wantIn21 := &inflights{
start: 5,
count: 5,
size: 10,
// ↓------------
buffer: []uint64{0, 0, 0, 0, 0, 0, 1, 2, 3, 4},
}

if !reflect.DeepEqual(in2, wantIn21) {
t.Fatalf("in = %+v, want %+v", in2, wantIn21)
}

for i := 5; i < 10; i++ {
in2.add(uint64(i))
}

wantIn22 := &inflights{
start: 5,
count: 10,
size: 10,
// -------------- ↓------------
buffer: []uint64{5, 6, 7, 8, 9, 0, 1, 2, 3, 4},
}

if !reflect.DeepEqual(in2, wantIn22) {
t.Fatalf("in = %+v, want %+v", in2, wantIn22)
}
}

func TestInflightFreeTo(t *testing.T) {
// no rotating case
in := newInflights(10)
for i := 0; i < 10; i++ {
in.add(uint64(i))
}

in.freeTo(4)

wantIn := &inflights{
start: 5,
count: 5,
size: 10,
// ↓------------
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
}

if !reflect.DeepEqual(in, wantIn) {
t.Fatalf("in = %+v, want %+v", in, wantIn)
}

in.freeTo(8)

wantIn2 := &inflights{
start: 9,
count: 1,
size: 10,
// ↓
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
}

if !reflect.DeepEqual(in, wantIn2) {
t.Fatalf("in = %+v, want %+v", in, wantIn2)
}

// rotating case
for i := 10; i < 15; i++ {
in.add(uint64(i))
}

in.freeTo(12)

wantIn3 := &inflights{
start: 3,
count: 2,
size: 10,
// ↓-----
buffer: []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9},
}

if !reflect.DeepEqual(in, wantIn3) {
t.Fatalf("in = %+v, want %+v", in, wantIn3)
}
}

func TestInflightFreeFirstOne(t *testing.T) {
in := newInflights(10)
for i := 0; i < 10; i++ {
in.add(uint64(i))
}

in.freeFirstOne()

wantIn := &inflights{
start: 1,
count: 9,
size: 10,
// ↓------------------------
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
}

if !reflect.DeepEqual(in, wantIn) {
t.Fatalf("in = %+v, want %+v", in, wantIn)
}
}
22 changes: 16 additions & 6 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ type raft struct {
// the log
raftLog *raftLog

maxMsgSize uint64
prs map[uint64]*Progress
maxInflight int
maxMsgSize uint64
prs map[uint64]*Progress

state StateType

Expand Down Expand Up @@ -109,13 +110,14 @@ func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage
// TODO(xiang): add a config arguement into newRaft after we add
// the max inflight message field.
maxMsgSize: 4 * 1024 * 1024,
maxInflight: 256,
prs: make(map[uint64]*Progress),
electionTimeout: election,
heartbeatTimeout: heartbeat,
}
r.rand = rand.New(rand.NewSource(int64(id)))
for _, p := range peers {
r.prs[p] = &Progress{Next: 1}
r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)}
}
if !isHardStateEqual(hs, emptyState) {
r.loadState(hs)
Expand Down Expand Up @@ -195,7 +197,9 @@ func (r *raft) sendAppend(to uint64) {
switch pr.State {
// optimistically increase the next when in ProgressStateReplicate
case ProgressStateReplicate:
pr.optimisticUpdate(m.Entries[n-1].Index)
last := m.Entries[n-1].Index
pr.optimisticUpdate(last)
pr.ins.add(last)
case ProgressStateProbe:
pr.pause()
default:
Expand Down Expand Up @@ -265,7 +269,7 @@ func (r *raft) reset(term uint64) {
r.elapsed = 0
r.votes = make(map[uint64]bool)
for i := range r.prs {
r.prs[i] = &Progress{Next: r.raftLog.lastIndex() + 1}
r.prs[i] = &Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight)}
if i == r.id {
r.prs[i].Match = r.raftLog.lastIndex()
}
Expand Down Expand Up @@ -456,6 +460,8 @@ func stepLeader(r *raft, m pb.Message) {
case pr.State == ProgressStateSnapshot && pr.maybeSnapshotAbort():
raftLogger.Infof("raft: %x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
pr.becomeProbe()
case pr.State == ProgressStateReplicate:
pr.ins.freeTo(m.Index)
}

if r.maybeCommit() {
Expand All @@ -468,6 +474,10 @@ func stepLeader(r *raft, m pb.Message) {
}
}
case pb.MsgHeartbeatResp:
// free one slot for the full inflights window to allow progress.
if pr.State == ProgressStateReplicate && pr.ins.full() {
pr.ins.freeFirstOne()
}
if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From)
}
Expand Down Expand Up @@ -661,7 +671,7 @@ func (r *raft) removeNode(id uint64) {
func (r *raft) resetPendingConf() { r.pendingConf = false }

func (r *raft) setProgress(id, match, next uint64) {
r.prs[id] = &Progress{Next: next, Match: match}
r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)}
}

func (r *raft) delProgress(id uint64) {
Expand Down
Loading

0 comments on commit cb8c4e1

Please sign in to comment.