Skip to content

Commit

Permalink
crosscluster/physical: update tokens when altering topology
Browse files Browse the repository at this point in the history
Release note: none.
Epic: none.

This regressed in cockroachdb#135637 which was assigning all conusmer sub-partitions the whole partition due
to sharing the original token.
  • Loading branch information
dt committed Nov 25, 2024
1 parent 6a7cdb8 commit 1127f78
Showing 1 changed file with 13 additions and 5 deletions.
18 changes: 13 additions & 5 deletions pkg/ccl/crosscluster/physical/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -491,10 +492,10 @@ func (p *replicationFlowPlanner) getSrcTenantID() (roachpb.TenantID, error) {
return p.srcTenantID, nil
}

func repartitionTopology(in streamclient.Topology, targetPartCount int) streamclient.Topology {
func repartitionTopology(in streamclient.Topology, targetPartCount int) (streamclient.Topology, error) {
growth := targetPartCount / len(in.Partitions)
if growth <= 1 {
return in
return in, nil
}

// Copy the topology and allocate a new partition slice.
Expand All @@ -507,12 +508,16 @@ func repartitionTopology(in streamclient.Topology, targetPartCount int) streamcl
for len(p.Spans) > 0 {
c := p
c.Spans = p.Spans[:min(chunk, len(p.Spans))]
tok, err := protoutil.Marshal(&streampb.SourcePartition{Spans: c.Spans})
if err != nil {
return out, err
}
c.SubscriptionToken = tok
out.Partitions = append(out.Partitions, c)
p.Spans = p.Spans[len(c.Spans):]
}
}

return out
return out, nil
}

func (p *replicationFlowPlanner) constructPlanGenerator(
Expand Down Expand Up @@ -542,7 +547,10 @@ func (p *replicationFlowPlanner) constructPlanGenerator(

// If we have fewer partitions than we have nodes, try to repartition the
// topology to have more partitions.
topology = repartitionTopology(topology, len(sqlInstanceIDs)*8)
topology, err = repartitionTopology(topology, len(sqlInstanceIDs)*8)
if err != nil {
return nil, nil, err
}

if !p.createdInitialPlan() {
p.initialTopology = topology
Expand Down

0 comments on commit 1127f78

Please sign in to comment.