This repository has been archived by the owner on Jun 12, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 45
/
replica_set.go
367 lines (316 loc) · 9.46 KB
/
replica_set.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
package dvara
import (
"errors"
"flag"
"fmt"
"net"
"os"
"strings"
"sync"
"time"
"github.com/facebookgo/stackerr"
"github.com/facebookgo/stats"
)
var hardRestart = flag.Bool(
"hard_restart",
true,
"if true will drop clients on restart",
)
// Logger allows for simple text logging.
type Logger interface {
Error(args ...interface{})
Errorf(format string, args ...interface{})
Warn(args ...interface{})
Warnf(format string, args ...interface{})
Info(args ...interface{})
Infof(format string, args ...interface{})
Debug(args ...interface{})
Debugf(format string, args ...interface{})
}
var errNoAddrsGiven = errors.New("dvara: no seed addresses given for ReplicaSet")
// ReplicaSet manages the real => proxy address mapping.
// NewReplicaSet returns the ReplicaSet given the list of seed servers. It is
// required for the seed servers to be a strict subset of the actual members if
// they are reachable. That is, if two of the addresses are members of
// different replica sets, it will be considered an error.
type ReplicaSet struct {
Log Logger `inject:""`
ReplicaSetStateCreator *ReplicaSetStateCreator `inject:""`
ProxyQuery *ProxyQuery `inject:""`
// Stats if provided will be used to record interesting stats.
Stats stats.Client `inject:""`
// Comma separated list of mongo addresses. This is the list of "seed"
// servers, and one of two conditions must be met for each entry here -- it's
// either alive and part of the same replica set as all others listed, or is
// not reachable.
Addrs string
// PortStart and PortEnd define the port range within which proxies will be
// allocated.
PortStart int
PortEnd int
// Maximum number of connections that will be established to each mongo node.
MaxConnections uint
// MinIdleConnections is the number of idle server connections we'll keep
// around.
MinIdleConnections uint
// ServerIdleTimeout is the duration after which a server connection will be
// considered idle.
ServerIdleTimeout time.Duration
// ServerClosePoolSize is the number of goroutines that will handle closing
// server connections.
ServerClosePoolSize uint
// ClientIdleTimeout is how long until we'll consider a client connection
// idle and disconnect and release it's resources.
ClientIdleTimeout time.Duration
// MaxPerClientConnections is how many client connections are allowed from a
// single client.
MaxPerClientConnections uint
// GetLastErrorTimeout is how long we'll hold on to an acquired server
// connection expecting a possibly getLastError call.
GetLastErrorTimeout time.Duration
// MessageTimeout is used to determine the timeout for a single message to be
// proxied.
MessageTimeout time.Duration
// Name is the name of the replica set to connect to. Nodes that are not part
// of this replica set will be ignored. If this is empty, the first replica set
// will be used
Name string
proxyToReal map[string]string
realToProxy map[string]string
ignoredReal map[string]ReplicaState
proxies map[string]*Proxy
restarter *sync.Once
lastState *ReplicaSetState
}
// Start starts proxies to support this ReplicaSet.
func (r *ReplicaSet) Start() error {
r.proxyToReal = make(map[string]string)
r.realToProxy = make(map[string]string)
r.ignoredReal = make(map[string]ReplicaState)
r.proxies = make(map[string]*Proxy)
if r.Addrs == "" {
return errNoAddrsGiven
}
rawAddrs := strings.Split(r.Addrs, ",")
var err error
r.lastState, err = r.ReplicaSetStateCreator.FromAddrs(rawAddrs, r.Name)
if err != nil {
return err
}
healthyAddrs := r.lastState.Addrs()
// Ensure we have at least one health address.
if len(healthyAddrs) == 0 {
return stackerr.Newf("no healthy primaries or secondaries: %s", r.Addrs)
}
// Add discovered nodes to seed address list. Over time if the original seed
// nodes have gone away and new nodes have joined this ensures that we'll
// still be able to connect.
r.Addrs = strings.Join(uniq(append(rawAddrs, healthyAddrs...)), ",")
r.restarter = new(sync.Once)
for _, addr := range healthyAddrs {
listener, err := r.newListener()
if err != nil {
return err
}
p := &Proxy{
Log: r.Log,
ReplicaSet: r,
ClientListener: listener,
ProxyAddr: r.proxyAddr(listener),
MongoAddr: addr,
}
if err := r.add(p); err != nil {
return err
}
}
// add the ignored hosts, unless lastRS is nil (single node mode)
if r.lastState.lastRS != nil {
for _, member := range r.lastState.lastRS.Members {
if _, ok := r.realToProxy[member.Name]; !ok {
r.ignoredReal[member.Name] = member.State
}
}
}
var wg sync.WaitGroup
wg.Add(len(r.proxies))
errch := make(chan error, len(r.proxies))
for _, p := range r.proxies {
go func(p *Proxy) {
defer wg.Done()
if err := p.Start(); err != nil {
r.Log.Error(err)
errch <- stackerr.Wrap(err)
}
}(p)
}
wg.Wait()
select {
default:
return nil
case err := <-errch:
return err
}
}
// Stop stops all the associated proxies for this ReplicaSet.
func (r *ReplicaSet) Stop() error {
return r.stop(false)
}
func (r *ReplicaSet) stop(hard bool) error {
var wg sync.WaitGroup
wg.Add(len(r.proxies))
errch := make(chan error, len(r.proxies))
for _, p := range r.proxies {
go func(p *Proxy) {
defer wg.Done()
if err := p.stop(hard); err != nil {
r.Log.Error(err)
errch <- stackerr.Wrap(err)
}
}(p)
}
wg.Wait()
select {
default:
return nil
case err := <-errch:
return err
}
}
// Restart stops all the proxies and restarts them. This is used when we detect
// an RS config change, like when an election happens.
func (r *ReplicaSet) Restart() {
r.restarter.Do(func() {
r.Log.Info("restart triggered")
if err := r.stop(*hardRestart); err != nil {
// We log and ignore this hoping for a successful start anyways.
r.Log.Errorf("stop failed for restart: %s", err)
} else {
r.Log.Info("successfully stopped for restart")
}
if err := r.Start(); err != nil {
// We panic here because we can't repair from here and are pretty much
// fucked.
panic(fmt.Errorf("start failed for restart: %s", err))
}
r.Log.Info("successfully restarted")
})
}
func (r *ReplicaSet) proxyAddr(l net.Listener) string {
_, port, err := net.SplitHostPort(l.Addr().String())
if err != nil {
panic(err)
}
return fmt.Sprintf("%s:%s", r.proxyHostname(), port)
}
func (r *ReplicaSet) proxyHostname() string {
const home = "127.0.0.1"
hostname, err := os.Hostname()
if err != nil {
r.Log.Error(err)
return home
}
// The follow logic ensures that the hostname resolves to a local address.
// If it doesn't we don't use it since it probably wont work anyways.
hostnameAddrs, err := net.LookupHost(hostname)
if err != nil {
r.Log.Error(err)
return home
}
interfaceAddrs, err := net.InterfaceAddrs()
if err != nil {
r.Log.Error(err)
return home
}
for _, ia := range interfaceAddrs {
sa := ia.String()
for _, ha := range hostnameAddrs {
// check for an exact match or a match ignoring the suffix bits
if sa == ha || strings.HasPrefix(sa, ha+"/") {
return hostname
}
}
}
r.Log.Warnf("hostname %s doesn't resolve to the current host", hostname)
return home
}
func (r *ReplicaSet) newListener() (net.Listener, error) {
for i := r.PortStart; i <= r.PortEnd; i++ {
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", i))
if err == nil {
return listener, nil
}
}
return nil, fmt.Errorf(
"could not find a free port in range %d-%d",
r.PortStart,
r.PortEnd,
)
}
// add a proxy/mongo mapping.
func (r *ReplicaSet) add(p *Proxy) error {
if _, ok := r.proxyToReal[p.ProxyAddr]; ok {
return fmt.Errorf("proxy %s already used in ReplicaSet", p.ProxyAddr)
}
if _, ok := r.realToProxy[p.MongoAddr]; ok {
return fmt.Errorf("mongo %s already exists in ReplicaSet", p.MongoAddr)
}
r.Log.Infof("added %s", p)
r.proxyToReal[p.ProxyAddr] = p.MongoAddr
r.realToProxy[p.MongoAddr] = p.ProxyAddr
r.proxies[p.ProxyAddr] = p
return nil
}
// Proxy returns the corresponding proxy address for the given real mongo
// address.
func (r *ReplicaSet) Proxy(h string) (string, error) {
p, ok := r.realToProxy[h]
if !ok {
if s, ok := r.ignoredReal[h]; ok {
return "", &ProxyMapperError{
RealHost: h,
State: s,
}
}
return "", fmt.Errorf("mongo %s is not in ReplicaSet", h)
}
return p, nil
}
// ProxyMembers returns the list of proxy members in this ReplicaSet.
func (r *ReplicaSet) ProxyMembers() []string {
members := make([]string, 0, len(r.proxyToReal))
for r := range r.proxyToReal {
members = append(members, r)
}
return members
}
// SameRS checks if the given replSetGetStatusResponse is the same as the last
// state.
func (r *ReplicaSet) SameRS(o *replSetGetStatusResponse) bool {
return r.lastState.SameRS(o)
}
// SameIM checks if the given isMasterResponse is the same as the last state.
func (r *ReplicaSet) SameIM(o *isMasterResponse) bool {
return r.lastState.SameIM(o)
}
// ProxyMapperError occurs when a known host is being ignored and does not have
// a corresponding proxy address.
type ProxyMapperError struct {
RealHost string
State ReplicaState
}
func (p *ProxyMapperError) Error() string {
return fmt.Sprintf("error mapping host %s in state %s", p.RealHost, p.State)
}
// uniq takes a slice of strings and returns a new slice with duplicates
// removed.
func uniq(set []string) []string {
m := make(map[string]struct{}, len(set))
for _, s := range set {
m[s] = struct{}{}
}
news := make([]string, 0, len(m))
for s := range m {
news = append(news, s)
}
return news
}