From b61cbb95046b3d56b76c36707a95eff9598f0d2d Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Wed, 15 Nov 2023 10:51:48 -0700 Subject: [PATCH] streamingccl: prevent node lag replanning starvation This patch prevents the lastNodeLagCheck time from updating every time the frontier processor receives a checkpoint, which can happen every few seconds. This previously prevented the node lag replanning check to trigger because this time needed to be older than 10 minutes. Rather, this timestamp should only update if we actually compute the lag check. Fixes #114341 Release note: none --- .../streamingest/node_lag_detector.go | 5 ++++- .../stream_ingestion_frontier_processor.go | 18 +++++++++--------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/node_lag_detector.go b/pkg/ccl/streamingccl/streamingest/node_lag_detector.go index acc31fbf95d3..0b92a978e730 100644 --- a/pkg/ccl/streamingccl/streamingest/node_lag_detector.go +++ b/pkg/ccl/streamingccl/streamingest/node_lag_detector.go @@ -9,10 +9,12 @@ package streamingest import ( + "context" "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -22,12 +24,13 @@ var ErrNodeLagging = errors.New("node frontier too far behind other nodes") // more than maxAllowable lag behind any other destination node. This function // assumes that all nodes have finished their initial scan (i.e. have a nonzero hwm). func checkLaggingNodes( - executionDetails []frontierExecutionDetails, maxAllowableLag time.Duration, + ctx context.Context, executionDetails []frontierExecutionDetails, maxAllowableLag time.Duration, ) error { if maxAllowableLag == 0 { return nil } laggingNode, minLagDifference := computeMinLagDifference(executionDetails) + log.VEventf(ctx, 2, "computed min lag diff: %d lagging node, difference %.2f", laggingNode, minLagDifference.Minutes()) if maxAllowableLag < minLagDifference { return errors.Wrapf(ErrNodeLagging, "node %d is %.2f minutes behind the next node. Try replanning", laggingNode, minLagDifference.Minutes()) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index 9bba1ced4208..37edce01081f 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -535,26 +535,26 @@ func (sf *streamIngestionFrontier) maybePersistFrontierEntries() error { } func (sf *streamIngestionFrontier) maybeCheckForLaggingNodes() error { - defer func() { - sf.lastNodeLagCheck = timeutil.Now() - }() + ctx := sf.Ctx() checkFreq := streamingccl.ReplanFrequency.Get(&sf.FlowCtx.Cfg.Settings.SV) maxLag := streamingccl.InterNodeLag.Get(&sf.FlowCtx.Cfg.Settings.SV) if checkFreq == 0 || maxLag == 0 || timeutil.Since(sf.lastNodeLagCheck) < checkFreq { + log.VEventf(ctx, 2, "skipping lag replanning check: maxLag %d; checkFreq %.2f; last node check %s; time since last check %.2f", + maxLag, checkFreq.Minutes(), sf.lastNodeLagCheck, timeutil.Since(sf.lastNodeLagCheck).Minutes()) return nil } - - ctx := sf.Ctx() - // Don't check for lagging nodes if the hwm has yet to advance. if sf.replicatedTimeAtStart.Equal(sf.persistedReplicatedTime) { - log.VEventf(ctx, 2, "skipping lagging nodes check as hwm has yet to advance past %s", sf.replicatedTimeAtStart) + log.VEventf(ctx, 2, "skipping lag replanning check: hwm has yet to advance past %s", sf.replicatedTimeAtStart) return nil } - + defer func() { + sf.lastNodeLagCheck = timeutil.Now() + }() executionDetails := constructSpanFrontierExecutionDetailsWithFrontier(sf.spec.PartitionSpecs, sf.frontier) - + log.VEvent(ctx, 2, "checking for lagging nodes") return checkLaggingNodes( + ctx, executionDetails, maxLag, )