Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
135637: crosscluster/physical: split PCR procs dest side instead r=dt a=dt

This allows the dest clsuter to choose to split less when it has fewer nodes.

Release note: none.
Epic: none.

Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
craig[bot] and dt committed Nov 19, 2024
2 parents 411026e + 4bac5aa commit 2b243f2
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 111 deletions.
30 changes: 30 additions & 0 deletions pkg/ccl/crosscluster/physical/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,30 @@ func (p *replicationFlowPlanner) getSrcTenantID() (roachpb.TenantID, error) {
return p.srcTenantID, nil
}

func repartitionTopology(in streamclient.Topology, targetPartCount int) streamclient.Topology {
growth := targetPartCount / len(in.Partitions)
if growth <= 1 {
return in
}

// Copy the topology and allocate a new partition slice.
out := in
out.Partitions = make([]streamclient.PartitionInfo, 0, targetPartCount)
// For each partition in the input, put some number of copies of it into the
// output each containing some fraction of its spans.
for _, p := range in.Partitions {
chunk := len(p.Spans)/growth + 1
for len(p.Spans) > 0 {
c := p
c.Spans = p.Spans[:min(chunk, len(p.Spans))]
out.Partitions = append(out.Partitions, c)
p.Spans = p.Spans[len(c.Spans):]
}
}

return out
}

func (p *replicationFlowPlanner) constructPlanGenerator(
execCtx sql.JobExecContext,
ingestionJobID jobspb.JobID,
Expand All @@ -515,12 +539,18 @@ func (p *replicationFlowPlanner) constructPlanGenerator(
if err != nil {
return nil, nil, err
}

// If we have fewer partitions than we have nodes, try to repartition the
// topology to have more partitions.
topology = repartitionTopology(topology, len(sqlInstanceIDs)*8)

if !p.createdInitialPlan() {
p.initialTopology = topology
p.initialStreamAddresses = topology.StreamAddresses()
p.initialDestinationNodes = sqlInstanceIDs

}

destNodeLocalities, err := GetDestNodeLocalities(ctx, dsp, sqlInstanceIDs)
if err != nil {
return nil, nil, err
Expand Down
38 changes: 38 additions & 0 deletions pkg/ccl/crosscluster/physical/stream_ingestion_dist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,3 +522,41 @@ func TestParallelInitialSplits(t *testing.T) {
require.Equal(t, sp.Key.String(), ts.mu.scatters[i].String())
}
}

func TestRepartition(t *testing.T) {
defer leaktest.AfterTest(t)()

p := func(node, parts, start int) streamclient.PartitionInfo {
spans := make([]roachpb.Span, parts)
for i := range spans {
spans[i].Key = roachpb.Key(fmt.Sprintf("n%d-%d-a", node, i+start))
spans[i].EndKey = roachpb.Key(fmt.Sprintf("n%d-%d-b", node, i+start))
}
return streamclient.PartitionInfo{SrcInstanceID: node, Spans: spans}
}
for _, parts := range []int{1, 4, 100} {
for _, input := range [][]streamclient.PartitionInfo{
{p(1, 43, 0), p(2, 44, 0), p(3, 41, 0)},
{p(1, 1, 0), p(2, 1, 0), p(3, 1, 0)},
{p(1, 43, 0), p(2, 44, 0), p(3, 38, 0)},
} {
got := repartitionTopology(streamclient.Topology{Partitions: input}, parts)

var expectedSpans, gotSpans roachpb.Spans
for _, part := range input {
expectedSpans = append(expectedSpans, part.Spans...)
}
for _, part := range got.Partitions {
gotSpans = append(gotSpans, part.Spans...)
}
require.LessOrEqual(t, min(parts, len(input)), len(got.Partitions))
require.GreaterOrEqual(t, max(parts, len(input)), len(got.Partitions))

// Regardless of how we partitioned, make sure we have all the spans.
sort.Sort(expectedSpans)
sort.Sort(gotSpans)
require.Equal(t, len(expectedSpans), len(gotSpans))
require.Equal(t, expectedSpans, gotSpans)
}
}
}
1 change: 0 additions & 1 deletion pkg/ccl/crosscluster/producer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ go_test(
"replication_manager_test.go",
"replication_stream_test.go",
"stream_event_batcher_test.go",
"stream_lifetime_test.go",
],
embed = [":producer"],
deps = [
Expand Down
10 changes: 1 addition & 9 deletions pkg/ccl/crosscluster/producer/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,6 @@ var useStreaksInLDR = settings.RegisterBoolSetting(
false,
)

var ldrProcCount = settings.RegisterIntSetting(
settings.ApplicationLevel,
"logical_replication.producer.ingest_processor_parallelism",
"target number of stream partitions per source node",
1,
)

func (r *replicationStreamManagerImpl) PlanLogicalReplication(
ctx context.Context, req streampb.LogicalReplicationPlanRequest,
) (*streampb.ReplicationStreamSpec, error) {
Expand Down Expand Up @@ -225,8 +218,7 @@ func (r *replicationStreamManagerImpl) PlanLogicalReplication(
}
}

spec, err := buildReplicationStreamSpec(ctx, r.evalCtx, tenID, false, spans,
int(ldrProcCount.Get(&r.evalCtx.Settings.SV)), useStreaksInLDR.Get(&r.evalCtx.Settings.SV))
spec, err := buildReplicationStreamSpec(ctx, r.evalCtx, tenID, false, spans, useStreaksInLDR.Get(&r.evalCtx.Settings.SV))
if err != nil {
return nil, err
}
Expand Down
45 changes: 1 addition & 44 deletions pkg/ccl/crosscluster/producer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
Expand All @@ -44,14 +43,6 @@ import (
// expiration window will be 24 hours.
const defaultExpirationWindow = time.Hour * 24

var streamMaxProcsPerPartition = settings.RegisterIntSetting(
settings.ApplicationLevel,
"stream_replication.ingest_processor_parallelism",
"controls the maximum number of ingest processors to assign to each source-planned partition",
8,
settings.PositiveInt,
)

// notAReplicationJobError returns an error that is returned anytime
// the user passes a job ID not related to a replication stream job.
func notAReplicationJobError(id jobspb.JobID) error {
Expand Down Expand Up @@ -283,7 +274,7 @@ func getPhysicalReplicationStreamSpec(
if j.Status() != jobs.StatusRunning {
return nil, jobIsNotRunningError(jobID, j.Status(), "create stream spec")
}
return buildReplicationStreamSpec(ctx, evalCtx, details.TenantID, false, details.Spans, 0, true)
return buildReplicationStreamSpec(ctx, evalCtx, details.TenantID, false, details.Spans, true)

}

Expand All @@ -293,7 +284,6 @@ func buildReplicationStreamSpec(
tenantID roachpb.TenantID,
forSpanConfigs bool,
targetSpans roachpb.Spans,
targetPartitionCount int,
useStreaks bool,
) (*streampb.ReplicationStreamSpec, error) {
jobExecCtx := evalCtx.JobExecContext.(sql.JobExecContext)
Expand All @@ -320,14 +310,6 @@ func buildReplicationStreamSpec(
return nil, err
}

// If more partitions were requested, try to repartition the spans.
if targetPartitionCount > len(spanPartitions) {
spanPartitions = repartitionSpans(spanPartitions, max(1, targetPartitionCount/len(spanPartitions)))
} else if targetPartitionCount < 1 {
// No explicit target requested, so fallback to 8x controlled via setting.
spanPartitions = repartitionSpans(spanPartitions, int(streamMaxProcsPerPartition.Get(&evalCtx.Settings.SV)))
}

var spanConfigsStreamID streampb.StreamID
if forSpanConfigs {
spanConfigsStreamID = streampb.StreamID(builtins.GenerateUniqueInt(builtins.ProcessUniqueID(evalCtx.NodeID.SQLInstanceID())))
Expand Down Expand Up @@ -356,31 +338,6 @@ func buildReplicationStreamSpec(
return res, nil
}

// repartitionSpans breaks up each of partition in partitions into parts smaller
// partitions that are round-robin assigned its spans. NB: we round-robin rather
// than assigning the first k to 1, next k to 2, etc since spans earlier in the
// key-space are generally smaller than those later in it due to the avoidance
// of skewed distributions during once-through streak-affinity planning (since
// early in the planning a small streak skews the overall distribution planned
// so far). It would be unfortunate to have round-robin assignment mean we do
// not colocate adjacent spans in the same processor, but we already expect gaps
// between these spans for spans assigned to other partitions, so we don't worry
// about that here.
func repartitionSpans(partitions []sql.SpanPartition, parts int) []sql.SpanPartition {
result := make([]sql.SpanPartition, 0, parts*len(partitions))
for part := range partitions {
repartitioned := make([]sql.SpanPartition, min(parts, len(partitions[part].Spans)))
for i := range repartitioned {
repartitioned[i].SQLInstanceID = partitions[part].SQLInstanceID
}
for x, sp := range partitions[part].Spans {
repartitioned[x%parts].Spans = append(repartitioned[x%parts].Spans, sp)
}
result = append(result, repartitioned...)
}
return result
}

func completeReplicationStream(
ctx context.Context,
evalCtx *eval.Context,
Expand Down
57 changes: 0 additions & 57 deletions pkg/ccl/crosscluster/producer/stream_lifetime_test.go

This file was deleted.

0 comments on commit 2b243f2

Please sign in to comment.