-
Notifications
You must be signed in to change notification settings - Fork 3
/
consul.go
127 lines (109 loc) · 2.7 KB
/
consul.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package targetsync
import (
"context"
consulApi "github.com/hashicorp/consul/api"
"github.com/sirupsen/logrus"
)
// NewConsulSource returns a new ConsulSource
func NewConsulSource(cfg *ConsulConfig) (*ConsulSource, error) {
consulCfg := consulApi.DefaultConfig()
client, err := consulApi.NewClient(consulCfg)
if err != nil {
return nil, err
}
return &ConsulSource{
cfg: cfg,
client: client,
healthClient: client.Health(),
}, nil
}
// ConsulSource is an implementation for talkint to consul for both `TargetSource` and `Locker`
type ConsulSource struct {
cfg *ConsulConfig
client *consulApi.Client
healthClient *consulApi.Health
}
// Lock to implement the Locker interface
func (s *ConsulSource) Lock(ctx context.Context, opts *LockOptions) (<-chan bool, error) {
lock, err := s.client.LockOpts(&consulApi.LockOptions{
Key: opts.Key,
SessionTTL: opts.TTL.String(),
})
if err != nil {
logrus.Errorf("Error creating consul lock: %v", err)
return nil, err
}
lockedCh := make(chan bool, 1)
go func() {
ctx, cancel := context.WithCancel(ctx)
defer close(lockedCh)
defer cancel()
stopCh := make(chan struct{})
go func() {
<-ctx.Done()
close(stopCh)
}()
for {
lockCh, err := lock.Lock(stopCh)
if lockCh == nil || err != nil {
if err != nil {
logrus.Errorf("Error acquiring lock: %v", err)
}
return
}
// We have the lock, start things up
logrus.Infof("Lock acquired")
lockedCh <- true
select {
case <-ctx.Done():
logrus.Infof("Context done, stopping lock")
return
case <-lockCh:
logrus.Infof("Lock lost")
lockedCh <- false
}
}
}()
return lockedCh, nil
}
// Subscribe to implement the `TargetSource` interface
func (s *ConsulSource) Subscribe(ctx context.Context) (chan []*Target, error) {
queryOpts := &consulApi.QueryOptions{
WaitIndex: 0,
}
queryOpts = queryOpts.WithContext(ctx)
// TODO: configurable size?
ch := make(chan []*Target, 100)
go func(ch chan []*Target) {
defer close(ch)
for {
select {
case <-ctx.Done():
return
default:
}
services, meta, err := s.healthClient.Service(s.cfg.ServiceName, s.cfg.Tag, true, queryOpts)
if err != nil {
// TODO: sleep on failure with backoff etc.
continue
}
// If there was a change
if meta.LastIndex != queryOpts.WaitIndex {
targets := make([]*Target, len(services))
for i, entry := range services {
addr := entry.Node.Address
if entry.Service.Address != "" {
addr = entry.Service.Address
}
targets[i] = &Target{
IP: addr,
Port: entry.Service.Port,
}
}
ch <- targets
}
queryOpts.WaitIndex = meta.LastIndex
}
}(ch)
return ch, nil
}