diff --git a/pkg/ccl/streamingccl/replicationtestutils/testutils.go b/pkg/ccl/streamingccl/replicationtestutils/testutils.go index 5f801e9095a8..e4567c2529cf 100644 --- a/pkg/ccl/streamingccl/replicationtestutils/testutils.go +++ b/pkg/ccl/streamingccl/replicationtestutils/testutils.go @@ -624,3 +624,30 @@ func SSTMaker(t *testing.T, keyValues []roachpb.KeyValue) kvpb.RangeFeedSSTable WriteTS: batchTS, } } + +// InitialSplitTester helps test expected behavior around initial splits: we +// always split before the initial scan, and after a job level retry if we just +// observed a replanning error and if +// physical_replication.consumer.split_on_job_retry.enabled is set to true. +type InitialSplitTester struct { + SplitOnRetry bool + InitialSplitCount int +} + +func (ist *InitialSplitTester) GenInitialSplitterInspector(t *testing.T) func(noopScatter bool) { + return func(noopScatter bool) { + ist.InitialSplitCount++ + if noopScatter { + require.Greater(t, ist.InitialSplitCount, 1) + } + } +} + +func (ist *InitialSplitTester) MaybeSetSplitOnRetry( + t *testing.T, rng *rand.Rand, c *testcluster.TestCluster, +) { + ist.SplitOnRetry = rng.Intn(2) == 1 + if ist.SplitOnRetry { + serverutils.SetClusterSetting(t, c, "physical_replication.consumer.split_on_job_retry.enabled", true) + } +} diff --git a/pkg/ccl/streamingccl/settings.go b/pkg/ccl/streamingccl/settings.go index 5c50e36caf43..7d35425b2b62 100644 --- a/pkg/ccl/streamingccl/settings.go +++ b/pkg/ccl/streamingccl/settings.go @@ -115,6 +115,13 @@ var InterNodeLag = settings.RegisterDurationSetting( settings.NonNegativeDuration, ) +var SplitOnRetry = settings.RegisterBoolSetting( + settings.SystemOnly, + "physical_replication.consumer.split_on_job_retry.enabled", + "controls whether we issue admin splits on the partition spans after a job level retry related to distsql replanning", + false, +) + // DumpFrontierEntries controls the frequency at which we persist the entries in // the frontier to the `system.job_info` table. // diff --git a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go index f286d53ae638..bbec06d867db 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go @@ -157,10 +157,13 @@ func TestTenantStreamingPauseResumeIngestion(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() + ist := replicationtestutils.InitialSplitTester{} args := replicationtestutils.DefaultTenantStreamingClustersArgs + args.TestingKnobs = &sql.StreamingTestingKnobs{ + InspectInitialSplitter: ist.GenInitialSplitterInspector(t)} c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args) defer cleanup() - + ist.MaybeSetSplitOnRetry(t, c.Rng, c.DestCluster) producerJobID, ingestionJobID := c.StartStreamReplication(ctx) jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID)) @@ -195,6 +198,9 @@ func TestTenantStreamingPauseResumeIngestion(t *testing.T) { srcTime = c.SrcCluster.Server(0).Clock().Now() c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID)) c.RequireFingerprintMatchAtTimestamp(srcTime.AsOfSystemTime()) + + // We should only observe one set of initial splits, as no replanning error occured. + require.Equal(t, 1, ist.InitialSplitCount) } func TestTenantStreamingPauseOnPermanentJobError(t *testing.T) { @@ -688,6 +694,8 @@ func TestStreamingAutoReplan(t *testing.T) { turnOffReplanning := make(chan struct{}) var alreadyReplanned atomic.Bool + ist := replicationtestutils.InitialSplitTester{} + // Track the number of unique addresses that we're connected to. clientAddresses := make(map[string]struct{}) var addressesMu syncutil.Mutex @@ -705,12 +713,14 @@ func TestStreamingAutoReplan(t *testing.T) { alreadyReplanned.Swap(true) } }, + InspectInitialSplitter: ist.GenInitialSplitterInspector(t), } c, cleanup := replicationtestutils.CreateMultiTenantStreamingCluster(ctx, t, args) defer cleanup() // Don't allow for replanning until the new nodes and scattered table have been created. serverutils.SetClusterSetting(t, c.DestCluster, "stream_replication.replan_flow_threshold", 0) serverutils.SetClusterSetting(t, c.DestCluster, "stream_replication.replan_flow_frequency", time.Millisecond*500) + ist.MaybeSetSplitOnRetry(t, c.Rng, c.DestCluster) // Don't allow inter node lag replanning to affect the test. serverutils.SetClusterSetting(t, c.DestCluster, "physical_replication.consumer.node_lag_replanning_threshold", 0) @@ -748,6 +758,11 @@ func TestStreamingAutoReplan(t *testing.T) { c.WaitUntilReplicatedTime(cutoverTime, jobspb.JobID(ingestionJobID)) require.Greater(t, len(clientAddresses), 1) + expectedInitialSplits := 1 + if ist.SplitOnRetry { + expectedInitialSplits = 2 + } + require.Equal(t, expectedInitialSplits, ist.InitialSplitCount) } // TestStreamingReplanOnLag asserts that the c2c job retries if a node lags far @@ -766,6 +781,8 @@ func TestStreamingReplanOnLag(t *testing.T) { turnOffReplanning := make(chan struct{}) var alreadyReplanned atomic.Bool + ist := replicationtestutils.InitialSplitTester{} + // Track the number of unique addresses that we're connected to, to ensure // that all destination nodes participate in the replication stream. clientAddresses := make(map[string]struct{}) @@ -792,11 +809,13 @@ func TestStreamingReplanOnLag(t *testing.T) { } return false }, + InspectInitialSplitter: ist.GenInitialSplitterInspector(t), } c, cleanup := replicationtestutils.CreateMultiTenantStreamingCluster(ctx, t, args) defer cleanup() // Don't allow for replanning based on node participation. serverutils.SetClusterSetting(t, c.DestCluster, "stream_replication.replan_flow_threshold", 0) + ist.MaybeSetSplitOnRetry(t, c.Rng, c.DestCluster) replicationtestutils.CreateScatteredTable(t, c, 3) @@ -821,6 +840,11 @@ func TestStreamingReplanOnLag(t *testing.T) { cutoverTime := c.DestSysServer.Clock().Now() c.WaitUntilReplicatedTime(cutoverTime, jobspb.JobID(ingestionJobID)) + expectedInitialSplits := 1 + if ist.SplitOnRetry { + expectedInitialSplits = 2 + } + require.Equal(t, expectedInitialSplits, ist.InitialSplitCount) } // TestProtectedTimestampManagement tests the active protected diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go index 703d77356422..b36a1f34c63f 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go @@ -201,13 +201,24 @@ func startDistIngestion( return rw.Err() } - // We now attempt to create initial splits. We currently do - // this once during initial planning to avoid re-splitting on - // resume since it isn't clear to us at the moment whether - // re-splitting is always going to be useful. - if !streamProgress.InitialSplitComplete { + isReplanErr := func(err error) bool { + return errors.Is(err, sql.ErrPlanChanged) || errors.Is(err, ErrNodeLagging) + } + + // We now attempt to issue splits over the topology if we've never done so + // (i.e. we're beginning the c2c job), or if we allow splits after a job level + // retry. + if !streamProgress.InitialSplitComplete || + (streamingccl.SplitOnRetry.Get(&execCtx.ExecCfg().Settings.SV) && isReplanErr(resumer.lastRetryableIngestionError)) { codec := execCtx.ExtendedEvalContext().Codec - splitter := &dbSplitAndScatter{db: execCtx.ExecCfg().DB} + splitter := &dbSplitAndScatter{ + db: execCtx.ExecCfg().DB, + // If we've already created initial splits, don't issue scatters. + noopScatter: streamProgress.InitialSplitComplete, + } + if knobs := execCtx.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.InspectInitialSplitter != nil { + knobs.InspectInitialSplitter(splitter.noopScatter) + } if err := createInitialSplits(ctx, codec, splitter, planner.initialTopology, details.DestinationTenantID); err != nil { return err } @@ -226,7 +237,7 @@ func startDistIngestion( } err = ctxgroup.GoAndWait(ctx, execInitialPlan, replanner, tracingAggLoop, streamSpanConfigs) - if errors.Is(err, sql.ErrPlanChanged) { + if isReplanErr(err) { execCtx.ExecCfg().JobRegistry.MetricsStruct().StreamIngest.(*Metrics).ReplanCount.Inc(1) } return err @@ -250,7 +261,8 @@ type splitAndScatterer interface { } type dbSplitAndScatter struct { - db *kv.DB + db *kv.DB + noopScatter bool } func (s *dbSplitAndScatter) split( @@ -260,6 +272,9 @@ func (s *dbSplitAndScatter) split( } func (s *dbSplitAndScatter) scatter(ctx context.Context, scatterKey roachpb.Key) error { + if s.noopScatter { + return nil + } _, pErr := kv.SendWrapped(ctx, s.db.NonTransactionalSender(), &kvpb.AdminScatterRequest{ RequestHeader: kvpb.RequestHeaderFromSpan(roachpb.Span{ Key: scatterKey, diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 7eaf5a2d2ddc..2413be273ea2 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -42,6 +42,8 @@ import ( type streamIngestionResumer struct { job *jobs.Job + lastRetryableIngestionError error + mu struct { syncutil.Mutex // perNodeAggregatorStats is a per component running aggregate of trace @@ -223,6 +225,7 @@ func ingestWithRetries( break } status := redact.Sprintf("waiting before retrying error: %s", err) + resumer.lastRetryableIngestionError = err updateRunningStatus(ctx, ingestionJob, jobspb.ReplicationError, status) newReplicatedTime := loadReplicatedTime(ctx, execCtx.ExecCfg().InternalDB, ingestionJob) if lastReplicatedTime.Less(newReplicatedTime) { diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index e6bf4dcd8af0..f3083f36889e 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -1753,7 +1753,8 @@ func destClusterSettings(t test.Test, db *sqlutils.SQLRunner, additionalDuration db.ExecMultiple(t, `SET CLUSTER SETTING cross_cluster_replication.enabled = true;`, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`, `SET CLUSTER SETTING stream_replication.replan_flow_threshold = 0.1;`, - `SET CLUSTER SETTING physical_replication.consumer.node_lag_replanning_threshold = '5m';`) + `SET CLUSTER SETTING physical_replication.consumer.node_lag_replanning_threshold = '5m';`, + `SET CLUSTER SETTING physical_replication.consumer.split_on_job_retry.enabled = true;`) if additionalDuration != 0 { replanFrequency := additionalDuration / 2 diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 586396f5dcf4..ad031b9c91a7 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1793,6 +1793,8 @@ type StreamingTestingKnobs struct { AfterReplicationFlowPlan func(map[base.SQLInstanceID]*execinfrapb.StreamIngestionDataSpec, *execinfrapb.StreamIngestionFrontierSpec) + InspectInitialSplitter func(noopScatter bool) + AfterPersistingPartitionSpecs func() // OverrideRevertRangeBatchSize allows overriding the `MaxSpanRequestKeys`