From 43a37d54a4fd6630176fd67c50a391975c2abb09 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 5 Jul 2022 15:44:51 -0400 Subject: [PATCH] kvserver: avoid replicating to followers that are I/O overloaded This commit implements the stop-gap solution to raft traffic contributing to I/O overload that was discussed[^1] in #79215. The newly introduced `admission.kv.pause_replication_io_threshold` cluster setting can be used to define an I/O overload score above which raft leaders will attempt to pause replication to nonessential followers, to given them a chance at tidying up their LSM. A follower is "nonessential" if it is either a non-voter, or if we think (and are fairly certain that) quorum can be reached without it. If there are multiple overloaded followers, we pick as many as we can without losing quorum, and we try to do so in a way that's a) random, i.e. different raft leaders will pick a different set, to give each store a share of the relief, and b) stable, i.e. each raft leader will pick the same set at least for a little while, to avoid interrupting a quorum by rapidly switching the set of active followers (but see here[^2]). The implementation of this is as follows: - on Store, materialize (on each tick) a `storeOverloadMap` from gossip - each Replica, on tick, from this map compute the set of followers to pause, and store it in a per-Replica map. - consult this latter map - when sending `MsgApp` from `handleRaftReady` - when releasing proposal quota. This commit by default disables this new functionality by setting the cluster setting to zero. This has the effect of an empty `storeOverloadMap` and thus, after one round of gossip and subsequent tick (i.e. within seconds), an empty per-Replica paused followers map. Additionally, it's worth pointing out the mixed-version behavior: the old nodes' stores will be observed as gossiping a zero IOThreshold, which is considered not overloaded, i.e. replication streams to old nodes will never be paused. Fixes #79215. [^1]: https://github.com/cockroachdb/cockroach/issues/79215#issuecomment-1167857356 [^2]: https://github.com/cockroachdb/cockroach/issues/83920#issuecomment-1176536456 Release note (ops change): the `admission.kv.pause_replication_io_threshold` cluster setting can be set to a nonzero value to reduce I/O throughput on followers that are driven towards an inverted LSM by replication traffic. The functionality is disabled by default. A suggested value is 0.8, meaning that replication traffic to nonessential followers is paused before these followers will begin throttling their foreground traffic. --- pkg/kv/kvserver/BUILD.bazel | 4 + .../client_replica_raft_overload_test.go | 101 +++++++++ pkg/kv/kvserver/kvserverpb/state.proto | 1 + pkg/kv/kvserver/metrics.go | 18 ++ pkg/kv/kvserver/raft_transport.go | 6 + pkg/kv/kvserver/replica.go | 31 ++- pkg/kv/kvserver/replica_metrics.go | 17 +- pkg/kv/kvserver/replica_proposal_quota.go | 9 + pkg/kv/kvserver/replica_raft.go | 81 ++++++- pkg/kv/kvserver/replica_raft_overload.go | 208 ++++++++++++++++++ pkg/kv/kvserver/replica_raft_overload_test.go | 154 +++++++++++++ pkg/kv/kvserver/replica_raft_quiesce.go | 21 +- pkg/kv/kvserver/replica_test.go | 17 +- pkg/kv/kvserver/store.go | 12 + pkg/kv/kvserver/store_raft.go | 28 ++- .../testdata/replica_raft_overload/basic.txt | 180 +++++++++++++++ .../min_live_match_index | 24 ++ pkg/kv/kvserver/testing_knobs.go | 4 + pkg/server/status/health_check.go | 13 +- pkg/ts/catalog/chart_catalog.go | 6 + .../admission/admissionpb/io_threshold.go | 1 + 21 files changed, 901 insertions(+), 35 deletions(-) create mode 100644 pkg/kv/kvserver/client_replica_raft_overload_test.go create mode 100644 pkg/kv/kvserver/replica_raft_overload.go create mode 100644 pkg/kv/kvserver/replica_raft_overload_test.go create mode 100644 pkg/kv/kvserver/testdata/replica_raft_overload/basic.txt create mode 100644 pkg/kv/kvserver/testdata/replica_raft_overload/min_live_match_index diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index e083bd0728c7..3fc0c4e559f4 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -58,6 +58,7 @@ go_library( "replica_proposal_quota.go", "replica_protected_timestamp.go", "replica_raft.go", + "replica_raft_overload.go", "replica_raft_quiesce.go", "replica_raftstorage.go", "replica_range_lease.go", @@ -235,6 +236,7 @@ go_test( "client_replica_backpressure_test.go", "client_replica_circuit_breaker_test.go", "client_replica_gc_test.go", + "client_replica_raft_overload_test.go", "client_replica_test.go", "client_spanconfigs_test.go", "client_split_burst_test.go", @@ -283,6 +285,7 @@ go_test( "replica_probe_test.go", "replica_proposal_buf_test.go", "replica_protected_timestamp_test.go", + "replica_raft_overload_test.go", "replica_raft_test.go", "replica_raft_truncation_test.go", "replica_rangefeed_test.go", @@ -401,6 +404,7 @@ go_test( "//pkg/ts", "//pkg/ts/tspb", "//pkg/util", + "//pkg/util/admission/admissionpb", "//pkg/util/caller", "//pkg/util/circuit", "//pkg/util/contextutil", diff --git a/pkg/kv/kvserver/client_replica_raft_overload_test.go b/pkg/kv/kvserver/client_replica_raft_overload_test.go new file mode 100644 index 000000000000..e7bcab234852 --- /dev/null +++ b/pkg/kv/kvserver/client_replica_raft_overload_test.go @@ -0,0 +1,101 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +// + +package kvserver_test + +import ( + "context" + "sync/atomic" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestReplicaRaftOverload is an end-to-end test verifying that leaseholders +// will "pause" followers that are on overloaded stores, and will unpause when +// the overload ends. +// +// This primarily tests the gossip signal as well as the mechanics of pausing, +// and does not check that "pausing" really results in no traffic being sent +// to the followers. +func TestReplicaRaftOverload(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + var on atomic.Value // bool + on.Store(false) + var args base.TestClusterArgs + args.ReplicationMode = base.ReplicationManual + args.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{StoreGossipIntercept: func(descriptor *roachpb.StoreDescriptor) { + if !on.Load().(bool) || descriptor.StoreID != 3 { + return + } + descriptor.Capacity.IOThreshold = admissionpb.IOThreshold{ + L0NumSubLevels: 1000000, + L0NumSubLevelsThreshold: 1, + L0NumFiles: 1000000, + L0NumFilesThreshold: 1, + } + }} + tc := testcluster.StartTestCluster(t, 3, args) + defer tc.Stopper().Stop(ctx) + + { + _, err := tc.ServerConn(0).Exec(`SET CLUSTER SETTING admission.kv.pause_replication_io_threshold = 1.0`) + require.NoError(t, err) + } + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + // Also split off another range that is on s3 so that we can verify that a + // quiesced range doesn't do anything unexpected, like not release the paused + // follower from its map (and thus messing with metrics when overload recovers). + tc.SplitRangeOrFatal(t, k.Next()) + + // Gossip faux IO overload from s3. s1 should pick up on that and pause followers. + on.Store(true) + require.NoError(t, tc.GetFirstStoreFromServer(t, 2 /* n3 */).GossipStore(ctx, false /* useCached */)) + testutils.SucceedsSoon(t, func() error { + // Touch the one range that is on s3 since it's likely quiesced, and wouldn't unquiesce + // if s3 becomes overloaded. Note that we do no such thing for the right sibling range, + // so it may or may not contribute here (depending on when it quiesces). + // + // See: https://github.com/cockroachdb/cockroach/issues/84252 + require.NoError(t, tc.Servers[0].DB().Put(ctx, tc.ScratchRange(t), "foo")) + s1 := tc.GetFirstStoreFromServer(t, 0) + require.NoError(t, s1.ComputeMetrics(ctx, 0 /* tick */)) + if n := s1.Metrics().RaftPausedFollowerCount.Value(); n == 0 { + return errors.New("no paused followers") + } + return nil + }) + + // Now remove the gossip intercept and check again. The follower should un-pause immediately. + on.Store(false) + require.NoError(t, tc.GetFirstStoreFromServer(t, 2 /* n3 */).GossipStore(ctx, false /* useCached */)) + testutils.SucceedsSoon(t, func() error { + s1 := tc.GetFirstStoreFromServer(t, 0) + require.NoError(t, s1.ComputeMetrics(ctx, 0 /* tick */)) + if n := s1.Metrics().RaftPausedFollowerCount.Value(); n > 0 { + return errors.Errorf("%d paused followers", n) + } + return nil + }) +} diff --git a/pkg/kv/kvserver/kvserverpb/state.proto b/pkg/kv/kvserver/kvserverpb/state.proto index ac6cdb577f70..d14566cbbe3d 100644 --- a/pkg/kv/kvserver/kvserverpb/state.proto +++ b/pkg/kv/kvserver/kvserverpb/state.proto @@ -184,6 +184,7 @@ message RangeInfo { // The circuit breaker error, if any. This is nonzero if and only if the // circuit breaker on the source Replica is tripped. string circuit_breaker_error = 20; + repeated int32 paused_replicas = 21 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.ReplicaID"]; } // RangeSideTransportInfo describes a range's closed timestamp info communicated diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 01eb6ebc9f5d..76d1bd2960f3 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -907,6 +907,20 @@ difficult to meaningfully interpret this metric.`, Unit: metric.Unit_COUNT, } + metaRaftFollowerPaused = metric.Metadata{ + Name: "admission.raft.paused_replicas", + Help: `Number of followers (i.e. Replicas) to which replication is currently paused to help them recover from I/O overload. + +Such Replicas will be ignored for the purposes of proposal quota, and will not +receive replication traffic. They are essentially treated as offline for the +purpose of replication. This serves as a crude form of admission control. + +The count is emitted by the leaseholder of each range. +.`, + Measurement: "Followers", + Unit: metric.Unit_COUNT, + } + // Replica queue metrics. metaMVCCGCQueueSuccesses = metric.Metadata{ Name: "queue.gc.process.success", @@ -1619,6 +1633,8 @@ type StoreMetrics struct { RaftLogFollowerBehindCount *metric.Gauge RaftLogTruncated *metric.Counter + RaftPausedFollowerCount *metric.Gauge + RaftEnqueuedPending *metric.Gauge RaftCoalescedHeartbeatsPending *metric.Gauge @@ -2095,6 +2111,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { RaftLogFollowerBehindCount: metric.NewGauge(metaRaftLogFollowerBehindCount), RaftLogTruncated: metric.NewCounter(metaRaftLogTruncated), + RaftPausedFollowerCount: metric.NewGauge(metaRaftFollowerPaused), + RaftEnqueuedPending: metric.NewGauge(metaRaftEnqueuedPending), // This Gauge measures the number of heartbeats queued up just before diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index ed8564a03cf1..4e935a98b0f3 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -144,6 +144,12 @@ type RaftMessageHandler interface { ) error } +// TODO(tbg): remove all of these metrics. The "NodeID" in this struct refers to the remote NodeID, i.e. when we send +// a message it refers to the recipient and when we receive a message it refers to the sender. This doesn't map to +// metrics well, where everyone should report on their local decisions. Instead have a *RaftTransportMetrics struct +// that is per-Store and tracks metrics on behalf of that Store. +// +// See: https://github.com/cockroachdb/cockroach/issues/83917 type raftTransportStats struct { nodeID roachpb.NodeID queue int diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 57fddaec43e9..b6c520336a5d 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -13,6 +13,7 @@ package kvserver import ( "context" "fmt" + "sort" "sync/atomic" "time" "unsafe" @@ -566,14 +567,6 @@ type Replica struct { // (making the assumption that all followers are live at that point), // and when the range unquiesces (marking all replicating followers as // live). - // - // TODO(tschottdorf): keeping a map on each replica seems to be - // overdoing it. We should map the replicaID to a NodeID and then use - // node liveness (or any sensible measure of the peer being around). - // The danger in doing so is that a single stuck replica on an otherwise - // functioning node could fill up the quota pool. We are already taking - // this kind of risk though: a replica that gets stuck on an otherwise - // live node will not lose leaseholdership. lastUpdateTimes lastUpdateTimesMap // Computed checksum at a snapshot UUID. @@ -653,6 +646,12 @@ type Replica struct { // Historical information about the command that set the closed timestamp. closedTimestampSetter closedTimestampSetterInfo + + // Followers to which replication traffic is currently dropped. + // + // Never mutated in place (always replaced wholesale), so can be leaked + // outside of the surrounding mutex. + pausedFollowers map[roachpb.ReplicaID]struct{} } // The raft log truncations that are pending. Access is protected by its own @@ -698,6 +697,11 @@ type Replica struct { // loadBasedSplitter keeps information about load-based splitting. loadBasedSplitter split.Decider + // TODO(tbg): this is effectively unused, we only use it to call ReportUnreachable + // when a heartbeat gets dropped but it's unclear whether a) that ever fires in + // practice b) if it provides any benefit. + // + // See: https://github.com/cockroachdb/cockroach/issues/84246 unreachablesMu struct { syncutil.Mutex remotes map[roachpb.ReplicaID]struct{} @@ -1288,7 +1292,16 @@ func (r *Replica) State(ctx context.Context) kvserverpb.RangeInfo { if err := r.breaker.Signal().Err(); err != nil { ri.CircuitBreakerError = err.Error() } - + if m := r.mu.pausedFollowers; len(m) > 0 { + var sl []roachpb.ReplicaID + for id := range m { + sl = append(sl, id) + } + sort.Slice(sl, func(i, j int) bool { + return sl[i] < sl[j] + }) + ri.PausedReplicas = sl + } return ri } diff --git a/pkg/kv/kvserver/replica_metrics.go b/pkg/kv/kvserver/replica_metrics.go index 847b9f4926d1..64d1e724ebb5 100644 --- a/pkg/kv/kvserver/replica_metrics.go +++ b/pkg/kv/kvserver/replica_metrics.go @@ -42,12 +42,13 @@ type ReplicaMetrics struct { // RangeCounter is true if the current replica is responsible for range-level // metrics (generally the leaseholder, if live, otherwise the first replica in the // range descriptor). - RangeCounter bool - Unavailable bool - Underreplicated bool - Overreplicated bool - RaftLogTooLarge bool - BehindCount int64 + RangeCounter bool + Unavailable bool + Underreplicated bool + Overreplicated bool + RaftLogTooLarge bool + BehindCount int64 + PausedFollowerCount int64 QuotaPoolPercentUsed int64 // [0,100] @@ -74,6 +75,7 @@ func (r *Replica) Metrics( qpCap = int64(q.Capacity()) // NB: max capacity is MaxInt64, see NewIntPool qpUsed = qpCap - qpAvail } + paused := r.mu.pausedFollowers r.mu.RUnlock() r.store.unquiescedReplicas.Lock() @@ -101,6 +103,7 @@ func (r *Replica) Metrics( raftLogSize, raftLogSizeTrusted, qpUsed, qpCap, + paused, ) } @@ -122,6 +125,7 @@ func calcReplicaMetrics( raftLogSize int64, raftLogSizeTrusted bool, qpUsed, qpCapacity int64, // quota pool used and capacity bytes + paused map[roachpb.ReplicaID]struct{}, ) ReplicaMetrics { var m ReplicaMetrics @@ -150,6 +154,7 @@ func calcReplicaMetrics( // behind. if m.Leader { m.BehindCount = calcBehindCount(raftStatus, desc, livenessMap) + m.PausedFollowerCount = int64(len(paused)) } m.LatchMetrics = latchMetrics diff --git a/pkg/kv/kvserver/replica_proposal_quota.go b/pkg/kv/kvserver/replica_proposal_quota.go index a89ee1b805db..3c0103fcf82d 100644 --- a/pkg/kv/kvserver/replica_proposal_quota.go +++ b/pkg/kv/kvserver/replica_proposal_quota.go @@ -144,6 +144,7 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( // cannot correspond to values beyond the applied index there's no reason // to consider progress beyond it as meaningful. minIndex := status.Applied + r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, progress tracker.Progress) { rep, ok := r.mu.state.Desc.GetReplicaDescriptorByID(roachpb.ReplicaID(id)) if !ok { @@ -207,6 +208,14 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( if progress.Match < r.mu.proposalQuotaBaseIndex { return } + if _, paused := r.mu.pausedFollowers[roachpb.ReplicaID(id)]; paused { + // We are dropping MsgApp to this store, so we are effectively treating + // it as non-live for the purpose of replication and are letting it fall + // behind intentionally. + // + // See #79215. + return + } if progress.Match > 0 && progress.Match < minIndex { minIndex = progress.Match } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index f95fc2d9cfd2..9563c9668359 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" @@ -720,6 +721,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( return unquiesceAndWakeLeader, nil }) r.mu.applyingEntries = len(rd.CommittedEntries) > 0 + pausedFollowers := r.mu.pausedFollowers r.mu.Unlock() if errors.Is(err, errRemoved) { // If we've been removed then just return. @@ -911,7 +913,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( msgApps, otherMsgs := splitMsgApps(rd.Messages) r.traceMessageSends(msgApps, "sending msgApp") - r.sendRaftMessagesRaftMuLocked(ctx, msgApps) + r.sendRaftMessagesRaftMuLocked(ctx, msgApps, pausedFollowers) // Use a more efficient write-only batch because we don't need to do any // reads from the batch. Any reads are performed on the underlying DB. @@ -1032,7 +1034,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // Update raft log entry cache. We clear any older, uncommitted log entries // and cache the latest ones. r.store.raftEntryCache.Add(r.RangeID, rd.Entries, true /* truncate */) - r.sendRaftMessagesRaftMuLocked(ctx, otherMsgs) + r.sendRaftMessagesRaftMuLocked(ctx, otherMsgs, nil /* blocked */) r.traceEntries(rd.CommittedEntries, "committed, before applying any entries") stats.tApplicationBegin = timeutil.Now() @@ -1154,9 +1156,13 @@ func maybeFatalOnRaftReadyErr(ctx context.Context, expl string, err error) (remo } } -// tick the Raft group, returning true if the raft group exists and is -// unquiesced; false otherwise. -func (r *Replica) tick(ctx context.Context, livenessMap liveness.IsLiveMap) (bool, error) { +// tick the Raft group, returning true if the raft group exists and should +// be queued for Ready processing; false otherwise. +func (r *Replica) tick( + ctx context.Context, + livenessMap liveness.IsLiveMap, + ioOverloadMap map[roachpb.StoreID]*admissionpb.IOThreshold, +) (bool, error) { r.unreachablesMu.Lock() remotes := r.unreachablesMu.remotes r.unreachablesMu.remotes = nil @@ -1180,6 +1186,54 @@ func (r *Replica) tick(ctx context.Context, livenessMap liveness.IsLiveMap) (boo return false, nil } + if r.replicaID == r.mu.leaderID && len(ioOverloadMap) > 0 && quotaPoolEnabledForRange(*r.descRLocked()) { + // When multiple followers are overloaded, we may not be able to exclude all + // of them from replication traffic due to quorum constraints. We would like + // a given Range to deterministically exclude the same store (chosen + // randomly), so that across multiple Ranges we have a chance of removing + // load from all overloaded Stores in the cluster. (It would be a bad idea + // to roll a per-Range dice here on every tick, since that would rapidly + // include and exclude individual followers from replication traffic, which + // would be akin to a high rate of packet loss. Once we've decided to ignore + // a follower, this decision should be somewhat stable for at least a few + // seconds). + // + // Note that we don't enable this mechanism for the liveness range (see + // quotaPoolEnabledForRange), simply to play it safe, as we know that the + // liveness range is unlikely to be a major contributor to any follower's + // I/O and wish to reduce the likelihood of a problem in replication pausing + // contributing to an outage of that critical range. + seed := int64(r.RangeID) + now := r.store.Clock().Now().GoTime() + d := computeExpendableOverloadedFollowersInput{ + replDescs: r.descRLocked().Replicas(), + ioOverloadMap: ioOverloadMap, + getProgressMap: func(_ context.Context) map[uint64]tracker.Progress { + prs := r.mu.internalRaftGroup.Status().Progress + updateRaftProgressFromActivity(ctx, prs, r.descRLocked().Replicas().AsProto(), func(id roachpb.ReplicaID) bool { + return r.mu.lastUpdateTimes.isFollowerActiveSince(ctx, id, now, r.store.cfg.RangeLeaseActiveDuration()) + }) + return prs + }, + minLiveMatchIndex: r.mu.proposalQuotaBaseIndex, + seed: seed, + } + r.mu.pausedFollowers, _ = computeExpendableOverloadedFollowers(ctx, d) + for replicaID := range r.mu.pausedFollowers { + // We're dropping messages to those followers (see handleRaftReady) but + // it's a good idea to tell raft not to even bother sending in the first + // place. Raft will react to this by moving the follower to probing state + // where it will be contacted only sporadically until it responds to an + // MsgApp (which it can only do once we stop dropping messages). Something + // similar would result naturally if we didn't report as unreachable, but + // with more wasted work. + r.mu.internalRaftGroup.ReportUnreachable(uint64(replicaID)) + } + } else if len(r.mu.pausedFollowers) > 0 { + // No store in the cluster is overloaded, or this replica is not raft leader. + r.mu.pausedFollowers = nil + } + now := r.store.Clock().NowAsClockTimestamp() if r.maybeQuiesceRaftMuLockedReplicaMuLocked(ctx, now, livenessMap) { return false, nil @@ -1191,6 +1245,14 @@ func (r *Replica) tick(ctx context.Context, livenessMap liveness.IsLiveMap) (boo // into the local Raft group. The leader won't hit that path, so we update // it whenever it ticks. In effect, this makes sure it always sees itself as // alive. + // + // Note that in a workload where the leader doesn't have inflight requests + // "most of the time" (i.e. occasional writes only on this range), it's quite + // likely that we'll never reach this line, since we'll return in the + // maybeQuiesceRaftMuLockedReplicaMuLocked branch above. + // + // This is likely unintentional, and the leader should likely consider itself + // live even when quiesced. if r.replicaID == r.mu.leaderID { r.mu.lastUpdateTimes.update(r.replicaID, timeutil.Now()) } @@ -1444,10 +1506,12 @@ func (r *Replica) maybeCoalesceHeartbeat( return true } -func (r *Replica) sendRaftMessagesRaftMuLocked(ctx context.Context, messages []raftpb.Message) { +func (r *Replica) sendRaftMessagesRaftMuLocked( + ctx context.Context, messages []raftpb.Message, blocked map[roachpb.ReplicaID]struct{}, +) { var lastAppResp raftpb.Message for _, message := range messages { - drop := false + _, drop := blocked[roachpb.ReplicaID(message.To)] switch message.Type { case raftpb.MsgApp: if util.RaceEnabled { @@ -1511,6 +1575,9 @@ func (r *Replica) sendRaftMessagesRaftMuLocked(ctx context.Context, messages []r } } + // TODO(tbg): record this to metrics. + // + // See: https://github.com/cockroachdb/cockroach/issues/83917 if !drop { r.sendRaftMessageRaftMuLocked(ctx, message) } diff --git a/pkg/kv/kvserver/replica_raft_overload.go b/pkg/kv/kvserver/replica_raft_overload.go new file mode 100644 index 000000000000..0495417a2613 --- /dev/null +++ b/pkg/kv/kvserver/replica_raft_overload.go @@ -0,0 +1,208 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + "math/rand" + "sort" + "sync/atomic" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/errors" + "go.etcd.io/etcd/raft/v3/tracker" +) + +var pauseReplicationIOThreshold = settings.RegisterFloatSetting( + settings.SystemOnly, + "admission.kv.pause_replication_io_threshold", + "pause replication to non-essential followers when their I/O admission control score exceeds the given threshold (zero to disable)", + // TODO(tbg): set a nonzero default. + // See: https://github.com/cockroachdb/cockroach/issues/83920 + 0.0, + func(v float64) error { + if v == 0 { + return nil + } + const min = 0.3 + if v < min { + return errors.Errorf("minimum admissible nonzero value is %f", min) + } + return nil + }, +) + +type computeExpendableOverloadedFollowersInput struct { + replDescs roachpb.ReplicaSet + // TODO(tbg): all entries are overloaded, so consdier removing the IOThreshold here + // because it's confusing. + ioOverloadMap map[roachpb.StoreID]*admissionpb.IOThreshold + // getProgressMap returns Raft's view of the progress map. This is only called + // when needed, and at most once. + getProgressMap func(context.Context) map[uint64]tracker.Progress + // seed is used to randomize selection of which followers to pause in case + // there are multiple followers that qualify, but quorum constraints require + // picking a subset. In practice, we set this to the RangeID to ensure maximum + // stability of the selection on a per-Range basis while encouraging randomness + // across ranges (which in turn should reduce load on all overloaded followers). + seed int64 + // In addition to being in StateReplicate in the progress map, we also only + // consider a follower live it its Match index matches or exceeds + // minLiveMatchIndex. This makes sure that a follower that is behind is not + // mistaken for one that can meaningfully contribute to quorum in the short + // term. Without this, it is - at least in theory - possible that as an + // overloaded formerly expendable store becomes non-overloaded, we will + // quickly mark another overloaded store as expendable under the assumption + // that the original store can now contribute to quorum. However, that store + // is likely behind on the log, and we should consider it as non-live until + // it has caught up. + minLiveMatchIndex uint64 +} + +type nonLiveReason byte + +const ( + nonLiveReasonInactive nonLiveReason = iota + nonLiveReasonPaused + nonLiveReasonBehind +) + +// computeExpendableOverloadedFollowers computes a set of followers that we can +// intentionally exempt from replication traffic (MsgApp) to help them avoid I/O +// overload. +// +// In the common case (no store or at least no follower store close to I/O +// overload), this method does very little work. +// +// If at least one follower is (close to being) overloaded, we determine the +// maximum set of such followers that we can afford not replicating to without +// losing quorum by successively reducing the set of overloaded followers by one +// randomly selected overloaded voter. The randomness makes it more likely that +// when there are multiple overloaded stores in the system that cannot be +// jointly excluded, both stores will in aggregate be relieved from +// approximately 50% of follower raft traffic. +// +// This method uses Raft's view of liveness and in particular will consider +// followers that haven't responded recently (including heartbeats) or are +// waiting for a snapshot as not live. In particular, a follower that is +// initially in the map may transfer out of the map by virtue of being cut off +// from the raft log via a truncation. This is acceptable, since the snapshot +// prevents the replica from receiving log traffic. +func computeExpendableOverloadedFollowers( + ctx context.Context, d computeExpendableOverloadedFollowersInput, +) (map[roachpb.ReplicaID]struct{}, map[roachpb.ReplicaID]nonLiveReason) { + var nonLive map[roachpb.ReplicaID]nonLiveReason + var liveOverloadedVoterCandidates map[roachpb.ReplicaID]struct{} + var liveOverloadedNonVoterCandidates map[roachpb.ReplicaID]struct{} + + var prs map[uint64]tracker.Progress + + for _, replDesc := range d.replDescs.AsProto() { + if _, overloaded := d.ioOverloadMap[replDesc.StoreID]; !overloaded { + continue + } + // There's at least one overloaded follower, so initialize + // extra state to determine which traffic we can drop without + // losing quorum. + if prs == nil { + prs = d.getProgressMap(ctx) + nonLive = map[roachpb.ReplicaID]nonLiveReason{} + for id, pr := range prs { + if !pr.RecentActive { + nonLive[roachpb.ReplicaID(id)] = nonLiveReasonInactive + } + if pr.IsPaused() { + nonLive[roachpb.ReplicaID(id)] = nonLiveReasonPaused + } + if pr.Match < d.minLiveMatchIndex { + nonLive[roachpb.ReplicaID(id)] = nonLiveReasonBehind + } + } + liveOverloadedVoterCandidates = map[roachpb.ReplicaID]struct{}{} + liveOverloadedNonVoterCandidates = map[roachpb.ReplicaID]struct{}{} + } + + // Mark replica on overloaded store as possibly pausable. + // + // NB: we make no distinction between non-live and live replicas at this + // point. That is, even if a replica is considered "non-live", we will still + // consider "additionally" pausing it. The first instinct was to avoid + // layering anything on top of a non-live follower, however a paused + // follower immediately becomes non-live, so if we want stable metrics on + // which followers are "paused", then we need the "pausing" state to + // overrule the "non-live" state. + if prs[uint64(replDesc.ReplicaID)].IsLearner { + liveOverloadedNonVoterCandidates[replDesc.ReplicaID] = struct{}{} + } else { + liveOverloadedVoterCandidates[replDesc.ReplicaID] = struct{}{} + } + } + + // Start out greedily with all overloaded candidates paused, and remove + // randomly chosen candidates until we think the raft group can obtain quorum. + var rnd *rand.Rand + for len(liveOverloadedVoterCandidates) > 0 { + up := d.replDescs.CanMakeProgress(func(replDesc roachpb.ReplicaDescriptor) bool { + rid := replDesc.ReplicaID + if _, ok := nonLive[rid]; ok { + return false // not live + } + if _, ok := liveOverloadedVoterCandidates[rid]; ok { + return false // want to drop traffic + } + if _, ok := liveOverloadedNonVoterCandidates[rid]; ok { + return false // want to drop traffic + } + return true // live for all we know + }) + if up { + // We've found the largest set of voters to drop traffic to + // without losing quorum. + break + } + var sl []roachpb.ReplicaID + for sid := range liveOverloadedVoterCandidates { + sl = append(sl, sid) + } + // Sort for determinism during tests. + sort.Slice(sl, func(i, j int) bool { + return sl[i] < sl[j] + }) + // Remove a random voter candidate, and loop around to see if we now have + // quorum. + if rnd == nil { + rnd = rand.New(rand.NewSource(d.seed)) + } + delete(liveOverloadedVoterCandidates, sl[rnd.Intn(len(sl))]) + } + + // Return union of non-voter and voter candidates. + for nonVoter := range liveOverloadedNonVoterCandidates { + liveOverloadedVoterCandidates[nonVoter] = struct{}{} + } + return liveOverloadedVoterCandidates, nonLive +} + +type overloadedStoresMap atomic.Value // map[roachpb.StoreID]*admissionpb.IOThreshold + +func (osm *overloadedStoresMap) Load() map[roachpb.StoreID]*admissionpb.IOThreshold { + v, _ := (*atomic.Value)(osm).Load().(map[roachpb.StoreID]*admissionpb.IOThreshold) + return v +} + +func (osm *overloadedStoresMap) Swap( + m map[roachpb.StoreID]*admissionpb.IOThreshold, +) map[roachpb.StoreID]*admissionpb.IOThreshold { + v, _ := (*atomic.Value)(osm).Swap(m).(map[roachpb.StoreID]*admissionpb.IOThreshold) + return v +} diff --git a/pkg/kv/kvserver/replica_raft_overload_test.go b/pkg/kv/kvserver/replica_raft_overload_test.go new file mode 100644 index 000000000000..25ad84ea7e5e --- /dev/null +++ b/pkg/kv/kvserver/replica_raft_overload_test.go @@ -0,0 +1,154 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +// + +package kvserver + +import ( + "context" + "fmt" + "sort" + "strconv" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/raft/v3/tracker" +) + +func TestReplicaRaftOverload_computeExpendableOverloadedFollowers(t *testing.T) { + defer leaktest.AfterTest(t)() + + datadriven.Walk(t, testutils.TestDataPath(t, "replica_raft_overload"), func(t *testing.T, path string) { + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + var buf strings.Builder + tr := tracing.NewTracer() + ctx, finishAndGet := tracing.ContextWithRecordingSpan(context.Background(), tr, path) + defer func() { + if t.Failed() { + t.Logf("%s", finishAndGet()) + } + }() + require.Equal(t, "run", d.Cmd) + var seed uint64 + var replDescs roachpb.ReplicaSet + ioOverloadMap := map[roachpb.StoreID]*admissionpb.IOThreshold{} + snapshotMap := map[roachpb.ReplicaID]struct{}{} + downMap := map[roachpb.ReplicaID]struct{}{} + match := map[roachpb.ReplicaID]uint64{} + minLiveMatchIndex := uint64(0) // accept all live followers by default + for _, arg := range d.CmdArgs { + for i := range arg.Vals { + sl := strings.SplitN(arg.Vals[i], "@", 2) + if len(sl) == 1 { + sl = append(sl, "") + } + idS, matchS := sl[0], sl[1] + arg.Vals[i] = idS + + var id uint64 + arg.Scan(t, i, &id) + switch arg.Key { + case "min-live-match-index": + minLiveMatchIndex = id + case "voters", "learners": + replicaID := roachpb.ReplicaID(id) + if matchS != "" { + var err error + match[replicaID], err = strconv.ParseUint(matchS, 10, 64) + require.NoError(t, err) + } + typ := roachpb.ReplicaTypeVoterFull() + if arg.Key == "learners" { + typ = roachpb.ReplicaTypeNonVoter() + } + replDesc := roachpb.ReplicaDescriptor{ + NodeID: roachpb.NodeID(id), + StoreID: roachpb.StoreID(id), + ReplicaID: replicaID, + Type: typ, + } + replDescs.AddReplica(replDesc) + case "overloaded": + ioOverloadMap[roachpb.StoreID(id)] = &admissionpb.IOThreshold{ + L0NumSubLevels: 1000, + L0NumSubLevelsThreshold: 20, + L0NumFiles: 1, + L0NumFilesThreshold: 1, + } + case "snapshot": + snapshotMap[roachpb.ReplicaID(id)] = struct{}{} + case "down": + downMap[roachpb.ReplicaID(id)] = struct{}{} + case "seed": + d.ScanArgs(t, "seed", &seed) + default: + t.Fatalf("unknown: %s", arg.Key) + } + } + } + + getProgressMap := func(ctx context.Context) map[uint64]tracker.Progress { + log.Eventf(ctx, "getProgressMap was called") + + // First, set up a progress map in which all replicas are tracked and are live. + m := map[uint64]tracker.Progress{} + for _, replDesc := range replDescs.AsProto() { + pr := tracker.Progress{ + State: tracker.StateReplicate, + Match: match[replDesc.ReplicaID], + RecentActive: true, + IsLearner: replDesc.GetType() == roachpb.LEARNER || replDesc.GetType() == roachpb.NON_VOTER, + Inflights: tracker.NewInflights(1), // avoid NPE + } + m[uint64(replDesc.ReplicaID)] = pr + } + // Mark replicas as down or needing snapshot as configured. + for replicaID := range downMap { + id := uint64(replicaID) + pr := m[id] + pr.RecentActive = false + m[id] = pr + } + for replicaID := range snapshotMap { + id := uint64(replicaID) + pr := m[id] + pr.State = tracker.StateSnapshot + m[id] = pr + } + return m + } + + m, _ := computeExpendableOverloadedFollowers(ctx, computeExpendableOverloadedFollowersInput{ + replDescs: replDescs, + ioOverloadMap: ioOverloadMap, + getProgressMap: getProgressMap, + seed: int64(seed), + minLiveMatchIndex: minLiveMatchIndex, + }) + { + var sl []roachpb.ReplicaID + for replID := range m { + sl = append(sl, replID) + } + sort.Slice(sl, func(i, j int) bool { return sl[i] < sl[j] }) + fmt.Fprintln(&buf, sl) + } + return buf.String() + }) + }) +} diff --git a/pkg/kv/kvserver/replica_raft_quiesce.go b/pkg/kv/kvserver/replica_raft_quiesce.go index 3b4e3786c4d7..40157633ed45 100644 --- a/pkg/kv/kvserver/replica_raft_quiesce.go +++ b/pkg/kv/kvserver/replica_raft_quiesce.go @@ -182,7 +182,7 @@ func (r *Replica) canUnquiesceRLocked() bool { func (r *Replica) maybeQuiesceRaftMuLockedReplicaMuLocked( ctx context.Context, now hlc.ClockTimestamp, livenessMap liveness.IsLiveMap, ) bool { - status, lagging, ok := shouldReplicaQuiesce(ctx, r, now, livenessMap) + status, lagging, ok := shouldReplicaQuiesce(ctx, r, now, livenessMap, r.mu.pausedFollowers) if !ok { return false } @@ -267,7 +267,11 @@ func (s laggingReplicaSet) Less(i, j int) bool { return s[i].NodeID < s[j].NodeI // // NOTE: The last 3 conditions are fairly, but not completely, overlapping. func shouldReplicaQuiesce( - ctx context.Context, q quiescer, now hlc.ClockTimestamp, livenessMap liveness.IsLiveMap, + ctx context.Context, + q quiescer, + now hlc.ClockTimestamp, + livenessMap liveness.IsLiveMap, + pausedFollowers map[roachpb.ReplicaID]struct{}, ) (*raft.Status, laggingReplicaSet, bool) { if testingDisableQuiescence { return nil, nil, false @@ -354,6 +358,19 @@ func shouldReplicaQuiesce( return nil, nil, false } + if len(pausedFollowers) > 0 { + // TODO(tbg): we should use a mechanism similar to livenessMap below (including a + // callback that unquiesces when paused followers unpause, since they will by + // definition be lagging). This was a little too much code churn at the time + // at which this comment was written. + // + // See: https://github.com/cockroachdb/cockroach/issues/84252 + if log.V(4) { + log.Infof(ctx, "not quiescing: overloaded followers %v", pausedFollowers) + } + return nil, nil, false + } + var foundSelf bool var lagging laggingReplicaSet for _, rep := range q.descRLocked().Replicas().Descriptors() { diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index afc446a03214..9a0207b4d31c 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -8070,7 +8070,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { ticks := r.mu.ticks r.mu.Unlock() for ; (ticks % electionTicks) != 0; ticks++ { - if _, err := r.tick(ctx, nil); err != nil { + if _, err := r.tick(ctx, nil, nil); err != nil { t.Fatal(err) } } @@ -8121,7 +8121,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { r.mu.Unlock() // Tick raft. - if _, err := r.tick(ctx, nil); err != nil { + if _, err := r.tick(ctx, nil, nil); err != nil { t.Fatal(err) } @@ -9187,7 +9187,8 @@ func TestReplicaMetrics(t *testing.T) { ctx, hlc.Timestamp{}, &cfg.RaftConfig, spanConfig, c.liveness, 0, &c.desc, c.raftStatus, kvserverpb.LeaseStatus{}, c.storeID, c.expected.Quiescent, c.expected.Ticking, - concurrency.LatchMetrics{}, concurrency.LockTableMetrics{}, c.raftLogSize, true, 0, 0) + concurrency.LatchMetrics{}, concurrency.LockTableMetrics{}, c.raftLogSize, + true, 0, 0, nil /* overloadMap */) require.Equal(t, c.expected, metrics) }) } @@ -9910,6 +9911,7 @@ type testQuiescer struct { // Not used to implement quiescer, but used by tests. livenessMap liveness.IsLiveMap + paused map[roachpb.ReplicaID]struct{} } func (q *testQuiescer) descRLocked() *roachpb.RangeDescriptor { @@ -10002,7 +10004,7 @@ func TestShouldReplicaQuiesce(t *testing.T) { }, } q = transform(q) - _, lagging, ok := shouldReplicaQuiesce(context.Background(), q, hlc.ClockTimestamp{}, q.livenessMap) + _, lagging, ok := shouldReplicaQuiesce(context.Background(), q, hlc.ClockTimestamp{}, q.livenessMap, q.paused) require.Equal(t, expected, ok) if ok { // Any non-live replicas should be in the laggingReplicaSet. @@ -10127,6 +10129,13 @@ func TestShouldReplicaQuiesce(t *testing.T) { return q }) } + // Verify no quiescence when a follower is paused. + test(false, func(q *testQuiescer) *testQuiescer { + q.paused = map[roachpb.ReplicaID]struct{}{ + q.desc.Replicas().AsProto()[0].ReplicaID: {}, + } + return q + }) } func TestFollowerQuiesceOnNotify(t *testing.T) { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 9c04b36da314..b90f08658c6e 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -930,6 +930,10 @@ type Store struct { // liveness. It is updated periodically in raftTickLoop() // and reactively in nodeIsLiveCallback() on liveness updates. livenessMap atomic.Value + // ioOverloadedStores is analogous to livenessMap, but stores a + // map[StoreID]*IOThreshold. It is gossip-backed but is not updated + // reactively, i.e. will refresh on each tick loop iteration only. + ioOverloadedStores overloadedStoresMap // cachedCapacity caches information on store capacity to prevent // expensive recomputations in case leases or replicas are rapidly @@ -2548,6 +2552,11 @@ func (s *Store) GossipStore(ctx context.Context, useCached bool) error { // Unique gossip key per store. gossipStoreKey := gossip.MakeStoreDescKey(storeDesc.StoreID) // Gossip store descriptor. + if fn := s.cfg.TestingKnobs.StoreGossipIntercept; fn != nil { + // Give the interceptor a chance to see and/or mutate the descriptor we're about + // to gossip. + fn(storeDesc) + } return s.cfg.Gossip.AddInfoProto(gossipStoreKey, storeDesc, gossip.StoreTTL) } @@ -3126,6 +3135,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { underreplicatedRangeCount int64 overreplicatedRangeCount int64 behindCount int64 + pausedFollowerCount int64 locks int64 totalLockHoldDurationNanos int64 @@ -3184,6 +3194,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { overreplicatedRangeCount++ } } + pausedFollowerCount += metrics.PausedFollowerCount behindCount += metrics.BehindCount if qps, dur := rep.leaseholderStats.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { averageQueriesPerSecond += qps @@ -3244,6 +3255,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { s.metrics.UnderReplicatedRangeCount.Update(underreplicatedRangeCount) s.metrics.OverReplicatedRangeCount.Update(overreplicatedRangeCount) s.metrics.RaftLogFollowerBehindCount.Update(behindCount) + s.metrics.RaftPausedFollowerCount.Update(pausedFollowerCount) var averageLockHoldDurationNanos int64 var averageLockWaitDurationNanos int64 diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index f06897fadf21..6cc38f1bdc1f 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -645,10 +646,12 @@ func (s *Store) processTick(_ context.Context, rangeID roachpb.RangeID) bool { return false } livenessMap, _ := s.livenessMap.Load().(liveness.IsLiveMap) + storeOverloadMap := s.ioOverloadedStores.Load() start := timeutil.Now() ctx := r.raftCtx - exists, err := r.tick(ctx, livenessMap) + + exists, err := r.tick(ctx, livenessMap, storeOverloadMap) if err != nil { log.Errorf(ctx, "%v", err) } @@ -738,6 +741,7 @@ func (s *Store) raftTickLoop(ctx context.Context) { if s.cfg.NodeLiveness != nil { s.updateLivenessMap() } + s.updateStoreOverloadMap() s.unquiescedReplicas.Lock() // Why do we bother to ever queue a Replica on the Raft scheduler for @@ -759,6 +763,28 @@ func (s *Store) raftTickLoop(ctx context.Context) { } } +var shouldLogStoreOverloadMap = log.Every(10 * time.Second) + +func (s *Store) updateStoreOverloadMap() { + storeOverloadMap := map[roachpb.StoreID]*admissionpb.IOThreshold{} + overloadThresh := pauseReplicationIOThreshold.Get(&s.cfg.Settings.SV) + for _, sd := range s.allocator.StorePool.GetStores() { + if score, _ := sd.Capacity.IOThreshold.Score(); overloadThresh != 0 && score > overloadThresh { + ioThreshold := sd.Capacity.IOThreshold // need a copy + storeOverloadMap[sd.StoreID] = &ioThreshold + } + } + old := s.ioOverloadedStores.Swap(storeOverloadMap) + // Consider logging if we're going from seeing overloaded stores to seeing no overloaded stores, or when + // there are still overloaded stores and we haven't logged for a while. + shouldLog := log.V(1) || + (len(old) > 0 && len(storeOverloadMap) == 0) || + (len(storeOverloadMap) > 0 && shouldLogStoreOverloadMap.ShouldLog()) + if shouldLog { + log.Infof(s.AnnotateCtx(context.Background()), "IO overloaded stores [threshold %.2f]: %+v (before: %+v)", overloadThresh, storeOverloadMap, old) + } +} + func (s *Store) updateLivenessMap() { nextMap := s.cfg.NodeLiveness.GetIsLiveMap() for nodeID, entry := range nextMap { diff --git a/pkg/kv/kvserver/testdata/replica_raft_overload/basic.txt b/pkg/kv/kvserver/testdata/replica_raft_overload/basic.txt new file mode 100644 index 000000000000..8a793d9bb5ff --- /dev/null +++ b/pkg/kv/kvserver/testdata/replica_raft_overload/basic.txt @@ -0,0 +1,180 @@ +############################## +# Basic sanity checks +############################## + +run +---- +[] + +run voters=(1) overloaded=(1) +---- +[] + +# Will pause as many learners as we can... +run voters=(1) learners=(2,3,4,5,6,7) overloaded=(2,4,6,7) +---- +[2 4 6 7] + +# ... even when we can't do anything about overloaded voters. +run voters=(1) learners=(2,3,4,5,6,7) overloaded=(1,2,4,6,7) +---- +[2 4 6 7] + +############################## +# RF=3 +############################## + +run voters=(1,2,3) +---- +[] + +run voters=(1,2,3) overloaded=(3) +---- +[3] + +run voters=(1,2,3) overloaded=(2,3) +---- +[3] + +run voters=(1,2,3) overloaded=(1,2,3) +---- +[3] + +run voters=(1,2,3) overloaded=(1,2,3) seed=3 +---- +[1] + +run voters=(1,2,3) learners=(4,5) +---- +[] + +run voters=(1,2,3) learners=(4,5) overloaded=(3,4) +---- +[3 4] + +run voters=(1,2,3) learners=(4,5) overloaded=(3,4,5) +---- +[3 4 5] + +run voters=(1,2,3) learners=(4,5) overloaded=(2,3,4,5) +---- +[3 4 5] + +run voters=(1,2,3) learners=(4,5) overloaded=(1,2,3,4,5) +---- +[3 4 5] + +run voters=(1,2,3) down=(3) overloaded=(1) +---- +[] + +run voters=(1,2,3) snapshot=(3) overloaded=(1) +---- +[] + +run voters=(1,2,3) down=(3) overloaded=(1,2) +---- +[] + +run voters=(1,2,3) snapshot=(3) overloaded=(1,2) +---- +[] + +# When a "down" follower is also overloaded, we consider +# pausing it (even though it's already "down"). If we didn't +# do this we'd pause a healthy follower, it will become non-healthy +# due to the pausing, and then we won't consider it paused, it will +# be caught up, get paused again, etc. +run voters=(1,2,3) down=(3) overloaded=(1,2,3) +---- +[3] + +run voters=(1,2,3) down=(3) overloaded=(3) +---- +[3] + +run voters=(1,2,3) learners=(4) down=(4) overloaded=(1,2) +---- +[2] + +run voters=(1,2,3) learners=(4) snapshot=(4) overloaded=(1,2) +---- +[2] + +run voters=(1,2,3) learners=(4) snapshot=(4) overloaded=(1,2,4) +---- +[2 4] + +# Won't pause 2 since a quorum is down... +run voters=(1,2,3) down=(1,2) overloaded=(2,3) +---- +[] + +# ... but if there's an overloaded learner we'll still pause it. +run voters=(1,2,3) learners=(4) down=(1,2) overloaded=(2,3,4) +---- +[4] + +############################## +# RF=4 +############################## + +run voters=(1,2,3,4) overloaded=(1,2,3,4) +---- +[2] + +run voters=(1,2,3,4) learners=(5) overloaded=(1,2,3,4) +---- +[2] + +run voters=(1,2,3,4) learners=(5) overloaded=(1,2,3,4,5) +---- +[2 5] + +############################## +# RF=5 +############################## + +run voters=(1,2,3,4,5) +---- +[] + +run voters=(1,2,3,4,5) overloaded=(1) +---- +[1] + +run voters=(1,2,3,4,5) overloaded=(1,2) +---- +[1 2] + +run voters=(1,2,3,4,5) overloaded=(1,2,3) +---- +[2 3] + +run voters=(1,2,3,4,5) overloaded=(1,2,3,4) +---- +[2 4] + +run voters=(1,2,3,4,5) overloaded=(1,2,3,4,5) +---- +[1 4] + +run voters=(1,2,3,4,5) overloaded=(1,2,3,4,5) snapshot=(1) +---- +[1 4] + +run voters=(1,2,3,4,5) overloaded=(1,2,3,4,5) snapshot=(1,2) +---- +[] + +run voters=(1,2,3,4,5) overloaded=(1,2,3,4,5) snapshot=(1,2,3) +---- +[] + +run voters=(1,2,3,4,5) learners=(6) overloaded=(1,6) +---- +[1 6] + +run voters=(1,2,3,4,5) learners=(6,7) overloaded=(1,2,3,4,5,6,7) +---- +[1 4 6 7] diff --git a/pkg/kv/kvserver/testdata/replica_raft_overload/min_live_match_index b/pkg/kv/kvserver/testdata/replica_raft_overload/min_live_match_index new file mode 100644 index 000000000000..cc734b2bfd3a --- /dev/null +++ b/pkg/kv/kvserver/testdata/replica_raft_overload/min_live_match_index @@ -0,0 +1,24 @@ +# Start out in a situation in which s3 is overloaded. The quorum (1,2) is ahead +# of the min live index, so we can afford pausing replication to s3. +run voters=(1@100,2@80,3@60) overloaded=(3) min-live-match-index=60 +---- +[3] + +# We'll also pause s3 if it's out of the quota pool window. +run voters=(1@100,2@80,3@60) overloaded=(3) min-live-match-index=59 +---- +[3] + +# s3 quickly recovers while s2 becomes overloaded. However, s3 hasn't caught up +# yet, so we don't count it towards quorum yet, meaning that we will need to +# continue replicating to s2 despite it being overloaded, until s3 considered +# caught up. +run voters=(1@100,2@90,3@69) overloaded=(2) min-live-match-index=70 +---- +[] + +# s3 has caught up to the min-live-match-index of 70, so we can now pause +# replication to s2. +run voters=(1@100,2@90,3@70) overloaded=(2) min-live-match-index=70 +---- +[2] diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index ff36aff7d8c2..2ab292632729 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -416,6 +416,10 @@ type StoreTestingKnobs struct { // send snapshot semaphore. AfterSendSnapshotThrottle func() + // This method, if set, gets to see (and mutate, if desired) any local + // StoreDescriptor before it is being sent out on the Gossip network. + StoreGossipIntercept func(descriptor *roachpb.StoreDescriptor) + // EnqueueReplicaInterceptor intercepts calls to `store.Enqueue()`. EnqueueReplicaInterceptor func(queueName string, replica *Replica) } diff --git a/pkg/server/status/health_check.go b/pkg/server/status/health_check.go index 4153b78b94fd..5e5694244a15 100644 --- a/pkg/server/status/health_check.go +++ b/pkg/server/status/health_check.go @@ -43,12 +43,13 @@ var ( // large backlog but show no sign of processing times. var trackedMetrics = map[string]threshold{ // Gauges. - "ranges.unavailable": gaugeZero, - "ranges.underreplicated": gaugeZero, - "requests.backpressure.split": gaugeZero, - "requests.slow.latch": gaugeZero, - "requests.slow.lease": gaugeZero, - "requests.slow.raft": gaugeZero, + "ranges.unavailable": gaugeZero, + "ranges.underreplicated": gaugeZero, + "requests.backpressure.split": gaugeZero, + "requests.slow.latch": gaugeZero, + "requests.slow.lease": gaugeZero, + "requests.slow.raft": gaugeZero, + "admission.raft.paused_replicas": gaugeZero, // TODO(tbg): this fires too eagerly. On a large machine that can handle many // concurrent requests, we'll blow a limit that would be disastrous to a smaller // machine. This will be hard to fix. We could track the max goroutine count diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index de32c6423028..7ef7be978853 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -564,6 +564,12 @@ var charts = []sectionDescription{ "ranges.overreplicated", }, }, + { + Title: "Paused Followers", + Metrics: []string{ + "admission.raft.paused_replicas", + }, + }, { Title: "Operations", Metrics: []string{ diff --git a/pkg/util/admission/admissionpb/io_threshold.go b/pkg/util/admission/admissionpb/io_threshold.go index f4089fa47b8e..2ed07df22064 100644 --- a/pkg/util/admission/admissionpb/io_threshold.go +++ b/pkg/util/admission/admissionpb/io_threshold.go @@ -46,6 +46,7 @@ func (iot IOThreshold) Score() (float64, bool) { func (iot IOThreshold) SafeFormat(s interfaces.SafePrinter, _ rune) { if iot == (IOThreshold{}) { s.Printf("N/A") + return } sc, overload := iot.Score() s.Printf("%.3f", redact.SafeFloat(sc))