Skip to content

Commit

Permalink
kv: replan rangefeeds with chronic closed ts lag
Browse files Browse the repository at this point in the history
When a rangefeed's closed timestamp lags behind the current time, any
writes that have occurred in-between will not be emitted. This is
problematic in cases where the lag is significant and chronic, as
consumers (changefeeds, logical data replication, physical cluster
replication) are likewise delayed in their processing. Observing a
rangefeed with a chronic lagging closed timestamp will become relatively
more likely with quorum replication flow control, as entries are
deliberately queued, instead of being sent, to stores which do not have
sufficient send tokens.

This commit (re)introduces the concept of cancelling lagging rangefeeds,
so that they may be replanned and retried on another replica. The other
replica may also have this issue, however there should be at least a
quorum of voting replicas with a similar closed timestamp that would be
suitable.

The replanning on a different replica is handled already by existing
machinery. This commit introduces an observer which generates a signal
indicating that the rangefeed should be cancelled. The signal also
encapsulates the existing logic to nudge a rangefeed as well.

The criteria for cancelling a rangefeed is influenced by two thresholds,
defined as cluster settings:

```
kv.rangefeed.lagging_closed_timestamp_cancel_multiple
(default = 20 x closed ts target duration = 60s)
```

```
kv.rangefeed.lagging_closed_timestamp_cancel_min_lagging_duration
(default = 60s)
```

When a replica's closed timestamp has sustained lag greater than:

```
kv.rangefeed.lagging_closed_timestamp_cancel_multiple * kv.closed_timestamp.target_duration
```

For at least:

```
`kv.rangefeed.lagging_closed_timestamp_cancel_min_lagging_duration`
```

duration, the rangefeed will be cancelled and then re-planned on the
client. This can be visualized in the following diagram, where there is
an initial spike over the lag threshold, which is recovered from so the
rangefeed wouldn't be cancelled. The second drop below the lag
threshold is sustained for greater than the duration threshold, so the
rangefeed is then cancelled for replanning:

```
lag=0          ─────────────────────────────────────────────────────

observed lag   ─────────┐
                        │
                        │
                        │     ┌───────┐
lag threshold  ─────────┼─────┼───────┼──────────────────────────────
                        │     │       └───┐
                        │     │           └─────┐
                        └─────┘                 └──────┐
                                                       └────────────
                                      ◄────────────────────────────►
                                         exceeds duration threshold
```

Note we could also prevent accepting a rangefeed registration if the lag
were sufficient, however the behavior change here applies only to lag
which as been observed to be sustained over time, without historical
data, we cannot apply identical decision logic on registration.

Fixes: cockroachdb#136214
Release note: None
  • Loading branch information
kvoli committed Dec 18, 2024
1 parent f61084f commit e5899f1
Show file tree
Hide file tree
Showing 9 changed files with 705 additions and 9 deletions.
5 changes: 3 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,8 +557,9 @@ func handleRangefeedError(
kvpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING,
kvpb.RangeFeedRetryError_REASON_SLOW_CONSUMER,
kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED:
// Try again with same descriptor. These are transient
// errors that should not show up again.
// Try again with same descriptor. These are transient errors that should
// not show up again, or should be retried on another replica which will
// likely not have the same issue.
return rangefeedErrorInfo{}, nil
case kvpb.RangeFeedRetryError_REASON_RANGE_SPLIT,
kvpb.RangeFeedRetryError_REASON_RANGE_MERGED,
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ go_library(
"replica_raftstorage.go",
"replica_range_lease.go",
"replica_rangefeed.go",
"replica_rangefeed_lag_observer.go",
"replica_rankings.go",
"replica_rate_limit.go",
"replica_read.go",
Expand Down Expand Up @@ -345,6 +346,7 @@ go_test(
"replica_raft_overload_test.go",
"replica_raft_test.go",
"replica_raft_truncation_test.go",
"replica_rangefeed_lag_observer_test.go",
"replica_rangefeed_test.go",
"replica_rankings_test.go",
"replica_sideload_test.go",
Expand Down
240 changes: 240 additions & 0 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
Expand All @@ -38,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -6141,6 +6144,243 @@ ORDER BY streams DESC;
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

func TestFlowControlSendQueueRangeFeed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// rangeFeed will create a rangefeed suitable for testing. It will start a
// rangefeed and return a function that can be used to stop it.
rangeFeed := func(
ctx context.Context,
dsI interface{},
sp roachpb.Span,
startFrom hlc.Timestamp,
onValue func(event kvcoord.RangeFeedMessage),
opts ...kvcoord.RangeFeedOption,
) func() {
ds := dsI.(*kvcoord.DistSender)
events := make(chan kvcoord.RangeFeedMessage)
cancelCtx, cancel := context.WithCancel(ctx)

g := ctxgroup.WithContext(cancelCtx)
g.GoCtx(func(ctx context.Context) (err error) {
return ds.RangeFeed(ctx, []kvcoord.SpanTimePair{{Span: sp, StartAfter: startFrom}}, events, opts...)
})
g.GoCtx(func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case ev := <-events:
onValue(ev)
}
}
})

return func() {
cancel()
_ = g.Wait()
}
}
// We will use the below logic later to assert on the node that the rangefeed
// is running on.
var curRangeFeedNodeID atomic.Value
curRangeFeedNodeID.Store(roachpb.NodeID(0))
checkRangeFeedNodeID := func(nodeID roachpb.NodeID, include bool) error {
curNodeID := curRangeFeedNodeID.Load().(roachpb.NodeID)
if curNodeID == 0 {
return errors.New("no rangefeed node yet")
}
if curNodeID != nodeID && include {
return errors.Errorf("expected rangefeed on n%v, got n%v", nodeID, curNodeID)
}
if curNodeID == nodeID && !include {
return errors.Errorf("expected rangefeed not on n%v", nodeID)
}
return nil
}

ctx := context.Background()
const numNodes = 3
settings := cluster.MakeTestingClusterSettings()
kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll)
// We want to exhaust tokens but not overload the test, so we set the limits
// lower (8 and 16 MiB default).
kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 2<<20)
kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 4<<20)
kvserver.RangefeedEnabled.Override(ctx, &settings.SV, true)
// Speed up cancellation, default is 20x the target duration.
kvserver.RangeFeedLaggingCTCancelMultiple.Override(ctx, &settings.SV, 6)
// Also speed up cancellation by shortening the required lag duration,
// default is 60s.
kvserver.RangeFeedLaggingCTCancelDuration.Override(ctx, &settings.SV, 3*time.Second)
// Likewise, the default target duration is 3s, lower it by a factor of 20.
closedts.TargetDuration.Override(ctx, &settings.SV, 150*time.Millisecond)

disableWorkQueueGrantingServers := make([]atomic.Bool, numNodes)
setTokenReturnEnabled := func(enabled bool, serverIdxs ...int) {
for _, serverIdx := range serverIdxs {
disableWorkQueueGrantingServers[serverIdx].Store(!enabled)
}
}

argsPerServer := make(map[int]base.TestServerArgs)
for i := range disableWorkQueueGrantingServers {
disableWorkQueueGrantingServers[i].Store(true)
argsPerServer[i] = base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{
UseOnlyForScratchRanges: true,
OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens {
// Deduct every write as 1 MiB, regardless of how large it
// actually is.
return kvflowcontrol.Tokens(1 << 20)
},
// We want to test the behavior of the send queue, so we want to
// always have up-to-date stats. This ensures that the send queue
// stats are always refreshed on each call to
// RangeController.HandleRaftEventRaftMuLocked.
OverrideAlwaysRefreshSendStreamStats: true,
},
},
AdmissionControl: &admission.TestingKnobs{
DisableWorkQueueFastPath: true,
DisableWorkQueueGranting: func() bool {
idx := i
return disableWorkQueueGrantingServers[idx].Load()
},
},
},
}
}

tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: argsPerServer,
})
defer tc.Stopper().Stop(ctx)
setTokenReturnEnabled(true /* enabled */, 0, 1, 2)

k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...)

h := newFlowControlTestHelper(
t, tc, "flow_control_integration_v2", /* testdata */
kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */
)
h.init(kvflowcontrol.ApplyToAll)
defer h.close("send_queue_range_feed")

desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
h.resetV2TokenMetrics(ctx)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)

ts := tc.Server(2)
span := desc.KeySpan().AsRawSpanWithNoLocals()
ignoreValues := func(event kvcoord.RangeFeedMessage) {}

ctx2, cancel := context.WithCancel(context.Background())
g := ctxgroup.WithContext(ctx2)
defer func() {
cancel()
err := g.Wait()
require.True(t, testutils.IsError(err, "context canceled"))
}()
observer := func(fn kvcoord.ForEachRangeFn) {
g.GoCtx(func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(200 * time.Millisecond):
}
err := fn(func(rfCtx kvcoord.RangeFeedContext, feed kvcoord.PartialRangeFeed) error {
curRangeFeedNodeID.Store(feed.NodeID)
return nil
})
if err != nil {
return err
}
}
})
}

closeFeed := rangeFeed(
ctx,
ts.DistSenderI(),
span,
tc.Server(0).Clock().Now(),
ignoreValues,
kvcoord.WithRangeObserver(observer),
)
defer closeFeed()
testutils.SucceedsSoon(t, func() error { return checkRangeFeedNodeID(3, true /* include */) })
h.comment(`(Rangefeed on n3)`)

h.comment(`
-- We will exhaust the tokens across all streams while admission is blocked on
-- n3, using 4x1 MiB (deduction, the write itself is small) writes. Then,
-- we will write 1 MiB to the range and wait for the closedTS to fall
-- behind on n3. We expect that the closedTS falling behind will trigger
-- an error that is returned to the mux rangefeed client, which will in turn
-- allows the rangefeed request to be re-routed to another replica.`)
// Block admission on n3, while allowing every other node to admit.
setTokenReturnEnabled(true /* enabled */, 0, 1)
setTokenReturnEnabled(false /* enabled */, 2)
// Drain the tokens to n3 by blocking admission and issuing the buffer
// size of writes to the range.
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)

h.comment(`(Sending 1 MiB put request to develop a send queue)`)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request)`)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
testingMkFlowStream(0), testingMkFlowStream(1))
h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.`)
h.query(n1, flowSendQueueQueryStr)
h.comment(`
-- Observe the total tracked tokens per-stream on n1, s3's entries will still
-- be tracked here.`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")
h.comment(`
-- Per-store tokens available from n1, these should reflect the lack of tokens
-- for s3.`)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

testutils.SucceedsSoon(t, func() error {
return checkRangeFeedNodeID(3, false /* include */)
})
newNode := curRangeFeedNodeID.Load().(roachpb.NodeID)
h.comment(fmt.Sprintf(`(Rangefeed moved to n%v)`, newNode))

h.comment(`-- (Allowing below-raft admission to proceed on n3.)`)
setTokenReturnEnabled(true /* enabled */, 2)
h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */)
h.waitForSendQueueSize(ctx, desc.RangeID, 0 /* expSize 0 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue and flow token metrics from n1. All tokens should be returned.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

type flowControlTestHelper struct {
t testing.TB
tc *testcluster.TestCluster
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,11 @@ type Replica struct {
msgAppScratchForFlowControl map[roachpb.ReplicaID][]raftpb.Message
// Scratch for populating rac2.RaftEvent.ReplicaSateInfo for flowContrlV2.
replicaStateScratchForFlowControl map[roachpb.ReplicaID]rac2.ReplicaStateInfo

// rangeFeedCTLagObserver is used to observe the closed timestamp lag of
// the replica and generate a signal to potentially nudge or cancel the
// rangefeed based on observed lag.
rangefeedCTLagObserver *rangeFeedCTLagObserver
}

// localMsgs contains a collection of raftpb.Message that target the local
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func newUninitializedReplicaWithoutRaftGroup(
// replica GC issues, but is a distraction at the moment.
// r.AmbientContext.AddLogTag("@", fmt.Sprintf("%x", unsafe.Pointer(r)))

r.raftMu.rangefeedCTLagObserver = newRangeFeedCTLagObserver()
r.raftMu.stateLoader = stateloader.Make(rangeID)
r.raftMu.sideloaded = logstore.NewDiskSideloadStorage(
store.cfg.Settings,
Expand Down
31 changes: 24 additions & 7 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
Expand Down Expand Up @@ -868,17 +867,20 @@ func (r *Replica) handleClosedTimestampUpdateRaftMuLocked(

// If the closed timestamp is sufficiently stale, signal that we want an
// update to the leaseholder so that it will eventually begin to progress
// again.
behind := r.Clock().PhysicalTime().Sub(closedTS.GoTime())
slowClosedTSThresh := 5 * closedts.TargetDuration.Get(&r.store.cfg.Settings.SV)
exceedsSlowLagThresh = behind > slowClosedTSThresh
if exceedsSlowLagThresh {
// again. Or, if the closed timestamp has been lagging by more than the
// cancel threshold for a while, cancel the rangefeed.
if signal := r.raftMu.rangefeedCTLagObserver.observeClosedTimestampUpdate(ctx,
closedTS.GoTime(),
r.Clock().PhysicalTime(),
&r.store.cfg.Settings.SV,
); signal.exceedsNudgeLagThreshold {
m := r.store.metrics.RangeFeedMetrics
if m.RangeFeedSlowClosedTimestampLogN.ShouldLog() {
if closedTS.IsEmpty() {
log.Infof(ctx, "RangeFeed closed timestamp is empty")
} else {
log.Infof(ctx, "RangeFeed closed timestamp %s is behind by %s", closedTS, behind)
log.Infof(ctx, "RangeFeed closed timestamp %s is behind by %s (%v)",
closedTS, signal.lag, signal)
}
}

Expand Down Expand Up @@ -906,6 +908,21 @@ func (r *Replica) handleClosedTimestampUpdateRaftMuLocked(
defer func() { <-m.RangeFeedSlowClosedTimestampNudgeSem }()
if err := r.ensureClosedTimestampStarted(ctx); err != nil {
log.Infof(ctx, `RangeFeed failed to nudge: %s`, err)
} else if signal.exceedsCancelLagThreshold {
// We have successfully nudged the leaseholder to make progress on
// the closed timestamp. If the lag was already persistently too
// high, we cancel the rangefeed, so that it can be replanned on
// another replica. The hazard this avoids with replanning earlier,
// is that we know the range at least has a leaseholder so we won't
// fail open, thrashing the rangefeed around the cluster.
//
// Note that we use the REASON_RANGEFEED_CLOSED, as adding a new
// reason would require a new version of the proto, which would
// prohibit us from cancelling the rangefeed in the current version,
// due to mixed version compatibility.
log.Warningf(ctx,
`RangeFeed is too far behind, cancelling for replanning [%v]`, signal)
r.disconnectRangefeedWithReason(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED)
}
return nil, nil
})
Expand Down
Loading

0 comments on commit e5899f1

Please sign in to comment.