diff --git a/raft/README.md b/raft/README.md index 5d1b72d5b28..bcb98b0de2b 100644 --- a/raft/README.md +++ b/raft/README.md @@ -26,7 +26,13 @@ This raft implementation is a full feature implementation of Raft protocol. Feat - Log compaction - Membership changes - Leadership transfer extension -- Lease-based linearizable read-only queries served by both the leader and followers +- Efficient linearizable read-only queries served by both the leader and followers + - leader checks with quorum and bypasses Raft log before processing read-only queries + - followers asks leader to get a safe read index before processing read-only queries +- More efficient lease-based linearizable read-only queries served by both the leader and followers + - leader bypasses Raft log and processing read-only queries locally + - followers asks leader to get a safe read index before processing read-only queries + - this approach relies on the clock of the all the machines in raft group This raft implementation also includes a few optional enhancements: diff --git a/raft/node.go b/raft/node.go index 800fb073744..5fce58457d0 100644 --- a/raft/node.go +++ b/raft/node.go @@ -60,11 +60,11 @@ type Ready struct { // HardState will be equal to empty state if there is no update. pb.HardState - // ReadState can be used for node to serve linearizable read requests locally + // ReadStates can be used for node to serve linearizable read requests locally // when its applied index is greater than the index in ReadState. // Note that the readState will be returned when raft receives msgReadIndex. // The returned is only valid for the request that requested to read. - ReadState + ReadStates []ReadState // Entries specifies entries to be saved to stable storage BEFORE // Messages are sent. @@ -102,7 +102,7 @@ func IsEmptySnap(sp pb.Snapshot) bool { func (rd Ready) containsUpdates() bool { return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) || !IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 || - len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || rd.Index != None + len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0 } // Node represents a node in a raft cluster. @@ -151,11 +151,6 @@ type Node interface { // Read state has a read index. Once the application advances further than the read // index, any linearizable read requests issued before the read request can be // processed safely. The read state will have the same rctx attached. - // - // Note: the current implementation depends on the leader lease. If the clock drift is unbounded, - // leader might keep the lease longer than it should (clock can move backward/pause without any bound). - // ReadIndex is not safe in that case. - // TODO: add clock drift bound into raft configuration. ReadIndex(ctx context.Context, rctx []byte) error // Status returns the current status of the raft state machine. @@ -370,8 +365,7 @@ func (n *node) run(r *raft) { } r.msgs = nil - r.readState.Index = None - r.readState.RequestCtx = nil + r.readStates = nil advancec = n.advancec case <-advancec: if prevHardSt.Commit != 0 { @@ -516,12 +510,8 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { if r.raftLog.unstable.snapshot != nil { rd.Snapshot = *r.raftLog.unstable.snapshot } - if r.readState.Index != None { - c := make([]byte, len(r.readState.RequestCtx)) - copy(c, r.readState.RequestCtx) - - rd.Index = r.readState.Index - rd.RequestCtx = c + if len(r.readStates) != 0 { + rd.ReadStates = r.readStates } return rd } diff --git a/raft/node_test.go b/raft/node_test.go index 3615e1dc731..ba381cc4a98 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -150,24 +150,19 @@ func TestNodeReadIndex(t *testing.T) { appendStep := func(r *raft, m raftpb.Message) { msgs = append(msgs, m) } - wreadIndex := uint64(1) - wrequestCtx := []byte("somedata") + wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}} n := newNode() s := NewMemoryStorage() r := newTestRaft(1, []uint64{1}, 10, 1, s) - r.readState.Index = wreadIndex - r.readState.RequestCtx = wrequestCtx + r.readStates = wrs + go n.run(r) n.Campaign(context.TODO()) for { rd := <-n.Ready() - if rd.Index != wreadIndex { - t.Errorf("ReadIndex = %d, want %d", rd.Index, wreadIndex) - } - - if !bytes.Equal(rd.RequestCtx, wrequestCtx) { - t.Errorf("RequestCtx = %v, want %v", rd.RequestCtx, wrequestCtx) + if !reflect.DeepEqual(rd.ReadStates, wrs) { + t.Errorf("ReadStates = %v, want %v", rd.ReadStates, wrs) } s.Append(rd.Entries) @@ -180,7 +175,7 @@ func TestNodeReadIndex(t *testing.T) { } r.step = appendStep - wrequestCtx = []byte("somedata2") + wrequestCtx := []byte("somedata2") n.ReadIndex(context.TODO(), wrequestCtx) n.Stop() diff --git a/raft/raft.go b/raft/raft.go index f236281e7cd..38c7904d7d0 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -39,6 +39,20 @@ const ( StateLeader ) +type ReadOnlyOption int + +const ( + // ReadOnlySafe guarantees the linearizability of the read only request by + // communicating with the quorum. It is the default and suggested option. + ReadOnlySafe ReadOnlyOption = iota + // ReadOnlyLeaseBased ensures linearizability of the read only request by + // relying on the leader lease. It can be affected by clock drift. + // If the clock drift is unbounded, leader might keep the lease longer than it + // should (clock can move backward/pause without any bound). ReadIndex is not safe + // in that case. + ReadOnlyLeaseBased +) + // Possible values for CampaignType const ( // campaignElection represents the type of normal election @@ -135,6 +149,18 @@ type Config struct { // steps down when quorum is not active for an electionTimeout. CheckQuorum bool + // ReadOnlyOption specifies how the read only request is processed. + // + // ReadOnlySafe guarantees the linearizability of the read only request by + // communicating with the quorum. It is the default and suggested option. + // + // ReadOnlyLeaseBased ensures linearizability of the read only request by + // relying on the leader lease. It can be affected by clock drift. + // If the clock drift is unbounded, leader might keep the lease longer than it + // should (clock can move backward/pause without any bound). ReadIndex is not safe + // in that case. + ReadOnlyOption ReadOnlyOption + // Logger is the logger used for raft log. For multinode which can host // multiple raft group, each raft group can have its own logger Logger Logger @@ -168,23 +194,13 @@ func (c *Config) validate() error { return nil } -// ReadState provides state for read only query. -// It's caller's responsibility to send MsgReadIndex first before getting -// this state from ready, It's also caller's duty to differentiate if this -// state is what it requests through RequestCtx, eg. given a unique id as -// RequestCtx -type ReadState struct { - Index uint64 - RequestCtx []byte -} - type raft struct { id uint64 Term uint64 Vote uint64 - readState ReadState + readStates []ReadState // the log raftLog *raftLog @@ -207,6 +223,8 @@ type raft struct { // New configuration is ignored if there exists unapplied configuration. pendingConf bool + readOnly *readOnly + // number of ticks since it reached last electionTimeout when it is leader // or candidate. // number of ticks since it reached last electionTimeout or received a @@ -254,7 +272,6 @@ func newRaft(c *Config) *raft { r := &raft{ id: c.ID, lead: None, - readState: ReadState{Index: None, RequestCtx: nil}, raftLog: raftlog, maxMsgSize: c.MaxSizePerMsg, maxInflight: c.MaxInflightMsgs, @@ -263,6 +280,7 @@ func newRaft(c *Config) *raft { heartbeatTimeout: c.HeartbeatTick, logger: c.Logger, checkQuorum: c.CheckQuorum, + readOnly: newReadOnly(c.ReadOnlyOption), } for _, p := range peers { r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)} @@ -380,7 +398,7 @@ func (r *raft) sendAppend(to uint64) { } // sendHeartbeat sends an empty MsgApp -func (r *raft) sendHeartbeat(to uint64) { +func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // Attach the commit as min(to.matched, r.committed). // When the leader sends out heartbeat message, // the receiver(follower) might not be matched with the leader @@ -389,10 +407,12 @@ func (r *raft) sendHeartbeat(to uint64) { // an unmatched index. commit := min(r.prs[to].Match, r.raftLog.committed) m := pb.Message{ - To: to, - Type: pb.MsgHeartbeat, - Commit: commit, + To: to, + Type: pb.MsgHeartbeat, + Commit: commit, + Context: ctx, } + r.send(m) } @@ -409,11 +429,20 @@ func (r *raft) bcastAppend() { // bcastHeartbeat sends RPC, without entries to all the peers. func (r *raft) bcastHeartbeat() { + lastCtx := r.readOnly.lastPendingRequestCtx() + if len(lastCtx) == 0 { + r.bcastHeartbeatWithCtx(nil) + } else { + r.bcastHeartbeatWithCtx([]byte(lastCtx)) + } +} + +func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { for id := range r.prs { if id == r.id { continue } - r.sendHeartbeat(id) + r.sendHeartbeat(id, ctx) r.prs[id].resume() } } @@ -453,6 +482,7 @@ func (r *raft) reset(term uint64) { } } r.pendingConf = false + r.readOnly = newReadOnly(r.readOnly.option) } func (r *raft) appendEntry(es ...pb.Entry) { @@ -699,16 +729,29 @@ func stepLeader(r *raft, m pb.Message) { r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true}) return case pb.MsgReadIndex: - ri := None - if r.checkQuorum { - ri = r.raftLog.committed - } - if m.From == None || m.From == r.id { // from local member - r.readState.Index = ri - r.readState.RequestCtx = m.Entries[0].Data + if r.quorum() > 1 { + // thinking: use an interally defined context instead of the user given context. + // We can express this in terms of the term and index instead of a user-supplied value. + // This would allow multiple reads to piggyback on the same message. + switch r.readOnly.option { + case ReadOnlySafe: + r.readOnly.addRequest(r.raftLog.committed, m) + r.bcastHeartbeatWithCtx(m.Entries[0].Data) + case ReadOnlyLeaseBased: + var ri uint64 + if r.checkQuorum { + ri = r.raftLog.committed + } + if m.From == None || m.From == r.id { // from local member + r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data}) + } else { + r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries}) + } + } } else { - r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries}) + r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data}) } + return } @@ -769,6 +812,25 @@ func stepLeader(r *raft, m pb.Message) { if pr.Match < r.raftLog.lastIndex() { r.sendAppend(m.From) } + + if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { + return + } + + ackCount := r.readOnly.recvAck(m) + if ackCount < r.quorum() { + return + } + + rss := r.readOnly.advance(m) + for _, rs := range rss { + req := rs.req + if req.From == None || req.From == r.id { // from local member + r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data}) + } else { + r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries}) + } + } case pb.MsgSnapStatus: if pr.State != ProgressStateSnapshot { return @@ -910,9 +972,7 @@ func stepFollower(r *raft, m pb.Message) { r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries)) return } - - r.readState.Index = m.Index - r.readState.RequestCtx = m.Entries[0].Data + r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data}) } } @@ -933,7 +993,7 @@ func (r *raft) handleAppendEntries(m pb.Message) { func (r *raft) handleHeartbeat(m pb.Message) { r.raftLog.commitTo(m.Commit) - r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp}) + r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) } func (r *raft) handleSnapshot(m pb.Message) { diff --git a/raft/raft_test.go b/raft/raft_test.go index 9e4ff9a93e8..6415267ce8e 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1411,11 +1411,65 @@ func TestNonPromotableVoterWithCheckQuorum(t *testing.T) { } } -func TestReadIndexWithCheckQuorum(t *testing.T) { +func TestReadOnlyOptionSafe(t *testing.T) { a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + nt := newNetwork(a, b, c) + setRandomizedElectionTimeout(b, b.electionTimeout+1) + + for i := 0; i < b.electionTimeout; i++ { + b.tick() + } + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + + if a.state != StateLeader { + t.Fatalf("state = %s, want %s", a.state, StateLeader) + } + + tests := []struct { + sm *raft + proposals int + wri uint64 + wctx []byte + }{ + {b, 10, 11, []byte("ctx1")}, + {c, 10, 21, []byte("ctx2")}, + {b, 10, 31, []byte("ctx3")}, + {c, 10, 41, []byte("ctx4")}, + } + + for i, tt := range tests { + for j := 0; j < tt.proposals; j++ { + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) + } + + nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}}) + + r := tt.sm + if len(r.readStates) == 0 { + t.Errorf("#%d: len(readStates) = 0, want non-zero", i) + } + rs := r.readStates[0] + if rs.Index != tt.wri { + t.Errorf("#%d: readIndex = %d, want %d", i, rs.Index, tt.wri) + } + + if !bytes.Equal(rs.RequestCtx, tt.wctx) { + t.Errorf("#%d: requestCtx = %v, want %v", i, rs.RequestCtx, tt.wctx) + } + r.readStates = nil + } +} + +func TestReadOnlyOptionLease(t *testing.T) { + a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + a.readOnly.option = ReadOnlyLeaseBased + b.readOnly.option = ReadOnlyLeaseBased + c.readOnly.option = ReadOnlyLeaseBased a.checkQuorum = true b.checkQuorum = true c.checkQuorum = true @@ -1444,7 +1498,7 @@ func TestReadIndexWithCheckQuorum(t *testing.T) { {c, 10, 41, []byte("ctx4")}, } - for _, tt := range tests { + for i, tt := range tests { for j := 0; j < tt.proposals; j++ { nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) } @@ -1452,20 +1506,25 @@ func TestReadIndexWithCheckQuorum(t *testing.T) { nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}}) r := tt.sm - if r.readState.Index != tt.wri { - t.Errorf("readIndex = %d, want %d", r.readState.Index, tt.wri) + rs := r.readStates[0] + if rs.Index != tt.wri { + t.Errorf("#%d: readIndex = %d, want %d", i, rs.Index, tt.wri) } - if !bytes.Equal(r.readState.RequestCtx, tt.wctx) { - t.Errorf("requestCtx = %v, want %v", r.readState.RequestCtx, tt.wctx) + if !bytes.Equal(rs.RequestCtx, tt.wctx) { + t.Errorf("#%d: requestCtx = %v, want %v", i, rs.RequestCtx, tt.wctx) } + r.readStates = nil } } -func TestReadIndexWithoutCheckQuorum(t *testing.T) { +func TestReadOnlyOptionLeaseWithoutCheckQuorum(t *testing.T) { a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) + a.readOnly.option = ReadOnlyLeaseBased + b.readOnly.option = ReadOnlyLeaseBased + c.readOnly.option = ReadOnlyLeaseBased nt := newNetwork(a, b, c) nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) @@ -1473,12 +1532,13 @@ func TestReadIndexWithoutCheckQuorum(t *testing.T) { ctx := []byte("ctx1") nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: ctx}}}) - if b.readState.Index != None { - t.Errorf("readIndex = %d, want %d", b.readState.Index, None) + rs := b.readStates[0] + if rs.Index != None { + t.Errorf("readIndex = %d, want %d", rs.Index, None) } - if !bytes.Equal(b.readState.RequestCtx, ctx) { - t.Errorf("requestCtx = %v, want %v", b.readState.RequestCtx, ctx) + if !bytes.Equal(rs.RequestCtx, ctx) { + t.Errorf("requestCtx = %v, want %v", rs.RequestCtx, ctx) } } diff --git a/raft/read_only.go b/raft/read_only.go new file mode 100644 index 00000000000..05a21dabd1f --- /dev/null +++ b/raft/read_only.go @@ -0,0 +1,118 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package raft + +import pb "github.com/coreos/etcd/raft/raftpb" + +// ReadState provides state for read only query. +// It's caller's responsibility to call ReadIndex first before getting +// this state from ready, It's also caller's duty to differentiate if this +// state is what it requests through RequestCtx, eg. given a unique id as +// RequestCtx +type ReadState struct { + Index uint64 + RequestCtx []byte +} + +type readIndexStatus struct { + req pb.Message + index uint64 + acks map[uint64]struct{} +} + +type readOnly struct { + option ReadOnlyOption + pendingReadIndex map[string]*readIndexStatus + readIndexQueue []string +} + +func newReadOnly(option ReadOnlyOption) *readOnly { + return &readOnly{ + option: option, + pendingReadIndex: make(map[string]*readIndexStatus), + } +} + +// addRequest adds a read only reuqest into readonly struct. +// `index` is the commit index of the raft state machine when it received +// the read only request. +// `m` is the original read only request message from the local or remote node. +func (ro *readOnly) addRequest(index uint64, m pb.Message) { + ctx := string(m.Entries[0].Data) + if _, ok := ro.pendingReadIndex[ctx]; ok { + return + } + ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})} + ro.readIndexQueue = append(ro.readIndexQueue, ctx) +} + +// recvAck notifies the readonly struct that the raft state machine received +// an acknowledgment of the heartbeat that attached with the read only request +// context. +func (ro *readOnly) recvAck(m pb.Message) int { + rs, ok := ro.pendingReadIndex[string(m.Context)] + if !ok { + return 0 + } + + rs.acks[m.From] = struct{}{} + // add one to include an ack from local node + return len(rs.acks) + 1 +} + +// advance advances the read only request queue kept by the readonly struct. +// It dequeues the requests until it finds the read only request that has +// the same context as the given `m`. +func (ro *readOnly) advance(m pb.Message) []*readIndexStatus { + var ( + i int + found bool + ) + + ctx := string(m.Context) + rss := []*readIndexStatus{} + + for _, okctx := range ro.readIndexQueue { + i++ + rs, ok := ro.pendingReadIndex[okctx] + if !ok { + panic("cannot find corresponding read state from pending map") + } + rss = append(rss, rs) + if okctx == ctx { + found = true + break + } + } + + if found { + ro.readIndexQueue = ro.readIndexQueue[i:] + for _, rs := range rss { + delete(ro.pendingReadIndex, string(rs.req.Context)) + } + return rss + } + + return nil +} + +// lastPendingRequestCtx returns the context of the last pending read only +// request in readonly struct. +func (ro *readOnly) lastPendingRequestCtx() string { + if len(ro.readIndexQueue) == 0 { + return "" + } + return ro.readIndexQueue[len(ro.readIndexQueue)-1] +}