-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfailover.go
286 lines (252 loc) · 5.99 KB
/
failover.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
package go_failover
import (
"errors"
"log"
"sort"
"time"
)
const (
Init PeerState = "init"
Active PeerState = "active"
Standby PeerState = "standby"
Unknown PeerState = "unknown"
DefaultPriority = 100
)
var (
ErrPeerIsInit = errors.New("peer is in init state")
)
type PeerState string
type Priority uint8
type Transport interface {
Ready(groupId string) (inboundAdvertise <-chan InboundAdvertisement)
Advertise(peer Peer, advertisement Advertisement) (Advertisement, error)
}
type Peer struct {
Id string `json:"id"`
Priority Priority `json:"priority"`
State PeerState `json:"state"`
}
type PeerList []Peer
func (l *PeerList) Add(peer Peer) {
for idx, p := range *l {
if p.Id == peer.Id {
//If it already exists, update info
(*l)[idx] = peer
return
}
}
*l = append(*l, peer)
l.sort()
}
func (l *PeerList) sort() {
sort.SliceStable(*l, func(i, j int) bool {
return (*l)[i].Priority > (*l)[j].Priority
})
}
type Advertisement struct {
GroupId string `json:"group_id"`
Local Peer `json:"local"`
Active Peer `json:"active"`
Standby PeerList `json:"peers"`
IntervalMs uint `json:"interval_ms"`
}
type InboundAdvertisement struct {
Advertisement
response chan Advertisement
}
type Failover struct {
Groups []Group
}
func New(groupId, localId string, localPriority uint8, intervalMs uint,
transport Transport,
becomeActiveCallback func(),
becomeStandbyCallback func(),
peerIds ...string) *Group {
standbyPeers := PeerList{}
for _, pId := range peerIds {
standbyPeers.Add(Peer{
Id: pId,
Priority: DefaultPriority,
State: Unknown,
})
}
group := &Group{
id: groupId,
local: Peer{
Id: localId,
Priority: Priority(localPriority),
State: Init,
},
active: Peer{},
standby: standbyPeers,
intervalMs: intervalMs,
becomeActive: becomeActiveCallback,
becomeStandby: becomeStandbyCallback,
transport: transport,
}
go group.loop()
return group
}
type Group struct {
id string
local Peer
active Peer
standby PeerList
intervalMs uint
becomeActive func()
becomeStandby func()
transport Transport
}
func (g *Group) loop() {
var interval = time.Duration(g.intervalMs) * time.Millisecond
intervalTicker := time.NewTicker(interval)
activeDownCount := 0
g.StateInit()
inboundAds := g.transport.Ready(g.id)
for {
select {
case ad := <-inboundAds:
//we have received an announcement from an incoming peer
log.Printf("Advertisement received: %#v", ad.Local)
if g.local.State == Init {
//We failed to fully initialize locally and a peer has announced to us
if ad.Local.Priority > g.local.Priority {
//they should be active
g.active = ad.Local
g.stateStandby()
} else {
//If our priority is higher or a tie
//we should be active
g.standby.Add(ad.Local)
g.stateActive()
}
} else if g.local.State == Active {
if ad.Local.State == Active {
log.Printf("Contested Active: %+v", ad)
//todo: newest wins
}
g.standby.Add(ad.Local)
} else if g.local.State == Standby {
if ad.Local.State != Active {
//This is an add from a new peer
//send them our current state and move on
ad.response <- g.advertisement()
break
}
if g.active.Id != ad.Active.Id && ad.Active.Id != "" {
log.Printf("new active found: %s\n", ad.Active.Id)
g.active = ad.Active
}
g.active = ad.Active
g.standby = ad.Standby
activeDownCount = 0
intervalTicker.Reset(interval)
}
ad.response <- g.advertisement()
case <-intervalTicker.C:
if g.local.State == Active {
// send advertisements to everyone!
for _, peer := range g.standby {
//might want to make this async
log.Printf("Sending Advertisement:%s", peer.Id)
_, err := g.transport.Advertise(peer, g.advertisement())
if err != nil {
log.Printf("Error Advertising: %s", err.Error())
}
}
} else {
activeDownCount++
if activeDownCount > 3 {
if g.local.State == Init {
g.stateActive()
} else {
//start iterating through
if len(g.standby) > 0 {
next := g.standby[0]
g.standby = g.standby[1:]
if next.Id == g.local.Id {
//we're next, assume active
g.stateActive()
}
} else {
//No known peers, assume active
g.stateActive()
}
}
}
}
}
}
}
func (g *Group) StateInit() {
log.Printf("[%s][%s] init begin", g.local.Id, g.id)
defer log.Printf("[%s][%s] init complete", g.local.Id, g.id)
g.local.State = Init
if len(g.standby) == 0 {
log.Printf("[%s][%s] no peers, assume active", g.local.Id, g.id)
g.stateActive()
return
}
claimActive := false
for _, peer := range g.standby {
adv, err := findActive(peer, g)
if err != nil {
log.Println(err)
continue
}
if adv.Active.Id == g.local.Id {
//peer suggests we should be active.
//If no active peer is found, claim it
claimActive = true
}
//found active
g.active = adv.Local
g.standby = adv.Standby
g.stateStandby()
return
}
if claimActive {
g.stateActive()
}
}
func findActive(peer Peer, g *Group) (Advertisement, error) {
adv, err := g.transport.Advertise(peer, g.advertisement())
if err != nil {
return Advertisement{}, err
}
if adv.Local.State == Active {
//found master
return adv, nil
} else if adv.Local.State == Standby {
if adv.Active.Id == g.local.Id {
//peer is saying we should be active
return adv, nil
}
return findActive(adv.Active, g)
} else {
//peer is in Init
return adv, ErrPeerIsInit
}
}
func (g *Group) stateStandby() {
g.local.State = Standby
if g.becomeStandby != nil {
g.becomeStandby()
}
}
func (g *Group) stateActive() {
g.local.State = Active
g.active = g.local
if g.becomeActive != nil {
g.becomeActive()
}
}
func (g *Group) advertisement() Advertisement {
return Advertisement{
GroupId: g.id,
Local: g.local,
Active: g.active,
Standby: g.standby,
IntervalMs: g.intervalMs,
}
}