diff --git a/pkg/ccl/crosscluster/producer/replication_manager.go b/pkg/ccl/crosscluster/producer/replication_manager.go index 3ac3b19eb704..9eea53072e7a 100644 --- a/pkg/ccl/crosscluster/producer/replication_manager.go +++ b/pkg/ccl/crosscluster/producer/replication_manager.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/repstream" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -172,6 +173,13 @@ func getUDTs( return typeDescriptors, nil, nil } +var useStreaksInLDR = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "logical_replication.producer.group_adjacent_spans.enabled", + "controls whether to attempt adjacent spans in the same stream", + true, +) + func (r *replicationStreamManagerImpl) PlanLogicalReplication( ctx context.Context, req streampb.LogicalReplicationPlanRequest, ) (*streampb.ReplicationStreamSpec, error) { @@ -199,7 +207,7 @@ func (r *replicationStreamManagerImpl) PlanLogicalReplication( return nil, err } } - spec, err := buildReplicationStreamSpec(ctx, r.evalCtx, tenID, false, spans) + spec, err := buildReplicationStreamSpec(ctx, r.evalCtx, tenID, false, spans, useStreaksInLDR.Get(&r.evalCtx.Settings.SV)) if err != nil { return nil, err } diff --git a/pkg/ccl/crosscluster/producer/stream_lifetime.go b/pkg/ccl/crosscluster/producer/stream_lifetime.go index e6f20ee8806c..249887bdb147 100644 --- a/pkg/ccl/crosscluster/producer/stream_lifetime.go +++ b/pkg/ccl/crosscluster/producer/stream_lifetime.go @@ -283,7 +283,7 @@ func getPhysicalReplicationStreamSpec( if j.Status() != jobs.StatusRunning { return nil, jobIsNotRunningError(jobID, j.Status(), "create stream spec") } - return buildReplicationStreamSpec(ctx, evalCtx, details.TenantID, false, details.Spans) + return buildReplicationStreamSpec(ctx, evalCtx, details.TenantID, false, details.Spans, true) } @@ -293,16 +293,21 @@ func buildReplicationStreamSpec( tenantID roachpb.TenantID, forSpanConfigs bool, targetSpans roachpb.Spans, + useStreaks bool, ) (*streampb.ReplicationStreamSpec, error) { jobExecCtx := evalCtx.JobExecContext.(sql.JobExecContext) // Partition the spans with SQLPlanner dsp := jobExecCtx.DistSQLPlanner() noLoc := roachpb.Locality{} - oracle := kvfollowerreadsccl.NewBulkOracle( - dsp.ReplicaOracleConfig(evalCtx.Locality), noLoc, kvfollowerreadsccl.StreakConfig{ + var streaks kvfollowerreadsccl.StreakConfig + if useStreaks { + streaks = kvfollowerreadsccl.StreakConfig{ Min: 10, SmallPlanMin: 3, SmallPlanThreshold: 3, MaxSkew: 0.95, - }, + } + } + oracle := kvfollowerreadsccl.NewBulkOracle( + dsp.ReplicaOracleConfig(evalCtx.Locality), noLoc, streaks, ) planCtx := dsp.NewPlanningCtxWithOracle(