Skip to content

Commit

Permalink
streamingccl: prevent node lag replanning starvation
Browse files Browse the repository at this point in the history
This patch prevents the lastNodeLagCheck time from updating every time the
frontier processor receives a checkpoint, which can happen every few seconds.
This previously prevented the node lag replanning check to trigger because this
time needed to be older than 10 minutes. Rather, this timestamp should only
update if we actually compute the lag check.

Fixes #114341

Release note: none
  • Loading branch information
msbutler committed Nov 15, 2023
1 parent feba605 commit 99701b8
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
5 changes: 4 additions & 1 deletion pkg/ccl/streamingccl/streamingest/node_lag_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
package streamingest

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

Expand All @@ -22,12 +24,13 @@ var ErrNodeLagging = errors.New("node frontier too far behind other nodes")
// more than maxAllowable lag behind any other destination node. This function
// assumes that all nodes have finished their initial scan (i.e. have a nonzero hwm).
func checkLaggingNodes(
executionDetails []frontierExecutionDetails, maxAllowableLag time.Duration,
ctx context.Context, executionDetails []frontierExecutionDetails, maxAllowableLag time.Duration,
) error {
if maxAllowableLag == 0 {
return nil
}
laggingNode, minLagDifference := computeMinLagDifference(executionDetails)
log.VEventf(ctx, 2, "computed min lag diff: %d lagging node, difference %.2f", laggingNode, minLagDifference.Minutes())
if maxAllowableLag < minLagDifference {
return errors.Wrapf(ErrNodeLagging, "node %d is %.2f minutes behind the next node. Try replanning", laggingNode, minLagDifference.Minutes())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,26 +535,26 @@ func (sf *streamIngestionFrontier) maybePersistFrontierEntries() error {
}

func (sf *streamIngestionFrontier) maybeCheckForLaggingNodes() error {
defer func() {
sf.lastNodeLagCheck = timeutil.Now()
}()
ctx := sf.Ctx()
checkFreq := streamingccl.ReplanFrequency.Get(&sf.FlowCtx.Cfg.Settings.SV)
maxLag := streamingccl.InterNodeLag.Get(&sf.FlowCtx.Cfg.Settings.SV)
if checkFreq == 0 || maxLag == 0 || timeutil.Since(sf.lastNodeLagCheck) < checkFreq {
log.VEventf(ctx, 2, "skipping lag replanning check: maxLag %d; checkFreq %.2f; last node check %s; time since last check %.2f",
maxLag, checkFreq.Minutes(), sf.lastNodeLagCheck, timeutil.Since(sf.lastNodeLagCheck).Minutes())
return nil
}

ctx := sf.Ctx()

// Don't check for lagging nodes if the hwm has yet to advance.
if sf.replicatedTimeAtStart.Equal(sf.persistedReplicatedTime) {
log.VEventf(ctx, 2, "skipping lagging nodes check as hwm has yet to advance past %s", sf.replicatedTimeAtStart)
log.VEventf(ctx, 2, "skipping lag replanning check: hwm has yet to advance past %s", sf.replicatedTimeAtStart)
return nil
}

defer func() {
sf.lastNodeLagCheck = timeutil.Now()
}()
executionDetails := constructSpanFrontierExecutionDetailsWithFrontier(sf.spec.PartitionSpecs, sf.frontier)

log.VEvent(ctx, 2, "checking for lagging nodes")
return checkLaggingNodes(
ctx,
executionDetails,
maxLag,
)
Expand Down

0 comments on commit 99701b8

Please sign in to comment.