diff --git a/pkg/ccl/crosscluster/physical/stream_ingestion_dist.go b/pkg/ccl/crosscluster/physical/stream_ingestion_dist.go index 6b9784bc9e03..8b4a3c0c1872 100644 --- a/pkg/ccl/crosscluster/physical/stream_ingestion_dist.go +++ b/pkg/ccl/crosscluster/physical/stream_ingestion_dist.go @@ -491,6 +491,30 @@ func (p *replicationFlowPlanner) getSrcTenantID() (roachpb.TenantID, error) { return p.srcTenantID, nil } +func repartitionTopology(in streamclient.Topology, targetPartCount int) streamclient.Topology { + growth := targetPartCount / len(in.Partitions) + if growth <= 1 { + return in + } + + // Copy the topology and allocate a new partition slice. + out := in + out.Partitions = make([]streamclient.PartitionInfo, 0, targetPartCount) + // For each partition in the input, put some number of copies of it into the + // output each containing some fraction of its spans. + for _, p := range in.Partitions { + chunk := len(p.Spans)/growth + 1 + for len(p.Spans) > 0 { + c := p + c.Spans = p.Spans[:min(chunk, len(p.Spans))] + out.Partitions = append(out.Partitions, c) + p.Spans = p.Spans[len(c.Spans):] + } + } + + return out +} + func (p *replicationFlowPlanner) constructPlanGenerator( execCtx sql.JobExecContext, ingestionJobID jobspb.JobID, @@ -515,12 +539,18 @@ func (p *replicationFlowPlanner) constructPlanGenerator( if err != nil { return nil, nil, err } + + // If we have fewer partitions than we have nodes, try to repartition the + // topology to have more partitions. + topology = repartitionTopology(topology, len(sqlInstanceIDs)*8) + if !p.createdInitialPlan() { p.initialTopology = topology p.initialStreamAddresses = topology.StreamAddresses() p.initialDestinationNodes = sqlInstanceIDs } + destNodeLocalities, err := GetDestNodeLocalities(ctx, dsp, sqlInstanceIDs) if err != nil { return nil, nil, err diff --git a/pkg/ccl/crosscluster/physical/stream_ingestion_dist_test.go b/pkg/ccl/crosscluster/physical/stream_ingestion_dist_test.go index 4ebc5950efc2..767b2d4b8968 100644 --- a/pkg/ccl/crosscluster/physical/stream_ingestion_dist_test.go +++ b/pkg/ccl/crosscluster/physical/stream_ingestion_dist_test.go @@ -522,3 +522,41 @@ func TestParallelInitialSplits(t *testing.T) { require.Equal(t, sp.Key.String(), ts.mu.scatters[i].String()) } } + +func TestRepartition(t *testing.T) { + defer leaktest.AfterTest(t)() + + p := func(node, parts, start int) streamclient.PartitionInfo { + spans := make([]roachpb.Span, parts) + for i := range spans { + spans[i].Key = roachpb.Key(fmt.Sprintf("n%d-%d-a", node, i+start)) + spans[i].EndKey = roachpb.Key(fmt.Sprintf("n%d-%d-b", node, i+start)) + } + return streamclient.PartitionInfo{SrcInstanceID: node, Spans: spans} + } + for _, parts := range []int{1, 4, 100} { + for _, input := range [][]streamclient.PartitionInfo{ + {p(1, 43, 0), p(2, 44, 0), p(3, 41, 0)}, + {p(1, 1, 0), p(2, 1, 0), p(3, 1, 0)}, + {p(1, 43, 0), p(2, 44, 0), p(3, 38, 0)}, + } { + got := repartitionTopology(streamclient.Topology{Partitions: input}, parts) + + var expectedSpans, gotSpans roachpb.Spans + for _, part := range input { + expectedSpans = append(expectedSpans, part.Spans...) + } + for _, part := range got.Partitions { + gotSpans = append(gotSpans, part.Spans...) + } + require.LessOrEqual(t, min(parts, len(input)), len(got.Partitions)) + require.GreaterOrEqual(t, max(parts, len(input)), len(got.Partitions)) + + // Regardless of how we partitioned, make sure we have all the spans. + sort.Sort(expectedSpans) + sort.Sort(gotSpans) + require.Equal(t, len(expectedSpans), len(gotSpans)) + require.Equal(t, expectedSpans, gotSpans) + } + } +} diff --git a/pkg/ccl/crosscluster/producer/BUILD.bazel b/pkg/ccl/crosscluster/producer/BUILD.bazel index 5c79228cd568..c35f1ce72166 100644 --- a/pkg/ccl/crosscluster/producer/BUILD.bazel +++ b/pkg/ccl/crosscluster/producer/BUILD.bazel @@ -84,7 +84,6 @@ go_test( "replication_manager_test.go", "replication_stream_test.go", "stream_event_batcher_test.go", - "stream_lifetime_test.go", ], embed = [":producer"], deps = [ diff --git a/pkg/ccl/crosscluster/producer/replication_manager.go b/pkg/ccl/crosscluster/producer/replication_manager.go index a6fafce97383..418c6e1cb6d8 100644 --- a/pkg/ccl/crosscluster/producer/replication_manager.go +++ b/pkg/ccl/crosscluster/producer/replication_manager.go @@ -190,13 +190,6 @@ var useStreaksInLDR = settings.RegisterBoolSetting( false, ) -var ldrProcCount = settings.RegisterIntSetting( - settings.ApplicationLevel, - "logical_replication.producer.ingest_processor_parallelism", - "target number of stream partitions per source node", - 1, -) - func (r *replicationStreamManagerImpl) PlanLogicalReplication( ctx context.Context, req streampb.LogicalReplicationPlanRequest, ) (*streampb.ReplicationStreamSpec, error) { @@ -225,8 +218,7 @@ func (r *replicationStreamManagerImpl) PlanLogicalReplication( } } - spec, err := buildReplicationStreamSpec(ctx, r.evalCtx, tenID, false, spans, - int(ldrProcCount.Get(&r.evalCtx.Settings.SV)), useStreaksInLDR.Get(&r.evalCtx.Settings.SV)) + spec, err := buildReplicationStreamSpec(ctx, r.evalCtx, tenID, false, spans, useStreaksInLDR.Get(&r.evalCtx.Settings.SV)) if err != nil { return nil, err } diff --git a/pkg/ccl/crosscluster/producer/stream_lifetime.go b/pkg/ccl/crosscluster/producer/stream_lifetime.go index f99b43dc61a5..2252c85937c5 100644 --- a/pkg/ccl/crosscluster/producer/stream_lifetime.go +++ b/pkg/ccl/crosscluster/producer/stream_lifetime.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" @@ -44,14 +43,6 @@ import ( // expiration window will be 24 hours. const defaultExpirationWindow = time.Hour * 24 -var streamMaxProcsPerPartition = settings.RegisterIntSetting( - settings.ApplicationLevel, - "stream_replication.ingest_processor_parallelism", - "controls the maximum number of ingest processors to assign to each source-planned partition", - 8, - settings.PositiveInt, -) - // notAReplicationJobError returns an error that is returned anytime // the user passes a job ID not related to a replication stream job. func notAReplicationJobError(id jobspb.JobID) error { @@ -283,7 +274,7 @@ func getPhysicalReplicationStreamSpec( if j.Status() != jobs.StatusRunning { return nil, jobIsNotRunningError(jobID, j.Status(), "create stream spec") } - return buildReplicationStreamSpec(ctx, evalCtx, details.TenantID, false, details.Spans, 0, true) + return buildReplicationStreamSpec(ctx, evalCtx, details.TenantID, false, details.Spans, true) } @@ -293,7 +284,6 @@ func buildReplicationStreamSpec( tenantID roachpb.TenantID, forSpanConfigs bool, targetSpans roachpb.Spans, - targetPartitionCount int, useStreaks bool, ) (*streampb.ReplicationStreamSpec, error) { jobExecCtx := evalCtx.JobExecContext.(sql.JobExecContext) @@ -320,14 +310,6 @@ func buildReplicationStreamSpec( return nil, err } - // If more partitions were requested, try to repartition the spans. - if targetPartitionCount > len(spanPartitions) { - spanPartitions = repartitionSpans(spanPartitions, max(1, targetPartitionCount/len(spanPartitions))) - } else if targetPartitionCount < 1 { - // No explicit target requested, so fallback to 8x controlled via setting. - spanPartitions = repartitionSpans(spanPartitions, int(streamMaxProcsPerPartition.Get(&evalCtx.Settings.SV))) - } - var spanConfigsStreamID streampb.StreamID if forSpanConfigs { spanConfigsStreamID = streampb.StreamID(builtins.GenerateUniqueInt(builtins.ProcessUniqueID(evalCtx.NodeID.SQLInstanceID()))) @@ -356,31 +338,6 @@ func buildReplicationStreamSpec( return res, nil } -// repartitionSpans breaks up each of partition in partitions into parts smaller -// partitions that are round-robin assigned its spans. NB: we round-robin rather -// than assigning the first k to 1, next k to 2, etc since spans earlier in the -// key-space are generally smaller than those later in it due to the avoidance -// of skewed distributions during once-through streak-affinity planning (since -// early in the planning a small streak skews the overall distribution planned -// so far). It would be unfortunate to have round-robin assignment mean we do -// not colocate adjacent spans in the same processor, but we already expect gaps -// between these spans for spans assigned to other partitions, so we don't worry -// about that here. -func repartitionSpans(partitions []sql.SpanPartition, parts int) []sql.SpanPartition { - result := make([]sql.SpanPartition, 0, parts*len(partitions)) - for part := range partitions { - repartitioned := make([]sql.SpanPartition, min(parts, len(partitions[part].Spans))) - for i := range repartitioned { - repartitioned[i].SQLInstanceID = partitions[part].SQLInstanceID - } - for x, sp := range partitions[part].Spans { - repartitioned[x%parts].Spans = append(repartitioned[x%parts].Spans, sp) - } - result = append(result, repartitioned...) - } - return result -} - func completeReplicationStream( ctx context.Context, evalCtx *eval.Context, diff --git a/pkg/ccl/crosscluster/producer/stream_lifetime_test.go b/pkg/ccl/crosscluster/producer/stream_lifetime_test.go deleted file mode 100644 index abf0bd5dc1fe..000000000000 --- a/pkg/ccl/crosscluster/producer/stream_lifetime_test.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2024 The Cockroach Authors. -// -// Use of this software is governed by the CockroachDB Software License -// included in the /LICENSE file. - -package producer - -import ( - "fmt" - "sort" - "testing" - - "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/stretchr/testify/require" -) - -func TestRepartition(t *testing.T) { - defer leaktest.AfterTest(t)() - - p := func(node, parts, start int) sql.SpanPartition { - spans := make([]roachpb.Span, parts) - for i := range spans { - spans[i].Key = roachpb.Key(fmt.Sprintf("n%d-%d-a", node, i+start)) - spans[i].EndKey = roachpb.Key(fmt.Sprintf("n%d-%d-b", node, i+start)) - } - return sql.SpanPartition{SQLInstanceID: base.SQLInstanceID(node), Spans: spans} - } - for _, parts := range []int{1, 4, 100} { - for _, input := range [][]sql.SpanPartition{ - {p(1, 43, 0), p(2, 44, 0), p(3, 41, 0)}, - {p(1, 1, 0), p(2, 1, 0), p(3, 1, 0)}, - {p(1, 43, 0), p(2, 44, 0), p(3, 38, 0)}, - } { - got := repartitionSpans(input, parts) - - var expectedParts int - var expectedSpans, gotSpans roachpb.Spans - for _, part := range input { - expectedParts += min(parts, len(part.Spans)) - expectedSpans = append(expectedSpans, part.Spans...) - } - for _, part := range got { - gotSpans = append(gotSpans, part.Spans...) - } - require.Equal(t, expectedParts, len(got)) - - // Regardless of how we partitioned, make sure we have all the spans. - sort.Sort(expectedSpans) - sort.Sort(gotSpans) - require.Equal(t, len(expectedSpans), len(gotSpans)) - require.Equal(t, expectedSpans, gotSpans) - } - } -}