Skip to content

Commit

Permalink
Merge pull request #6275 from xiang90/raft_l
Browse files Browse the repository at this point in the history
raft: support safe readonly request
  • Loading branch information
xiang90 authored Sep 13, 2016
2 parents 8c492c7 + 710b14c commit cfe717e
Show file tree
Hide file tree
Showing 6 changed files with 297 additions and 68 deletions.
8 changes: 7 additions & 1 deletion raft/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ This raft implementation is a full feature implementation of Raft protocol. Feat
- Log compaction
- Membership changes
- Leadership transfer extension
- Lease-based linearizable read-only queries served by both the leader and followers
- Efficient linearizable read-only queries served by both the leader and followers
- leader checks with quorum and bypasses Raft log before processing read-only queries
- followers asks leader to get a safe read index before processing read-only queries
- More efficient lease-based linearizable read-only queries served by both the leader and followers
- leader bypasses Raft log and processing read-only queries locally
- followers asks leader to get a safe read index before processing read-only queries
- this approach relies on the clock of the all the machines in raft group

This raft implementation also includes a few optional enhancements:

Expand Down
22 changes: 6 additions & 16 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ type Ready struct {
// HardState will be equal to empty state if there is no update.
pb.HardState

// ReadState can be used for node to serve linearizable read requests locally
// ReadStates can be used for node to serve linearizable read requests locally
// when its applied index is greater than the index in ReadState.
// Note that the readState will be returned when raft receives msgReadIndex.
// The returned is only valid for the request that requested to read.
ReadState
ReadStates []ReadState

// Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent.
Expand Down Expand Up @@ -102,7 +102,7 @@ func IsEmptySnap(sp pb.Snapshot) bool {
func (rd Ready) containsUpdates() bool {
return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) ||
!IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 ||
len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || rd.Index != None
len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0
}

// Node represents a node in a raft cluster.
Expand Down Expand Up @@ -151,11 +151,6 @@ type Node interface {
// Read state has a read index. Once the application advances further than the read
// index, any linearizable read requests issued before the read request can be
// processed safely. The read state will have the same rctx attached.
//
// Note: the current implementation depends on the leader lease. If the clock drift is unbounded,
// leader might keep the lease longer than it should (clock can move backward/pause without any bound).
// ReadIndex is not safe in that case.
// TODO: add clock drift bound into raft configuration.
ReadIndex(ctx context.Context, rctx []byte) error

// Status returns the current status of the raft state machine.
Expand Down Expand Up @@ -370,8 +365,7 @@ func (n *node) run(r *raft) {
}

r.msgs = nil
r.readState.Index = None
r.readState.RequestCtx = nil
r.readStates = nil
advancec = n.advancec
case <-advancec:
if prevHardSt.Commit != 0 {
Expand Down Expand Up @@ -516,12 +510,8 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
if r.raftLog.unstable.snapshot != nil {
rd.Snapshot = *r.raftLog.unstable.snapshot
}
if r.readState.Index != None {
c := make([]byte, len(r.readState.RequestCtx))
copy(c, r.readState.RequestCtx)

rd.Index = r.readState.Index
rd.RequestCtx = c
if len(r.readStates) != 0 {
rd.ReadStates = r.readStates
}
return rd
}
17 changes: 6 additions & 11 deletions raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,24 +150,19 @@ func TestNodeReadIndex(t *testing.T) {
appendStep := func(r *raft, m raftpb.Message) {
msgs = append(msgs, m)
}
wreadIndex := uint64(1)
wrequestCtx := []byte("somedata")
wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}

n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
r.readState.Index = wreadIndex
r.readState.RequestCtx = wrequestCtx
r.readStates = wrs

go n.run(r)
n.Campaign(context.TODO())
for {
rd := <-n.Ready()
if rd.Index != wreadIndex {
t.Errorf("ReadIndex = %d, want %d", rd.Index, wreadIndex)
}

if !bytes.Equal(rd.RequestCtx, wrequestCtx) {
t.Errorf("RequestCtx = %v, want %v", rd.RequestCtx, wrequestCtx)
if !reflect.DeepEqual(rd.ReadStates, wrs) {
t.Errorf("ReadStates = %v, want %v", rd.ReadStates, wrs)
}

s.Append(rd.Entries)
Expand All @@ -180,7 +175,7 @@ func TestNodeReadIndex(t *testing.T) {
}

r.step = appendStep
wrequestCtx = []byte("somedata2")
wrequestCtx := []byte("somedata2")
n.ReadIndex(context.TODO(), wrequestCtx)
n.Stop()

Expand Down
118 changes: 89 additions & 29 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ const (
StateLeader
)

type ReadOnlyOption int

const (
// ReadOnlySafe guarantees the linearizability of the read only request by
// communicating with the quorum. It is the default and suggested option.
ReadOnlySafe ReadOnlyOption = iota
// ReadOnlyLeaseBased ensures linearizability of the read only request by
// relying on the leader lease. It can be affected by clock drift.
// If the clock drift is unbounded, leader might keep the lease longer than it
// should (clock can move backward/pause without any bound). ReadIndex is not safe
// in that case.
ReadOnlyLeaseBased
)

// Possible values for CampaignType
const (
// campaignElection represents the type of normal election
Expand Down Expand Up @@ -135,6 +149,18 @@ type Config struct {
// steps down when quorum is not active for an electionTimeout.
CheckQuorum bool

// ReadOnlyOption specifies how the read only request is processed.
//
// ReadOnlySafe guarantees the linearizability of the read only request by
// communicating with the quorum. It is the default and suggested option.
//
// ReadOnlyLeaseBased ensures linearizability of the read only request by
// relying on the leader lease. It can be affected by clock drift.
// If the clock drift is unbounded, leader might keep the lease longer than it
// should (clock can move backward/pause without any bound). ReadIndex is not safe
// in that case.
ReadOnlyOption ReadOnlyOption

// Logger is the logger used for raft log. For multinode which can host
// multiple raft group, each raft group can have its own logger
Logger Logger
Expand Down Expand Up @@ -168,23 +194,13 @@ func (c *Config) validate() error {
return nil
}

// ReadState provides state for read only query.
// It's caller's responsibility to send MsgReadIndex first before getting
// this state from ready, It's also caller's duty to differentiate if this
// state is what it requests through RequestCtx, eg. given a unique id as
// RequestCtx
type ReadState struct {
Index uint64
RequestCtx []byte
}

type raft struct {
id uint64

Term uint64
Vote uint64

readState ReadState
readStates []ReadState

// the log
raftLog *raftLog
Expand All @@ -207,6 +223,8 @@ type raft struct {
// New configuration is ignored if there exists unapplied configuration.
pendingConf bool

readOnly *readOnly

// number of ticks since it reached last electionTimeout when it is leader
// or candidate.
// number of ticks since it reached last electionTimeout or received a
Expand Down Expand Up @@ -254,7 +272,6 @@ func newRaft(c *Config) *raft {
r := &raft{
id: c.ID,
lead: None,
readState: ReadState{Index: None, RequestCtx: nil},
raftLog: raftlog,
maxMsgSize: c.MaxSizePerMsg,
maxInflight: c.MaxInflightMsgs,
Expand All @@ -263,6 +280,7 @@ func newRaft(c *Config) *raft {
heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger,
checkQuorum: c.CheckQuorum,
readOnly: newReadOnly(c.ReadOnlyOption),
}
for _, p := range peers {
r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)}
Expand Down Expand Up @@ -380,7 +398,7 @@ func (r *raft) sendAppend(to uint64) {
}

// sendHeartbeat sends an empty MsgApp
func (r *raft) sendHeartbeat(to uint64) {
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
// Attach the commit as min(to.matched, r.committed).
// When the leader sends out heartbeat message,
// the receiver(follower) might not be matched with the leader
Expand All @@ -389,10 +407,12 @@ func (r *raft) sendHeartbeat(to uint64) {
// an unmatched index.
commit := min(r.prs[to].Match, r.raftLog.committed)
m := pb.Message{
To: to,
Type: pb.MsgHeartbeat,
Commit: commit,
To: to,
Type: pb.MsgHeartbeat,
Commit: commit,
Context: ctx,
}

r.send(m)
}

Expand All @@ -409,11 +429,20 @@ func (r *raft) bcastAppend() {

// bcastHeartbeat sends RPC, without entries to all the peers.
func (r *raft) bcastHeartbeat() {
lastCtx := r.readOnly.lastPendingRequestCtx()
if len(lastCtx) == 0 {
r.bcastHeartbeatWithCtx(nil)
} else {
r.bcastHeartbeatWithCtx([]byte(lastCtx))
}
}

func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
for id := range r.prs {
if id == r.id {
continue
}
r.sendHeartbeat(id)
r.sendHeartbeat(id, ctx)
r.prs[id].resume()
}
}
Expand Down Expand Up @@ -453,6 +482,7 @@ func (r *raft) reset(term uint64) {
}
}
r.pendingConf = false
r.readOnly = newReadOnly(r.readOnly.option)
}

func (r *raft) appendEntry(es ...pb.Entry) {
Expand Down Expand Up @@ -699,16 +729,29 @@ func stepLeader(r *raft, m pb.Message) {
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
return
case pb.MsgReadIndex:
ri := None
if r.checkQuorum {
ri = r.raftLog.committed
}
if m.From == None || m.From == r.id { // from local member
r.readState.Index = ri
r.readState.RequestCtx = m.Entries[0].Data
if r.quorum() > 1 {
// thinking: use an interally defined context instead of the user given context.
// We can express this in terms of the term and index instead of a user-supplied value.
// This would allow multiple reads to piggyback on the same message.
switch r.readOnly.option {
case ReadOnlySafe:
r.readOnly.addRequest(r.raftLog.committed, m)
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
case ReadOnlyLeaseBased:
var ri uint64
if r.checkQuorum {
ri = r.raftLog.committed
}
if m.From == None || m.From == r.id { // from local member
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
} else {
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
}
}
} else {
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
}

return
}

Expand Down Expand Up @@ -769,6 +812,25 @@ func stepLeader(r *raft, m pb.Message) {
if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From)
}

if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
return
}

ackCount := r.readOnly.recvAck(m)
if ackCount < r.quorum() {
return
}

rss := r.readOnly.advance(m)
for _, rs := range rss {
req := rs.req
if req.From == None || req.From == r.id { // from local member
r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
} else {
r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
}
}
case pb.MsgSnapStatus:
if pr.State != ProgressStateSnapshot {
return
Expand Down Expand Up @@ -910,9 +972,7 @@ func stepFollower(r *raft, m pb.Message) {
r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
return
}

r.readState.Index = m.Index
r.readState.RequestCtx = m.Entries[0].Data
r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
}
}

Expand All @@ -933,7 +993,7 @@ func (r *raft) handleAppendEntries(m pb.Message) {

func (r *raft) handleHeartbeat(m pb.Message) {
r.raftLog.commitTo(m.Commit)
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp})
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}

func (r *raft) handleSnapshot(m pb.Message) {
Expand Down
Loading

0 comments on commit cfe717e

Please sign in to comment.