-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathdinghy.go
285 lines (273 loc) · 8.19 KB
/
dinghy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
package dinghy
import (
"errors"
"fmt"
"net/http"
"sync"
"time"
)
var (
// DefaultOnLeader is a no op function to execute when a node becomes a leader.
DefaultOnLeader = func() error { return nil }
// DefaultOnFollower is a no op function to execute when a node becomes a follower.
DefaultOnFollower = func() error { return nil }
// DefaultRoutePrefix is what is prefixed for the dinghy routes. (/dinghy)
DefaultRoutePrefix = "/dinghy"
// ErrTooFewVotes happens on a RequestVote when the candidate receives less than the
// majority of votes.
ErrTooFewVotes = errors.New("too few votes")
// ErrNewElectionTerm if during RequestVote there is a higher term found.
ErrNewElectionTerm = errors.New("newer election term")
// ErrLeader is returned when an operation can't be completed on a
// leader node.
ErrLeader = errors.New("node is the leader")
// ErrNotLeader is returned when an operation can't be completed on a
// follower or candidate node.
ErrNotLeader = errors.New("node is not the leader")
)
// Dinghy manages the raft FSM and executes OnLeader and OnFollower events.
type Dinghy struct {
client *http.Client
logger Logger
mu *sync.Mutex
// routePrefix will be prefixed to all handler routes. This should start with /route.
routePrefix string
stopChan chan struct{}
// Addr is a host:port for the current node.
Addr string
// Nodes is a list of all nodes for consensus.
Nodes []string
// OnLeader is an optional function to execute when becoming a leader.
OnLeader func() error
// OnFollower is an optional function to execute when becoming a follower.
OnFollower func() error
// State for holding the raft state.
State *State
}
// ApplyFunc is for on leader and on follower events.
type ApplyFunc func() error
// New initializes a new dinghy. Start is required to be run to
// begin leader election.
func New(addr string, nodes []string, onLeader, onFollower ApplyFunc, l Logger, eMS, hMS int) (*Dinghy, error) {
l.Println("addr:", addr, "nodes:", nodes)
if onLeader == nil {
onLeader = DefaultOnLeader
}
if onFollower == nil {
onFollower = DefaultOnFollower
}
c := &http.Client{
Timeout: time.Duration(hMS) * time.Millisecond,
}
id, err := hashToInt(addr, len(nodes)*1000)
if err != nil {
return nil, err
}
id++
mu := &sync.Mutex{}
mu.Lock()
d := &Dinghy{
client: c,
logger: l,
mu: mu,
routePrefix: DefaultRoutePrefix,
stopChan: make(chan struct{}),
Addr: addr,
Nodes: nodes,
OnLeader: onLeader,
OnFollower: onFollower,
State: NewState(id, eMS, hMS),
}
mu.Unlock()
l.Printf("%+v", d)
return d, nil
}
// Stop will stop any running event loop.
func (d *Dinghy) Stop() error {
d.logger.Println("stopping event loop")
// exit any state running and the main event fsm.
for i := 0; i < 2; i++ {
d.stopChan <- struct{}{}
}
return nil
}
// Start begins the leader election process.
func (d *Dinghy) Start() error {
d.logger.Println("starting event loop")
for {
d.logger.Println(d.State)
select {
case <-d.stopChan:
d.logger.Println("stopping event loop")
return nil
default:
}
switch d.State.State() {
case StateFollower:
if err := d.follower(); err != nil {
return err
}
case StateCandidate:
d.candidate()
case StateLeader:
if err := d.leader(); err != nil {
return err
}
default:
return fmt.Errorf("unknown state %d", d.State.State())
}
}
}
// follower will wait for an AppendEntries from the leader and on expiration will begin
// the process of leader election with a RequestVote.
func (d *Dinghy) follower() error {
d.logger.Println("entering follower state, leader id", d.State.LeaderID())
d.mu.Lock()
if err := d.OnFollower(); err != nil {
d.logger.Errorln("executing OnFollower", err)
d.mu.Unlock()
return err
}
d.mu.Unlock()
LOOP:
for {
select {
case <-d.stopChan:
return nil
case newState := <-d.State.StateChanged():
if newState == StateFollower {
continue
}
d.logger.Println("follower state changed to", d.State.StateString(newState))
return nil
case <-d.State.HeartbeatReset():
d.logger.Println("heartbeat reset")
continue LOOP
case aer := <-d.State.AppendEntriesEvent():
d.logger.Println(d.State.StateString(d.State.State()), "got AppendEntries from leader", aer)
continue LOOP
case h := <-d.State.HeartbeatTickRandom():
// https://raft.github.io/raft.pdf
// If a follower receives no communication over a period of time
// called the election timeout, then it assumes there is no viable
// leader and begins an election to choose a new leader.
// To begin an election, a follower increments its current
// term and transitions to candidate state.
d.logger.Println("follower heartbeat timeout, transitioning to candidate", h)
d.State.VotedFor(NoVote)
d.State.LeaderID(UnknownLeaderID)
d.State.Term(d.State.Term() + 1)
d.State.State(StateCandidate)
return nil
}
}
}
// candidate is for when in StateCandidate. The loop will
// attempt an election repeatedly until it receives events.
// https://raft.github.io/raft.pdf
// A candidate continues in
// this state until one of three things happens: (a) it wins the
// election, (b) another server establishes itself as leader, or
// (c) a period of time goes by with no winner.
func (d *Dinghy) candidate() {
d.logger.Println("entering candidate state")
go func() {
d.logger.Println("requesting vote")
currentTerm, err := d.RequestVoteRequest()
if err != nil {
d.logger.Errorln("executing RequestVoteRequest", err)
switch err {
case ErrNewElectionTerm:
d.State.StepDown(currentTerm)
case ErrTooFewVotes:
d.State.State(StateFollower)
}
return
}
// it wins the election
d.State.LeaderID(d.State.ID())
d.State.State(StateLeader)
}()
for {
select {
case <-d.stopChan:
return
case aer := <-d.State.AppendEntriesEvent():
// https://raft.github.io/raft.pdf
// While waiting for votes, a candidate may receive an
// AppendEntries RPC from another server claiming to be
// leader. If the leader’s term (included in its RPC) is at least
// as large as the candidate’s current term, then the candidate
// recognizes the leader as legitimate and returns to follower
// state. If the term in the RPC is smaller than the candidate’s
// current term, then the candidate rejects the RPC and continues
// in candidate state.
d.logger.Println("candidate got an AppendEntries from a leader", aer)
if aer.Term >= d.State.Term() {
d.State.StepDown(aer.Term)
return
}
case newState := <-d.State.StateChanged():
if newState == StateCandidate {
continue
}
d.logger.Println("candidate state changed to", d.State.StateString(newState))
return
case e := <-d.State.ElectionTick():
d.logger.Println("election timeout, restarting election", e)
return
}
}
}
// leader is for when in StateLeader. The loop will continually send
// a heartbeat of AppendEntries to all peers at a rate of HeartbeatTimeoutMS.
func (d *Dinghy) leader() error {
d.logger.Println("entering leader state")
go d.AppendEntriesRequest()
errChan := make(chan error)
go func() {
// Run the OnLeader event in a goroutine in case
// it has a long delay. Any errors returned will exit the
// leader state.
d.mu.Lock()
defer d.mu.Unlock()
if err := d.OnLeader(); err != nil {
d.logger.Errorln("executing OnLeader", err)
errChan <- err
}
}()
for {
select {
case err := <-errChan:
d.State.State(StateFollower)
go func() {
// Removing the state change event here
// before returning error.
<-d.State.StateChanged()
}()
return err
case <-d.stopChan:
return nil
case <-d.State.AppendEntriesEvent():
// ignore any append entries to self.
continue
case newState := <-d.State.StateChanged():
if newState == StateLeader {
continue
}
d.logger.Println("leader state changed to", d.State.StateString(newState))
return nil
case h := <-d.State.HeartbeatTick():
d.logger.Println("sending to peers AppendEntriesRequest", d.Nodes, h)
currentTerm, err := d.AppendEntriesRequest()
if err != nil {
d.logger.Errorln("executing AppendEntriesRequest", err)
switch err {
case ErrNewElectionTerm:
d.State.StepDown(currentTerm)
return nil
}
}
}
}
}