Skip to content

Commit

Permalink
server,kvserver: wait for liveness record refresh on other nodes at t…
Browse files Browse the repository at this point in the history
…he end of drain

Prior to this patch, it was possible for a node to shut down
gracefully "too quickly", before the other nodes got a chance to see
that the node has gone away.

In particular it was possible:

- while the node was pushing leases away, it was possible for the
  other nodes with replicas on shared ranges to push them back (store
  rebalance / allocator). This is because the other nodes did not yet
  have a copy of the updated node descriptor marked "draining".

- after the node had moved its leases away and stopped, it was
  possible for range caches on other nodes to continue to try to use
  replicas on the drained node.

To alleviate both issues, this commit makes the drain process wait
until the expiry deadline on the draining nodes' liveness, plus 5
seconds, before starting to transfer leases away. This way, there is
confidence during the lease transfer that the other nodes know the
draining node is, in fact, draining, and will not be considered as a
transfer target.

Additionally, an additional wait of 5 seconds is added at the
very end after all leases have transferred, so that if another
node still finds itself wanting to address a replica
on the now-drained node, it gets a chance to get a
NodeLeaseHolderError and a redirect to the new leaseholder.

Release note: None
  • Loading branch information
knz committed Jan 4, 2021
1 parent db75b89 commit 8ad85fd
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 44 deletions.
1 change: 1 addition & 0 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,7 @@ If problems persist, please see %s.`
time.Sleep(200 * time.Millisecond)
}

log.Infof(drainCtx, "stopping all tasks")
stopper.Stop(drainCtx)
}()

Expand Down
32 changes: 32 additions & 0 deletions pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,23 @@ func (nl *NodeLiveness) SetDraining(
return errors.New("failed to drain self")
}

// TimeUntilLivenessExpiry returns a duration equal to the difference
// between the current time and the current expiration deadline on
// this node's liveness record.
func (nl *NodeLiveness) TimeUntilLivenessExpiry(ctx context.Context) (time.Duration, error) {
myID := nl.gossip.NodeID.Get()
liveness, ok := nl.GetLiveness(myID)
if !ok {
// Our liveness record does not exist yet? This is surprising,
// but it does mean we have nothing to do here.
return 0, nil
}

// Wait until the record has expired.
expiryTime := timeutil.Unix(0, liveness.Expiration.WallTime)
return expiryTime.Sub(nl.clock.PhysicalTime()), nil
}

// SetMembershipStatus changes the liveness record to reflect the target
// membership status. It does so idempotently, and may retry internally until it
// observes its target state durably persisted. It returns whether it was able
Expand Down Expand Up @@ -1473,3 +1490,18 @@ func (nl *NodeLiveness) TestingSetDecommissioningInternal(
) (changeCommitted bool, err error) {
return nl.setMembershipStatusInternal(ctx, oldLivenessRec, targetStatus)
}

// GetAvailableNodeCount returns a count of the number of live,
// non-decommission{ed,ing} nodes as known to liveness.
func (nl *NodeLiveness) GetAvailableNodeCount() int {
now := nl.clock.Now().GoTime()
nl.mu.RLock()
defer nl.mu.RUnlock()
var count int
for _, l := range nl.mu.nodes {
if l.Membership.Active() && l.IsLive(now) {
count++
}
}
return count
}
217 changes: 173 additions & 44 deletions pkg/server/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package server

import (
"context"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/server/serverpb"
Expand Down Expand Up @@ -122,59 +121,149 @@ func (s *adminServer) Drain(req *serverpb.DrainRequest, stream serverpb.Admin_Dr
// directly. Use the Drain() RPC instead with a suitably crafted
// DrainRequest.
//
// On failure, the system may be in a partially drained
// state; the client should either continue calling Drain() or shut
// down the server.
//
// The reporter function, if non-nil, is called for each
// packet of load shed away from the server during the drain.
// On failure, the system may be in a partially drained state; the
// client should either continue calling Drain() until the remaining
// count comes down to zero, or shut down the server.
//
// TODO(knz): This method is currently exported for use by the
// shutdown code in cli/start.go; however, this is a mis-design. The
// start code should use the Drain() RPC like quit does.
func (s *Server) Drain(
ctx context.Context,
) (remaining uint64, info redact.RedactableString, err error) {
reports := make(map[redact.SafeString]int)
var mu syncutil.Mutex
reporter := func(howMany int, what redact.SafeString) {
if howMany > 0 {
mu.Lock()
reports[what] += howMany
mu.Unlock()
}
}
progress := newDrainProgress()

// Regardless of the return path, report on the progress in logs before
// the function exits.
defer func() {
// Detail the counts based on the collected reports.
var descBuf strings.Builder
comma := redact.SafeString("")
for what, howMany := range reports {
remaining += uint64(howMany)
redact.Fprintf(&descBuf, "%s%s: %d", comma, what, howMany)
comma = ", "
}
info = redact.RedactableString(descBuf.String())
log.Ops.Infof(ctx, "drain remaining: %d", remaining)
remaining, info = progress.getProgress()
log.Ops.Infof(ctx, "drain progress: %d", remaining)
if info != "" {
log.Ops.Infof(ctx, "drain details: %s", info)
}
}()

if err := s.doDrain(ctx, reporter); err != nil {
return 0, "", err
// First drain all clients and SQL leases.
log.Infof(ctx, "draining clients")
if err := s.drainClients(ctx, progress.report); err != nil {
return remaining, info, err
}

return
}
// Mark the node liveness record as draining. This starts telling
// range caches on other nodes that this node is going away.
log.Infof(ctx, "draining liveness")
if err := s.nodeLiveness.SetDraining(ctx, true /* drain */, progress.report); err != nil {
return remaining, info, err
}

func (s *Server) doDrain(ctx context.Context, reporter func(int, redact.SafeString)) error {
// First drain all clients and SQL leases.
if err := s.drainClients(ctx, reporter); err != nil {
return err
// GetAvailableNodeCount includes nodes which are member of the
// cluster, not decommissioning, and for which their liveness
// record has not expired. This includes the current (draining) node,
// because marking the liveness record as draining does not prevent
// it from heartbeating.
availableNodes := s.nodeLiveness.GetAvailableNodeCount()

if progress.hasProgress() {
// Wait until some confidence exists that the other nodes have
// acknowledged the draining state: this waits until the expiry
// of of the liveness record, which is at least as late as when
// other nodes are forced to refresh their range leases.
//
// The problem that this wait solves is the following. When other
// nodes already have replicas for the same ranges as this node,
// these other nodes may attempt to transfer leases to this node
// for routine rebalancing as long as they see this node as live
// (via its liveness). This wait ensures they are forced to reload
// the liveness record and see the node is draining, and stop using
// it as target for rebalancing.
//
// We wait here only the first time Drain() is called, when the
// liveness record has been toggled from non-draining to
// draining.
//
// The reason why hasProgress() is synonymous with "Drain() is
// being called for the first time" here is because only during
// the first iteration is there work performed in drainClients()
// and nodeLiveness.SetDraining() above. At the second and later
// iterations, these first two steps do no work.

toWait, err := s.nodeLiveness.TimeUntilLivenessExpiry(ctx)
if err != nil {
return remaining, info, err
}

if toWait > 0 {
log.Infof(ctx, "waiting %s for the liveness record to expire", toWait)
time.Sleep(toWait)
} else {
log.VInfof(ctx, 1, "no liveness record on this node, no expiry to wait on")
}

if availableNodes > 2 {
// If we believe there are other nodes, we also wait 5 seconds
// past the expiration to give ample time for these nodes to
// re-load their copy of this node's liveness record, prior to
// us transferring leases below.
//
// This wait is not necessary for correctness; it is merely an
// optimization: it reduces the probability that another node
// hasn't seen the expiration yet and tries to transfer a
// lease back to this draining node during the lease drain
// below.
//
// We also only use the optimization if we have some
// confidence that there are other ready nodes in the cluster;
// for a single-node cluster, this wait is clearly a waste of
// time and would be a source of annoyance to the user.
const extraLivenessDrainStatePropagationDelay = 5 * time.Second

log.Infof(ctx, "waiting %s to let draining state propagate throughout cluster", extraLivenessDrainStatePropagationDelay)
time.Sleep(extraLivenessDrainStatePropagationDelay)
}
}

// Transfer the range leases away.
// This may take a while; that's OK.
//
// Note that we are careful to only start shedding range leases away
// after we have some confidence that the other nodes have picked up
// the draining bit (by waiting for the expiration delay,
// above). This is because otherwise, this node would be a candidate
// for transferring the leases back immediately, concurrently
// negating the progress made by this SetDraining() call.
log.Infof(ctx, "transferring leases")
if err := s.node.SetDraining(true /* drain */, progress.report); err != nil {
return remaining, info, err
}
// Finally, mark the node as draining in liveness and drain the
// range leases.
return s.drainNode(ctx, reporter)

if !progress.hasProgress() && availableNodes > 2 {
// If there is no more work to do, the process will then proceed to
// shut down.
//
// Just before doing so however, if we believe there are other
// nodes, then wait a little bit more.
//
// The problem that this second wait helps solve is when other
// nodes do not have replicas for the ranges whose leases were
// just transferred away, but may have oudated information about
// them in their range desc/lease caches. These other nodes may be
// serving application traffic, and we want to give them a chance
// to encounter a NotLeaseHolderError and refresh their cache
// after the last lease has been transferred.
//
// Like above, this is an optimization: if this was not
// occurring, the other nodes would simply time out on a request
// and start inquiring other replicas to discover the new
// leaseholder. We also avoid the optimization if the ready node
// count is just 1, to prevent UX annoyances.
const extraIdleStateForReceivingStrayMisroutedLeaseholderRequestsDelay = 5 * time.Second
log.Infof(ctx,
"waiting %s so that final requests to this node from rest of cluster can be redirected",
extraIdleStateForReceivingStrayMisroutedLeaseholderRequestsDelay)
time.Sleep(extraIdleStateForReceivingStrayMisroutedLeaseholderRequestsDelay)
}

return remaining, info, nil
}

// isDraining returns true if either clients are being drained
Expand Down Expand Up @@ -204,15 +293,55 @@ func (s *Server) drainClients(ctx context.Context, reporter func(int, redact.Saf
// given sessions a chance to finish ongoing work.
s.sqlServer.leaseMgr.SetDraining(true /* drain */, reporter)

// Done. This executes the defers set above to drain SQL leases.
return nil
}

// drainNode initiates the draining mode for the node, which
// starts draining range leases.
func (s *Server) drainNode(ctx context.Context, reporter func(int, redact.SafeString)) error {
if err := s.nodeLiveness.SetDraining(ctx, true /* drain */, reporter); err != nil {
return err
// drainProgress stores the result of calls to the reporter function
// passed to the various draining function.
//
// This is made safe for concurrent use.
type drainProgress struct {
// We use a mutex to ensure that the reporter function can be called
// concurrently and safely from multiple goroutines.
syncutil.Mutex
reports map[redact.SafeString]int
}

func newDrainProgress() *drainProgress {
return &drainProgress{
reports: make(map[redact.SafeString]int),
}
}

// report some drain work to the drainProgress tracker.
// This is safe for use by multiple goroutines concurrently.
func (p *drainProgress) report(howMany int, what redact.SafeString) {
if howMany > 0 {
p.Lock()
defer p.Unlock()
p.reports[what] += howMany
}
}

// hasProgress returns true iff some progress was reported via
// the report() method already.
func (p *drainProgress) hasProgress() bool {
p.Lock()
defer p.Unlock()
return len(p.reports) > 0
}

// getProgress retrieves a description and a count of the work
// performed so far.
// The caller guarantees that no concurrent calls to the report()
// method are occurring.
func (p *drainProgress) getProgress() (remaining uint64, details redact.RedactableString) {
var descBuf redact.StringBuilder
comma := redact.SafeString("")
for what, howMany := range p.reports {
remaining += uint64(howMany)
descBuf.Printf("%s%s: %d", comma, what, howMany)
comma = ", "
}
return s.node.SetDraining(true /* drain */, reporter)
return remaining, descBuf.RedactableString()
}

0 comments on commit 8ad85fd

Please sign in to comment.