From e73bc2eeef418e478b69f1981841cda761fd8821 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Fri, 1 Dec 2023 21:15:47 +0000 Subject: [PATCH] streamingccl: double the frequency we check for lagging nodes PR #115000 refactored the lagging node checker to only trigger a replanning event if the checker detects a lagging node 2 times in a row without hwm advancement, but maintained the frequency the checker runs. This implies that it takes twice as long for the checker to trigger a replanning event, relative to the `stream_replication.replan_flow_frequency` setting. This patch doubles the frequency the checker runs, implying a replanning event would trigger after `stream_replication.replan_flow_frequency` time has elapsed. Informs #115415 Release note: none --- .../streamingest/stream_ingestion_frontier_processor.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index a84c7ddda893..7b7f308c087f 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -530,7 +530,11 @@ func (sf *streamIngestionFrontier) maybePersistFrontierEntries() error { func (sf *streamIngestionFrontier) maybeCheckForLaggingNodes() error { ctx := sf.Ctx() - checkFreq := streamingccl.ReplanFrequency.Get(&sf.FlowCtx.Cfg.Settings.SV) + + // We halve the frequency relative to the ReplanFrequency setting (i.e. + // check twice as often), because the node lag checker will only restart the + // distSQL plan if a node is lagging for 2 checks in a row. + checkFreq := streamingccl.ReplanFrequency.Get(&sf.FlowCtx.Cfg.Settings.SV) / 2 maxLag := streamingccl.InterNodeLag.Get(&sf.FlowCtx.Cfg.Settings.SV) if checkFreq == 0 || maxLag == 0 || timeutil.Since(sf.lastNodeLagCheck) < checkFreq { log.VEventf(ctx, 2, "skipping lag replanning check: maxLag %d; checkFreq %.2f; last node check %s; time since last check %.2f", @@ -564,8 +568,8 @@ func (sf *streamIngestionFrontier) handleLaggingNodeError(ctx context.Context, e case !errors.Is(err, ErrNodeLagging): return err case sf.replicatedTimeAtLastPositiveLagNodeCheck.Less(sf.persistedReplicatedTime): - sf.replicatedTimeAtLastPositiveLagNodeCheck = sf.persistedReplicatedTime log.Infof(ctx, "detected a lagging node: %s. Don't forward error because replicated time at last check %s is less than current replicated time %s", err, sf.replicatedTimeAtLastPositiveLagNodeCheck, sf.persistedReplicatedTime) + sf.replicatedTimeAtLastPositiveLagNodeCheck = sf.persistedReplicatedTime return nil case sf.replicatedTimeAtLastPositiveLagNodeCheck.Equal(sf.persistedReplicatedTime): return errors.Wrapf(err, "hwm has not advanced from %s", sf.persistedReplicatedTime)