Skip to content

Commit

Permalink
crosscluster/physical: split PCR procs dest side instead
Browse files Browse the repository at this point in the history
This allows the dest clsuter to choose to split less when it has fewer nodes.

Release note: none.
Epic: none.
  • Loading branch information
dt committed Nov 19, 2024
1 parent f27b402 commit 4bac5aa
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 111 deletions.
30 changes: 30 additions & 0 deletions pkg/ccl/crosscluster/physical/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,30 @@ func (p *replicationFlowPlanner) getSrcTenantID() (roachpb.TenantID, error) {
return p.srcTenantID, nil
}

func repartitionTopology(in streamclient.Topology, targetPartCount int) streamclient.Topology {
growth := targetPartCount / len(in.Partitions)
if growth <= 1 {
return in
}

// Copy the topology and allocate a new partition slice.
out := in
out.Partitions = make([]streamclient.PartitionInfo, 0, targetPartCount)
// For each partition in the input, put some number of copies of it into the
// output each containing some fraction of its spans.
for _, p := range in.Partitions {
chunk := len(p.Spans)/growth + 1
for len(p.Spans) > 0 {
c := p
c.Spans = p.Spans[:min(chunk, len(p.Spans))]
out.Partitions = append(out.Partitions, c)
p.Spans = p.Spans[len(c.Spans):]
}
}

return out
}

func (p *replicationFlowPlanner) constructPlanGenerator(
execCtx sql.JobExecContext,
ingestionJobID jobspb.JobID,
Expand All @@ -515,12 +539,18 @@ func (p *replicationFlowPlanner) constructPlanGenerator(
if err != nil {
return nil, nil, err
}

// If we have fewer partitions than we have nodes, try to repartition the
// topology to have more partitions.
topology = repartitionTopology(topology, len(sqlInstanceIDs)*8)

if !p.createdInitialPlan() {
p.initialTopology = topology
p.initialStreamAddresses = topology.StreamAddresses()
p.initialDestinationNodes = sqlInstanceIDs

}

destNodeLocalities, err := GetDestNodeLocalities(ctx, dsp, sqlInstanceIDs)
if err != nil {
return nil, nil, err
Expand Down
38 changes: 38 additions & 0 deletions pkg/ccl/crosscluster/physical/stream_ingestion_dist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,3 +522,41 @@ func TestParallelInitialSplits(t *testing.T) {
require.Equal(t, sp.Key.String(), ts.mu.scatters[i].String())
}
}

func TestRepartition(t *testing.T) {
defer leaktest.AfterTest(t)()

p := func(node, parts, start int) streamclient.PartitionInfo {
spans := make([]roachpb.Span, parts)
for i := range spans {
spans[i].Key = roachpb.Key(fmt.Sprintf("n%d-%d-a", node, i+start))
spans[i].EndKey = roachpb.Key(fmt.Sprintf("n%d-%d-b", node, i+start))
}
return streamclient.PartitionInfo{SrcInstanceID: node, Spans: spans}
}
for _, parts := range []int{1, 4, 100} {
for _, input := range [][]streamclient.PartitionInfo{
{p(1, 43, 0), p(2, 44, 0), p(3, 41, 0)},
{p(1, 1, 0), p(2, 1, 0), p(3, 1, 0)},
{p(1, 43, 0), p(2, 44, 0), p(3, 38, 0)},
} {
got := repartitionTopology(streamclient.Topology{Partitions: input}, parts)

var expectedSpans, gotSpans roachpb.Spans
for _, part := range input {
expectedSpans = append(expectedSpans, part.Spans...)
}
for _, part := range got.Partitions {
gotSpans = append(gotSpans, part.Spans...)
}
require.LessOrEqual(t, min(parts, len(input)), len(got.Partitions))
require.GreaterOrEqual(t, max(parts, len(input)), len(got.Partitions))

// Regardless of how we partitioned, make sure we have all the spans.
sort.Sort(expectedSpans)
sort.Sort(gotSpans)
require.Equal(t, len(expectedSpans), len(gotSpans))
require.Equal(t, expectedSpans, gotSpans)
}
}
}
1 change: 0 additions & 1 deletion pkg/ccl/crosscluster/producer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ go_test(
"replication_manager_test.go",
"replication_stream_test.go",
"stream_event_batcher_test.go",
"stream_lifetime_test.go",
],
embed = [":producer"],
deps = [
Expand Down
10 changes: 1 addition & 9 deletions pkg/ccl/crosscluster/producer/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,6 @@ var useStreaksInLDR = settings.RegisterBoolSetting(
false,
)

var ldrProcCount = settings.RegisterIntSetting(
settings.ApplicationLevel,
"logical_replication.producer.ingest_processor_parallelism",
"target number of stream partitions per source node",
1,
)

func (r *replicationStreamManagerImpl) PlanLogicalReplication(
ctx context.Context, req streampb.LogicalReplicationPlanRequest,
) (*streampb.ReplicationStreamSpec, error) {
Expand Down Expand Up @@ -225,8 +218,7 @@ func (r *replicationStreamManagerImpl) PlanLogicalReplication(
}
}

spec, err := buildReplicationStreamSpec(ctx, r.evalCtx, tenID, false, spans,
int(ldrProcCount.Get(&r.evalCtx.Settings.SV)), useStreaksInLDR.Get(&r.evalCtx.Settings.SV))
spec, err := buildReplicationStreamSpec(ctx, r.evalCtx, tenID, false, spans, useStreaksInLDR.Get(&r.evalCtx.Settings.SV))
if err != nil {
return nil, err
}
Expand Down
45 changes: 1 addition & 44 deletions pkg/ccl/crosscluster/producer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
Expand All @@ -44,14 +43,6 @@ import (
// expiration window will be 24 hours.
const defaultExpirationWindow = time.Hour * 24

var streamMaxProcsPerPartition = settings.RegisterIntSetting(
settings.ApplicationLevel,
"stream_replication.ingest_processor_parallelism",
"controls the maximum number of ingest processors to assign to each source-planned partition",
8,
settings.PositiveInt,
)

// notAReplicationJobError returns an error that is returned anytime
// the user passes a job ID not related to a replication stream job.
func notAReplicationJobError(id jobspb.JobID) error {
Expand Down Expand Up @@ -283,7 +274,7 @@ func getPhysicalReplicationStreamSpec(
if j.Status() != jobs.StatusRunning {
return nil, jobIsNotRunningError(jobID, j.Status(), "create stream spec")
}
return buildReplicationStreamSpec(ctx, evalCtx, details.TenantID, false, details.Spans, 0, true)
return buildReplicationStreamSpec(ctx, evalCtx, details.TenantID, false, details.Spans, true)

}

Expand All @@ -293,7 +284,6 @@ func buildReplicationStreamSpec(
tenantID roachpb.TenantID,
forSpanConfigs bool,
targetSpans roachpb.Spans,
targetPartitionCount int,
useStreaks bool,
) (*streampb.ReplicationStreamSpec, error) {
jobExecCtx := evalCtx.JobExecContext.(sql.JobExecContext)
Expand All @@ -320,14 +310,6 @@ func buildReplicationStreamSpec(
return nil, err
}

// If more partitions were requested, try to repartition the spans.
if targetPartitionCount > len(spanPartitions) {
spanPartitions = repartitionSpans(spanPartitions, max(1, targetPartitionCount/len(spanPartitions)))
} else if targetPartitionCount < 1 {
// No explicit target requested, so fallback to 8x controlled via setting.
spanPartitions = repartitionSpans(spanPartitions, int(streamMaxProcsPerPartition.Get(&evalCtx.Settings.SV)))
}

var spanConfigsStreamID streampb.StreamID
if forSpanConfigs {
spanConfigsStreamID = streampb.StreamID(builtins.GenerateUniqueInt(builtins.ProcessUniqueID(evalCtx.NodeID.SQLInstanceID())))
Expand Down Expand Up @@ -356,31 +338,6 @@ func buildReplicationStreamSpec(
return res, nil
}

// repartitionSpans breaks up each of partition in partitions into parts smaller
// partitions that are round-robin assigned its spans. NB: we round-robin rather
// than assigning the first k to 1, next k to 2, etc since spans earlier in the
// key-space are generally smaller than those later in it due to the avoidance
// of skewed distributions during once-through streak-affinity planning (since
// early in the planning a small streak skews the overall distribution planned
// so far). It would be unfortunate to have round-robin assignment mean we do
// not colocate adjacent spans in the same processor, but we already expect gaps
// between these spans for spans assigned to other partitions, so we don't worry
// about that here.
func repartitionSpans(partitions []sql.SpanPartition, parts int) []sql.SpanPartition {
result := make([]sql.SpanPartition, 0, parts*len(partitions))
for part := range partitions {
repartitioned := make([]sql.SpanPartition, min(parts, len(partitions[part].Spans)))
for i := range repartitioned {
repartitioned[i].SQLInstanceID = partitions[part].SQLInstanceID
}
for x, sp := range partitions[part].Spans {
repartitioned[x%parts].Spans = append(repartitioned[x%parts].Spans, sp)
}
result = append(result, repartitioned...)
}
return result
}

func completeReplicationStream(
ctx context.Context,
evalCtx *eval.Context,
Expand Down
57 changes: 0 additions & 57 deletions pkg/ccl/crosscluster/producer/stream_lifetime_test.go

This file was deleted.

0 comments on commit 4bac5aa

Please sign in to comment.