-
Notifications
You must be signed in to change notification settings - Fork 3
/
sync.go
276 lines (250 loc) · 6.82 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
package targetsync
import (
"context"
"fmt"
"time"
"github.com/jacksontj/lane"
"github.com/sirupsen/logrus"
)
// Syncer is the struct that uses the various interfaces to actually do the sync
// TODO: metrics
type Syncer struct {
Config *SyncConfig
LocalAddr string
Locker Locker
Src TargetSource
Dst TargetDestination
Started bool
}
// syncSelf simply syncs the LocalAddr from the souce to the target
func (s *Syncer) syncSelf(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
logrus.Infof("Local Addr %s -- waiting until added to target", s.LocalAddr)
srcCh, err := s.Src.Subscribe(ctx)
if err != nil {
return err
}
// Now we wait until our IP shows up in the source data, once it does
// we add ourselves to the target
for {
logrus.Debugf("Waiting for targets from source")
var srcTargets []*Target
select {
case <-ctx.Done():
return ctx.Err()
case srcTargets = <-srcCh:
}
logrus.Debugf("Received targets from source: %+#v", srcTargets)
for _, target := range srcTargets {
if target.IP == s.LocalAddr {
// try adding ourselves
if err := s.Dst.AddTargets(ctx, []*Target{target}); err != nil {
return err
}
return nil
}
}
}
}
// Run is the main method for the syncer. This is responsible for calling
// runLeader when the lock is held
func (s *Syncer) Run(ctx context.Context) error {
// add ourselves if a LocalAddr was defined
if s.LocalAddr != "" {
if err := s.syncSelf(ctx); err != nil {
return err
}
}
s.Started = true
logrus.Debugf("Syncer creating lock: %v", s.Config.LockOptions)
electedCh, err := s.Locker.Lock(ctx, &s.Config.LockOptions)
if err != nil {
return err
}
var leaderCtx context.Context
var leaderCtxCancel context.CancelFunc
for {
select {
case <-ctx.Done():
if leaderCtxCancel != nil {
leaderCtxCancel()
}
return ctx.Err()
case elected, ok := <-electedCh:
if !ok {
return fmt.Errorf("Lock channel closed")
}
if elected {
leaderCtx, leaderCtxCancel = context.WithCancel(ctx)
logrus.Infof("Lock acquired, starting leader actions")
go s.runLeader(leaderCtx)
} else {
logrus.Infof("Lock lost, stopping leader actions")
if leaderCtxCancel != nil {
leaderCtxCancel()
}
}
}
}
}
// bgRemove is a background goroutine responsible for removing targets from the destination
// this exists to allow for a `RemoveDelay` on the removal of targets from the destination
// to avoid issues where a target is "flapping" in the source
func (s *Syncer) bgRemove(ctx context.Context, removeCh chan *Target, addCh chan *Target) {
itemMap := make(map[string]*lane.Item)
q := lane.NewPQueue(lane.MINPQ)
defaultDuration := time.Hour
t := time.NewTimer(defaultDuration)
for {
select {
case <-ctx.Done():
return
case toRemove, ok := <-removeCh:
if !ok {
continue
}
// This means the target is already scheduled for removal
if _, ok := itemMap[toRemove.Key()]; ok {
continue
}
logrus.Debugf("Scheduling target for removal from destination in %v: %v", s.Config.RemoveDelay, toRemove)
now := time.Now()
removeUnixTime := now.Add(s.Config.RemoveDelay).Unix()
if headItem, headAt := q.Head(); headItem == nil || removeUnixTime < headAt {
if !t.Stop() {
select {
case <-t.C:
default:
}
}
t.Reset(s.Config.RemoveDelay)
}
itemMap[toRemove.Key()] = q.Push(toRemove, removeUnixTime)
case toAdd, ok := <-addCh:
if !ok {
continue
}
key := toAdd.Key()
if item, ok := itemMap[key]; ok {
logrus.Debugf("Removing target from removal queue as it was re-added: %v", toAdd)
q.Remove(item)
delete(itemMap, key)
}
case <-t.C:
// Check if there is an item at head, and if the time is past then
// do the removal
headItem, headUnixTime := q.Head()
logrus.Debugf("Processing target removal: %v", headItem)
now := time.Now()
nowUnix := now.Unix()
DELETE_LOOP:
for headItem != nil {
// If we where woken before something is ready, just reschedule
if headUnixTime > nowUnix {
break DELETE_LOOP
} else {
target := headItem.(*Target)
if err := s.Dst.RemoveTargets(ctx, []*Target{target}); err == nil {
logrus.Debugf("Target removal successful: %v", target)
q.Pop()
delete(itemMap, target.Key())
} else {
logrus.Errorf("Target removal unsuccessful %v: %v", target, err)
break DELETE_LOOP
}
}
headItem, headUnixTime = q.Head()
}
// If there is still an item in the queue, reset the timer
if headItem != nil {
d := time.Unix(headUnixTime, 0).Sub(now)
if !t.Stop() {
select {
case <-t.C:
default:
}
}
t.Reset(d)
}
}
}
}
// runLeader does the actual syncing from source to destination. This is called
// after the leader election has been done, there should only be one of these per
// unique destination running globally
func (s *Syncer) runLeader(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
removeCh := make(chan *Target, 100)
addCh := make(chan *Target, 100)
defer close(removeCh)
defer close(addCh)
go s.bgRemove(ctx, removeCh, addCh)
// get state from source
srcCh, err := s.Src.Subscribe(ctx)
if err != nil {
return err
}
var srcTargets []*Target
// Check for destination changes every 15 minutes
// (timer initialized to inf to ensure source gets initialized first)
d := time.Minute * 15
t := time.NewTimer(time.Second * (1 << 32))
defer t.Stop()
// Wait for an update, if we get one sync it
for {
logrus.Debugf("Waiting for targets from source")
select {
case <-ctx.Done():
return ctx.Err()
case srcTargets = <-srcCh:
logrus.Debugf("Received targets from source: %+#v", srcTargets)
case <-t.C:
}
if !t.Stop() {
select {
case <-t.C:
default:
}
}
t.Reset(d)
// get current ones from dst
dstTargets, err := s.Dst.GetTargets(ctx)
if err != nil {
return err
}
logrus.Debugf("Fetched targets from destination: %+#v", dstTargets)
// TODO: compare ports and do something with them
srcMap := make(map[string]*Target)
for _, target := range srcTargets {
srcMap[target.IP] = target
}
dstMap := make(map[string]*Target)
for _, target := range dstTargets {
dstMap[target.IP] = target
}
// Add hosts first
hostsToAdd := make([]*Target, 0)
for ip, target := range srcMap {
// We want to ensure that any target we think should be alive isn't
// in the removal queue
addCh <- target
if _, ok := dstMap[ip]; !ok {
hostsToAdd = append(hostsToAdd, target)
}
}
if len(hostsToAdd) > 0 {
logrus.Debugf("Adding targets to destination: %v", hostsToAdd)
if err := s.Dst.AddTargets(ctx, hostsToAdd); err != nil {
return err
}
}
// Remove hosts last
for ip, target := range dstMap {
if _, ok := srcMap[ip]; !ok {
removeCh <- target
}
}
}
}