Skip to content

Commit

Permalink
streamingccl: only return lag replanning error if lagging node has no…
Browse files Browse the repository at this point in the history
…t advanced

Previously, the frontier processor would return a lag replanning error if it
detected a lagging node and after the hwm had advanced during the flow. This
implies the frontier processor could replan as soon as a lagging node finished
its catchup scan and bumped the hwm, but was still far behind the other nodes,
as we observed in #114706. Ideally, the frontier processor should not throw
this replanning error because the lagging node is making progress and because
replanning can cause repeated work.

This patch prevents this scenario by teaching the frontier processor to only
throw a replanning error if:
- the hwm has advanced in the flow
- two consecutive lagging node checks detected a lagging node and the hwm has
  not advanced during those two checks.

Informs #114706

Release note: none
  • Loading branch information
msbutler committed Nov 22, 2023
1 parent e5570cb commit 9be6d5a
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ type streamIngestionFrontier struct {
partitionProgress map[string]jobspb.StreamIngestionProgress_PartitionProgress

lastNodeLagCheck time.Time

// replicatedTimeAtLastPositiveLagNodeCheck records the replicated time the
// last time the lagging node checker detected a lagging node.
replicatedTimeAtLastPositiveLagNodeCheck hlc.Timestamp
}

var _ execinfra.Processor = &streamIngestionFrontier{}
Expand Down Expand Up @@ -553,9 +557,29 @@ func (sf *streamIngestionFrontier) maybeCheckForLaggingNodes() error {
}()
executionDetails := constructSpanFrontierExecutionDetailsWithFrontier(sf.spec.PartitionSpecs, sf.frontier)
log.VEvent(ctx, 2, "checking for lagging nodes")
return checkLaggingNodes(
err := checkLaggingNodes(
ctx,
executionDetails,
maxLag,
)
return sf.handleLaggingNodeError(ctx, err)
}

func (sf *streamIngestionFrontier) handleLaggingNodeError(ctx context.Context, err error) error {
switch {
case err == nil:
sf.replicatedTimeAtLastPositiveLagNodeCheck = hlc.Timestamp{}
log.VEvent(ctx, 2, "no lagging nodes after check")
return nil
case !errors.Is(err, ErrNodeLagging):
return err
case sf.replicatedTimeAtLastPositiveLagNodeCheck.Less(sf.persistedReplicatedTime):
sf.replicatedTimeAtLastPositiveLagNodeCheck = sf.persistedReplicatedTime
log.Infof(ctx, "detected a lagging node: %s. Don't forward error because replicated time at last check %s is less than current replicated time %s", err, sf.replicatedTimeAtLastPositiveLagNodeCheck, sf.persistedReplicatedTime)
return nil
case sf.replicatedTimeAtLastPositiveLagNodeCheck.Equal(sf.persistedReplicatedTime):
return errors.Wrapf(err, "hwm has not advanced from %s", sf.persistedReplicatedTime)
default:
return errors.Wrapf(err, "unable to handle replanning error with replicated time %s and last node lag check replicated time %s", sf.persistedReplicatedTime, sf.replicatedTimeAtLastPositiveLagNodeCheck)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,78 @@ func TestHeartbeatLoop(t *testing.T) {
}
}
}

func TestLaggingNodeErrorHandler(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

type testCase struct {
name string
replicatedTime int64
previousReplicatedTimeOnLag int64
inputLagErr error

expectedNewReplicatedTimeOnLag int64
expectedErrMsg string
}

for _, tc := range []testCase{
{
name: "no more lag",
previousReplicatedTimeOnLag: 1,
expectedNewReplicatedTimeOnLag: 0,
},
{
name: "new lag",
previousReplicatedTimeOnLag: 0,
replicatedTime: 1,
expectedNewReplicatedTimeOnLag: 1,
inputLagErr: ErrNodeLagging,
},
{
name: "repeated lag, no hwm advance",
previousReplicatedTimeOnLag: 1,
replicatedTime: 1,
expectedNewReplicatedTimeOnLag: 1,
inputLagErr: ErrNodeLagging,
expectedErrMsg: ErrNodeLagging.Error(),
},
{
name: "repeated lag, with hwm advance",
previousReplicatedTimeOnLag: 1,
replicatedTime: 2,
expectedNewReplicatedTimeOnLag: 2,
inputLagErr: ErrNodeLagging,
},
{
name: "non lag error",
inputLagErr: errors.New("unexpected"),
expectedErrMsg: "unexpected",
},
{
name: "unhandlable lag error",
previousReplicatedTimeOnLag: 2,
replicatedTime: 1,
expectedNewReplicatedTimeOnLag: 2,
inputLagErr: ErrNodeLagging,
expectedErrMsg: "unable to handle replanning",
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
sf := streamIngestionFrontier{
persistedReplicatedTime: hlc.Timestamp{WallTime: tc.replicatedTime},
replicatedTimeAtLastPositiveLagNodeCheck: hlc.Timestamp{WallTime: tc.previousReplicatedTimeOnLag},
}
err := sf.handleLaggingNodeError(ctx, tc.inputLagErr)
if tc.expectedErrMsg == "" {
require.NoError(t, err)
} else {
require.ErrorContains(t, err, tc.expectedErrMsg)
}
require.Equal(t, hlc.Timestamp{WallTime: tc.expectedNewReplicatedTimeOnLag}, sf.replicatedTimeAtLastPositiveLagNodeCheck)
})
}
}
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1752,7 +1752,7 @@ func destClusterSettings(t test.Test, db *sqlutils.SQLRunner, additionalDuration
db.ExecMultiple(t, `SET CLUSTER SETTING cross_cluster_replication.enabled = true;`,
`SET CLUSTER SETTING kv.rangefeed.enabled = true;`,
`SET CLUSTER SETTING stream_replication.replan_flow_threshold = 0.1;`,
`SET CLUSTER SETTING physical_replication.consumer.node_lag_replanning_threshold = '10m';`)
`SET CLUSTER SETTING physical_replication.consumer.node_lag_replanning_threshold = '5m';`)

if additionalDuration != 0 {
replanFrequency := additionalDuration / 2
Expand Down

0 comments on commit 9be6d5a

Please sign in to comment.