From 1127f78edafb5fed129f8638621dcaab82b6a7e0 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Mon, 25 Nov 2024 19:01:16 +0000 Subject: [PATCH] crosscluster/physical: update tokens when altering topology Release note: none. Epic: none. This regressed in #135637 which was assigning all conusmer sub-partitions the whole partition due to sharing the original token. --- .../physical/stream_ingestion_dist.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/pkg/ccl/crosscluster/physical/stream_ingestion_dist.go b/pkg/ccl/crosscluster/physical/stream_ingestion_dist.go index 8b4a3c0c1872..87f78ffe357a 100644 --- a/pkg/ccl/crosscluster/physical/stream_ingestion_dist.go +++ b/pkg/ccl/crosscluster/physical/stream_ingestion_dist.go @@ -33,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -491,10 +492,10 @@ func (p *replicationFlowPlanner) getSrcTenantID() (roachpb.TenantID, error) { return p.srcTenantID, nil } -func repartitionTopology(in streamclient.Topology, targetPartCount int) streamclient.Topology { +func repartitionTopology(in streamclient.Topology, targetPartCount int) (streamclient.Topology, error) { growth := targetPartCount / len(in.Partitions) if growth <= 1 { - return in + return in, nil } // Copy the topology and allocate a new partition slice. @@ -507,12 +508,16 @@ func repartitionTopology(in streamclient.Topology, targetPartCount int) streamcl for len(p.Spans) > 0 { c := p c.Spans = p.Spans[:min(chunk, len(p.Spans))] + tok, err := protoutil.Marshal(&streampb.SourcePartition{Spans: c.Spans}) + if err != nil { + return out, err + } + c.SubscriptionToken = tok out.Partitions = append(out.Partitions, c) p.Spans = p.Spans[len(c.Spans):] } } - - return out + return out, nil } func (p *replicationFlowPlanner) constructPlanGenerator( @@ -542,7 +547,10 @@ func (p *replicationFlowPlanner) constructPlanGenerator( // If we have fewer partitions than we have nodes, try to repartition the // topology to have more partitions. - topology = repartitionTopology(topology, len(sqlInstanceIDs)*8) + topology, err = repartitionTopology(topology, len(sqlInstanceIDs)*8) + if err != nil { + return nil, nil, err + } if !p.createdInitialPlan() { p.initialTopology = topology