Skip to content

Commit

Permalink
Merge 2c3e186 into blathers/backport-release-24.3-133347
Browse files Browse the repository at this point in the history
  • Loading branch information
blathers-crl[bot] authored Nov 5, 2024
2 parents 47623ec + 2c3e186 commit c10a2f3
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
10 changes: 9 additions & 1 deletion pkg/ccl/crosscluster/producer/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/repstream"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand Down Expand Up @@ -172,6 +173,13 @@ func getUDTs(
return typeDescriptors, nil, nil
}

var useStreaksInLDR = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"logical_replication.producer.group_adjacent_spans.enabled",
"controls whether to attempt adjacent spans in the same stream",
true,
)

func (r *replicationStreamManagerImpl) PlanLogicalReplication(
ctx context.Context, req streampb.LogicalReplicationPlanRequest,
) (*streampb.ReplicationStreamSpec, error) {
Expand Down Expand Up @@ -199,7 +207,7 @@ func (r *replicationStreamManagerImpl) PlanLogicalReplication(
return nil, err
}
}
spec, err := buildReplicationStreamSpec(ctx, r.evalCtx, tenID, false, spans)
spec, err := buildReplicationStreamSpec(ctx, r.evalCtx, tenID, false, spans, useStreaksInLDR.Get(&r.evalCtx.Settings.SV))
if err != nil {
return nil, err
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/ccl/crosscluster/producer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func getPhysicalReplicationStreamSpec(
if j.Status() != jobs.StatusRunning {
return nil, jobIsNotRunningError(jobID, j.Status(), "create stream spec")
}
return buildReplicationStreamSpec(ctx, evalCtx, details.TenantID, false, details.Spans)
return buildReplicationStreamSpec(ctx, evalCtx, details.TenantID, false, details.Spans, true)

}

Expand All @@ -293,16 +293,21 @@ func buildReplicationStreamSpec(
tenantID roachpb.TenantID,
forSpanConfigs bool,
targetSpans roachpb.Spans,
useStreaks bool,
) (*streampb.ReplicationStreamSpec, error) {
jobExecCtx := evalCtx.JobExecContext.(sql.JobExecContext)

// Partition the spans with SQLPlanner
dsp := jobExecCtx.DistSQLPlanner()
noLoc := roachpb.Locality{}
oracle := kvfollowerreadsccl.NewBulkOracle(
dsp.ReplicaOracleConfig(evalCtx.Locality), noLoc, kvfollowerreadsccl.StreakConfig{
var streaks kvfollowerreadsccl.StreakConfig
if useStreaks {
streaks = kvfollowerreadsccl.StreakConfig{
Min: 10, SmallPlanMin: 3, SmallPlanThreshold: 3, MaxSkew: 0.95,
},
}
}
oracle := kvfollowerreadsccl.NewBulkOracle(
dsp.ReplicaOracleConfig(evalCtx.Locality), noLoc, streaks,
)

planCtx := dsp.NewPlanningCtxWithOracle(
Expand Down

0 comments on commit c10a2f3

Please sign in to comment.