From 1dc18dfd800ae15dab44bac0616434d9a6df4917 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 24 Jul 2020 15:47:59 -0400 Subject: [PATCH] kvserver: avoid redundant liveness heartbeats under a thundering herd When a node's liveness expires, either because its liveness record's epoch is incremented or it is just slow to heartbeat its record, all of its epoch-based leases immediately become invalid. As a result, we often see a thundering herd of requests attempt to synchronously heartbeat the node's liveness record, on the order of the number of ranges that lost their lease. We already limit the concurrency of these heartbeats to 1, so there is not a huge concern that this will lead to overwhelming the liveness range, but it does cause other issues. For one, it means that we end up heartbeating the liveness record very rapidly, which causes large growth in MVCC history. It also means that heartbeats at the end of the queue have to wait for all other heartbeats in front of it to complete. Even if these heartbeats only take 5ms each, if there are 100 of them waiting, then the last one in line will wait for 500ms and its range will be unavailable during this time. This also has the potential to starve the liveness heartbeat loop, which isn't a problem in and of itself as long as other synchronous heartbeats are succeeding, but leads to concerning log warnings. Finally, this was an instance where we were adding additional load to a cluster once it was close to being overloaded. That's generally a bad property for a system that wants to stay stable, and this change helps avoid it. The solution here is to detect redundant heartbeats and make them no-ops where possible. This has a similar effect to if we were to explicitly coalesce heartbeats, but it's easier to reason about and requires us to maintain less state. The commit is conservative about this, providing a fairly strong guarantee that a heartbeat attempt, if successful, will ensure that the liveness record's expiration will be at least the liveness threshold above the time that the method was called. We may be able to relax this and say that the heartbeat attempt will just ensure that the expiration is now above that of the oldLiveness provided, but this weakened guarantee seems harder to reason about as a consumer of this interface. Release note (performance improvement): ranges recover moderately faster when their leaseholder is briefly down before becoming live again. --- pkg/kv/kvserver/client_lease_test.go | 2 +- pkg/kv/kvserver/client_merge_test.go | 2 +- pkg/kv/kvserver/client_raft_test.go | 4 +- pkg/kv/kvserver/closed_timestamp_test.go | 2 +- pkg/kv/kvserver/node_liveness.go | 119 +++++++++++++++++------ pkg/kv/kvserver/node_liveness_test.go | 103 +++++++++++++++++--- pkg/server/admin_test.go | 2 +- pkg/ts/catalog/chart_catalog.go | 4 + 8 files changed, 193 insertions(+), 45 deletions(-) diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index e0294eb4a011..7c2b69ca6bf6 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -284,7 +284,7 @@ func TestGossipNodeLivenessOnLeaseChange(t *testing.T) { // Turn off liveness heartbeats on all nodes to ensure that updates to node // liveness are not triggering gossiping. for i := range mtc.nodeLivenesses { - mtc.nodeLivenesses[i].PauseHeartbeat(true) + mtc.nodeLivenesses[i].PauseHeartbeatLoopForTest() } nodeLivenessKey := gossip.MakeNodeLivenessKey(1) diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index b42db1a30605..7648aa065ecd 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -1450,7 +1450,7 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) { // Turn off liveness heartbeats on the second store, then advance the clock // past the liveness expiration time. This expires all leases on all stores. - mtc.nodeLivenesses[1].PauseHeartbeat(true) + mtc.nodeLivenesses[1].PauseHeartbeatLoopForTest() mtc.advanceClock(ctx) // Manually heartbeat the liveness on the first store to ensure it's diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 4d197dfbcc5b..8bee792b7a6d 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -1339,7 +1339,7 @@ func TestRefreshPendingCommands(t *testing.T) { // Disable node liveness heartbeats which can reacquire leases when we're // trying to expire them. We pause liveness heartbeats here after node 0 // was restarted (which creates a new NodeLiveness). - pauseNodeLivenessHeartbeats(mtc, true) + pauseNodeLivenessHeartbeatLoops(mtc) // Start draining stores 0 and 1 to prevent them from grabbing any new // leases. @@ -3900,7 +3900,7 @@ func TestRangeQuiescence(t *testing.T) { defer mtc.Stop() mtc.Start(t, 3) - pauseNodeLivenessHeartbeats(mtc, true) + pauseNodeLivenessHeartbeatLoops(mtc) // Replica range 1 to all 3 nodes. const rangeID = roachpb.RangeID(1) diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index 5eb45b062e19..55c873842122 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -676,7 +676,7 @@ func forceLeaseTransferOnSubsumedRange( } return nil }) - restartHeartbeats := oldLeaseholderStore.NodeLiveness().DisableAllHeartbeatsForTest() + restartHeartbeats := oldLeaseholderStore.NodeLiveness().PauseAllHeartbeatsForTest() defer restartHeartbeats() log.Infof(ctx, "paused RHS rightLeaseholder's liveness heartbeats") time.Sleep(oldLeaseholderStore.NodeLiveness().GetLivenessThreshold()) diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index 01194b954240..b4312a08a7ec 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -80,6 +80,12 @@ var ( Measurement: "Nodes", Unit: metric.Unit_COUNT, } + metaHeartbeatsInFlight = metric.Metadata{ + Name: "liveness.heartbeatsinflight", + Help: "Number of in-flight liveness heartbeats from this node", + Measurement: "Requests", + Unit: metric.Unit_COUNT, + } metaHeartbeatSuccesses = metric.Metadata{ Name: "liveness.heartbeatsuccesses", Help: "Number of successful node liveness heartbeats from this node", @@ -109,6 +115,7 @@ var ( // LivenessMetrics holds metrics for use with node liveness activity. type LivenessMetrics struct { LiveNodes *metric.Gauge + HeartbeatsInFlight *metric.Gauge HeartbeatSuccesses *metric.Counter HeartbeatFailures *metric.Counter EpochIncrements *metric.Counter @@ -215,6 +222,7 @@ func NewNodeLiveness( } nl.metrics = LivenessMetrics{ LiveNodes: metric.NewFunctionalGauge(metaLiveNodes, nl.numLiveNodes), + HeartbeatsInFlight: metric.NewGauge(metaHeartbeatsInFlight), HeartbeatSuccesses: metric.NewCounter(metaHeartbeatSuccesses), HeartbeatFailures: metric.NewCounter(metaHeartbeatFailures), EpochIncrements: metric.NewCounter(metaEpochIncrements), @@ -612,27 +620,27 @@ of network connectivity problems. For help troubleshooting, visit: ` -// PauseHeartbeat stops or restarts the periodic heartbeat depending on the -// pause parameter. When pause is true, waits until it acquires the heartbeatToken -// (unless heartbeat was already paused); this ensures that no heartbeats happen -// after this is called. This function is only safe for use in tests. -func (nl *NodeLiveness) PauseHeartbeat(pause bool) { - if pause { - if swapped := atomic.CompareAndSwapUint32(&nl.heartbeatPaused, 0, 1); swapped { - <-nl.heartbeatToken - } - } else { +// PauseHeartbeatLoopForTest stops the periodic heartbeat. The function +// waits until it acquires the heartbeatToken (unless heartbeat was +// already paused); this ensures that no heartbeats happen after this is +// called. Returns a closure to call to re-enable the heartbeat loop. +// This function is only safe for use in tests. +func (nl *NodeLiveness) PauseHeartbeatLoopForTest() func() { + if swapped := atomic.CompareAndSwapUint32(&nl.heartbeatPaused, 0, 1); swapped { + <-nl.heartbeatToken + } + return func() { if swapped := atomic.CompareAndSwapUint32(&nl.heartbeatPaused, 1, 0); swapped { nl.heartbeatToken <- struct{}{} } } } -// DisableAllHeartbeatsForTest disables all node liveness heartbeats, including -// those triggered from outside the normal StartHeartbeat loop. Returns a -// closure to call to re-enable heartbeats. Only safe for use in tests. -func (nl *NodeLiveness) DisableAllHeartbeatsForTest() func() { - nl.PauseHeartbeat(true) +// PauseSynchronousHeartbeatsForTest disables all node liveness +// heartbeats triggered from outside the normal StartHeartbeat loop. +// Returns a closure to call to re-enable synchronous heartbeats. Only +// safe for use in tests. +func (nl *NodeLiveness) PauseSynchronousHeartbeatsForTest() func() { nl.selfSem <- struct{}{} nl.otherSem <- struct{}{} return func() { @@ -641,6 +649,19 @@ func (nl *NodeLiveness) DisableAllHeartbeatsForTest() func() { } } +// PauseAllHeartbeatsForTest disables all node liveness heartbeats, +// including those triggered from outside the normal StartHeartbeat +// loop. Returns a closure to call to re-enable heartbeats. Only safe +// for use in tests. +func (nl *NodeLiveness) PauseAllHeartbeatsForTest() func() { + enableLoop := nl.PauseHeartbeatLoopForTest() + enableSync := nl.PauseSynchronousHeartbeatsForTest() + return func() { + enableLoop() + enableSync() + } +} + var errNodeAlreadyLive = errors.New("node already live") // Heartbeat is called to update a node's expiration timestamp. This @@ -678,6 +699,24 @@ func (nl *NodeLiveness) heartbeatInternal( } }(timeutil.Now()) + // Collect a clock reading from before we begin queuing on the heartbeat + // semaphore. This method (attempts to, see [*]) guarantees that, if + // successful, the liveness record's expiration will be at least the + // liveness threshold above the time that the method was called. + // Collecting this clock reading before queuing allows us to enforce + // this while avoiding redundant liveness heartbeats during thundering + // herds without needing to explicitly coalesce heartbeats. + // + // [*]: see TODO below about how errNodeAlreadyLive handling does not + // enforce this guarantee. + beforeQueueTS := nl.clock.Now() + minExpiration := hlc.LegacyTimestamp( + beforeQueueTS.Add(nl.livenessThreshold.Nanoseconds(), 0)) + + // Before queueing, record the heartbeat as in-flight. + nl.metrics.HeartbeatsInFlight.Inc(1) + defer nl.metrics.HeartbeatsInFlight.Dec(1) + // Allow only one heartbeat at a time. nodeID := nl.gossip.NodeID.Get() sem := nl.sem(nodeID) @@ -690,6 +729,20 @@ func (nl *NodeLiveness) heartbeatInternal( <-sem }() + // If we are not intending to increment the node's liveness epoch, detect + // whether this heartbeat is needed anymore. It is possible that we queued + // for long enough on the sempahore such that other heartbeat attempts ahead + // of us already incremented the expiration past what we wanted. Note that + // if we allowed the heartbeat to proceed in this case, we know that it + // would hit a ConditionFailedError and return a errNodeAlreadyLive down + // below. + if !incrementEpoch { + curLiveness, err := nl.Self() + if err == nil && minExpiration.Less(curLiveness.Expiration) { + return nil + } + } + // Let's compute what our new liveness record should be. var newLiveness kvserverpb.Liveness if oldLiveness == (kvserverpb.Liveness{}) { @@ -706,18 +759,17 @@ func (nl *NodeLiveness) heartbeatInternal( } } - // We need to add the maximum clock offset to the expiration because it's - // used when determining liveness for a node. - { - newLiveness.Expiration = hlc.LegacyTimestamp( - nl.clock.Now().Add((nl.livenessThreshold).Nanoseconds(), 0)) - // This guards against the system clock moving backwards. As long - // as the cockroach process is running, checks inside hlc.Clock - // will ensure that the clock never moves backwards, but these - // checks don't work across process restarts. - if newLiveness.Expiration.Less(oldLiveness.Expiration) { - return errors.Errorf("proposed liveness update expires earlier than previous record") - } + // Grab a new clock reading to compute the new expiration time, + // since we may have queued on the semaphore for a while. + afterQueueTS := nl.clock.Now() + newLiveness.Expiration = hlc.LegacyTimestamp( + afterQueueTS.Add(nl.livenessThreshold.Nanoseconds(), 0)) + // This guards against the system clock moving backwards. As long + // as the cockroach process is running, checks inside hlc.Clock + // will ensure that the clock never moves backwards, but these + // checks don't work across process restarts. + if newLiveness.Expiration.Less(oldLiveness.Expiration) { + return errors.Errorf("proposed liveness update expires earlier than previous record") } update := livenessUpdate{ @@ -740,6 +792,17 @@ func (nl *NodeLiveness) heartbeatInternal( // expired while in flight, so maybe we don't have to care about // that and only need to distinguish between same and different // epochs in our return value. + // + // TODO(nvanbenschoten): Unlike the early return above, this doesn't + // guarantee that the resulting expiration is past minExpiration, + // only that it's different than our oldLiveness. Is that ok? It + // hasn't caused issues so far, but we might want to detect this + // case and retry, at least in the case of the liveness heartbeat + // loop. The downside of this is that a heartbeat that's intending + // to bump the expiration of a record out 9s into the future may + // return a success even if the expiration is only 5 seconds in the + // future. The next heartbeat will then start with only 0.5 seconds + // before expiration. if actual.IsLive(nl.clock.Now().GoTime()) && !incrementEpoch { return errNodeAlreadyLive } @@ -770,7 +833,7 @@ func (nl *NodeLiveness) Self() (kvserverpb.Liveness, error) { if err != nil { return kvserverpb.Liveness{}, err } - return rec.Liveness, err + return rec.Liveness, nil } // SelfEx is like Self, but returns the raw, encoded value that the database has diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index eb59cc6ac9cb..7484adf12cfa 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/logtags" "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) func verifyLiveness(t *testing.T, mtc *multiTestContext) { @@ -64,9 +65,15 @@ func verifyLiveness(t *testing.T, mtc *multiTestContext) { }) } -func pauseNodeLivenessHeartbeats(mtc *multiTestContext, pause bool) { +func pauseNodeLivenessHeartbeatLoops(mtc *multiTestContext) func() { + var enableFns []func() for _, nl := range mtc.nodeLivenesses { - nl.PauseHeartbeat(pause) + enableFns = append(enableFns, nl.PauseHeartbeatLoopForTest()) + } + return func() { + for _, fn := range enableFns { + fn() + } } } @@ -79,7 +86,7 @@ func TestNodeLiveness(t *testing.T) { // Verify liveness of all nodes for all nodes. verifyLiveness(t, mtc) - pauseNodeLivenessHeartbeats(mtc, true) + pauseNodeLivenessHeartbeatLoops(mtc) // Advance clock past the liveness threshold to verify IsLive becomes false. mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1) @@ -163,7 +170,81 @@ func verifyEpochIncremented(t *testing.T, mtc *multiTestContext, nodeIdx int) { } return nil }) +} +// TestRedundantNodeLivenessHeartbeatsAvoided tests that in a thundering herd +// scenario with many goroutines rush to synchronously heartbeat a node's +// liveness record, redundant heartbeats are detected and avoided. +func TestRedundantNodeLivenessHeartbeatsAvoided(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + mtc := &multiTestContext{} + defer mtc.Stop() + mtc.Start(t, 1) + nl := mtc.nodeLivenesses[0] + nlActive, _ := mtc.storeConfig.NodeLivenessDurations() + + // Verify liveness of all nodes for all nodes. + verifyLiveness(t, mtc) + nl.PauseHeartbeatLoopForTest() + enableSync := nl.PauseSynchronousHeartbeatsForTest() + + liveness, err := nl.Self() + require.NoError(t, err) + hbBefore := nl.Metrics().HeartbeatSuccesses.Count() + require.Equal(t, int64(0), nl.Metrics().HeartbeatsInFlight.Value()) + + // Issue a set of synchronous node liveness heartbeats. Mimic the kind of + // thundering herd we see due to lease acquisitions when a node's liveness + // epoch is incremented. + var g errgroup.Group + const herdSize = 30 + for i := 0; i < herdSize; i++ { + g.Go(func() error { + before := mtc.clock().Now() + if err := nl.Heartbeat(ctx, liveness); err != nil { + return err + } + livenessAfter, err := nl.Self() + if err != nil { + return err + } + exp := livenessAfter.Expiration + minExp := hlc.LegacyTimestamp(before.Add(nlActive.Nanoseconds(), 0)) + if exp.Less(minExp) { + return errors.Errorf("expected min expiration %v, found %v", minExp, exp) + } + return nil + }) + } + + // Wait for all heartbeats to be in-flight, at which point they will have + // already computed their minimum expiration time. + testutils.SucceedsSoon(t, func() error { + inFlight := nl.Metrics().HeartbeatsInFlight.Value() + if inFlight < herdSize { + return errors.Errorf("not all heartbeats in-flight, want %d, got %d", herdSize, inFlight) + } else if inFlight > herdSize { + t.Fatalf("unexpected in-flight heartbeat count: %d", inFlight) + } + return nil + }) + + // Allow the heartbeats to proceed. Only a single one should end up touching + // the liveness record. The rest should be considered redundant. + enableSync() + require.NoError(t, g.Wait()) + require.Equal(t, hbBefore+1, nl.Metrics().HeartbeatSuccesses.Count()) + require.Equal(t, int64(0), nl.Metrics().HeartbeatsInFlight.Value()) + + // Send one more heartbeat. Should update liveness record. + liveness, err = nl.Self() + require.NoError(t, err) + require.NoError(t, nl.Heartbeat(ctx, liveness)) + require.Equal(t, hbBefore+2, nl.Metrics().HeartbeatSuccesses.Count()) + require.Equal(t, int64(0), nl.Metrics().HeartbeatsInFlight.Value()) } // TestNodeIsLiveCallback verifies that the liveness callback for a @@ -177,7 +258,7 @@ func TestNodeIsLiveCallback(t *testing.T) { // Verify liveness of all nodes for all nodes. verifyLiveness(t, mtc) - pauseNodeLivenessHeartbeats(mtc, true) + pauseNodeLivenessHeartbeatLoops(mtc) var cbMu syncutil.Mutex cbs := map[roachpb.NodeID]struct{}{} @@ -225,7 +306,7 @@ func TestNodeHeartbeatCallback(t *testing.T) { // Verify liveness of all nodes for all nodes. verifyLiveness(t, mtc) - pauseNodeLivenessHeartbeats(mtc, true) + pauseNodeLivenessHeartbeatLoops(mtc) // Verify that last update time has been set for all nodes. verifyUptimes := func() error { @@ -281,7 +362,7 @@ func TestNodeLivenessEpochIncrement(t *testing.T) { mtc.Start(t, 2) verifyLiveness(t, mtc) - pauseNodeLivenessHeartbeats(mtc, true) + pauseNodeLivenessHeartbeatLoops(mtc) // First try to increment the epoch of a known-live node. deadNodeID := mtc.gossips[1].NodeID.Get() @@ -412,7 +493,7 @@ func TestNodeLivenessSelf(t *testing.T) { mtc.Start(t, 1) g := mtc.gossips[0] - pauseNodeLivenessHeartbeats(mtc, true) + pauseNodeLivenessHeartbeatLoops(mtc) // Verify liveness is properly initialized. This needs to be wrapped in a // SucceedsSoon because node liveness gets initialized via an async gossip @@ -472,7 +553,7 @@ func TestNodeLivenessGetIsLiveMap(t *testing.T) { mtc.Start(t, 3) verifyLiveness(t, mtc) - pauseNodeLivenessHeartbeats(mtc, true) + pauseNodeLivenessHeartbeatLoops(mtc) lMap := mtc.nodeLivenesses[0].GetIsLiveMap() expectedLMap := kvserver.IsLiveMap{ 1: {IsLive: true, Epoch: 1}, @@ -517,7 +598,7 @@ func TestNodeLivenessGetLivenesses(t *testing.T) { mtc.Start(t, 3) verifyLiveness(t, mtc) - pauseNodeLivenessHeartbeats(mtc, true) + pauseNodeLivenessHeartbeatLoops(mtc) livenesses := mtc.nodeLivenesses[0].GetLivenesses() actualLMapNodes := make(map[roachpb.NodeID]struct{}) @@ -574,7 +655,7 @@ func TestNodeLivenessConcurrentHeartbeats(t *testing.T) { mtc.Start(t, 1) verifyLiveness(t, mtc) - pauseNodeLivenessHeartbeats(mtc, true) + pauseNodeLivenessHeartbeatLoops(mtc) const concurrency = 10 @@ -608,7 +689,7 @@ func TestNodeLivenessConcurrentIncrementEpochs(t *testing.T) { mtc.Start(t, 2) verifyLiveness(t, mtc) - pauseNodeLivenessHeartbeats(mtc, true) + pauseNodeLivenessHeartbeatLoops(mtc) const concurrency = 10 diff --git a/pkg/server/admin_test.go b/pkg/server/admin_test.go index 58cbbc61a12a..1bff544a6079 100644 --- a/pkg/server/admin_test.go +++ b/pkg/server/admin_test.go @@ -1347,7 +1347,7 @@ func TestHealthAPI(t *testing.T) { // Expire this node's liveness record by pausing heartbeats and advancing the // server's clock. ts := s.(*TestServer) - defer ts.nodeLiveness.DisableAllHeartbeatsForTest()() + defer ts.nodeLiveness.PauseAllHeartbeatsForTest()() self, err := ts.nodeLiveness.Self() if err != nil { t.Fatal(err) diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 7b23f987ec0f..4104c95f2ddd 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -1093,6 +1093,10 @@ var charts = []sectionDescription{ Title: "Epoch Increment Count", Metrics: []string{"liveness.epochincrements"}, }, + { + Title: "Heartbeats In-Flight", + Metrics: []string{"liveness.heartbeatsinflight"}, + }, { Title: "Heartbeat Latency", Metrics: []string{"liveness.heartbeatlatency"},