Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

crosscluster/physical: split PCR procs dest side instead #135637

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions pkg/ccl/crosscluster/physical/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,30 @@ func (p *replicationFlowPlanner) getSrcTenantID() (roachpb.TenantID, error) {
return p.srcTenantID, nil
}

func repartitionTopology(in streamclient.Topology, targetPartCount int) streamclient.Topology {
growth := targetPartCount / len(in.Partitions)
if growth <= 1 {
return in
}

// Copy the topology and allocate a new partition slice.
out := in
out.Partitions = make([]streamclient.PartitionInfo, 0, targetPartCount)
// For each partition in the input, put some number of copies of it into the
// output each containing some fraction of its spans.
for _, p := range in.Partitions {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you removed the round robin behavior from the previous repartition algorithm. why? I'm referring to this code block from the old func:

	for x, sp := range partitions[part].Spans {
			repartitioned[x%parts].Spans = append(repartitioned[x%parts].Spans, sp)
		}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We like adjacent spans in the same proc -- we added a whole new planning mode to get it! -- and round-robin seems antithetical to that, so I think I was just misguided when I added it initially. If I'm wrong we can being it back.

chunk := len(p.Spans)/growth + 1
for len(p.Spans) > 0 {
c := p
c.Spans = p.Spans[:min(chunk, len(p.Spans))]
out.Partitions = append(out.Partitions, c)
p.Spans = p.Spans[len(c.Spans):]
}
}

return out
}

func (p *replicationFlowPlanner) constructPlanGenerator(
execCtx sql.JobExecContext,
ingestionJobID jobspb.JobID,
Expand All @@ -515,12 +539,18 @@ func (p *replicationFlowPlanner) constructPlanGenerator(
if err != nil {
return nil, nil, err
}

// If we have fewer partitions than we have nodes, try to repartition the
// topology to have more partitions.
topology = repartitionTopology(topology, len(sqlInstanceIDs)*8)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason you wanted to remove this tunable cluster setting? Like, if we encounter another pcr oom, it may be nice to have it.

Copy link
Member Author

@dt dt Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OOM was in smaller dest clusters than src clusters where we'd create >8 procs per node, whereas now we are using the src cluster size directly here, so we no longer risk that specific behavior. The setting also wasn't settable in CC where the sys tenant is inaccessible so I think having the code tune itself, as we do now, rather than having a tuning knob, is what we need anyway.


if !p.createdInitialPlan() {
p.initialTopology = topology
p.initialStreamAddresses = topology.StreamAddresses()
p.initialDestinationNodes = sqlInstanceIDs

}

destNodeLocalities, err := GetDestNodeLocalities(ctx, dsp, sqlInstanceIDs)
if err != nil {
return nil, nil, err
Expand Down
38 changes: 38 additions & 0 deletions pkg/ccl/crosscluster/physical/stream_ingestion_dist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,3 +522,41 @@ func TestParallelInitialSplits(t *testing.T) {
require.Equal(t, sp.Key.String(), ts.mu.scatters[i].String())
}
}

func TestRepartition(t *testing.T) {
defer leaktest.AfterTest(t)()

p := func(node, parts, start int) streamclient.PartitionInfo {
spans := make([]roachpb.Span, parts)
for i := range spans {
spans[i].Key = roachpb.Key(fmt.Sprintf("n%d-%d-a", node, i+start))
spans[i].EndKey = roachpb.Key(fmt.Sprintf("n%d-%d-b", node, i+start))
}
return streamclient.PartitionInfo{SrcInstanceID: node, Spans: spans}
}
for _, parts := range []int{1, 4, 100} {
for _, input := range [][]streamclient.PartitionInfo{
{p(1, 43, 0), p(2, 44, 0), p(3, 41, 0)},
{p(1, 1, 0), p(2, 1, 0), p(3, 1, 0)},
{p(1, 43, 0), p(2, 44, 0), p(3, 38, 0)},
} {
got := repartitionTopology(streamclient.Topology{Partitions: input}, parts)

var expectedSpans, gotSpans roachpb.Spans
for _, part := range input {
expectedSpans = append(expectedSpans, part.Spans...)
}
for _, part := range got.Partitions {
gotSpans = append(gotSpans, part.Spans...)
}
require.LessOrEqual(t, min(parts, len(input)), len(got.Partitions))
require.GreaterOrEqual(t, max(parts, len(input)), len(got.Partitions))

// Regardless of how we partitioned, make sure we have all the spans.
sort.Sort(expectedSpans)
sort.Sort(gotSpans)
require.Equal(t, len(expectedSpans), len(gotSpans))
require.Equal(t, expectedSpans, gotSpans)
}
}
}
1 change: 0 additions & 1 deletion pkg/ccl/crosscluster/producer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ go_test(
"replication_manager_test.go",
"replication_stream_test.go",
"stream_event_batcher_test.go",
"stream_lifetime_test.go",
],
embed = [":producer"],
deps = [
Expand Down
10 changes: 1 addition & 9 deletions pkg/ccl/crosscluster/producer/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,6 @@ var useStreaksInLDR = settings.RegisterBoolSetting(
false,
)

var ldrProcCount = settings.RegisterIntSetting(
settings.ApplicationLevel,
"logical_replication.producer.ingest_processor_parallelism",
"target number of stream partitions per source node",
1,
)

func (r *replicationStreamManagerImpl) PlanLogicalReplication(
ctx context.Context, req streampb.LogicalReplicationPlanRequest,
) (*streampb.ReplicationStreamSpec, error) {
Expand Down Expand Up @@ -225,8 +218,7 @@ func (r *replicationStreamManagerImpl) PlanLogicalReplication(
}
}

spec, err := buildReplicationStreamSpec(ctx, r.evalCtx, tenID, false, spans,
int(ldrProcCount.Get(&r.evalCtx.Settings.SV)), useStreaksInLDR.Get(&r.evalCtx.Settings.SV))
spec, err := buildReplicationStreamSpec(ctx, r.evalCtx, tenID, false, spans, useStreaksInLDR.Get(&r.evalCtx.Settings.SV))
if err != nil {
return nil, err
}
Expand Down
45 changes: 1 addition & 44 deletions pkg/ccl/crosscluster/producer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
Expand All @@ -44,14 +43,6 @@ import (
// expiration window will be 24 hours.
const defaultExpirationWindow = time.Hour * 24

var streamMaxProcsPerPartition = settings.RegisterIntSetting(
settings.ApplicationLevel,
"stream_replication.ingest_processor_parallelism",
"controls the maximum number of ingest processors to assign to each source-planned partition",
8,
settings.PositiveInt,
)

// notAReplicationJobError returns an error that is returned anytime
// the user passes a job ID not related to a replication stream job.
func notAReplicationJobError(id jobspb.JobID) error {
Expand Down Expand Up @@ -283,7 +274,7 @@ func getPhysicalReplicationStreamSpec(
if j.Status() != jobs.StatusRunning {
return nil, jobIsNotRunningError(jobID, j.Status(), "create stream spec")
}
return buildReplicationStreamSpec(ctx, evalCtx, details.TenantID, false, details.Spans, 0, true)
return buildReplicationStreamSpec(ctx, evalCtx, details.TenantID, false, details.Spans, true)

}

Expand All @@ -293,7 +284,6 @@ func buildReplicationStreamSpec(
tenantID roachpb.TenantID,
forSpanConfigs bool,
targetSpans roachpb.Spans,
targetPartitionCount int,
useStreaks bool,
) (*streampb.ReplicationStreamSpec, error) {
jobExecCtx := evalCtx.JobExecContext.(sql.JobExecContext)
Expand All @@ -320,14 +310,6 @@ func buildReplicationStreamSpec(
return nil, err
}

// If more partitions were requested, try to repartition the spans.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

glad to see this logic moved to the consumer.

if targetPartitionCount > len(spanPartitions) {
spanPartitions = repartitionSpans(spanPartitions, max(1, targetPartitionCount/len(spanPartitions)))
} else if targetPartitionCount < 1 {
// No explicit target requested, so fallback to 8x controlled via setting.
spanPartitions = repartitionSpans(spanPartitions, int(streamMaxProcsPerPartition.Get(&evalCtx.Settings.SV)))
}

var spanConfigsStreamID streampb.StreamID
if forSpanConfigs {
spanConfigsStreamID = streampb.StreamID(builtins.GenerateUniqueInt(builtins.ProcessUniqueID(evalCtx.NodeID.SQLInstanceID())))
Expand Down Expand Up @@ -356,31 +338,6 @@ func buildReplicationStreamSpec(
return res, nil
}

// repartitionSpans breaks up each of partition in partitions into parts smaller
// partitions that are round-robin assigned its spans. NB: we round-robin rather
// than assigning the first k to 1, next k to 2, etc since spans earlier in the
// key-space are generally smaller than those later in it due to the avoidance
// of skewed distributions during once-through streak-affinity planning (since
// early in the planning a small streak skews the overall distribution planned
// so far). It would be unfortunate to have round-robin assignment mean we do
// not colocate adjacent spans in the same processor, but we already expect gaps
// between these spans for spans assigned to other partitions, so we don't worry
// about that here.
func repartitionSpans(partitions []sql.SpanPartition, parts int) []sql.SpanPartition {
result := make([]sql.SpanPartition, 0, parts*len(partitions))
for part := range partitions {
repartitioned := make([]sql.SpanPartition, min(parts, len(partitions[part].Spans)))
for i := range repartitioned {
repartitioned[i].SQLInstanceID = partitions[part].SQLInstanceID
}
for x, sp := range partitions[part].Spans {
repartitioned[x%parts].Spans = append(repartitioned[x%parts].Spans, sp)
}
result = append(result, repartitioned...)
}
return result
}

func completeReplicationStream(
ctx context.Context,
evalCtx *eval.Context,
Expand Down
57 changes: 0 additions & 57 deletions pkg/ccl/crosscluster/producer/stream_lifetime_test.go

This file was deleted.