Skip to content

Commit

Permalink
Merge pull request #8 from mohini-crl/241216.rac-cancel-lagging-range…
Browse files Browse the repository at this point in the history
…feed

kv: replan rangefeeds with chronic closed ts lag
  • Loading branch information
mohini-crl authored Dec 19, 2024
2 parents 92400bc + 2f7df22 commit 43ebaa0
Show file tree
Hide file tree
Showing 9 changed files with 705 additions and 9 deletions.
5 changes: 3 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,8 +557,9 @@ func handleRangefeedError(
kvpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING,
kvpb.RangeFeedRetryError_REASON_SLOW_CONSUMER,
kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED:
// Try again with same descriptor. These are transient
// errors that should not show up again.
// Try again with same descriptor. These are transient errors that should
// not show up again, or should be retried on another replica which will
// likely not have the same issue.
return rangefeedErrorInfo{}, nil
case kvpb.RangeFeedRetryError_REASON_RANGE_SPLIT,
kvpb.RangeFeedRetryError_REASON_RANGE_MERGED,
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ go_library(
"replica_raftstorage.go",
"replica_range_lease.go",
"replica_rangefeed.go",
"replica_rangefeed_lag_observer.go",
"replica_rankings.go",
"replica_rate_limit.go",
"replica_read.go",
Expand Down Expand Up @@ -345,6 +346,7 @@ go_test(
"replica_raft_overload_test.go",
"replica_raft_test.go",
"replica_raft_truncation_test.go",
"replica_rangefeed_lag_observer_test.go",
"replica_rangefeed_test.go",
"replica_rankings_test.go",
"replica_sideload_test.go",
Expand Down
240 changes: 240 additions & 0 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
Expand All @@ -38,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -6141,6 +6144,243 @@ ORDER BY streams DESC;
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

func TestFlowControlSendQueueRangeFeed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// rangeFeed will create a rangefeed suitable for testing. It will start a
// rangefeed and return a function that can be used to stop it.
rangeFeed := func(
ctx context.Context,
dsI interface{},
sp roachpb.Span,
startFrom hlc.Timestamp,
onValue func(event kvcoord.RangeFeedMessage),
opts ...kvcoord.RangeFeedOption,
) func() {
ds := dsI.(*kvcoord.DistSender)
events := make(chan kvcoord.RangeFeedMessage)
cancelCtx, cancel := context.WithCancel(ctx)

g := ctxgroup.WithContext(cancelCtx)
g.GoCtx(func(ctx context.Context) (err error) {
return ds.RangeFeed(ctx, []kvcoord.SpanTimePair{{Span: sp, StartAfter: startFrom}}, events, opts...)
})
g.GoCtx(func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case ev := <-events:
onValue(ev)
}
}
})

return func() {
cancel()
_ = g.Wait()
}
}
// We will use the below logic later to assert on the node that the rangefeed
// is running on.
var curRangeFeedNodeID atomic.Value
curRangeFeedNodeID.Store(roachpb.NodeID(0))
checkRangeFeedNodeID := func(nodeID roachpb.NodeID, include bool) error {
curNodeID := curRangeFeedNodeID.Load().(roachpb.NodeID)
if curNodeID == 0 {
return errors.New("no rangefeed node yet")
}
if curNodeID != nodeID && include {
return errors.Errorf("expected rangefeed on n%v, got n%v", nodeID, curNodeID)
}
if curNodeID == nodeID && !include {
return errors.Errorf("expected rangefeed not on n%v", nodeID)
}
return nil
}

ctx := context.Background()
const numNodes = 3
settings := cluster.MakeTestingClusterSettings()
kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll)
// We want to exhaust tokens but not overload the test, so we set the limits
// lower (8 and 16 MiB default).
kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 2<<20)
kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 4<<20)
kvserver.RangefeedEnabled.Override(ctx, &settings.SV, true)
// Speed up cancellation, default is 20x the target duration.
kvserver.RangeFeedLaggingCTCancelMultiple.Override(ctx, &settings.SV, 6)
// Also speed up cancellation by shortening the required lag duration,
// default is 60s.
kvserver.RangeFeedLaggingCTCancelDuration.Override(ctx, &settings.SV, 3*time.Second)
// Likewise, the default target duration is 3s, lower it by a factor of 20.
closedts.TargetDuration.Override(ctx, &settings.SV, 150*time.Millisecond)

disableWorkQueueGrantingServers := make([]atomic.Bool, numNodes)
setTokenReturnEnabled := func(enabled bool, serverIdxs ...int) {
for _, serverIdx := range serverIdxs {
disableWorkQueueGrantingServers[serverIdx].Store(!enabled)
}
}

argsPerServer := make(map[int]base.TestServerArgs)
for i := range disableWorkQueueGrantingServers {
disableWorkQueueGrantingServers[i].Store(true)
argsPerServer[i] = base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{
UseOnlyForScratchRanges: true,
OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens {
// Deduct every write as 1 MiB, regardless of how large it
// actually is.
return kvflowcontrol.Tokens(1 << 20)
},
// We want to test the behavior of the send queue, so we want to
// always have up-to-date stats. This ensures that the send queue
// stats are always refreshed on each call to
// RangeController.HandleRaftEventRaftMuLocked.
OverrideAlwaysRefreshSendStreamStats: true,
},
},
AdmissionControl: &admission.TestingKnobs{
DisableWorkQueueFastPath: true,
DisableWorkQueueGranting: func() bool {
idx := i
return disableWorkQueueGrantingServers[idx].Load()
},
},
},
}
}

tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: argsPerServer,
})
defer tc.Stopper().Stop(ctx)
setTokenReturnEnabled(true /* enabled */, 0, 1, 2)

k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...)

h := newFlowControlTestHelper(
t, tc, "flow_control_integration_v2", /* testdata */
kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */
)
h.init(kvflowcontrol.ApplyToAll)
defer h.close("send_queue_range_feed")

desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
h.resetV2TokenMetrics(ctx)
h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)

ts := tc.Server(2)
span := desc.KeySpan().AsRawSpanWithNoLocals()
ignoreValues := func(event kvcoord.RangeFeedMessage) {}

ctx2, cancel := context.WithCancel(context.Background())
g := ctxgroup.WithContext(ctx2)
defer func() {
cancel()
err := g.Wait()
require.True(t, testutils.IsError(err, "context canceled"))
}()
observer := func(fn kvcoord.ForEachRangeFn) {
g.GoCtx(func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(200 * time.Millisecond):
}
err := fn(func(rfCtx kvcoord.RangeFeedContext, feed kvcoord.PartialRangeFeed) error {
curRangeFeedNodeID.Store(feed.NodeID)
return nil
})
if err != nil {
return err
}
}
})
}

closeFeed := rangeFeed(
ctx,
ts.DistSenderI(),
span,
tc.Server(0).Clock().Now(),
ignoreValues,
kvcoord.WithRangeObserver(observer),
)
defer closeFeed()
testutils.SucceedsSoon(t, func() error { return checkRangeFeedNodeID(3, true /* include */) })
h.comment(`(Rangefeed on n3)`)

h.comment(`
-- We will exhaust the tokens across all streams while admission is blocked on
-- n3, using 4x1 MiB (deduction, the write itself is small) writes. Then,
-- we will write 1 MiB to the range and wait for the closedTS to fall
-- behind on n3. We expect that the closedTS falling behind will trigger
-- an error that is returned to the mux rangefeed client, which will in turn
-- allows the rangefeed request to be re-routed to another replica.`)
// Block admission on n3, while allowing every other node to admit.
setTokenReturnEnabled(true /* enabled */, 0, 1)
setTokenReturnEnabled(false /* enabled */, 2)
// Drain the tokens to n3 by blocking admission and issuing the buffer
// size of writes to the range.
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)

h.comment(`(Sending 1 MiB put request to develop a send queue)`)
h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request)`)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
testingMkFlowStream(0), testingMkFlowStream(1))
h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.`)
h.query(n1, flowSendQueueQueryStr)
h.comment(`
-- Observe the total tracked tokens per-stream on n1, s3's entries will still
-- be tracked here.`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")
h.comment(`
-- Per-store tokens available from n1, these should reflect the lack of tokens
-- for s3.`)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

testutils.SucceedsSoon(t, func() error {
return checkRangeFeedNodeID(3, false /* include */)
})
newNode := curRangeFeedNodeID.Load().(roachpb.NodeID)
h.comment(fmt.Sprintf(`(Rangefeed moved to n%v)`, newNode))

h.comment(`-- (Allowing below-raft admission to proceed on n3.)`)
setTokenReturnEnabled(true /* enabled */, 2)
h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */)
h.waitForSendQueueSize(ctx, desc.RangeID, 0 /* expSize 0 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue and flow token metrics from n1. All tokens should be returned.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

type flowControlTestHelper struct {
t testing.TB
tc *testcluster.TestCluster
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,11 @@ type Replica struct {
msgAppScratchForFlowControl map[roachpb.ReplicaID][]raftpb.Message
// Scratch for populating rac2.RaftEvent.ReplicaSateInfo for flowContrlV2.
replicaStateScratchForFlowControl map[roachpb.ReplicaID]rac2.ReplicaStateInfo

// rangeFeedCTLagObserver is used to observe the closed timestamp lag of
// the replica and generate a signal to potentially nudge or cancel the
// rangefeed based on observed lag.
rangefeedCTLagObserver *rangeFeedCTLagObserver
}

// localMsgs contains a collection of raftpb.Message that target the local
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func newUninitializedReplicaWithoutRaftGroup(
// replica GC issues, but is a distraction at the moment.
// r.AmbientContext.AddLogTag("@", fmt.Sprintf("%x", unsafe.Pointer(r)))

r.raftMu.rangefeedCTLagObserver = newRangeFeedCTLagObserver()
r.raftMu.stateLoader = stateloader.Make(rangeID)
r.raftMu.sideloaded = logstore.NewDiskSideloadStorage(
store.cfg.Settings,
Expand Down
31 changes: 24 additions & 7 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
Expand Down Expand Up @@ -868,17 +867,20 @@ func (r *Replica) handleClosedTimestampUpdateRaftMuLocked(

// If the closed timestamp is sufficiently stale, signal that we want an
// update to the leaseholder so that it will eventually begin to progress
// again.
behind := r.Clock().PhysicalTime().Sub(closedTS.GoTime())
slowClosedTSThresh := 5 * closedts.TargetDuration.Get(&r.store.cfg.Settings.SV)
exceedsSlowLagThresh = behind > slowClosedTSThresh
if exceedsSlowLagThresh {
// again. Or, if the closed timestamp has been lagging by more than the
// cancel threshold for a while, cancel the rangefeed.
if signal := r.raftMu.rangefeedCTLagObserver.observeClosedTimestampUpdate(ctx,
closedTS.GoTime(),
r.Clock().PhysicalTime(),
&r.store.cfg.Settings.SV,
); signal.exceedsNudgeLagThreshold {
m := r.store.metrics.RangeFeedMetrics
if m.RangeFeedSlowClosedTimestampLogN.ShouldLog() {
if closedTS.IsEmpty() {
log.Infof(ctx, "RangeFeed closed timestamp is empty")
} else {
log.Infof(ctx, "RangeFeed closed timestamp %s is behind by %s", closedTS, behind)
log.Infof(ctx, "RangeFeed closed timestamp %s is behind by %s (%v)",
closedTS, signal.lag, signal)
}
}

Expand Down Expand Up @@ -906,6 +908,21 @@ func (r *Replica) handleClosedTimestampUpdateRaftMuLocked(
defer func() { <-m.RangeFeedSlowClosedTimestampNudgeSem }()
if err := r.ensureClosedTimestampStarted(ctx); err != nil {
log.Infof(ctx, `RangeFeed failed to nudge: %s`, err)
} else if signal.exceedsCancelLagThreshold {
// We have successfully nudged the leaseholder to make progress on
// the closed timestamp. If the lag was already persistently too
// high, we cancel the rangefeed, so that it can be replanned on
// another replica. The hazard this avoids with replanning earlier,
// is that we know the range at least has a leaseholder so we won't
// fail open, thrashing the rangefeed around the cluster.
//
// Note that we use the REASON_RANGEFEED_CLOSED, as adding a new
// reason would require a new version of the proto, which would
// prohibit us from cancelling the rangefeed in the current version,
// due to mixed version compatibility.
log.Warningf(ctx,
`RangeFeed is too far behind, cancelling for replanning [%v]`, signal)
r.disconnectRangefeedWithReason(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED)
}
return nil, nil
})
Expand Down
Loading

0 comments on commit 43ebaa0

Please sign in to comment.