From 2f7df2234925b8c9c6c1c89e3b57d60369670946 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Wed, 4 Dec 2024 20:37:34 -0500 Subject: [PATCH] kv: replan rangefeeds with chronic closed ts lag MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a rangefeed's closed timestamp lags behind the current time, any writes that have occurred in-between will not be emitted. This is problematic in cases where the lag is significant and chronic, as consumers (changefeeds, logical data replication, physical cluster replication) are likewise delayed in their processing. Observing a rangefeed with a chronic lagging closed timestamp will become relatively more likely with quorum replication flow control, as entries are deliberately queued, instead of being sent, to stores which do not have sufficient send tokens. This commit (re)introduces the concept of cancelling lagging rangefeeds, so that they may be replanned and retried on another replica. The other replica may also have this issue, however there should be at least a quorum of voting replicas with a similar closed timestamp that would be suitable. The replanning on a different replica is handled already by existing machinery. This commit introduces an observer which generates a signal indicating that the rangefeed should be cancelled. The signal also encapsulates the existing logic to nudge a rangefeed as well. The criteria for cancelling a rangefeed is influenced by two thresholds, defined as cluster settings: ``` kv.rangefeed.lagging_closed_timestamp_cancel_multiple (default = 20 x closed ts target duration = 60s) ``` ``` kv.rangefeed.lagging_closed_timestamp_cancel_min_lagging_duration (default = 60s) ``` When a replica's closed timestamp has sustained lag greater than: ``` kv.rangefeed.lagging_closed_timestamp_cancel_multiple * kv.closed_timestamp.target_duration ``` For at least: ``` `kv.rangefeed.lagging_closed_timestamp_cancel_min_lagging_duration` ``` duration, the rangefeed will be cancelled and then re-planned on the client. This can be visualized in the following diagram, where there is an initial spike over the lag threshold, which is recovered from so the rangefeed wouldn't be cancelled. The second drop below the lag threshold is sustained for greater than the duration threshold, so the rangefeed is then cancelled for replanning: ``` lag=0 ───────────────────────────────────────────────────── observed lag ─────────┐ │ │ │ ┌───────┐ lag threshold ─────────┼─────┼───────┼────────────────────────────── │ │ └───┐ │ │ └─────┐ └─────┘ └──────┐ └──────────── ◄────────────────────────────► exceeds duration threshold ``` Note we could also prevent accepting a rangefeed registration if the lag were sufficient, however the behavior change here applies only to lag which as been observed to be sustained over time, without historical data, we cannot apply identical decision logic on registration. Fixes: #136214 Release note: None --- .../kvclient/kvcoord/dist_sender_rangefeed.go | 5 +- pkg/kv/kvserver/BUILD.bazel | 2 + .../kvserver/flow_control_integration_test.go | 240 ++++++++++++++++++ pkg/kv/kvserver/replica.go | 5 + pkg/kv/kvserver/replica_init.go | 1 + pkg/kv/kvserver/replica_rangefeed.go | 31 ++- .../replica_rangefeed_lag_observer.go | 142 +++++++++++ .../replica_rangefeed_lag_observer_test.go | 186 ++++++++++++++ .../send_queue_range_feed | 102 ++++++++ 9 files changed, 705 insertions(+), 9 deletions(-) create mode 100644 pkg/kv/kvserver/replica_rangefeed_lag_observer.go create mode 100644 pkg/kv/kvserver/replica_rangefeed_lag_observer_test.go create mode 100644 pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_feed diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index ad964cad4847..0cc3731c728d 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -557,8 +557,9 @@ func handleRangefeedError( kvpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING, kvpb.RangeFeedRetryError_REASON_SLOW_CONSUMER, kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED: - // Try again with same descriptor. These are transient - // errors that should not show up again. + // Try again with same descriptor. These are transient errors that should + // not show up again, or should be retried on another replica which will + // likely not have the same issue. return rangefeedErrorInfo{}, nil case kvpb.RangeFeedRetryError_REASON_RANGE_SPLIT, kvpb.RangeFeedRetryError_REASON_RANGE_MERGED, diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 3ba5bd59e5e8..d2644bc465c2 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -70,6 +70,7 @@ go_library( "replica_raftstorage.go", "replica_range_lease.go", "replica_rangefeed.go", + "replica_rangefeed_lag_observer.go", "replica_rankings.go", "replica_rate_limit.go", "replica_read.go", @@ -345,6 +346,7 @@ go_test( "replica_raft_overload_test.go", "replica_raft_test.go", "replica_raft_truncation_test.go", + "replica_rangefeed_lag_observer_test.go", "replica_rangefeed_test.go", "replica_rankings_test.go", "replica_sideload_test.go", diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index 1cfd524f70ef..67e45a400840 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -19,9 +19,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2" @@ -38,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -6141,6 +6144,243 @@ ORDER BY streams DESC; h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) } +func TestFlowControlSendQueueRangeFeed(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // rangeFeed will create a rangefeed suitable for testing. It will start a + // rangefeed and return a function that can be used to stop it. + rangeFeed := func( + ctx context.Context, + dsI interface{}, + sp roachpb.Span, + startFrom hlc.Timestamp, + onValue func(event kvcoord.RangeFeedMessage), + opts ...kvcoord.RangeFeedOption, + ) func() { + ds := dsI.(*kvcoord.DistSender) + events := make(chan kvcoord.RangeFeedMessage) + cancelCtx, cancel := context.WithCancel(ctx) + + g := ctxgroup.WithContext(cancelCtx) + g.GoCtx(func(ctx context.Context) (err error) { + return ds.RangeFeed(ctx, []kvcoord.SpanTimePair{{Span: sp, StartAfter: startFrom}}, events, opts...) + }) + g.GoCtx(func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case ev := <-events: + onValue(ev) + } + } + }) + + return func() { + cancel() + _ = g.Wait() + } + } + // We will use the below logic later to assert on the node that the rangefeed + // is running on. + var curRangeFeedNodeID atomic.Value + curRangeFeedNodeID.Store(roachpb.NodeID(0)) + checkRangeFeedNodeID := func(nodeID roachpb.NodeID, include bool) error { + curNodeID := curRangeFeedNodeID.Load().(roachpb.NodeID) + if curNodeID == 0 { + return errors.New("no rangefeed node yet") + } + if curNodeID != nodeID && include { + return errors.Errorf("expected rangefeed on n%v, got n%v", nodeID, curNodeID) + } + if curNodeID == nodeID && !include { + return errors.Errorf("expected rangefeed not on n%v", nodeID) + } + return nil + } + + ctx := context.Background() + const numNodes = 3 + settings := cluster.MakeTestingClusterSettings() + kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll) + // We want to exhaust tokens but not overload the test, so we set the limits + // lower (8 and 16 MiB default). + kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 2<<20) + kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 4<<20) + kvserver.RangefeedEnabled.Override(ctx, &settings.SV, true) + // Speed up cancellation, default is 20x the target duration. + kvserver.RangeFeedLaggingCTCancelMultiple.Override(ctx, &settings.SV, 6) + // Also speed up cancellation by shortening the required lag duration, + // default is 60s. + kvserver.RangeFeedLaggingCTCancelDuration.Override(ctx, &settings.SV, 3*time.Second) + // Likewise, the default target duration is 3s, lower it by a factor of 20. + closedts.TargetDuration.Override(ctx, &settings.SV, 150*time.Millisecond) + + disableWorkQueueGrantingServers := make([]atomic.Bool, numNodes) + setTokenReturnEnabled := func(enabled bool, serverIdxs ...int) { + for _, serverIdx := range serverIdxs { + disableWorkQueueGrantingServers[serverIdx].Store(!enabled) + } + } + + argsPerServer := make(map[int]base.TestServerArgs) + for i := range disableWorkQueueGrantingServers { + disableWorkQueueGrantingServers[i].Store(true) + argsPerServer[i] = base.TestServerArgs{ + Settings: settings, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ + UseOnlyForScratchRanges: true, + OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens { + // Deduct every write as 1 MiB, regardless of how large it + // actually is. + return kvflowcontrol.Tokens(1 << 20) + }, + // We want to test the behavior of the send queue, so we want to + // always have up-to-date stats. This ensures that the send queue + // stats are always refreshed on each call to + // RangeController.HandleRaftEventRaftMuLocked. + OverrideAlwaysRefreshSendStreamStats: true, + }, + }, + AdmissionControl: &admission.TestingKnobs{ + DisableWorkQueueFastPath: true, + DisableWorkQueueGranting: func() bool { + idx := i + return disableWorkQueueGrantingServers[idx].Load() + }, + }, + }, + } + } + + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: argsPerServer, + }) + defer tc.Stopper().Stop(ctx) + setTokenReturnEnabled(true /* enabled */, 0, 1, 2) + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + + h := newFlowControlTestHelper( + t, tc, "flow_control_integration_v2", /* testdata */ + kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */ + ) + h.init(kvflowcontrol.ApplyToAll) + defer h.close("send_queue_range_feed") + + desc, err := tc.LookupRange(k) + require.NoError(t, err) + h.enableVerboseRaftMsgLoggingForRange(desc.RangeID) + n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) + h.resetV2TokenMetrics(ctx) + h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */) + + ts := tc.Server(2) + span := desc.KeySpan().AsRawSpanWithNoLocals() + ignoreValues := func(event kvcoord.RangeFeedMessage) {} + + ctx2, cancel := context.WithCancel(context.Background()) + g := ctxgroup.WithContext(ctx2) + defer func() { + cancel() + err := g.Wait() + require.True(t, testutils.IsError(err, "context canceled")) + }() + observer := func(fn kvcoord.ForEachRangeFn) { + g.GoCtx(func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(200 * time.Millisecond): + } + err := fn(func(rfCtx kvcoord.RangeFeedContext, feed kvcoord.PartialRangeFeed) error { + curRangeFeedNodeID.Store(feed.NodeID) + return nil + }) + if err != nil { + return err + } + } + }) + } + + closeFeed := rangeFeed( + ctx, + ts.DistSenderI(), + span, + tc.Server(0).Clock().Now(), + ignoreValues, + kvcoord.WithRangeObserver(observer), + ) + defer closeFeed() + testutils.SucceedsSoon(t, func() error { return checkRangeFeedNodeID(3, true /* include */) }) + h.comment(`(Rangefeed on n3)`) + + h.comment(` +-- We will exhaust the tokens across all streams while admission is blocked on +-- n3, using 4x1 MiB (deduction, the write itself is small) writes. Then, +-- we will write 1 MiB to the range and wait for the closedTS to fall +-- behind on n3. We expect that the closedTS falling behind will trigger +-- an error that is returned to the mux rangefeed client, which will in turn +-- allows the rangefeed request to be re-routed to another replica.`) + // Block admission on n3, while allowing every other node to admit. + setTokenReturnEnabled(true /* enabled */, 0, 1) + setTokenReturnEnabled(false /* enabled */, 2) + // Drain the tokens to n3 by blocking admission and issuing the buffer + // size of writes to the range. + h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri) + h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri) + h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri) + h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri) + h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */) + + h.comment(`(Sending 1 MiB put request to develop a send queue)`) + h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri) + h.comment(`(Sent 1 MiB put request)`) + h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */) + h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */ + testingMkFlowStream(0), testingMkFlowStream(1)) + h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */) + + h.comment(` +-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.`) + h.query(n1, flowSendQueueQueryStr) + h.comment(` +-- Observe the total tracked tokens per-stream on n1, s3's entries will still +-- be tracked here.`) + h.query(n1, ` + SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 +`, "range_id", "store_id", "total_tracked_tokens") + h.comment(` +-- Per-store tokens available from n1, these should reflect the lack of tokens +-- for s3.`) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) + + testutils.SucceedsSoon(t, func() error { + return checkRangeFeedNodeID(3, false /* include */) + }) + newNode := curRangeFeedNodeID.Load().(roachpb.NodeID) + h.comment(fmt.Sprintf(`(Rangefeed moved to n%v)`, newNode)) + + h.comment(`-- (Allowing below-raft admission to proceed on n3.)`) + setTokenReturnEnabled(true /* enabled */, 2) + h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */) + h.waitForSendQueueSize(ctx, desc.RangeID, 0 /* expSize 0 MiB */, 0 /* serverIdx */) + + h.comment(` +-- Send queue and flow token metrics from n1. All tokens should be returned.`) + h.query(n1, flowSendQueueQueryStr) + h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...) +} + type flowControlTestHelper struct { t testing.TB tc *testcluster.TestCluster diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 75fe749c7449..46a5bf6c9bb4 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -341,6 +341,11 @@ type Replica struct { msgAppScratchForFlowControl map[roachpb.ReplicaID][]raftpb.Message // Scratch for populating rac2.RaftEvent.ReplicaSateInfo for flowContrlV2. replicaStateScratchForFlowControl map[roachpb.ReplicaID]rac2.ReplicaStateInfo + + // rangeFeedCTLagObserver is used to observe the closed timestamp lag of + // the replica and generate a signal to potentially nudge or cancel the + // rangefeed based on observed lag. + rangefeedCTLagObserver *rangeFeedCTLagObserver } // localMsgs contains a collection of raftpb.Message that target the local diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 8eb735d44548..36b4b4dafbb5 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -198,6 +198,7 @@ func newUninitializedReplicaWithoutRaftGroup( // replica GC issues, but is a distraction at the moment. // r.AmbientContext.AddLogTag("@", fmt.Sprintf("%x", unsafe.Pointer(r))) + r.raftMu.rangefeedCTLagObserver = newRangeFeedCTLagObserver() r.raftMu.stateLoader = stateloader.Make(rangeID) r.raftMu.sideloaded = logstore.NewDiskSideloadStorage( store.cfg.Settings, diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 8e962207df5e..a54662c03f29 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -14,7 +14,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" @@ -868,17 +867,20 @@ func (r *Replica) handleClosedTimestampUpdateRaftMuLocked( // If the closed timestamp is sufficiently stale, signal that we want an // update to the leaseholder so that it will eventually begin to progress - // again. - behind := r.Clock().PhysicalTime().Sub(closedTS.GoTime()) - slowClosedTSThresh := 5 * closedts.TargetDuration.Get(&r.store.cfg.Settings.SV) - exceedsSlowLagThresh = behind > slowClosedTSThresh - if exceedsSlowLagThresh { + // again. Or, if the closed timestamp has been lagging by more than the + // cancel threshold for a while, cancel the rangefeed. + if signal := r.raftMu.rangefeedCTLagObserver.observeClosedTimestampUpdate(ctx, + closedTS.GoTime(), + r.Clock().PhysicalTime(), + &r.store.cfg.Settings.SV, + ); signal.exceedsNudgeLagThreshold { m := r.store.metrics.RangeFeedMetrics if m.RangeFeedSlowClosedTimestampLogN.ShouldLog() { if closedTS.IsEmpty() { log.Infof(ctx, "RangeFeed closed timestamp is empty") } else { - log.Infof(ctx, "RangeFeed closed timestamp %s is behind by %s", closedTS, behind) + log.Infof(ctx, "RangeFeed closed timestamp %s is behind by %s (%v)", + closedTS, signal.lag, signal) } } @@ -906,6 +908,21 @@ func (r *Replica) handleClosedTimestampUpdateRaftMuLocked( defer func() { <-m.RangeFeedSlowClosedTimestampNudgeSem }() if err := r.ensureClosedTimestampStarted(ctx); err != nil { log.Infof(ctx, `RangeFeed failed to nudge: %s`, err) + } else if signal.exceedsCancelLagThreshold { + // We have successfully nudged the leaseholder to make progress on + // the closed timestamp. If the lag was already persistently too + // high, we cancel the rangefeed, so that it can be replanned on + // another replica. The hazard this avoids with replanning earlier, + // is that we know the range at least has a leaseholder so we won't + // fail open, thrashing the rangefeed around the cluster. + // + // Note that we use the REASON_RANGEFEED_CLOSED, as adding a new + // reason would require a new version of the proto, which would + // prohibit us from cancelling the rangefeed in the current version, + // due to mixed version compatibility. + log.Warningf(ctx, + `RangeFeed is too far behind, cancelling for replanning [%v]`, signal) + r.disconnectRangefeedWithReason(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED) } return nil, nil }) diff --git a/pkg/kv/kvserver/replica_rangefeed_lag_observer.go b/pkg/kv/kvserver/replica_rangefeed_lag_observer.go new file mode 100644 index 000000000000..8d6d883ca276 --- /dev/null +++ b/pkg/kv/kvserver/replica_rangefeed_lag_observer.go @@ -0,0 +1,142 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package kvserver + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/redact" +) + +// laggingRangeFeedCTNudgeMultiple is the multiple of the closed timestamp target +// duration that a rangefeed's closed timestamp can lag behind the current time +// before the rangefeed is nudged to catch up. +const laggingRangeFeedCTNudgeMultiple = 5 + +// RangeFeedLaggingCTCancelMultiple is the multiple of the closed timestamp +// target duration that a rangefeed's closed timestamp can lag behind the +// current time before the rangefeed is cancelled, if the duration threshold is +// also met, see RangeFeedLaggingCTCancelDuration. When set to 0, cancelling is +// disabled. +var RangeFeedLaggingCTCancelMultiple = settings.RegisterIntSetting( + settings.SystemOnly, + "kv.rangefeed.lagging_closed_timestamp_cancel_multiple", + "if a range's closed timestamp is more than this multiple of the "+ + "`kv.closed_timestamp.target_duration` behind the current time,"+ + "for at least `kv.rangefeed.lagging_closed_timestamp_cancel_min_lagging_duration`"+ + ", cancel the rangefeed, when set to 0, canceling is disabled", + 20, /* 20x closed ts target, currently default 60s */ + // NB: We don't want users setting a value incongruent with the closed + // timestamp target duration, as that would lead to thrashing of rangefeeds. + // Also, the nudge multiple is a constant above, so we don't want users + // setting a lower value than that, as nudging is a prerequisite for + // cancelling. + settings.IntInRangeOrZeroDisable(laggingRangeFeedCTNudgeMultiple, 10_000), +) + +// RangeFeedLaggingCTCancelDuration is the duration threshold for lagging +// rangefeeds to be cancelled when the closed timestamp is lagging behind the +// current time by more than: +// +// `kv.rangefeed.lagging_closed_timestamp_cancel_multiple` * +// `kv.closed_timestamp.target_duration` +// +// e.g., if the closed timestamp target duration is 3s (current default) and +// the multiple is 2, then the lagging rangefeed will be canceled if the closed +// timestamp is more than 6s behind the current time, for at least +// laggingRangeFeedCTCancelDurationThreshold: +// +// closed_ts = -7s (relative to now) +// target_closed_ts = -3s +// multiple = 2.0 +// lagging_duration_threshold = 60s +// +// In the above example, the rangefeed will be canceled if this state is +// sustained for at least 60s. Visually (and abstractly) it looks like this: +// +// lag=0 ───────────────────────────────────────────────────── +// +// observed lag ─────────┐ +// │ +// │ +// │ ┌───────┐ +// lag threshold ─────────┼─────┼───────┼────────────────────────────── +// │ │ └───┐ +// │ │ └─────┐ +// └─────┘ └──────┐ +// └──────────── +// ◄────────────────────────────► +// exceeds duration threshold +// +// Where time is moving from left to right, and the y-axis represents the +// closed timestamp lag relative to the current time. +var RangeFeedLaggingCTCancelDuration = settings.RegisterDurationSetting( + settings.SystemOnly, + "kv.rangefeed.lagging_closed_timestamp_cancel_min_lagging_duration", + "if a range's closed timestamp is more than "+ + "`kv.rangefeed.lagging_closed_timestamp_cancel_multiple` of the "+ + "`kv.closed_timestamp.target_duration` behind the current time,"+ + "for at least this duration, cancel the rangefeed", + time.Minute, +) + +type rangeFeedCTLagObserver struct { + exceedsCancelLagStartTime time.Time +} + +func newRangeFeedCTLagObserver() *rangeFeedCTLagObserver { + return &rangeFeedCTLagObserver{} +} + +func (r *rangeFeedCTLagObserver) observeClosedTimestampUpdate( + ctx context.Context, closedTS, now time.Time, sv *settings.Values, +) rangeFeedCTLagSignal { + signal := rangeFeedCTLagSignal{targetLag: closedts.TargetDuration.Get(sv)} + nudgeLagThreshold := signal.targetLag * laggingRangeFeedCTNudgeMultiple + cancelLagThreshold := signal.targetLag * time.Duration(RangeFeedLaggingCTCancelMultiple.Get(sv)) + cancelLagMinDuration := RangeFeedLaggingCTCancelDuration.Get(sv) + + signal.lag = now.Sub(closedTS) + if signal.lag <= cancelLagThreshold { + // The closed timestamp is no longer lagging behind the current time by + // more than the cancel threshold, so reset the start time, as we only want + // to signal on sustained lag above the threshold. + r.exceedsCancelLagStartTime = time.Time{} + } else if r.exceedsCancelLagStartTime.IsZero() { + r.exceedsCancelLagStartTime = now + } + signal.exceedsNudgeLagThreshold = signal.lag > nudgeLagThreshold + signal.exceedsCancelLagThreshold = !r.exceedsCancelLagStartTime.IsZero() && + now.Sub(r.exceedsCancelLagStartTime) > cancelLagMinDuration + return signal +} + +type rangeFeedCTLagSignal struct { + lag time.Duration + targetLag time.Duration + exceedsNudgeLagThreshold bool + exceedsCancelLagThreshold bool +} + +func (rfls rangeFeedCTLagSignal) String() string { + return redact.StringWithoutMarkers(rfls) +} + +var _ redact.SafeFormatter = rangeFeedCTLagSignal{} + +// SafeFormat implements the redact.SafeFormatter interface. +func (rfls rangeFeedCTLagSignal) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf( + "behind=%v target=%v nudge=%t cancel=%t", + rfls.lag, + rfls.targetLag, + rfls.exceedsNudgeLagThreshold, + rfls.exceedsCancelLagThreshold, + ) +} diff --git a/pkg/kv/kvserver/replica_rangefeed_lag_observer_test.go b/pkg/kv/kvserver/replica_rangefeed_lag_observer_test.go new file mode 100644 index 000000000000..476703e9e866 --- /dev/null +++ b/pkg/kv/kvserver/replica_rangefeed_lag_observer_test.go @@ -0,0 +1,186 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package kvserver + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +// TestObserveClosedTimestampUpdate asserts that the expected signal is +// generated for each closed timestamp observation over time. +func TestObserveClosedTimestampUpdate(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + sv := &st.SV + + baseTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + targetDuration := 3 * time.Second + cancelMultiple := int64(2) + cancelMinDuration := 60 * time.Second + + RangeFeedLaggingCTCancelMultiple.Override(ctx, sv, cancelMultiple) + RangeFeedLaggingCTCancelDuration.Override(ctx, sv, cancelMinDuration) + + tests := []struct { + name string + updates []struct { + closedTS time.Time + now time.Time + } + expected []rangeFeedCTLagSignal + }{ + { + name: "no lag", + updates: []struct { + closedTS time.Time + now time.Time + }{ + { + closedTS: baseTime, + now: baseTime.Add(targetDuration), + }, + }, + expected: []rangeFeedCTLagSignal{ + { + lag: targetDuration, + targetLag: targetDuration, + exceedsNudgeLagThreshold: false, + exceedsCancelLagThreshold: false, + }, + }, + }, + { + name: "exceeds nudge threshold", + updates: []struct { + closedTS time.Time + now time.Time + }{ + { + closedTS: baseTime, + now: baseTime.Add(targetDuration * (laggingRangeFeedCTNudgeMultiple + 1)), + }, + }, + expected: []rangeFeedCTLagSignal{ + { + lag: targetDuration * (laggingRangeFeedCTNudgeMultiple + 1), + targetLag: targetDuration, + exceedsNudgeLagThreshold: true, + exceedsCancelLagThreshold: false, + }, + }, + }, + { + name: "exceeds cancel threshold but not duration", + updates: []struct { + closedTS time.Time + now time.Time + }{ + { + closedTS: baseTime, + now: baseTime.Add(targetDuration * (laggingRangeFeedCTNudgeMultiple + 1)), + }, + { + closedTS: baseTime, + now: baseTime.Add(targetDuration*(laggingRangeFeedCTNudgeMultiple+1) + cancelMinDuration/2), + }, + }, + expected: []rangeFeedCTLagSignal{ + { + lag: targetDuration * (laggingRangeFeedCTNudgeMultiple + 1), + targetLag: targetDuration, + exceedsNudgeLagThreshold: true, + exceedsCancelLagThreshold: false, + }, + { + lag: targetDuration*(laggingRangeFeedCTNudgeMultiple+1) + cancelMinDuration/2, + targetLag: targetDuration, + exceedsNudgeLagThreshold: true, + exceedsCancelLagThreshold: false, + }, + }, + }, + { + name: "exceeds cancel threshold and duration", + updates: []struct { + closedTS time.Time + now time.Time + }{ + { + closedTS: baseTime, + now: baseTime.Add(targetDuration * (laggingRangeFeedCTNudgeMultiple + 1)), + }, + { + closedTS: baseTime, + now: baseTime.Add(targetDuration*(laggingRangeFeedCTNudgeMultiple+1) + cancelMinDuration + time.Second), + }, + }, + expected: []rangeFeedCTLagSignal{ + { + lag: targetDuration * (laggingRangeFeedCTNudgeMultiple + 1), + targetLag: targetDuration, + exceedsNudgeLagThreshold: true, + exceedsCancelLagThreshold: false, + }, + { + lag: targetDuration*(laggingRangeFeedCTNudgeMultiple+1) + cancelMinDuration + time.Second, + targetLag: targetDuration, + exceedsNudgeLagThreshold: true, + exceedsCancelLagThreshold: true, + }, + }, + }, + { + name: "recovers from lag", + updates: []struct { + closedTS time.Time + now time.Time + }{ + { + closedTS: baseTime, + now: baseTime.Add(targetDuration * (laggingRangeFeedCTNudgeMultiple + 1)), + }, + { + closedTS: baseTime.Add(targetDuration * (laggingRangeFeedCTNudgeMultiple + 1)), + now: baseTime.Add(targetDuration*(laggingRangeFeedCTNudgeMultiple+1) + targetDuration), + }, + }, + expected: []rangeFeedCTLagSignal{ + { + lag: targetDuration * (laggingRangeFeedCTNudgeMultiple + 1), + targetLag: targetDuration, + exceedsNudgeLagThreshold: true, + exceedsCancelLagThreshold: false, + }, + { + lag: targetDuration, + targetLag: targetDuration, + exceedsNudgeLagThreshold: false, + exceedsCancelLagThreshold: false, + }, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + observer := newRangeFeedCTLagObserver() + for i, update := range tc.updates { + signal := observer.observeClosedTimestampUpdate(ctx, update.closedTS, update.now, sv) + require.Equal(t, tc.expected[i], signal, "update %d produced unexpected signal", i) + } + }) + } +} diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_feed b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_feed new file mode 100644 index 000000000000..8b436e55593b --- /dev/null +++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_feed @@ -0,0 +1,102 @@ +echo +---- +---- +(Rangefeed on n3) + + +-- We will exhaust the tokens across all streams while admission is blocked on +-- n3, using 4x1 MiB (deduction, the write itself is small) writes. Then, +-- we will write 1 MiB to the range and wait for the closedTS to fall +-- behind on n3. We expect that the closedTS falling behind will trigger +-- an error that is returned to the mux rangefeed client, which will in turn +-- allows the rangefeed request to be re-routed to another replica. + + +(Sending 1 MiB put request to develop a send queue) + + +(Sent 1 MiB put request) + + +-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 1.0 MiB + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B + + +-- Observe the total tracked tokens per-stream on n1, s3's entries will still +-- be tracked here. +SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8) + FROM crdb_internal.kv_flow_control_handles_v2 + + range_id | store_id | total_tracked_tokens +-----------+----------+----------------------- + 75 | 1 | 0 B + 75 | 2 | 0 B + 75 | 3 | 4.0 MiB + + +-- Per-store tokens available from n1, these should reflect the lack of tokens +-- for s3. +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 0 B | -3.0 MiB | 0 B | -2.0 MiB + + +(Rangefeed moved to n1) + + +-- (Allowing below-raft admission to proceed on n3.) + + +-- Send queue and flow token metrics from n1. All tokens should be returned. +SELECT name, crdb_internal.humanize_bytes(value::INT8) + FROM crdb_internal.node_metrics + WHERE name LIKE '%kvflowcontrol%send_queue%' + AND name != 'kvflowcontrol.send_queue.count' +ORDER BY name ASC; + + kvflowcontrol.send_queue.bytes | 0 B + kvflowcontrol.send_queue.prevent.count | 0 B + kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B + kvflowcontrol.send_queue.scheduled.force_flush | 0 B + kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B + kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B + kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B +SELECT store_id, + crdb_internal.humanize_bytes(available_eval_regular_tokens), + crdb_internal.humanize_bytes(available_eval_elastic_tokens), + crdb_internal.humanize_bytes(available_send_regular_tokens), + crdb_internal.humanize_bytes(available_send_elastic_tokens) + FROM crdb_internal.kv_flow_controller_v2 + ORDER BY store_id ASC; + + store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available +-----------+------------------------+------------------------+------------------------+------------------------- + 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB + 3 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB +---- +---- + +# vim:ft=sql