Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server,kvserver: wait for liveness record refresh on other nodes at the end of drain #55460

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ If problems persist, please see %s.`
time.Sleep(200 * time.Millisecond)
}

log.Infof(drainCtx, "stopping all tasks")
stopper.Stop(drainCtx)
}()

Expand Down
32 changes: 32 additions & 0 deletions pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,23 @@ func (nl *NodeLiveness) SetDraining(
return errors.New("failed to drain self")
}

// TimeUntilLivenessExpiry returns a duration equal to the difference
// between the current time and the current expiration deadline on
// this node's liveness record.
func (nl *NodeLiveness) TimeUntilLivenessExpiry(ctx context.Context) (time.Duration, error) {
myID := nl.gossip.NodeID.Get()
liveness, ok := nl.GetLiveness(myID)
if !ok {
// Our liveness record does not exist yet? This is surprising,
// but it does mean we have nothing to do here.
return 0, nil
}

// Wait until the record has expired.
expiryTime := timeutil.Unix(0, liveness.Expiration.WallTime)
return expiryTime.Sub(nl.clock.PhysicalTime()), nil
}

// SetMembershipStatus changes the liveness record to reflect the target
// membership status. It does so idempotently, and may retry internally until it
// observes its target state durably persisted. It returns whether it was able
Expand Down Expand Up @@ -1473,3 +1490,18 @@ func (nl *NodeLiveness) TestingSetDecommissioningInternal(
) (changeCommitted bool, err error) {
return nl.setMembershipStatusInternal(ctx, oldLivenessRec, targetStatus)
}

// GetLiveNodeCount returns a count of the number of live,
// possibly decommission{ing,ed} nodes as known to liveness.
func (nl *NodeLiveness) GetLiveNodeCount() int {
now := nl.clock.Now().GoTime()
nl.mu.RLock()
defer nl.mu.RUnlock()
var count int
for _, l := range nl.mu.nodes {
if l.IsLive(now) {
count++
}
}
return count
}
218 changes: 174 additions & 44 deletions pkg/server/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package server

import (
"context"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/server/serverpb"
Expand Down Expand Up @@ -122,59 +121,150 @@ func (s *adminServer) Drain(req *serverpb.DrainRequest, stream serverpb.Admin_Dr
// directly. Use the Drain() RPC instead with a suitably crafted
// DrainRequest.
//
// On failure, the system may be in a partially drained
// state; the client should either continue calling Drain() or shut
// down the server.
//
// The reporter function, if non-nil, is called for each
// packet of load shed away from the server during the drain.
// On failure, the system may be in a partially drained state; the
// client should either continue calling Drain() until the remaining
// count comes down to zero, or shut down the server.
//
// TODO(knz): This method is currently exported for use by the
// shutdown code in cli/start.go; however, this is a mis-design. The
// start code should use the Drain() RPC like quit does.
func (s *Server) Drain(
ctx context.Context,
) (remaining uint64, info redact.RedactableString, err error) {
reports := make(map[redact.SafeString]int)
var mu syncutil.Mutex
reporter := func(howMany int, what redact.SafeString) {
if howMany > 0 {
mu.Lock()
reports[what] += howMany
mu.Unlock()
}
}
progress := newDrainProgress()

// Regardless of the return path, report on the progress in logs before
// the function exits.
defer func() {
// Detail the counts based on the collected reports.
var descBuf strings.Builder
comma := redact.SafeString("")
for what, howMany := range reports {
remaining += uint64(howMany)
redact.Fprintf(&descBuf, "%s%s: %d", comma, what, howMany)
comma = ", "
}
info = redact.RedactableString(descBuf.String())
log.Ops.Infof(ctx, "drain remaining: %d", remaining)
remaining, info = progress.getProgress()
log.Ops.Infof(ctx, "drain progress: %d", remaining)
if info != "" {
log.Ops.Infof(ctx, "drain details: %s", info)
}
}()

if err := s.doDrain(ctx, reporter); err != nil {
return 0, "", err
// First drain all clients and SQL leases.
log.Infof(ctx, "draining clients")
if err := s.drainClients(ctx, progress.report); err != nil {
return remaining, info, err
}

return
}
// Mark the node liveness record as draining. This starts telling
// range caches on other nodes that this node is going away.
log.Infof(ctx, "draining liveness")
if err := s.nodeLiveness.SetDraining(ctx, true /* drain */, progress.report); err != nil {
return remaining, info, err
}

func (s *Server) doDrain(ctx context.Context, reporter func(int, redact.SafeString)) error {
// First drain all clients and SQL leases.
if err := s.drainClients(ctx, reporter); err != nil {
return err
// GetLiveNodeCount includes nodes which are member of the
// cluster and for which their liveness
// record has not expired. This includes the current (draining) node,
// because marking the liveness record as draining does not prevent
// it from heartbeating.
liveNodes := s.nodeLiveness.GetLiveNodeCount()

if progress.hasProgress() {
// Wait until some confidence exists that the other nodes have
// acknowledged the draining state: this waits until the expiry
// of of the liveness record, which is at least as late as when
// other nodes are forced to refresh their range leases.
//
// The problem that this wait solves is the following. When other
// nodes already have replicas for the same ranges as this node,
// these other nodes may attempt to transfer leases to this node
// for routine rebalancing as long as they see this node as live
// (via its liveness). This wait ensures they are forced to reload
// the liveness record and see the node is draining, and stop using
// it as target for rebalancing.
//
// We wait here only the first time Drain() is called, when the
// liveness record has been toggled from non-draining to
// draining.
//
// The reason why hasProgress() is synonymous with "Drain() is
// being called for the first time" here is because only during
// the first iteration is there work performed in drainClients()
// and nodeLiveness.SetDraining() above. At the second and later
// iterations, these first two steps do no work.

toWait, err := s.nodeLiveness.TimeUntilLivenessExpiry(ctx)
if err != nil {
return remaining, info, err
}

if toWait > 0 {
log.Infof(ctx, "waiting %s for the liveness record to expire", toWait)
time.Sleep(toWait)
} else {
log.VInfof(ctx, 1, "no liveness record on this node, no expiry to wait on")
}

if liveNodes > 1 {
// If we believe there are other nodes, we also wait 5 seconds
// past the expiration to give ample time for these nodes to
// re-load their copy of this node's liveness record, prior to
// us transferring leases below.
//
// This wait is not necessary for correctness; it is merely an
// optimization: it reduces the probability that another node
// hasn't seen the expiration yet and tries to transfer a
// lease back to this draining node during the lease drain
// below.
//
// We also only use the optimization if we have some
// confidence that there are other ready nodes in the cluster;
// for a single-node cluster, this wait is clearly a waste of
// time and would be a source of annoyance to the user.
const extraLivenessDrainStatePropagationDelay = 2 * time.Second

log.Infof(ctx, "waiting %s to let draining state propagate throughout cluster", extraLivenessDrainStatePropagationDelay)
time.Sleep(extraLivenessDrainStatePropagationDelay)
}
}

// Transfer the range leases away.
// This may take a while; that's OK.
//
// Note that we are careful to only start shedding range leases away
// after we have some confidence that the other nodes have picked up
// the draining bit (by waiting for the expiration delay,
// above). This is because otherwise, this node would be a candidate
// for transferring the leases back immediately, concurrently
// negating the progress made by this SetDraining() call.
log.Infof(ctx, "transferring leases")
if err := s.node.SetDraining(true /* drain */, progress.report); err != nil {
return remaining, info, err
}
// Finally, mark the node as draining in liveness and drain the
// range leases.
return s.drainNode(ctx, reporter)

if !progress.hasProgress() && liveNodes > 1 {
s.TestingLeaseTransferDoneAfterDrain.Set(true)
// If there is no more work to do, the process will then proceed to
// shut down.
//
// Just before doing so however, if we believe there are other
// nodes, then wait a little bit more.
//
// The problem that this second wait helps solve is when other
// nodes do not have replicas for the ranges whose leases were
// just transferred away, but may have oudated information about
// them in their range desc/lease caches. These other nodes may be
// serving application traffic, and we want to give them a chance
// to encounter a NotLeaseHolderError and refresh their cache
// after the last lease has been transferred.
//
// Like above, this is an optimization: if this was not
// occurring, the other nodes would simply time out on a request
// and start inquiring other replicas to discover the new
// leaseholder. We also avoid the optimization if the ready node
// count is just 1, to prevent UX annoyances.
const extraIdleStateForReceivingStrayMisroutedLeaseholderRequestsDelay = 5 * time.Second
log.Infof(ctx,
"waiting %s so that final requests to this node from rest of cluster can be redirected",
extraIdleStateForReceivingStrayMisroutedLeaseholderRequestsDelay)
time.Sleep(extraIdleStateForReceivingStrayMisroutedLeaseholderRequestsDelay)
}

return remaining, info, nil
}

// isDraining returns true if either clients are being drained
Expand Down Expand Up @@ -204,15 +294,55 @@ func (s *Server) drainClients(ctx context.Context, reporter func(int, redact.Saf
// given sessions a chance to finish ongoing work.
s.sqlServer.leaseMgr.SetDraining(true /* drain */, reporter)

// Done. This executes the defers set above to drain SQL leases.
return nil
}

// drainNode initiates the draining mode for the node, which
// starts draining range leases.
func (s *Server) drainNode(ctx context.Context, reporter func(int, redact.SafeString)) error {
if err := s.nodeLiveness.SetDraining(ctx, true /* drain */, reporter); err != nil {
return err
// drainProgress stores the result of calls to the reporter function
// passed to the various draining function.
//
// This is made safe for concurrent use.
type drainProgress struct {
// We use a mutex to ensure that the reporter function can be called
// concurrently and safely from multiple goroutines.
syncutil.Mutex
reports map[redact.SafeString]int
}

func newDrainProgress() *drainProgress {
return &drainProgress{
reports: make(map[redact.SafeString]int),
}
}

// report some drain work to the drainProgress tracker.
// This is safe for use by multiple goroutines concurrently.
func (p *drainProgress) report(howMany int, what redact.SafeString) {
if howMany > 0 {
p.Lock()
defer p.Unlock()
p.reports[what] += howMany
}
}

// hasProgress returns true iff some progress was reported via
// the report() method already.
func (p *drainProgress) hasProgress() bool {
p.Lock()
defer p.Unlock()
return len(p.reports) > 0
}

// getProgress retrieves a description and a count of the work
// performed so far.
// The caller guarantees that no concurrent calls to the report()
// method are occurring.
func (p *drainProgress) getProgress() (remaining uint64, details redact.RedactableString) {
var descBuf redact.StringBuilder
comma := redact.SafeString("")
for what, howMany := range p.reports {
remaining += uint64(howMany)
descBuf.Printf("%s%s: %d", comma, what, howMany)
comma = ", "
}
return s.node.SetDraining(true /* drain */, reporter)
return remaining, descBuf.RedactableString()
}
Loading