Skip to content

Commit

Permalink
streamingccl: double the frequency we check for lagging nodes
Browse files Browse the repository at this point in the history
PR #115000 refactored the lagging node checker to only trigger a replanning
event if the checker detects a lagging node 2 times in a row without hwm
advancement, but maintained the frequency the checker runs. This implies that
it takes twice as long for the checker to trigger a replanning event, relative
to the `stream_replication.replan_flow_frequency` setting. This patch doubles
the frequency the checker runs, implying a replanning event would trigger after
`stream_replication.replan_flow_frequency` time has elapsed.

Informs #115415

Release note: none
  • Loading branch information
msbutler committed Dec 1, 2023
1 parent 228ad8e commit e73bc2e
Showing 1 changed file with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,11 @@ func (sf *streamIngestionFrontier) maybePersistFrontierEntries() error {

func (sf *streamIngestionFrontier) maybeCheckForLaggingNodes() error {
ctx := sf.Ctx()
checkFreq := streamingccl.ReplanFrequency.Get(&sf.FlowCtx.Cfg.Settings.SV)

// We halve the frequency relative to the ReplanFrequency setting (i.e.
// check twice as often), because the node lag checker will only restart the
// distSQL plan if a node is lagging for 2 checks in a row.
checkFreq := streamingccl.ReplanFrequency.Get(&sf.FlowCtx.Cfg.Settings.SV) / 2
maxLag := streamingccl.InterNodeLag.Get(&sf.FlowCtx.Cfg.Settings.SV)
if checkFreq == 0 || maxLag == 0 || timeutil.Since(sf.lastNodeLagCheck) < checkFreq {
log.VEventf(ctx, 2, "skipping lag replanning check: maxLag %d; checkFreq %.2f; last node check %s; time since last check %.2f",
Expand Down Expand Up @@ -564,8 +568,8 @@ func (sf *streamIngestionFrontier) handleLaggingNodeError(ctx context.Context, e
case !errors.Is(err, ErrNodeLagging):
return err
case sf.replicatedTimeAtLastPositiveLagNodeCheck.Less(sf.persistedReplicatedTime):
sf.replicatedTimeAtLastPositiveLagNodeCheck = sf.persistedReplicatedTime
log.Infof(ctx, "detected a lagging node: %s. Don't forward error because replicated time at last check %s is less than current replicated time %s", err, sf.replicatedTimeAtLastPositiveLagNodeCheck, sf.persistedReplicatedTime)
sf.replicatedTimeAtLastPositiveLagNodeCheck = sf.persistedReplicatedTime
return nil
case sf.replicatedTimeAtLastPositiveLagNodeCheck.Equal(sf.persistedReplicatedTime):
return errors.Wrapf(err, "hwm has not advanced from %s", sf.persistedReplicatedTime)
Expand Down

0 comments on commit e73bc2e

Please sign in to comment.