Skip to content

Commit

Permalink
Merge pull request hashicorp#102 from ongardie/inflight2
Browse files Browse the repository at this point in the history
Replace inflight tracker with commitment tracker (retargeted)
  • Loading branch information
slackpad committed Mar 21, 2016
2 parents 90c4021 + 2ec123f commit ee1f21c
Show file tree
Hide file tree
Showing 8 changed files with 391 additions and 402 deletions.
99 changes: 99 additions & 0 deletions commitment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package raft

import (
"sort"
"sync"
)

// Commitment is used to advance the leader's commit index. The leader and
// replication goroutines report in newly written entries with Match(), and
// this notifies on commitCh when the commit index has advanced.
type commitment struct {
// protectes matchIndexes and commitIndex
sync.Mutex
// notified when commitIndex increases
commitCh chan struct{}
// voter to log index: the server stores up through this log entry
matchIndexes map[string]uint64
// a quorum stores up through this log entry. monotonically increases.
commitIndex uint64
// the first index of this leader's term: this needs to be replicated to a
// majority of the cluster before this leader may mark anything committed
// (per Raft's commitment rule)
startIndex uint64
}

// newCommitment returns an commitment struct that notifies the provided
// channel when log entries have been committed. A new commitment struct is
// created each time this server becomes leader for a particular term.
// 'voters' are the voting members of the cluster, including the
// local server except when it's removed itself from the cluster.
// 'startIndex' is the first index created in this term (see
// its description above).
func newCommitment(commitCh chan struct{}, voters []string, startIndex uint64) *commitment {
matchIndexes := make(map[string]uint64, len(voters))
for _, voter := range voters {
matchIndexes[voter] = 0
}
return &commitment{
commitCh: commitCh,
matchIndexes: matchIndexes,
commitIndex: 0,
startIndex: startIndex,
}
}

// Called when a new cluster membership configuration is created: it will be
// used to determine commitment from now on. 'voters' are the voting members of
// the cluster, including the local server except when it's removed itself from
// the cluster.
func (c *commitment) setVoters(voters []string) {
c.Lock()
defer c.Unlock()
oldMatchIndexes := c.matchIndexes
c.matchIndexes = make(map[string]uint64, len(voters))
for _, voter := range voters {
c.matchIndexes[voter] = oldMatchIndexes[voter] // defaults to 0
}
c.recalculate()
}

// Called by leader after commitCh is notified
func (c *commitment) getCommitIndex() uint64 {
c.Lock()
defer c.Unlock()
return c.commitIndex
}

// Match is called once a server completes writing entries to disk: either the
// leader has written the new entry or a follower has replied to an
// AppendEntries RPC. The given server's disk agrees with this server's log up
// through the given index.
func (c *commitment) match(server string, matchIndex uint64) {
c.Lock()
defer c.Unlock()
if prev, hasVote := c.matchIndexes[server]; hasVote && matchIndex > prev {
c.matchIndexes[server] = matchIndex
c.recalculate()
}
}

// Internal helper to calculate new commitIndex from matchIndexes.
// Must be called with lock held.
func (c *commitment) recalculate() {
if len(c.matchIndexes) == 0 {
return
}

matched := make([]uint64, 0, len(c.matchIndexes))
for _, idx := range c.matchIndexes {
matched = append(matched, idx)
}
sort.Sort(uint64Slice(matched))
quorumMatchIndex := matched[(len(matched)-1)/2]

if quorumMatchIndex > c.commitIndex && quorumMatchIndex >= c.startIndex {
c.commitIndex = quorumMatchIndex
asyncNotifyCh(c.commitCh)
}
}
230 changes: 230 additions & 0 deletions commitment_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package raft

import (
"testing"
)

// Returns a slice of server names of size n.
func voters(n int) []string {
if n > 7 {
panic("only up to 7 servers implemented")
}
return []string{"s1", "s2", "s3", "s4", "s5", "s6", "s7"}[:n]
}

// Tests setVoters() keeps matchIndexes where possible.
func TestCommitment_setVoters(t *testing.T) {
commitCh := make(chan struct{}, 1)
c := newCommitment(commitCh, []string{"a", "b", "c"}, 0)
c.match("a", 10)
c.match("b", 20)
c.match("c", 30)
// commitIndex: 20
if !drainNotifyCh(commitCh) {
t.Fatalf("expected commit notify")
}
c.setVoters([]string{"c", "d", "e"})
// c: 30, d: 0, e: 0
c.match("e", 40)
if c.getCommitIndex() != 30 {
t.Fatalf("expected 30 entries committed, found %d",
c.getCommitIndex())
}
if !drainNotifyCh(commitCh) {
t.Fatalf("expected commit notify")
}
}

// Tests match() being called with smaller index than before.
func TestCommitment_match_max(t *testing.T) {
commitCh := make(chan struct{}, 1)
c := newCommitment(commitCh, voters(5), 4)

c.match("s1", 8)
c.match("s2", 8)
c.match("s2", 1)
c.match("s3", 8)

if c.getCommitIndex() != 8 {
t.Fatalf("calling match with an earlier index should be ignored")
}
}

// Tests match() being called with non-voters.
func TestCommitment_match_nonVoting(t *testing.T) {
commitCh := make(chan struct{}, 1)
c := newCommitment(commitCh, voters(5), 4)

c.match("s1", 8)
c.match("s2", 8)
c.match("s3", 8)

if !drainNotifyCh(commitCh) {
t.Fatalf("expected commit notify")
}

c.match("s90", 10)
c.match("s91", 10)
c.match("s92", 10)

if c.getCommitIndex() != 8 {
t.Fatalf("non-voting servers shouldn't be able to commit")
}
if drainNotifyCh(commitCh) {
t.Fatalf("unexpected commit notify")
}
}

// Tests recalculate() algorithm.
func TestCommitment_recalculate(t *testing.T) {
commitCh := make(chan struct{}, 1)
c := newCommitment(commitCh, voters(5), 0)

c.match("s1", 30)
c.match("s2", 20)

if c.getCommitIndex() != 0 {
t.Fatalf("shouldn't commit after two of five servers")
}
if drainNotifyCh(commitCh) {
t.Fatalf("unexpected commit notify")
}

c.match("s3", 10)
if c.getCommitIndex() != 10 {
t.Fatalf("expected 10 entries committed, found %d",
c.getCommitIndex())
}
if !drainNotifyCh(commitCh) {
t.Fatalf("expected commit notify")
}
c.match("s4", 15)
if c.getCommitIndex() != 15 {
t.Fatalf("expected 15 entries committed, found %d",
c.getCommitIndex())
}
if !drainNotifyCh(commitCh) {
t.Fatalf("expected commit notify")
}

c.setVoters(voters(3))
// s1: 30, s2: 20, s3: 10
if c.getCommitIndex() != 20 {
t.Fatalf("expected 20 entries committed, found %d",
c.getCommitIndex())
}
if !drainNotifyCh(commitCh) {
t.Fatalf("expected commit notify")
}

c.setVoters(voters(4))
// s1: 30, s2: 20, s3: 10, s4: 0
c.match("s2", 25)
if c.getCommitIndex() != 20 {
t.Fatalf("expected 20 entries committed, found %d",
c.getCommitIndex())
}
if drainNotifyCh(commitCh) {
t.Fatalf("unexpected commit notify")
}
c.match("s4", 23)
if c.getCommitIndex() != 23 {
t.Fatalf("expected 23 entries committed, found %d",
c.getCommitIndex())
}
if !drainNotifyCh(commitCh) {
t.Fatalf("expected commit notify")
}
}

// Tests recalculate() respecting startIndex.
func TestCommitment_recalculate_startIndex(t *testing.T) {
commitCh := make(chan struct{}, 1)
c := newCommitment(commitCh, voters(5), 4)

c.match("s1", 3)
c.match("s2", 3)
c.match("s3", 3)

if c.getCommitIndex() != 0 {
t.Fatalf("can't commit until startIndex is replicated to a quorum")
}
if drainNotifyCh(commitCh) {
t.Fatalf("unexpected commit notify")
}

c.match("s1", 4)
c.match("s2", 4)
c.match("s3", 4)

if c.getCommitIndex() != 4 {
t.Fatalf("should be able to commit startIndex once replicated to a quorum")
}
if !drainNotifyCh(commitCh) {
t.Fatalf("expected commit notify")
}
}

// With no voting members in the cluster, the most sane behavior is probably
// to not mark anything committed.
func TestCommitment_noVoterSanity(t *testing.T) {
commitCh := make(chan struct{}, 1)
c := newCommitment(commitCh, []string{}, 4)
c.match("s1", 10)
c.setVoters([]string{})
c.match("s1", 10)
if c.getCommitIndex() != 0 {
t.Fatalf("no voting servers: shouldn't be able to commit")
}
if drainNotifyCh(commitCh) {
t.Fatalf("unexpected commit notify")
}

// add a voter so we can commit something and then remove it
c.setVoters(voters(1))
c.match("s1", 10)
if c.getCommitIndex() != 10 {
t.Fatalf("expected 10 entries committed, found %d",
c.getCommitIndex())
}
if !drainNotifyCh(commitCh) {
t.Fatalf("expected commit notify")
}

c.setVoters([]string{})
c.match("s1", 20)
if c.getCommitIndex() != 10 {
t.Fatalf("expected 10 entries committed, found %d",
c.getCommitIndex())
}
if drainNotifyCh(commitCh) {
t.Fatalf("unexpected commit notify")
}

}

// Single voter commits immediately.
func TestCommitment_singleVoter(t *testing.T) {
commitCh := make(chan struct{}, 1)
c := newCommitment(commitCh, voters(1), 4)
c.match("s1", 10)
if c.getCommitIndex() != 10 {
t.Fatalf("expected 10 entries committed, found %d",
c.getCommitIndex())
}
if !drainNotifyCh(commitCh) {
t.Fatalf("expected commit notify")
}
c.setVoters(voters(1))
if drainNotifyCh(commitCh) {
t.Fatalf("unexpected commit notify")
}
c.match("s1", 12)
if c.getCommitIndex() != 12 {
t.Fatalf("expected 12 entries committed, found %d",
c.getCommitIndex())
}
if !drainNotifyCh(commitCh) {
t.Fatalf("expected commit notify")
}
}
1 change: 0 additions & 1 deletion future.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func (d *deferError) respond(err error) {
type logFuture struct {
deferError
log Log
policy quorumPolicy
response interface{}
dispatch time.Time
}
Expand Down
Loading

0 comments on commit ee1f21c

Please sign in to comment.