Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
kv: replan rangefeeds with chronic closed ts lag
Browse files Browse the repository at this point in the history
When a rangefeed's closed timestamp lags behind the current time, any
writes that have occurred in-between will not be emitted. This is
problematic in cases where the lag is significant and chronic, as
consumers (changefeeds, logical data replication, physical cluster
replication) are likewise delayed in their processing. Observing a
rangefeed with a chronic lagging closed timestamp will become relatively
more likely with quorum replication flow control, as entries are
deliberately queued, instead of being sent, to stores which do not have
sufficient send tokens.

This commit (re)introduces the concept of cancelling lagging rangefeeds,
so that they may be replanned and retried on another replica. The other
replica may also have this issue, however there should be at least a
quorum of voting replicas with a similar closed timestamp that would be
suitable.

The replanning on a different replica is handled already by existing
machinery. This commit introduces an observer which generates a signal
indicating that the rangefeed should be cancelled. The signal also
encapsulates the existing logic to nudge a rangefeed as well.

The criteria for cancelling a rangefeed is influenced by two thresholds,
defined as cluster settings:

```
kv.rangefeed.lagging_closed_timestamp_cancel_multiple
(default = 20 x closed ts target duration = 60s)
```

```
kv.rangefeed.lagging_closed_timestamp_cancel_min_lagging_duration
(default = 60s)
```

When a replica's closed timestamp has sustained lag greater than:

```
kv.rangefeed.lagging_closed_timestamp_cancel_multiple * kv.closed_timestamp.target_duration
```

For at least:

```
`kv.rangefeed.lagging_closed_timestamp_cancel_min_lagging_duration`
```

duration, the rangefeed will be cancelled and then re-planned on the
client. This can be visualized in the following diagram, where there is
an initial spike over the lag threshold, which is recovered from so the
rangefeed wouldn't be cancelled. The second drop below the lag
threshold is sustained for greater than the duration threshold, so the
rangefeed is then cancelled for replanning:

```
lag=0          ─────────────────────────────────────────────────────

observed lag   ─────────┐
                        │
                        │
                        │     ┌───────┐
lag threshold  ─────────┼─────┼───────┼──────────────────────────────
                        │     │       └───┐
                        │     │           └─────┐
                        └─────┘                 └──────┐
                                                       └────────────
                                      ◄────────────────────────────►
                                         exceeds duration threshold
```

Note we could also prevent accepting a rangefeed registration if the lag
were sufficient, however the behavior change here applies only to lag
which as been observed to be sustained over time, without historical
data, we cannot apply identical decision logic on registration.

Fixes: cockroachdb#136214
Release note: None
kvoli committed Dec 13, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 40a187c commit be9bee9
Showing 11 changed files with 714 additions and 10 deletions.
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
@@ -1034,6 +1034,7 @@
<tr><td>APPLICATION</td><td>distsender.rangefeed.error_catchup_ranges</td><td>Number of ranges in catchup mode which experienced an error</td><td>Ranges</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rangefeed.local_ranges</td><td>Number of ranges connected to local node.</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rangefeed.restart_ranges</td><td>Number of ranges that were restarted due to transient errors</td><td>Ranges</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rangefeed.retry.lagging_closed_timestamp</td><td>Number of ranges that encountered retryable LAGGING_CLOSED_TIMESTAMP error</td><td>Ranges</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rangefeed.retry.logical_ops_missing</td><td>Number of ranges that encountered retryable LOGICAL_OPS_MISSING error</td><td>Ranges</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rangefeed.retry.manual_range_split</td><td>Number of ranges that encountered retryable MANUAL_RANGE_SPLIT error</td><td>Ranges</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rangefeed.retry.no_leaseholder</td><td>Number of ranges that encountered retryable NO_LEASEHOLDER error</td><td>Ranges</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
16 changes: 13 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
@@ -556,9 +556,19 @@ func handleRangefeedError(
kvpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT,
kvpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING,
kvpb.RangeFeedRetryError_REASON_SLOW_CONSUMER,
kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED:
// Try again with same descriptor. These are transient
// errors that should not show up again.
kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED,
kvpb.RangeFeedRetryError_REASON_LAGGING_CLOSED_TIMESTAMP:
// Try again with same descriptor. These are transient errors that should
// not show up again, or should be retried on another replica which will
// likely not have the same issue.
//
// TODO(kvoli): Mixed-version concerns: this logic can be run on the SQL
// pod in single-process multi-tenancy. It seems that the SQL pod could
// be running a much further behind version and not recognize the error
// codes? Or, even in a non-multi-tenant world, the cluster could be
// mixed-version with the server on a version with the error and the
// client not. Perhaps, we should just-reuse one of the above error codes
// with the same behavior to get around this problem.
return rangefeedErrorInfo{}, nil
case kvpb.RangeFeedRetryError_REASON_RANGE_SPLIT,
kvpb.RangeFeedRetryError_REASON_RANGE_MERGED,
3 changes: 3 additions & 0 deletions pkg/kv/kvpb/errors.proto
Original file line number Diff line number Diff line change
@@ -654,6 +654,9 @@ message RangeFeedRetryError {
REASON_RANGEFEED_CLOSED = 7;
// The range was manually split in two.
REASON_MANUAL_RANGE_SPLIT = 8;
// The replica's closed timestamp is persistently lagging behind the
// current time by greater than the cancellation threshold.
REASON_LAGGING_CLOSED_TIMESTAMP = 9;
}
optional Reason reason = 1 [(gogoproto.nullable) = false];
}
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -70,6 +70,7 @@ go_library(
"replica_raftstorage.go",
"replica_range_lease.go",
"replica_rangefeed.go",
"replica_rangefeed_lag_observer.go",
"replica_rankings.go",
"replica_rate_limit.go",
"replica_read.go",
@@ -345,6 +346,7 @@ go_test(
"replica_raft_overload_test.go",
"replica_raft_test.go",
"replica_raft_truncation_test.go",
"replica_rangefeed_lag_observer_test.go",
"replica_rangefeed_test.go",
"replica_rankings_test.go",
"replica_sideload_test.go",
240 changes: 240 additions & 0 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
@@ -19,9 +19,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
@@ -38,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -6141,6 +6144,243 @@ ORDER BY streams DESC;
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

func TestFlowControlSendQueueRangeFeed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// rangeFeed will create a rangefeed suitable for testing. It will start a
// rangefeed and return a function that can be used to stop it.
rangeFeed := func(
ctx context.Context,
dsI interface{},
sp roachpb.Span,
startFrom hlc.Timestamp,
onValue func(event kvcoord.RangeFeedMessage),
opts ...kvcoord.RangeFeedOption,
) func() {
ds := dsI.(*kvcoord.DistSender)
events := make(chan kvcoord.RangeFeedMessage)
cancelCtx, cancel := context.WithCancel(ctx)

g := ctxgroup.WithContext(cancelCtx)
g.GoCtx(func(ctx context.Context) (err error) {
return ds.RangeFeed(ctx, []kvcoord.SpanTimePair{{Span: sp, StartAfter: startFrom}}, events, opts...)
})
g.GoCtx(func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case ev := <-events:
onValue(ev)
}
}
})

return func() {
cancel()
_ = g.Wait()
}
}
// We will use the below logic later to assert on the node that the rangefeed
// is running on.
var curRangeFeedNodeID atomic.Value
curRangeFeedNodeID.Store(roachpb.NodeID(0))
checkRangeFeedNodeID := func(nodeID roachpb.NodeID, include bool) error {
curNodeID := curRangeFeedNodeID.Load().(roachpb.NodeID)
if curNodeID == 0 {
return errors.New("no rangefeed node yet")
}
if curNodeID != nodeID && include {
return errors.Errorf("expected rangefeed on n%v, got n%v", nodeID, curNodeID)
}
if curNodeID == nodeID && !include {
return errors.Errorf("expected rangefeed not on n%v", nodeID)
}
return nil
}

ctx := context.Background()
const numNodes = 3
settings := cluster.MakeTestingClusterSettings()
kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll)
// We want to exhaust tokens but not overload the test, so we set the limits
// lower (8 and 16 MiB default).
kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 2<<20)
kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 4<<20)
kvserver.RangefeedEnabled.Override(ctx, &settings.SV, true)
// Speed up cancellation, default is 20x the target duration.
kvserver.RangeFeedLaggingCTCancelMultiple.Override(ctx, &settings.SV, 6)
// Also speed up cancellation by shortening the required lag duration,
// default is 60s.
kvserver.RangeFeedLaggingCTCancelDuration.Override(ctx, &settings.SV, 3*time.Second)
// Likewise, the default target duration is 3s, lower it by a factor of 20.
closedts.TargetDuration.Override(ctx, &settings.SV, 150*time.Millisecond)

disableWorkQueueGrantingServers := make([]atomic.Bool, numNodes)
setTokenReturnEnabled := func(enabled bool, serverIdxs ...int) {
for _, serverIdx := range serverIdxs {
disableWorkQueueGrantingServers[serverIdx].Store(!enabled)
}
}

argsPerServer := make(map[int]base.TestServerArgs)
for i := range disableWorkQueueGrantingServers {
disableWorkQueueGrantingServers[i].Store(true)
argsPerServer[i] = base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{
UseOnlyForScratchRanges: true,
OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens {
// Deduct every write as 1 MiB, regardless of how large it
// actually is.
return kvflowcontrol.Tokens(1 << 20)
},
// We want to test the behavior of the send queue, so we want to
// always have up-to-date stats. This ensures that the send queue
// stats are always refreshed on each call to
// RangeController.HandleRaftEventRaftMuLocked.
OverrideAlwaysRefreshSendStreamStats: true,
},
},
AdmissionControl: &admission.TestingKnobs{
DisableWorkQueueFastPath: true,
DisableWorkQueueGranting: func() bool {
idx := i
return disableWorkQueueGrantingServers[idx].Load()
},
},
},
}
}

tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: argsPerServer,
})
defer tc.Stopper().Stop(ctx)
setTokenReturnEnabled(true /* enabled */, 0, 1, 2)

k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...)

h := newFlowControlTestHelper(
t, tc, "flow_control_integration_v2", /* testdata */
kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */
)
h.init(kvflowcontrol.ApplyToAll)
defer h.close("send_queue_range_feed")

desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
h.resetV2TokenMetrics(ctx)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)

ts := tc.Server(2)
span := desc.KeySpan().AsRawSpanWithNoLocals()
ignoreValues := func(event kvcoord.RangeFeedMessage) {}

ctx2, cancel := context.WithCancel(context.Background())
g := ctxgroup.WithContext(ctx2)
defer func() {
cancel()
err := g.Wait()
require.True(t, testutils.IsError(err, "context canceled"))
}()
observer := func(fn kvcoord.ForEachRangeFn) {
g.GoCtx(func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(200 * time.Millisecond):
}
err := fn(func(rfCtx kvcoord.RangeFeedContext, feed kvcoord.PartialRangeFeed) error {
curRangeFeedNodeID.Store(feed.NodeID)
return nil
})
if err != nil {
return err
}
}
})
}

closeFeed := rangeFeed(
ctx,
ts.DistSenderI(),
span,
tc.Server(0).Clock().Now(),
ignoreValues,
kvcoord.WithRangeObserver(observer),
)
defer closeFeed()
testutils.SucceedsSoon(t, func() error { return checkRangeFeedNodeID(3, true /* include */) })
h.comment(`(Rangefeed on n3)`)

h.comment(`
-- We will exhaust the tokens across all streams while admission is blocked on
-- n3, using 4x1 MiB (deduction, the write itself is small) writes. Then,
-- we will write 1 MiB to the range and wait for the closedTS to fall
-- behind on n3. We expect that the closedTS falling behind will trigger
-- an error that is returned to the mux rangefeed client, which will in turn
-- allows the rangefeed request to be re-routed to another replica.`)
// Block admission on n3, while allowing every other node to admit.
setTokenReturnEnabled(true /* enabled */, 0, 1)
setTokenReturnEnabled(false /* enabled */, 2)
// Drain the tokens to n3 by blocking admission and issuing the buffer
// size of writes to the range.
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)

h.comment(`(Sending 1 MiB put request to develop a send queue)`)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request)`)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
testingMkFlowStream(0), testingMkFlowStream(1))
h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.`)
h.query(n1, flowSendQueueQueryStr)
h.comment(`
-- Observe the total tracked tokens per-stream on n1, s3's entries will still
-- be tracked here.`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")
h.comment(`
-- Per-store tokens available from n1, these should reflect the lack of tokens
-- for s3.`)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

testutils.SucceedsSoon(t, func() error {
return checkRangeFeedNodeID(3, false /* include */)
})
newNode := curRangeFeedNodeID.Load().(roachpb.NodeID)
h.comment(fmt.Sprintf(`(Rangefeed moved to n%v)`, newNode))

h.comment(`-- (Allowing below-raft admission to proceed on n3.)`)
setTokenReturnEnabled(true /* enabled */, 2)
h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */)
h.waitForSendQueueSize(ctx, desc.RangeID, 0 /* expSize 0 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue and flow token metrics from n1. All tokens should be returned.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

type flowControlTestHelper struct {
t testing.TB
tc *testcluster.TestCluster
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
@@ -341,6 +341,11 @@ type Replica struct {
msgAppScratchForFlowControl map[roachpb.ReplicaID][]raftpb.Message
// Scratch for populating rac2.RaftEvent.ReplicaSateInfo for flowContrlV2.
replicaStateScratchForFlowControl map[roachpb.ReplicaID]rac2.ReplicaStateInfo

// rangeFeedCTLagObserver is used to observe the closed timestamp lag of
// the replica and generate a signal to potentially nudge or cancel the
// rangefeed based on observed lag.
rangefeedCTLagObserver *rangeFeedCTLagObserver
}

// localMsgs contains a collection of raftpb.Message that target the local
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
@@ -198,6 +198,7 @@ func newUninitializedReplicaWithoutRaftGroup(
// replica GC issues, but is a distraction at the moment.
// r.AmbientContext.AddLogTag("@", fmt.Sprintf("%x", unsafe.Pointer(r)))

r.raftMu.rangefeedCTLagObserver = newRangeFeedCTLagObserver()
r.raftMu.stateLoader = stateloader.Make(rangeID)
r.raftMu.sideloaded = logstore.NewDiskSideloadStorage(
store.cfg.Settings,
26 changes: 19 additions & 7 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
@@ -14,7 +14,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
@@ -868,17 +867,20 @@ func (r *Replica) handleClosedTimestampUpdateRaftMuLocked(

// If the closed timestamp is sufficiently stale, signal that we want an
// update to the leaseholder so that it will eventually begin to progress
// again.
behind := r.Clock().PhysicalTime().Sub(closedTS.GoTime())
slowClosedTSThresh := 5 * closedts.TargetDuration.Get(&r.store.cfg.Settings.SV)
exceedsSlowLagThresh = behind > slowClosedTSThresh
if exceedsSlowLagThresh {
// again. Or, if the closed timestamp has been lagging by more than the
// cancel threshold for a while, cancel the rangefeed.
if signal := r.raftMu.rangefeedCTLagObserver.observeClosedTimestampUpdate(ctx,
closedTS.GoTime(),
r.Clock().PhysicalTime(),
&r.store.cfg.Settings.SV,
); signal.exceedsNudgeLagThreshold {
m := r.store.metrics.RangeFeedMetrics
if m.RangeFeedSlowClosedTimestampLogN.ShouldLog() {
if closedTS.IsEmpty() {
log.Infof(ctx, "RangeFeed closed timestamp is empty")
} else {
log.Infof(ctx, "RangeFeed closed timestamp %s is behind by %s", closedTS, behind)
log.Infof(ctx, "RangeFeed closed timestamp %s is behind by %s (%v)",
closedTS, signal.lag, signal)
}
}

@@ -906,6 +908,16 @@ func (r *Replica) handleClosedTimestampUpdateRaftMuLocked(
defer func() { <-m.RangeFeedSlowClosedTimestampNudgeSem }()
if err := r.ensureClosedTimestampStarted(ctx); err != nil {
log.Infof(ctx, `RangeFeed failed to nudge: %s`, err)
} else if signal.exceedsCancelLagThreshold {
// We have successfully nudged the leaseholder to make progress on
// the closed timestamp. If the lag was already persistently too
// high, we cancel the rangefeed, so that it can be replanned on
// another replica. The hazard this avoids with replanning earlier,
// is that we know the range at least has a leaseholder so we won't
// fail open, thrashing the rangefeed around the cluster.
log.Warningf(ctx,
`RangeFeed is too far behind, cancelling for replanning [%v]`, signal)
r.disconnectRangefeedWithReason(kvpb.RangeFeedRetryError_REASON_LAGGING_CLOSED_TIMESTAMP)
}
return nil, nil
})
142 changes: 142 additions & 0 deletions pkg/kv/kvserver/replica_rangefeed_lag_observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package kvserver

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/redact"
)

// laggingRangeFeedCTNudgeMultiple is the multiple of the closed timestamp target
// duration that a rangefeed's closed timestamp can lag behind the current time
// before the rangefeed is nudged to catch up.
const laggingRangeFeedCTNudgeMultiple = 5

// RangeFeedLaggingCTCancelMultiple is the multiple of the closed timestamp
// target duration that a rangefeed's closed timestamp can lag behind the
// current time before the rangefeed is cancelled, if the duration threshold is
// also met, see RangeFeedLaggingCTCancelDuration. When set to 0, cancelling is
// disabled.
var RangeFeedLaggingCTCancelMultiple = settings.RegisterIntSetting(
settings.SystemOnly,
"kv.rangefeed.lagging_closed_timestamp_cancel_multiple",
"if a range's closed timestamp is more than this multiple of the "+
"`kv.closed_timestamp.target_duration` behind the current time,"+
"for at least `kv.rangefeed.lagging_closed_timestamp_cancel_min_lagging_duration`"+
", cancel the rangefeed, when set to 0, canceling is disabled",
20, /* 20x closed ts target, currently default 60s */
// NB: We don't want users setting a value incongruent with the closed
// timestamp target duration, as that would lead to thrashing of rangefeeds.
// Also, the nudge multiple is a constant above, so we don't want users
// setting a lower value than that, as nudging is a prerequisite for
// cancelling.
settings.IntInRangeOrZeroDisable(laggingRangeFeedCTNudgeMultiple, 10_000),
)

// RangeFeedLaggingCTCancelDuration is the duration threshold for lagging
// rangefeeds to be cancelled when the closed timestamp is lagging behind the
// current time by more than:
//
// `kv.rangefeed.lagging_closed_timestamp_cancel_multiple` *
// `kv.closed_timestamp.target_duration`
//
// e.g., if the closed timestamp target duration is 3s (current default) and
// the multiple is 2, then the lagging rangefeed will be canceled if the closed
// timestamp is more than 6s behind the current time, for at least
// laggingRangeFeedCTCancelDurationThreshold:
//
// closed_ts = -7s (relative to now)
// target_closed_ts = -3s
// multiple = 2.0
// lagging_duration_threshold = 60s
//
// In the above example, the rangefeed will be canceled if this state is
// sustained for at least 60s. Visually (and abstractly) it looks like this:
//
// lag=0 ─────────────────────────────────────────────────────
//
// observed lag ─────────┐
// │
// │
// │ ┌───────┐
// lag threshold ─────────┼─────┼───────┼──────────────────────────────
// │ │ └───┐
// │ │ └─────┐
// └─────┘ └──────┐
// └────────────
// ◄────────────────────────────►
// exceeds duration threshold
//
// Where time is moving from left to right, and the y-axis represents the
// closed timestamp lag relative to the current time.
var RangeFeedLaggingCTCancelDuration = settings.RegisterDurationSetting(
settings.SystemOnly,
"kv.rangefeed.lagging_closed_timestamp_cancel_min_lagging_duration",
"if a range's closed timestamp is more than "+
"`kv.rangefeed.lagging_closed_timestamp_cancel_multiple` of the "+
"`kv.closed_timestamp.target_duration` behind the current time,"+
"for at least this duration, cancel the rangefeed",
time.Minute,
)

type rangeFeedCTLagObserver struct {
exceedsCancelLagStartTime time.Time
}

func newRangeFeedCTLagObserver() *rangeFeedCTLagObserver {
return &rangeFeedCTLagObserver{}
}

func (r *rangeFeedCTLagObserver) observeClosedTimestampUpdate(
ctx context.Context, closedTS, now time.Time, sv *settings.Values,
) rangeFeedCTLagSignal {
signal := rangeFeedCTLagSignal{targetLag: closedts.TargetDuration.Get(sv)}
nudgeLagThreshold := signal.targetLag * laggingRangeFeedCTNudgeMultiple
cancelLagThreshold := signal.targetLag * time.Duration(RangeFeedLaggingCTCancelMultiple.Get(sv))
cancelLagMinDuration := RangeFeedLaggingCTCancelDuration.Get(sv)

signal.lag = now.Sub(closedTS)
if signal.lag <= cancelLagThreshold {
// The closed timestamp is no longer lagging behind the current time by
// more than the cancel threshold, so reset the start time, as we only want
// to signal on sustained lag above the threshold.
r.exceedsCancelLagStartTime = time.Time{}
} else if r.exceedsCancelLagStartTime.IsZero() {
r.exceedsCancelLagStartTime = now
}
signal.exceedsNudgeLagThreshold = signal.lag > nudgeLagThreshold
signal.exceedsCancelLagThreshold = !r.exceedsCancelLagStartTime.IsZero() &&
now.Sub(r.exceedsCancelLagStartTime) > cancelLagMinDuration
return signal
}

type rangeFeedCTLagSignal struct {
lag time.Duration
targetLag time.Duration
exceedsNudgeLagThreshold bool
exceedsCancelLagThreshold bool
}

func (rfls rangeFeedCTLagSignal) String() string {
return redact.StringWithoutMarkers(rfls)
}

var _ redact.SafeFormatter = rangeFeedCTLagSignal{}

// SafeFormat implements the redact.SafeFormatter interface.
func (rfls rangeFeedCTLagSignal) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf(
"behind=%v target=%v nudge=%t cancel=%t",
rfls.lag,
rfls.targetLag,
rfls.exceedsNudgeLagThreshold,
rfls.exceedsCancelLagThreshold,
)
}
186 changes: 186 additions & 0 deletions pkg/kv/kvserver/replica_rangefeed_lag_observer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package kvserver

import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

// TestObserveClosedTimestampUpdate asserts that the expected signal is
// generated for each closed timestamp observation over time.
func TestObserveClosedTimestampUpdate(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
sv := &st.SV

baseTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
targetDuration := 3 * time.Second
cancelMultiple := int64(2)
cancelMinDuration := 60 * time.Second

RangeFeedLaggingCTCancelMultiple.Override(ctx, sv, cancelMultiple)
RangeFeedLaggingCTCancelDuration.Override(ctx, sv, cancelMinDuration)

tests := []struct {
name string
updates []struct {
closedTS time.Time
now time.Time
}
expected []rangeFeedCTLagSignal
}{
{
name: "no lag",
updates: []struct {
closedTS time.Time
now time.Time
}{
{
closedTS: baseTime,
now: baseTime.Add(targetDuration),
},
},
expected: []rangeFeedCTLagSignal{
{
lag: targetDuration,
targetLag: targetDuration,
exceedsNudgeLagThreshold: false,
exceedsCancelLagThreshold: false,
},
},
},
{
name: "exceeds nudge threshold",
updates: []struct {
closedTS time.Time
now time.Time
}{
{
closedTS: baseTime,
now: baseTime.Add(targetDuration * (laggingRangeFeedCTNudgeMultiple + 1)),
},
},
expected: []rangeFeedCTLagSignal{
{
lag: targetDuration * (laggingRangeFeedCTNudgeMultiple + 1),
targetLag: targetDuration,
exceedsNudgeLagThreshold: true,
exceedsCancelLagThreshold: false,
},
},
},
{
name: "exceeds cancel threshold but not duration",
updates: []struct {
closedTS time.Time
now time.Time
}{
{
closedTS: baseTime,
now: baseTime.Add(targetDuration * (laggingRangeFeedCTNudgeMultiple + 1)),
},
{
closedTS: baseTime,
now: baseTime.Add(targetDuration*(laggingRangeFeedCTNudgeMultiple+1) + cancelMinDuration/2),
},
},
expected: []rangeFeedCTLagSignal{
{
lag: targetDuration * (laggingRangeFeedCTNudgeMultiple + 1),
targetLag: targetDuration,
exceedsNudgeLagThreshold: true,
exceedsCancelLagThreshold: false,
},
{
lag: targetDuration*(laggingRangeFeedCTNudgeMultiple+1) + cancelMinDuration/2,
targetLag: targetDuration,
exceedsNudgeLagThreshold: true,
exceedsCancelLagThreshold: false,
},
},
},
{
name: "exceeds cancel threshold and duration",
updates: []struct {
closedTS time.Time
now time.Time
}{
{
closedTS: baseTime,
now: baseTime.Add(targetDuration * (laggingRangeFeedCTNudgeMultiple + 1)),
},
{
closedTS: baseTime,
now: baseTime.Add(targetDuration*(laggingRangeFeedCTNudgeMultiple+1) + cancelMinDuration + time.Second),
},
},
expected: []rangeFeedCTLagSignal{
{
lag: targetDuration * (laggingRangeFeedCTNudgeMultiple + 1),
targetLag: targetDuration,
exceedsNudgeLagThreshold: true,
exceedsCancelLagThreshold: false,
},
{
lag: targetDuration*(laggingRangeFeedCTNudgeMultiple+1) + cancelMinDuration + time.Second,
targetLag: targetDuration,
exceedsNudgeLagThreshold: true,
exceedsCancelLagThreshold: true,
},
},
},
{
name: "recovers from lag",
updates: []struct {
closedTS time.Time
now time.Time
}{
{
closedTS: baseTime,
now: baseTime.Add(targetDuration * (laggingRangeFeedCTNudgeMultiple + 1)),
},
{
closedTS: baseTime.Add(targetDuration * (laggingRangeFeedCTNudgeMultiple + 1)),
now: baseTime.Add(targetDuration*(laggingRangeFeedCTNudgeMultiple+1) + targetDuration),
},
},
expected: []rangeFeedCTLagSignal{
{
lag: targetDuration * (laggingRangeFeedCTNudgeMultiple + 1),
targetLag: targetDuration,
exceedsNudgeLagThreshold: true,
exceedsCancelLagThreshold: false,
},
{
lag: targetDuration,
targetLag: targetDuration,
exceedsNudgeLagThreshold: false,
exceedsCancelLagThreshold: false,
},
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
observer := newRangeFeedCTLagObserver()
for i, update := range tc.updates {
signal := observer.observeClosedTimestampUpdate(ctx, update.closedTS, update.now, sv)
require.Equal(t, tc.expected[i], signal, "update %d produced unexpected signal", i)
}
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
echo
----
----
(Rangefeed on n3)


-- We will exhaust the tokens across all streams while admission is blocked on
-- n3, using 4x1 MiB (deduction, the write itself is small) writes. Then,
-- we will write 1 MiB to the range and wait for the closedTS to fall
-- behind on n3. We expect that the closedTS falling behind will trigger
-- an error that is returned to the mux rangefeed client, which will in turn
-- allows the rangefeed request to be re-routed to another replica.


(Sending 1 MiB put request to develop a send queue)


(Sent 1 MiB put request)


-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.
SELECT name, crdb_internal.humanize_bytes(value::INT8)
FROM crdb_internal.node_metrics
WHERE name LIKE '%kvflowcontrol%send_queue%'
AND name != 'kvflowcontrol.send_queue.count'
ORDER BY name ASC;

kvflowcontrol.send_queue.bytes | 1.0 MiB
kvflowcontrol.send_queue.prevent.count | 0 B
kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B
kvflowcontrol.send_queue.scheduled.force_flush | 0 B
kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B
kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B
kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B


-- Observe the total tracked tokens per-stream on n1, s3's entries will still
-- be tracked here.
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2

range_id | store_id | total_tracked_tokens
-----------+----------+-----------------------
75 | 1 | 0 B
75 | 2 | 0 B
75 | 3 | 4.0 MiB


-- Per-store tokens available from n1, these should reflect the lack of tokens
-- for s3.
SELECT store_id,
crdb_internal.humanize_bytes(available_eval_regular_tokens),
crdb_internal.humanize_bytes(available_eval_elastic_tokens),
crdb_internal.humanize_bytes(available_send_regular_tokens),
crdb_internal.humanize_bytes(available_send_elastic_tokens)
FROM crdb_internal.kv_flow_controller_v2
ORDER BY store_id ASC;

store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available
-----------+------------------------+------------------------+------------------------+-------------------------
1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB
2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB
3 | 0 B | -3.0 MiB | 0 B | -2.0 MiB


(Rangefeed moved to n1)


-- (Allowing below-raft admission to proceed on n3.)


-- Send queue and flow token metrics from n1. All tokens should be returned.
SELECT name, crdb_internal.humanize_bytes(value::INT8)
FROM crdb_internal.node_metrics
WHERE name LIKE '%kvflowcontrol%send_queue%'
AND name != 'kvflowcontrol.send_queue.count'
ORDER BY name ASC;

kvflowcontrol.send_queue.bytes | 0 B
kvflowcontrol.send_queue.prevent.count | 0 B
kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B
kvflowcontrol.send_queue.scheduled.force_flush | 0 B
kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B
kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B
kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B
SELECT store_id,
crdb_internal.humanize_bytes(available_eval_regular_tokens),
crdb_internal.humanize_bytes(available_eval_elastic_tokens),
crdb_internal.humanize_bytes(available_send_regular_tokens),
crdb_internal.humanize_bytes(available_send_elastic_tokens)
FROM crdb_internal.kv_flow_controller_v2
ORDER BY store_id ASC;

store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available
-----------+------------------------+------------------------+------------------------+-------------------------
1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB
2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB
3 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB
----
----

# vim:ft=sql

0 comments on commit be9bee9

Please sign in to comment.