Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: avoid redundant liveness heartbeats under a thundering herd #51888

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func TestGossipNodeLivenessOnLeaseChange(t *testing.T) {
// Turn off liveness heartbeats on all nodes to ensure that updates to node
// liveness are not triggering gossiping.
for i := range mtc.nodeLivenesses {
mtc.nodeLivenesses[i].PauseHeartbeat(true)
mtc.nodeLivenesses[i].PauseHeartbeatLoopForTest()
}

nodeLivenessKey := gossip.MakeNodeLivenessKey(1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1450,7 +1450,7 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) {

// Turn off liveness heartbeats on the second store, then advance the clock
// past the liveness expiration time. This expires all leases on all stores.
mtc.nodeLivenesses[1].PauseHeartbeat(true)
mtc.nodeLivenesses[1].PauseHeartbeatLoopForTest()
mtc.advanceClock(ctx)

// Manually heartbeat the liveness on the first store to ensure it's
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1339,7 +1339,7 @@ func TestRefreshPendingCommands(t *testing.T) {
// Disable node liveness heartbeats which can reacquire leases when we're
// trying to expire them. We pause liveness heartbeats here after node 0
// was restarted (which creates a new NodeLiveness).
pauseNodeLivenessHeartbeats(mtc, true)
pauseNodeLivenessHeartbeatLoops(mtc)

// Start draining stores 0 and 1 to prevent them from grabbing any new
// leases.
Expand Down Expand Up @@ -3900,7 +3900,7 @@ func TestRangeQuiescence(t *testing.T) {
defer mtc.Stop()
mtc.Start(t, 3)

pauseNodeLivenessHeartbeats(mtc, true)
pauseNodeLivenessHeartbeatLoops(mtc)

// Replica range 1 to all 3 nodes.
const rangeID = roachpb.RangeID(1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ func forceLeaseTransferOnSubsumedRange(
}
return nil
})
restartHeartbeats := oldLeaseholderStore.NodeLiveness().DisableAllHeartbeatsForTest()
restartHeartbeats := oldLeaseholderStore.NodeLiveness().PauseAllHeartbeatsForTest()
defer restartHeartbeats()
log.Infof(ctx, "paused RHS rightLeaseholder's liveness heartbeats")
time.Sleep(oldLeaseholderStore.NodeLiveness().GetLivenessThreshold())
Expand Down
119 changes: 91 additions & 28 deletions pkg/kv/kvserver/node_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ var (
Measurement: "Nodes",
Unit: metric.Unit_COUNT,
}
metaHeartbeatsInFlight = metric.Metadata{
Name: "liveness.heartbeatsinflight",
Help: "Number of in-flight liveness heartbeats from this node",
Measurement: "Requests",
Unit: metric.Unit_COUNT,
}
metaHeartbeatSuccesses = metric.Metadata{
Name: "liveness.heartbeatsuccesses",
Help: "Number of successful node liveness heartbeats from this node",
Expand Down Expand Up @@ -109,6 +115,7 @@ var (
// LivenessMetrics holds metrics for use with node liveness activity.
type LivenessMetrics struct {
LiveNodes *metric.Gauge
HeartbeatsInFlight *metric.Gauge
HeartbeatSuccesses *metric.Counter
HeartbeatFailures *metric.Counter
EpochIncrements *metric.Counter
Expand Down Expand Up @@ -215,6 +222,7 @@ func NewNodeLiveness(
}
nl.metrics = LivenessMetrics{
LiveNodes: metric.NewFunctionalGauge(metaLiveNodes, nl.numLiveNodes),
HeartbeatsInFlight: metric.NewGauge(metaHeartbeatsInFlight),
HeartbeatSuccesses: metric.NewCounter(metaHeartbeatSuccesses),
HeartbeatFailures: metric.NewCounter(metaHeartbeatFailures),
EpochIncrements: metric.NewCounter(metaEpochIncrements),
Expand Down Expand Up @@ -612,27 +620,27 @@ of network connectivity problems. For help troubleshooting, visit:

`

// PauseHeartbeat stops or restarts the periodic heartbeat depending on the
// pause parameter. When pause is true, waits until it acquires the heartbeatToken
// (unless heartbeat was already paused); this ensures that no heartbeats happen
// after this is called. This function is only safe for use in tests.
func (nl *NodeLiveness) PauseHeartbeat(pause bool) {
if pause {
if swapped := atomic.CompareAndSwapUint32(&nl.heartbeatPaused, 0, 1); swapped {
<-nl.heartbeatToken
}
} else {
// PauseHeartbeatLoopForTest stops the periodic heartbeat. The function
// waits until it acquires the heartbeatToken (unless heartbeat was
// already paused); this ensures that no heartbeats happen after this is
// called. Returns a closure to call to re-enable the heartbeat loop.
// This function is only safe for use in tests.
func (nl *NodeLiveness) PauseHeartbeatLoopForTest() func() {
if swapped := atomic.CompareAndSwapUint32(&nl.heartbeatPaused, 0, 1); swapped {
<-nl.heartbeatToken
}
return func() {
if swapped := atomic.CompareAndSwapUint32(&nl.heartbeatPaused, 1, 0); swapped {
nl.heartbeatToken <- struct{}{}
}
}
}

// DisableAllHeartbeatsForTest disables all node liveness heartbeats, including
// those triggered from outside the normal StartHeartbeat loop. Returns a
// closure to call to re-enable heartbeats. Only safe for use in tests.
func (nl *NodeLiveness) DisableAllHeartbeatsForTest() func() {
nl.PauseHeartbeat(true)
// PauseSynchronousHeartbeatsForTest disables all node liveness
// heartbeats triggered from outside the normal StartHeartbeat loop.
// Returns a closure to call to re-enable synchronous heartbeats. Only
// safe for use in tests.
func (nl *NodeLiveness) PauseSynchronousHeartbeatsForTest() func() {
nl.selfSem <- struct{}{}
nl.otherSem <- struct{}{}
return func() {
Expand All @@ -641,6 +649,19 @@ func (nl *NodeLiveness) DisableAllHeartbeatsForTest() func() {
}
}

// PauseAllHeartbeatsForTest disables all node liveness heartbeats,
// including those triggered from outside the normal StartHeartbeat
// loop. Returns a closure to call to re-enable heartbeats. Only safe
// for use in tests.
func (nl *NodeLiveness) PauseAllHeartbeatsForTest() func() {
enableLoop := nl.PauseHeartbeatLoopForTest()
enableSync := nl.PauseSynchronousHeartbeatsForTest()
return func() {
enableLoop()
enableSync()
}
}

var errNodeAlreadyLive = errors.New("node already live")

// Heartbeat is called to update a node's expiration timestamp. This
Expand Down Expand Up @@ -678,6 +699,24 @@ func (nl *NodeLiveness) heartbeatInternal(
}
}(timeutil.Now())

// Collect a clock reading from before we begin queuing on the heartbeat
// semaphore. This method (attempts to, see [*]) guarantees that, if
// successful, the liveness record's expiration will be at least the
// liveness threshold above the time that the method was called.
// Collecting this clock reading before queuing allows us to enforce
// this while avoiding redundant liveness heartbeats during thundering
// herds without needing to explicitly coalesce heartbeats.
//
// [*]: see TODO below about how errNodeAlreadyLive handling does not
// enforce this guarantee.
beforeQueueTS := nl.clock.Now()
minExpiration := hlc.LegacyTimestamp(
beforeQueueTS.Add(nl.livenessThreshold.Nanoseconds(), 0))

// Before queueing, record the heartbeat as in-flight.
nl.metrics.HeartbeatsInFlight.Inc(1)
defer nl.metrics.HeartbeatsInFlight.Dec(1)

// Allow only one heartbeat at a time.
nodeID := nl.gossip.NodeID.Get()
sem := nl.sem(nodeID)
Expand All @@ -690,6 +729,20 @@ func (nl *NodeLiveness) heartbeatInternal(
<-sem
}()

// If we are not intending to increment the node's liveness epoch, detect
// whether this heartbeat is needed anymore. It is possible that we queued
// for long enough on the sempahore such that other heartbeat attempts ahead
// of us already incremented the expiration past what we wanted. Note that
// if we allowed the heartbeat to proceed in this case, we know that it
// would hit a ConditionFailedError and return a errNodeAlreadyLive down
// below.
if !incrementEpoch {
curLiveness, err := nl.Self()
if err == nil && minExpiration.Less(curLiveness.Expiration) {
return nil
}
}

// Let's compute what our new liveness record should be.
var newLiveness kvserverpb.Liveness
if oldLiveness == (kvserverpb.Liveness{}) {
Expand All @@ -706,18 +759,17 @@ func (nl *NodeLiveness) heartbeatInternal(
}
}

// We need to add the maximum clock offset to the expiration because it's
// used when determining liveness for a node.
{
newLiveness.Expiration = hlc.LegacyTimestamp(
nl.clock.Now().Add((nl.livenessThreshold).Nanoseconds(), 0))
// This guards against the system clock moving backwards. As long
// as the cockroach process is running, checks inside hlc.Clock
// will ensure that the clock never moves backwards, but these
// checks don't work across process restarts.
if newLiveness.Expiration.Less(oldLiveness.Expiration) {
return errors.Errorf("proposed liveness update expires earlier than previous record")
}
// Grab a new clock reading to compute the new expiration time,
// since we may have queued on the semaphore for a while.
afterQueueTS := nl.clock.Now()
newLiveness.Expiration = hlc.LegacyTimestamp(
afterQueueTS.Add(nl.livenessThreshold.Nanoseconds(), 0))
// This guards against the system clock moving backwards. As long
// as the cockroach process is running, checks inside hlc.Clock
// will ensure that the clock never moves backwards, but these
// checks don't work across process restarts.
if newLiveness.Expiration.Less(oldLiveness.Expiration) {
return errors.Errorf("proposed liveness update expires earlier than previous record")
}

update := livenessUpdate{
Expand All @@ -740,6 +792,17 @@ func (nl *NodeLiveness) heartbeatInternal(
// expired while in flight, so maybe we don't have to care about
// that and only need to distinguish between same and different
// epochs in our return value.
//
// TODO(nvanbenschoten): Unlike the early return above, this doesn't
// guarantee that the resulting expiration is past minExpiration,
// only that it's different than our oldLiveness. Is that ok? It
// hasn't caused issues so far, but we might want to detect this
// case and retry, at least in the case of the liveness heartbeat
// loop. The downside of this is that a heartbeat that's intending
// to bump the expiration of a record out 9s into the future may
// return a success even if the expiration is only 5 seconds in the
// future. The next heartbeat will then start with only 0.5 seconds
// before expiration.
if actual.IsLive(nl.clock.Now().GoTime()) && !incrementEpoch {
return errNodeAlreadyLive
}
Expand Down Expand Up @@ -770,7 +833,7 @@ func (nl *NodeLiveness) Self() (kvserverpb.Liveness, error) {
if err != nil {
return kvserverpb.Liveness{}, err
}
return rec.Liveness, err
return rec.Liveness, nil
}

// SelfEx is like Self, but returns the raw, encoded value that the database has
Expand Down
Loading