Skip to content

Commit

Permalink
kvserver: avoid replicating to followers that are I/O overloaded
Browse files Browse the repository at this point in the history
This commit implements the stop-gap solution to raft traffic
contributing to I/O overload that was discussed[^1] in #79215.

The newly introduced `admission.kv.pause_replication_io_threshold`
cluster setting can be used to define an I/O overload score above which
raft leaders will attempt to pause replication to nonessential
followers, to given them a chance at tidying up their LSM.

A follower is "nonessential" if it is either a non-voter, or if we think
(and are fairly certain that) quorum can be reached without it. If there
are multiple overloaded followers, we pick as many as we can without
losing quorum, and we try to do so in a way that's a) random, i.e.
different raft leaders will pick a different set, to give each store a
share of the relief, and b) stable, i.e. each raft leader will pick the
same set at least for a little while, to avoid interrupting a quorum by
rapidly switching the set of active followers (but see here[^2]).

The implementation of this is as follows:
- on Store, materialize (on each tick) a `storeOverloadMap` from gossip
- each Replica, on tick, from this map compute the set of followers
  to pause, and store it in a per-Replica map.
- consult this latter map
    - when sending `MsgApp` from `handleRaftReady`
    - when releasing proposal quota.

This commit by default disables this new functionality by setting the
cluster setting to zero. This has the effect of an empty
`storeOverloadMap` and thus, after one round of gossip and subsequent
tick (i.e. within seconds), an empty per-Replica paused followers map.

Additionally, it's worth pointing out the mixed-version behavior: the
old nodes' stores will be observed as gossiping a zero IOThreshold,
which is considered not overloaded, i.e. replication streams to old
nodes will never be paused.

Fixes #79215.

[^1]: #79215 (comment)
[^2]: #83920 (comment)

Release note (ops change): the
`admission.kv.pause_replication_io_threshold` cluster setting can be set
to a nonzero value to reduce I/O throughput on followers that are driven
towards an inverted LSM by replication traffic. The functionality is
disabled by default. A suggested value is 0.8, meaning that replication
traffic to nonessential followers is paused before these followers will
begin throttling their foreground traffic.
  • Loading branch information
tbg committed Jul 15, 2022
1 parent e8818d5 commit 43a37d5
Show file tree
Hide file tree
Showing 21 changed files with 901 additions and 35 deletions.
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ go_library(
"replica_proposal_quota.go",
"replica_protected_timestamp.go",
"replica_raft.go",
"replica_raft_overload.go",
"replica_raft_quiesce.go",
"replica_raftstorage.go",
"replica_range_lease.go",
Expand Down Expand Up @@ -235,6 +236,7 @@ go_test(
"client_replica_backpressure_test.go",
"client_replica_circuit_breaker_test.go",
"client_replica_gc_test.go",
"client_replica_raft_overload_test.go",
"client_replica_test.go",
"client_spanconfigs_test.go",
"client_split_burst_test.go",
Expand Down Expand Up @@ -283,6 +285,7 @@ go_test(
"replica_probe_test.go",
"replica_proposal_buf_test.go",
"replica_protected_timestamp_test.go",
"replica_raft_overload_test.go",
"replica_raft_test.go",
"replica_raft_truncation_test.go",
"replica_rangefeed_test.go",
Expand Down Expand Up @@ -401,6 +404,7 @@ go_test(
"//pkg/ts",
"//pkg/ts/tspb",
"//pkg/util",
"//pkg/util/admission/admissionpb",
"//pkg/util/caller",
"//pkg/util/circuit",
"//pkg/util/contextutil",
Expand Down
101 changes: 101 additions & 0 deletions pkg/kv/kvserver/client_replica_raft_overload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
//

package kvserver_test

import (
"context"
"sync/atomic"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// TestReplicaRaftOverload is an end-to-end test verifying that leaseholders
// will "pause" followers that are on overloaded stores, and will unpause when
// the overload ends.
//
// This primarily tests the gossip signal as well as the mechanics of pausing,
// and does not check that "pausing" really results in no traffic being sent
// to the followers.
func TestReplicaRaftOverload(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

var on atomic.Value // bool
on.Store(false)
var args base.TestClusterArgs
args.ReplicationMode = base.ReplicationManual
args.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{StoreGossipIntercept: func(descriptor *roachpb.StoreDescriptor) {
if !on.Load().(bool) || descriptor.StoreID != 3 {
return
}
descriptor.Capacity.IOThreshold = admissionpb.IOThreshold{
L0NumSubLevels: 1000000,
L0NumSubLevelsThreshold: 1,
L0NumFiles: 1000000,
L0NumFilesThreshold: 1,
}
}}
tc := testcluster.StartTestCluster(t, 3, args)
defer tc.Stopper().Stop(ctx)

{
_, err := tc.ServerConn(0).Exec(`SET CLUSTER SETTING admission.kv.pause_replication_io_threshold = 1.0`)
require.NoError(t, err)
}
k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...)
// Also split off another range that is on s3 so that we can verify that a
// quiesced range doesn't do anything unexpected, like not release the paused
// follower from its map (and thus messing with metrics when overload recovers).
tc.SplitRangeOrFatal(t, k.Next())

// Gossip faux IO overload from s3. s1 should pick up on that and pause followers.
on.Store(true)
require.NoError(t, tc.GetFirstStoreFromServer(t, 2 /* n3 */).GossipStore(ctx, false /* useCached */))
testutils.SucceedsSoon(t, func() error {
// Touch the one range that is on s3 since it's likely quiesced, and wouldn't unquiesce
// if s3 becomes overloaded. Note that we do no such thing for the right sibling range,
// so it may or may not contribute here (depending on when it quiesces).
//
// See: https://github.com/cockroachdb/cockroach/issues/84252
require.NoError(t, tc.Servers[0].DB().Put(ctx, tc.ScratchRange(t), "foo"))
s1 := tc.GetFirstStoreFromServer(t, 0)
require.NoError(t, s1.ComputeMetrics(ctx, 0 /* tick */))
if n := s1.Metrics().RaftPausedFollowerCount.Value(); n == 0 {
return errors.New("no paused followers")
}
return nil
})

// Now remove the gossip intercept and check again. The follower should un-pause immediately.
on.Store(false)
require.NoError(t, tc.GetFirstStoreFromServer(t, 2 /* n3 */).GossipStore(ctx, false /* useCached */))
testutils.SucceedsSoon(t, func() error {
s1 := tc.GetFirstStoreFromServer(t, 0)
require.NoError(t, s1.ComputeMetrics(ctx, 0 /* tick */))
if n := s1.Metrics().RaftPausedFollowerCount.Value(); n > 0 {
return errors.Errorf("%d paused followers", n)
}
return nil
})
}
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvserverpb/state.proto
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ message RangeInfo {
// The circuit breaker error, if any. This is nonzero if and only if the
// circuit breaker on the source Replica is tripped.
string circuit_breaker_error = 20;
repeated int32 paused_replicas = 21 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.ReplicaID"];
}

// RangeSideTransportInfo describes a range's closed timestamp info communicated
Expand Down
18 changes: 18 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,20 @@ difficult to meaningfully interpret this metric.`,
Unit: metric.Unit_COUNT,
}

metaRaftFollowerPaused = metric.Metadata{
Name: "admission.raft.paused_replicas",
Help: `Number of followers (i.e. Replicas) to which replication is currently paused to help them recover from I/O overload.
Such Replicas will be ignored for the purposes of proposal quota, and will not
receive replication traffic. They are essentially treated as offline for the
purpose of replication. This serves as a crude form of admission control.
The count is emitted by the leaseholder of each range.
.`,
Measurement: "Followers",
Unit: metric.Unit_COUNT,
}

// Replica queue metrics.
metaMVCCGCQueueSuccesses = metric.Metadata{
Name: "queue.gc.process.success",
Expand Down Expand Up @@ -1619,6 +1633,8 @@ type StoreMetrics struct {
RaftLogFollowerBehindCount *metric.Gauge
RaftLogTruncated *metric.Counter

RaftPausedFollowerCount *metric.Gauge

RaftEnqueuedPending *metric.Gauge
RaftCoalescedHeartbeatsPending *metric.Gauge

Expand Down Expand Up @@ -2095,6 +2111,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RaftLogFollowerBehindCount: metric.NewGauge(metaRaftLogFollowerBehindCount),
RaftLogTruncated: metric.NewCounter(metaRaftLogTruncated),

RaftPausedFollowerCount: metric.NewGauge(metaRaftFollowerPaused),

RaftEnqueuedPending: metric.NewGauge(metaRaftEnqueuedPending),

// This Gauge measures the number of heartbeats queued up just before
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ type RaftMessageHandler interface {
) error
}

// TODO(tbg): remove all of these metrics. The "NodeID" in this struct refers to the remote NodeID, i.e. when we send
// a message it refers to the recipient and when we receive a message it refers to the sender. This doesn't map to
// metrics well, where everyone should report on their local decisions. Instead have a *RaftTransportMetrics struct
// that is per-Store and tracks metrics on behalf of that Store.
//
// See: https://github.com/cockroachdb/cockroach/issues/83917
type raftTransportStats struct {
nodeID roachpb.NodeID
queue int
Expand Down
31 changes: 22 additions & 9 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package kvserver
import (
"context"
"fmt"
"sort"
"sync/atomic"
"time"
"unsafe"
Expand Down Expand Up @@ -566,14 +567,6 @@ type Replica struct {
// (making the assumption that all followers are live at that point),
// and when the range unquiesces (marking all replicating followers as
// live).
//
// TODO(tschottdorf): keeping a map on each replica seems to be
// overdoing it. We should map the replicaID to a NodeID and then use
// node liveness (or any sensible measure of the peer being around).
// The danger in doing so is that a single stuck replica on an otherwise
// functioning node could fill up the quota pool. We are already taking
// this kind of risk though: a replica that gets stuck on an otherwise
// live node will not lose leaseholdership.
lastUpdateTimes lastUpdateTimesMap

// Computed checksum at a snapshot UUID.
Expand Down Expand Up @@ -653,6 +646,12 @@ type Replica struct {

// Historical information about the command that set the closed timestamp.
closedTimestampSetter closedTimestampSetterInfo

// Followers to which replication traffic is currently dropped.
//
// Never mutated in place (always replaced wholesale), so can be leaked
// outside of the surrounding mutex.
pausedFollowers map[roachpb.ReplicaID]struct{}
}

// The raft log truncations that are pending. Access is protected by its own
Expand Down Expand Up @@ -698,6 +697,11 @@ type Replica struct {
// loadBasedSplitter keeps information about load-based splitting.
loadBasedSplitter split.Decider

// TODO(tbg): this is effectively unused, we only use it to call ReportUnreachable
// when a heartbeat gets dropped but it's unclear whether a) that ever fires in
// practice b) if it provides any benefit.
//
// See: https://github.com/cockroachdb/cockroach/issues/84246
unreachablesMu struct {
syncutil.Mutex
remotes map[roachpb.ReplicaID]struct{}
Expand Down Expand Up @@ -1288,7 +1292,16 @@ func (r *Replica) State(ctx context.Context) kvserverpb.RangeInfo {
if err := r.breaker.Signal().Err(); err != nil {
ri.CircuitBreakerError = err.Error()
}

if m := r.mu.pausedFollowers; len(m) > 0 {
var sl []roachpb.ReplicaID
for id := range m {
sl = append(sl, id)
}
sort.Slice(sl, func(i, j int) bool {
return sl[i] < sl[j]
})
ri.PausedReplicas = sl
}
return ri
}

Expand Down
17 changes: 11 additions & 6 deletions pkg/kv/kvserver/replica_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ type ReplicaMetrics struct {
// RangeCounter is true if the current replica is responsible for range-level
// metrics (generally the leaseholder, if live, otherwise the first replica in the
// range descriptor).
RangeCounter bool
Unavailable bool
Underreplicated bool
Overreplicated bool
RaftLogTooLarge bool
BehindCount int64
RangeCounter bool
Unavailable bool
Underreplicated bool
Overreplicated bool
RaftLogTooLarge bool
BehindCount int64
PausedFollowerCount int64

QuotaPoolPercentUsed int64 // [0,100]

Expand All @@ -74,6 +75,7 @@ func (r *Replica) Metrics(
qpCap = int64(q.Capacity()) // NB: max capacity is MaxInt64, see NewIntPool
qpUsed = qpCap - qpAvail
}
paused := r.mu.pausedFollowers
r.mu.RUnlock()

r.store.unquiescedReplicas.Lock()
Expand Down Expand Up @@ -101,6 +103,7 @@ func (r *Replica) Metrics(
raftLogSize,
raftLogSizeTrusted,
qpUsed, qpCap,
paused,
)
}

Expand All @@ -122,6 +125,7 @@ func calcReplicaMetrics(
raftLogSize int64,
raftLogSizeTrusted bool,
qpUsed, qpCapacity int64, // quota pool used and capacity bytes
paused map[roachpb.ReplicaID]struct{},
) ReplicaMetrics {
var m ReplicaMetrics

Expand Down Expand Up @@ -150,6 +154,7 @@ func calcReplicaMetrics(
// behind.
if m.Leader {
m.BehindCount = calcBehindCount(raftStatus, desc, livenessMap)
m.PausedFollowerCount = int64(len(paused))
}

m.LatchMetrics = latchMetrics
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/replica_proposal_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
// cannot correspond to values beyond the applied index there's no reason
// to consider progress beyond it as meaningful.
minIndex := status.Applied

r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, progress tracker.Progress) {
rep, ok := r.mu.state.Desc.GetReplicaDescriptorByID(roachpb.ReplicaID(id))
if !ok {
Expand Down Expand Up @@ -207,6 +208,14 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
if progress.Match < r.mu.proposalQuotaBaseIndex {
return
}
if _, paused := r.mu.pausedFollowers[roachpb.ReplicaID(id)]; paused {
// We are dropping MsgApp to this store, so we are effectively treating
// it as non-live for the purpose of replication and are letting it fall
// behind intentionally.
//
// See #79215.
return
}
if progress.Match > 0 && progress.Match < minIndex {
minIndex = progress.Match
}
Expand Down
Loading

0 comments on commit 43a37d5

Please sign in to comment.