-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
client.go
336 lines (302 loc) · 10.9 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package gossip
import (
"context"
"fmt"
"net"
"sync"
"time"
"github.com/pkg/errors"
circuit "github.com/rubyist/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)
// client is a client-side RPC connection to a gossip peer node.
type client struct {
log.AmbientContext
createdAt time.Time
peerID roachpb.NodeID // Peer node ID; 0 until first gossip response
resolvedPlaceholder bool // Whether we've resolved the nodeSet's placeholder for this client
addr net.Addr // Peer node network address
forwardAddr *util.UnresolvedAddr // Set if disconnected with an alternate addr
remoteHighWaterStamps map[roachpb.NodeID]int64 // Remote server's high water timestamps
closer chan struct{} // Client shutdown channel
clientMetrics Metrics
nodeMetrics Metrics
}
// extractKeys returns a string representation of a gossip delta's keys.
func extractKeys(delta map[string]*Info) string {
keys := make([]string, 0, len(delta))
for key := range delta {
keys = append(keys, key)
}
return fmt.Sprintf("%s", keys)
}
// newClient creates and returns a client struct.
func newClient(ambient log.AmbientContext, addr net.Addr, nodeMetrics Metrics) *client {
return &client{
AmbientContext: ambient,
createdAt: timeutil.Now(),
addr: addr,
remoteHighWaterStamps: map[roachpb.NodeID]int64{},
closer: make(chan struct{}),
clientMetrics: makeMetrics(),
nodeMetrics: nodeMetrics,
}
}
// start dials the remote addr and commences gossip once connected. Upon exit,
// the client is sent on the disconnected channel. This method starts client
// processing in a goroutine and returns immediately.
func (c *client) startLocked(
g *Gossip,
disconnected chan *client,
rpcCtx *rpc.Context,
stopper *stop.Stopper,
breaker *circuit.Breaker,
) {
// Add a placeholder for the new outgoing connection because we may not know
// the ID of the node we're connecting to yet. This will be resolved in
// (*client).handleResponse once we know the ID.
g.outgoing.addPlaceholder()
ctx, cancel := context.WithCancel(c.AnnotateCtx(context.Background()))
stopper.RunWorker(ctx, func(ctx context.Context) {
var wg sync.WaitGroup
defer func() {
// This closes the outgoing stream, causing any attempt to send or
// receive to return an error.
//
// Note: it is still possible for incoming gossip to be processed after
// this point.
cancel()
// The stream is closed, but there may still be some incoming gossip
// being processed. Wait until that is complete to avoid racing the
// client's removal against the discovery of its remote's node ID.
wg.Wait()
disconnected <- c
}()
consecFailures := breaker.ConsecFailures()
var stream Gossip_GossipClient
if err := breaker.Call(func() error {
// Note: avoid using `grpc.WithBlock` here. This code is already
// asynchronous from the caller's perspective, so the only effect of
// `WithBlock` here is blocking shutdown - at the time of this writing,
// that ends ups up making `kv` tests take twice as long.
conn, err := rpcCtx.GRPCDial(c.addr.String()).Connect(ctx)
if err != nil {
return err
}
if stream, err = NewGossipClient(conn).Gossip(ctx); err != nil {
return err
}
return c.requestGossip(g, stream)
}, 0); err != nil {
if consecFailures == 0 {
log.Warningf(ctx, "failed to start gossip client to %s: %s", c.addr, err)
}
return
}
// Start gossiping.
log.Infof(ctx, "started gossip client to %s", c.addr)
if err := c.gossip(ctx, g, stream, stopper, &wg); err != nil {
if !grpcutil.IsClosedConnection(err) {
g.mu.Lock()
if c.peerID != 0 {
log.Infof(ctx, "closing client to node %d (%s): %s", c.peerID, c.addr, err)
} else {
log.Infof(ctx, "closing client to %s: %s", c.addr, err)
}
g.mu.Unlock()
}
}
})
}
// close stops the client gossip loop and returns immediately.
func (c *client) close() {
select {
case <-c.closer:
default:
close(c.closer)
}
}
// requestGossip requests the latest gossip from the remote server by
// supplying a map of this node's knowledge of other nodes' high water
// timestamps.
func (c *client) requestGossip(g *Gossip, stream Gossip_GossipClient) error {
g.mu.Lock()
args := &Request{
NodeID: g.NodeID.Get(),
Addr: g.mu.is.NodeAddr,
HighWaterStamps: g.mu.is.getHighWaterStamps(),
ClusterID: g.clusterID.Get(),
}
g.mu.Unlock()
bytesSent := int64(args.Size())
c.clientMetrics.BytesSent.Inc(bytesSent)
c.nodeMetrics.BytesSent.Inc(bytesSent)
return stream.Send(args)
}
// sendGossip sends the latest gossip to the remote server, based on
// the remote server's notion of other nodes' high water timestamps.
func (c *client) sendGossip(g *Gossip, stream Gossip_GossipClient) error {
g.mu.Lock()
if delta := g.mu.is.delta(c.remoteHighWaterStamps); len(delta) > 0 {
args := Request{
NodeID: g.NodeID.Get(),
Addr: g.mu.is.NodeAddr,
Delta: delta,
HighWaterStamps: g.mu.is.getHighWaterStamps(),
ClusterID: g.clusterID.Get(),
}
bytesSent := int64(args.Size())
infosSent := int64(len(delta))
c.clientMetrics.BytesSent.Inc(bytesSent)
c.clientMetrics.InfosSent.Inc(infosSent)
c.nodeMetrics.BytesSent.Inc(bytesSent)
c.nodeMetrics.InfosSent.Inc(infosSent)
if log.V(1) {
ctx := c.AnnotateCtx(stream.Context())
if c.peerID != 0 {
log.Infof(ctx, "sending %s to node %d (%s)", extractKeys(args.Delta), c.peerID, c.addr)
} else {
log.Infof(ctx, "sending %s to %s", extractKeys(args.Delta), c.addr)
}
}
g.mu.Unlock()
return stream.Send(&args)
}
g.mu.Unlock()
return nil
}
// handleResponse handles errors, remote forwarding, and combines delta
// gossip infos from the remote server with this node's infostore.
func (c *client) handleResponse(ctx context.Context, g *Gossip, reply *Response) error {
g.mu.Lock()
defer g.mu.Unlock()
bytesReceived := int64(reply.Size())
infosReceived := int64(len(reply.Delta))
c.clientMetrics.BytesReceived.Inc(bytesReceived)
c.clientMetrics.InfosReceived.Inc(infosReceived)
c.nodeMetrics.BytesReceived.Inc(bytesReceived)
c.nodeMetrics.InfosReceived.Inc(infosReceived)
// Combine remote node's infostore delta with ours.
if reply.Delta != nil {
freshCount, err := g.mu.is.combine(reply.Delta, reply.NodeID)
if err != nil {
log.Warningf(ctx, "failed to fully combine delta from node %d: %s", reply.NodeID, err)
}
if infoCount := len(reply.Delta); infoCount > 0 {
if log.V(1) {
log.Infof(ctx, "received %s from node %d (%d fresh)", extractKeys(reply.Delta), reply.NodeID, freshCount)
}
}
g.maybeTightenLocked()
}
c.peerID = reply.NodeID
c.remoteHighWaterStamps = reply.HighWaterStamps
// If we haven't yet recorded which node ID we're connected to in the outgoing
// nodeSet, do so now. Note that we only want to do this if the peer has a
// node ID allocated (i.e. if it's nonzero), because otherwise it could change
// after we record it.
if !c.resolvedPlaceholder && c.peerID != 0 {
c.resolvedPlaceholder = true
g.outgoing.resolvePlaceholder(c.peerID)
}
// Handle remote forwarding.
if reply.AlternateAddr != nil {
if g.hasIncomingLocked(reply.AlternateNodeID) || g.hasOutgoingLocked(reply.AlternateNodeID) {
return errors.Errorf("received forward from node %d to %d (%s); already have active connection, skipping",
reply.NodeID, reply.AlternateNodeID, reply.AlternateAddr)
}
// We try to resolve the address, but don't actually use the result.
// The certificates (if any) may only be valid for the unresolved
// address.
if _, err := reply.AlternateAddr.Resolve(); err != nil {
return errors.Errorf("unable to resolve alternate address %s for node %d: %s", reply.AlternateAddr, reply.AlternateNodeID, err)
}
c.forwardAddr = reply.AlternateAddr
return errors.Errorf("received forward from node %d to %d (%s)", reply.NodeID, reply.AlternateNodeID, reply.AlternateAddr)
}
// Check whether we're connected at this point.
g.signalConnectedLocked()
// Check whether this outgoing client is duplicating work already
// being done by an incoming client, either because an outgoing
// matches an incoming or the client is connecting to itself.
if nodeID := g.NodeID.Get(); nodeID == c.peerID {
return errors.Errorf("stopping outgoing client to node %d (%s); loopback connection", c.peerID, c.addr)
} else if g.hasIncomingLocked(c.peerID) && nodeID > c.peerID {
// To avoid mutual shutdown, we only shutdown our client if our
// node ID is higher than the peer's.
return errors.Errorf("stopping outgoing client to node %d (%s); already have incoming", c.peerID, c.addr)
}
return nil
}
// gossip loops, sending deltas of the infostore and receiving deltas
// in turn. If an alternate is proposed on response, the client addr
// is modified and method returns for forwarding by caller.
func (c *client) gossip(
ctx context.Context,
g *Gossip,
stream Gossip_GossipClient,
stopper *stop.Stopper,
wg *sync.WaitGroup,
) error {
sendGossipChan := make(chan struct{}, 1)
// Register a callback for gossip updates.
updateCallback := func(_ string, _ roachpb.Value) {
select {
case sendGossipChan <- struct{}{}:
default:
}
}
// Defer calling "undoer" callback returned from registration.
defer g.RegisterCallback(".*", updateCallback)()
errCh := make(chan error, 1)
// This wait group is used to allow the caller to wait until gossip
// processing is terminated.
wg.Add(1)
stopper.RunWorker(ctx, func(ctx context.Context) {
defer wg.Done()
errCh <- func() error {
for {
reply, err := stream.Recv()
if err != nil {
return err
}
if err := c.handleResponse(ctx, g, reply); err != nil {
return err
}
}
}()
})
for {
select {
case <-c.closer:
return nil
case <-stopper.ShouldStop():
return nil
case err := <-errCh:
return err
case <-sendGossipChan:
if err := c.sendGossip(g, stream); err != nil {
return err
}
}
}
}