-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathdrain.go
340 lines (304 loc) · 11.5 KB
/
drain.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package server
import (
"context"
"os"
"reflect"
"time"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
// DeprecatedDrainParameter the special value that must be
// passed in DrainRequest.DeprecatedProbeIndicator to signal the
// drain request is not a probe.
// This variable is also used in the v20.1 "quit" client
// to provide a valid input to the request sent to
// v19.1 nodes.
//
// TODO(knz): Remove this in v20.2 and whenever the "quit" command
// is not meant to work with 19.x servers any more, whichever comes
// later.
DeprecatedDrainParameter = []int32{0, 1}
queryWait = settings.RegisterPublicDurationSetting(
"server.shutdown.query_wait",
"the server will wait for at least this amount of time for active queries to finish",
10*time.Second,
)
drainWait = settings.RegisterPublicDurationSetting(
"server.shutdown.drain_wait",
"the amount of time a server waits in an unready state before proceeding with the rest "+
"of the shutdown process",
0*time.Second,
)
)
// Drain puts the node into the specified drain mode(s) and optionally
// instructs the process to terminate.
// This method is part of the serverpb.AdminClient interface.
func (s *adminServer) Drain(req *serverpb.DrainRequest, stream serverpb.Admin_DrainServer) error {
ctx := stream.Context()
ctx = s.server.AnnotateCtx(ctx)
doDrain := req.DoDrain
if len(req.DeprecatedProbeIndicator) > 0 {
// Pre-20.1 behavior.
// TODO(knz): Remove this condition in 20.2.
doDrain = true
if !reflect.DeepEqual(req.DeprecatedProbeIndicator, DeprecatedDrainParameter) {
return status.Errorf(codes.InvalidArgument, "Invalid drain request parameter.")
}
}
log.Infof(ctx, "drain request received with doDrain = %v, shutdown = %v", doDrain, req.Shutdown)
res := serverpb.DrainResponse{}
if doDrain {
remaining, info, err := s.server.Drain(ctx)
if err != nil {
log.Errorf(ctx, "drain failed: %v", err)
return err
}
res.DrainRemainingIndicator = remaining
res.DrainRemainingDescription = info.StripMarkers()
}
if s.server.isDraining() {
res.DeprecatedDrainStatus = DeprecatedDrainParameter
res.IsDraining = true
}
if err := stream.Send(&res); err != nil {
return err
}
if !req.Shutdown {
if doDrain {
// The condition "if doDrain" is because we don't need an info
// message for just a probe.
log.Infof(ctx, "drain request completed without server shutdown")
}
return nil
}
go func() {
// TODO(tbg): why don't we stop the stopper first? Stopping the stopper
// first seems more reasonable since grpc.Stop closes the listener right
// away (and who knows whether gRPC-goroutines are tied up in some
// stopper task somewhere).
s.server.grpc.Stop()
s.server.stopper.Stop(ctx)
}()
select {
case <-s.server.stopper.IsStopped():
return nil
case <-ctx.Done():
return ctx.Err()
case <-time.After(10 * time.Second):
// This is a hack to work around the problem in
// https://github.com/cockroachdb/cockroach/issues/37425#issuecomment-494336131
//
// There appear to be deadlock scenarios in which we don't manage to
// fully stop the grpc server (which implies closing the listener, i.e.
// seeming dead to the outside world) or don't manage to shut down the
// stopper (the evidence in #37425 is inconclusive which one it is).
//
// Other problems in this area are known, such as
// https://github.com/cockroachdb/cockroach/pull/31692
//
// The signal-based shutdown path uses a similar time-based escape hatch.
// Until we spend (potentially lots of time to) understand and fix this
// issue, this will serve us well.
os.Exit(1)
return errors.New("unreachable")
}
}
const extraDrainWait = 5 * time.Second
// Drain idempotently activates the draining mode.
// Note: new code should not be taught to use this method
// directly. Use the Drain() RPC instead with a suitably crafted
// DrainRequest.
//
// On failure, the system may be in a partially drained state; the
// client should either continue calling Drain() until the remaining
// count comes down to zero, or shut down the server.
//
// TODO(knz): This method is currently exported for use by the
// shutdown code in cli/start.go; however, this is a mis-design. The
// start code should use the Drain() RPC like quit does.
func (s *Server) Drain(
ctx context.Context,
) (remaining uint64, info redact.RedactableString, err error) {
progress := newDrainProgress()
defer func() {
remaining, info = progress.getProgress()
log.Infof(ctx, "drain progress: %d", remaining)
if info != "" {
log.Infof(ctx, "drain details: %s", info)
}
}()
// We use a closure to ensure the defer above is run in all return
// paths, including errors.
err = func() error {
// Since Drain() is usually called in a loop, we need to
// (re-)wake up the stores at the beginning of every
// iteration. This ensures that the liveness record can be
// accessed every time. (Otherwise, after the first iteration the
// stores are draining and don't accept to refresh their leases on
// the liveness range, and the lookup of the liveness record
// below can deadlock.)
log.Infof(ctx, "waking up stores")
if err := s.node.SetDraining(false /* drain */, progress.report); err != nil {
return err
}
// First drain all clients and SQL leases.
log.Infof(ctx, "draining clients")
if err := s.drainClients(ctx, progress.report); err != nil {
return err
}
// Mark the node liveness record as draining. This starts telling
// range caches on other nodes that this node is going away.
log.Infof(ctx, "draining liveness")
if err := s.nodeLiveness.SetDraining(ctx, true /* drain */, progress.report); err != nil {
return err
}
if progress.hasProgress() {
// Wait until some confidence exists that the other nodes have
// acknowledged the draining state: this waits until the expiry
// of of the liveness record, which is at least as late as when
// other nodes are forced to refresh their range leases.
//
// We wait here only the first time Drain() is called, when the
// liveness record has been toggled from non-draining to
// draining.
//
// The reason why hasProgress() is synonymous with "Drain() is
// being called for the first time" here is because only during
// the first iteration is there work performed in drainClients()
// and nodeLiveness.SetDraining() above. At the second and later
// iterations, these first two steps do no work.
toWait, err := s.nodeLiveness.TimeUntilLivenessExpiry(ctx)
if err != nil {
return err
}
if toWait > 0 {
log.Infof(ctx, "waiting %s for the liveness record to expire", toWait)
time.Sleep(toWait)
}
if s.nodeLiveness.GetAvailableNodeCount() > 1 {
// If we believe there are other nodes, we also wait 5 seconds
// past the expiration to give ample time for these nodes to
// re-load their copy of this node's descriptor, prior to us
// transferring leases below.
//
// This wait is not necessary for correctness; it is merely an
// optimization: it reduces the probability that another node
// hasn't seen the expiration yet and tries to transfer a
// lease back to this draining node during the lease drain
// below.
//
// We also only use the optimization if we have some
// confidence that there are other ready nodes in the cluster;
// for a single-node cluster, this wait is clearly a waste of
// time and would be a source of annoyance to the user.
log.Infof(ctx, "waiting %s to let draining state propagate throughout cluster", extraDrainWait)
time.Sleep(extraDrainWait)
}
}
// Transfer the range leases away.
// This may take a while; that's OK.
log.Infof(ctx, "transferring leases")
if err := s.node.SetDraining(true /* drain */, progress.report); err != nil {
return err
}
if !progress.hasProgress() && s.nodeLiveness.GetAvailableNodeCount() > 0 {
// If there is no more work to do, the process will then proceed to
// shut down.
//
// Just before doing so however, if we believe there are other
// nodes, then wait a little bit more so that any stray range
// leases on other nodes gets a chance to see a NotLeaseHolderError.
//
// Like above, this is an optimization: if this was not
// occurring, the other nodes would simply time out on a request
// and start inquiring other replicas to discover the new
// leaseholder. We also avoid the optimization if the ready node
// count is just 1, to prevent UX annoyances.
log.Infof(ctx,
"waiting %s so that final requests to this node from rest of cluster can be redirected",
extraDrainWait)
time.Sleep(extraDrainWait)
}
return nil
}()
return
}
// isDraining returns true if either clients are being drained
// or one of the stores on the node is not accepting replicas.
func (s *Server) isDraining() bool {
return s.sqlServer.pgServer.IsDraining() || s.node.IsDraining()
}
// drainClients starts draining the SQL layer.
func (s *Server) drainClients(ctx context.Context, reporter func(int, redact.SafeString)) error {
// Mark the server as draining in a way that probes to
// /health?ready=1 will notice.
s.grpc.setMode(modeDraining)
// Wait for drainUnreadyWait. This will fail load balancer checks and
// delay draining so that client traffic can move off this node.
time.Sleep(drainWait.Get(&s.st.SV))
// Disable incoming SQL clients up to the queryWait timeout.
drainMaxWait := queryWait.Get(&s.st.SV)
if err := s.sqlServer.pgServer.Drain(drainMaxWait, reporter); err != nil {
return err
}
// Stop ongoing SQL execution up to the queryWait timeout.
s.sqlServer.distSQLServer.Drain(ctx, drainMaxWait, reporter)
// Drain the SQL leases. This must be done after the pgServer has
// given sessions a chance to finish ongoing work.
s.sqlServer.leaseMgr.SetDraining(true /* drain */, reporter)
return nil
}
type drainProgress struct {
syncutil.Mutex
reports map[redact.SafeString]int
}
func newDrainProgress() *drainProgress {
return &drainProgress{
reports: make(map[redact.SafeString]int),
}
}
// report registers some drain work to the drainProgress tracker.
func (p *drainProgress) report(howMany int, what redact.SafeString) {
if howMany > 0 {
p.Lock()
p.reports[what] += howMany
p.Unlock()
}
}
// hasProgress returns true iff some progress was reported via
// the report() method already.
func (p *drainProgress) hasProgress() bool {
p.Lock()
defer p.Unlock()
return len(p.reports) > 0
}
// getProgress retrieves a description and a count of the work
// performed so far.
// The caller guarantees that no concurrent calls to the report()
// method are occurring.
func (p *drainProgress) getProgress() (remaining uint64, details redact.RedactableString) {
var descBuf redact.StringBuilder
comma := redact.SafeString("")
for what, howMany := range p.reports {
remaining += uint64(howMany)
descBuf.Printf("%s%s: %d", comma, what, howMany)
comma = ", "
}
return remaining, descBuf.RedactableString()
}