diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index 7edd1d33a3e9..0c75a87d964f 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -78,6 +78,10 @@ type streamIngestionFrontier struct { partitionProgress map[string]jobspb.StreamIngestionProgress_PartitionProgress lastNodeLagCheck time.Time + + // replicatedTimeAtLastPositiveLagNodeCheck records the replicated time the + // last time the lagging node checker detected a lagging node. + replicatedTimeAtLastPositiveLagNodeCheck hlc.Timestamp } var _ execinfra.Processor = &streamIngestionFrontier{} @@ -553,9 +557,29 @@ func (sf *streamIngestionFrontier) maybeCheckForLaggingNodes() error { }() executionDetails := constructSpanFrontierExecutionDetailsWithFrontier(sf.spec.PartitionSpecs, sf.frontier) log.VEvent(ctx, 2, "checking for lagging nodes") - return checkLaggingNodes( + err := checkLaggingNodes( ctx, executionDetails, maxLag, ) + return sf.handleLaggingNodeError(ctx, err) +} + +func (sf *streamIngestionFrontier) handleLaggingNodeError(ctx context.Context, err error) error { + switch { + case err == nil: + sf.replicatedTimeAtLastPositiveLagNodeCheck = hlc.Timestamp{} + log.VEvent(ctx, 2, "no lagging nodes after check") + return nil + case !errors.Is(err, ErrNodeLagging): + return err + case sf.replicatedTimeAtLastPositiveLagNodeCheck.Less(sf.persistedReplicatedTime): + sf.replicatedTimeAtLastPositiveLagNodeCheck = sf.persistedReplicatedTime + log.Infof(ctx, "detected a lagging node: %s. Don't forward error because replicated time at last check %s is less than current replicated time %s", err, sf.replicatedTimeAtLastPositiveLagNodeCheck, sf.persistedReplicatedTime) + return nil + case sf.replicatedTimeAtLastPositiveLagNodeCheck.Equal(sf.persistedReplicatedTime): + return errors.Wrapf(err, "hwm has not advanced from %s", sf.persistedReplicatedTime) + default: + return errors.Wrapf(err, "unable to handle replanning error with replicated time %s and last node lag check replicated time %s", sf.persistedReplicatedTime, sf.replicatedTimeAtLastPositiveLagNodeCheck) + } } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go index 754300aaa703..209efe2d0b24 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go @@ -152,3 +152,78 @@ func TestHeartbeatLoop(t *testing.T) { } } } + +func TestLaggingNodeErrorHandler(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + type testCase struct { + name string + replicatedTime int64 + previousReplicatedTimeOnLag int64 + inputLagErr error + + expectedNewReplicatedTimeOnLag int64 + expectedErrMsg string + } + + for _, tc := range []testCase{ + { + name: "no more lag", + previousReplicatedTimeOnLag: 1, + expectedNewReplicatedTimeOnLag: 0, + }, + { + name: "new lag", + previousReplicatedTimeOnLag: 0, + replicatedTime: 1, + expectedNewReplicatedTimeOnLag: 1, + inputLagErr: ErrNodeLagging, + }, + { + name: "repeated lag, no hwm advance", + previousReplicatedTimeOnLag: 1, + replicatedTime: 1, + expectedNewReplicatedTimeOnLag: 1, + inputLagErr: ErrNodeLagging, + expectedErrMsg: ErrNodeLagging.Error(), + }, + { + name: "repeated lag, with hwm advance", + previousReplicatedTimeOnLag: 1, + replicatedTime: 2, + expectedNewReplicatedTimeOnLag: 2, + inputLagErr: ErrNodeLagging, + }, + { + name: "non lag error", + inputLagErr: errors.New("unexpected"), + expectedErrMsg: "unexpected", + }, + { + name: "unhandlable lag error", + previousReplicatedTimeOnLag: 2, + replicatedTime: 1, + expectedNewReplicatedTimeOnLag: 2, + inputLagErr: ErrNodeLagging, + expectedErrMsg: "unable to handle replanning", + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + sf := streamIngestionFrontier{ + persistedReplicatedTime: hlc.Timestamp{WallTime: tc.replicatedTime}, + replicatedTimeAtLastPositiveLagNodeCheck: hlc.Timestamp{WallTime: tc.previousReplicatedTimeOnLag}, + } + err := sf.handleLaggingNodeError(ctx, tc.inputLagErr) + if tc.expectedErrMsg == "" { + require.NoError(t, err) + } else { + require.ErrorContains(t, err, tc.expectedErrMsg) + } + require.Equal(t, hlc.Timestamp{WallTime: tc.expectedNewReplicatedTimeOnLag}, sf.replicatedTimeAtLastPositiveLagNodeCheck) + }) + } +} diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index 8ca95753f28c..f7be595cd71d 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -1756,7 +1756,7 @@ func destClusterSettings(t test.Test, db *sqlutils.SQLRunner, additionalDuration db.ExecMultiple(t, `SET CLUSTER SETTING cross_cluster_replication.enabled = true;`, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`, `SET CLUSTER SETTING stream_replication.replan_flow_threshold = 0.1;`, - `SET CLUSTER SETTING physical_replication.consumer.node_lag_replanning_threshold = '10m';`) + `SET CLUSTER SETTING physical_replication.consumer.node_lag_replanning_threshold = '5m';`) if additionalDuration != 0 { replanFrequency := additionalDuration / 2