Skip to content

Commit

Permalink
Merge pull request #134248 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.3-134217

release-24.3: sql: fix recently introduced data race
  • Loading branch information
yuzefovich authored Nov 8, 2024
2 parents 8c0dabf + a09bea1 commit 045ef0a
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 20 deletions.
19 changes: 0 additions & 19 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,25 +727,6 @@ func NewColOperator(
result := opResult{NewColOperatorResult: colexecargs.GetNewColOperatorResult()}
r := result.NewColOperatorResult
spec := args.Spec
// Throughout this method we often use the type slice from the input spec to
// create the type schema of an operator. However, it is possible that the
// same type slice is shared by multiple stages of processors. If it just so
// happens that there is free capacity in the slice, and we append to it
// when planning operators for both stages, we might corrupt the type schema
// captured by the operators for the earlier stage. In order to prevent such
// type schema corruption we cap the slice to force creation of a fresh copy
// on the first append.
if flowCtx.Gateway {
// Sharing of the same type slice is only possible on the gateway node
// because we don't serialize the specs created during the physical
// planning. On the remote nodes each stage of processors gets their own
// allocation, so there is no aliasing that can lead to the type schema
// corruption.
for i := range spec.Input {
inputSpec := &spec.Input[i]
inputSpec.ColumnTypes = inputSpec.ColumnTypes[:len(inputSpec.ColumnTypes):len(inputSpec.ColumnTypes)]
}
}
inputs := args.Inputs
if args.Factory == nil {
// This code path is only used in tests.
Expand Down
22 changes: 21 additions & 1 deletion pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,8 @@ func (p *PlanningCtx) getCleanupFunc() func() {
// plan to a planNode subtree.
//
// These plans are built recursively on a planNode tree.
//
// PhysicalPlan is immutable after its finalization.
type PhysicalPlan struct {
physicalplan.PhysicalPlan

Expand Down Expand Up @@ -5140,8 +5142,8 @@ func finalizePlanWithRowCount(
Type: execinfrapb.StreamEndpointSpec_SYNC_RESPONSE,
})

// Assign processor IDs.
for i, p := range plan.Processors {
// Assign processor IDs.
plan.Processors[i].Spec.ProcessorID = int32(i)
// Double check that our reliance on ProcessorID == index is good.
if _, ok := plan.LocalVectorSources[int32(i)]; ok {
Expand All @@ -5150,6 +5152,24 @@ func finalizePlanWithRowCount(
panic(errors.AssertionFailedf("expected processor to be Values"))
}
}
// Prevent the type schema corruption as found in #130402.
//
// Namely, during the vectorized operator planning we often use the type
// slice from the input spec to create the type schema of an operator.
// However, it is possible that the same type slice is shared by
// multiple stages of processors. If it just so happens that there is
// free capacity in the slice, and we append to it when planning
// operators for both stages, we might corrupt the type schema captured
// by the operators for the earlier stage. In order to prevent such type
// schema corruption we cap the slice to force creation of a fresh copy
// on the first append.
//
// We can't do this capping later (during the vectorized planning)
// because the physical plan is immutable once finalized.
for j := range p.Spec.Input {
inputSpec := &p.Spec.Input[j]
inputSpec.ColumnTypes = inputSpec.ColumnTypes[:len(inputSpec.ColumnTypes):len(inputSpec.ColumnTypes)]
}
}
}

Expand Down

0 comments on commit 045ef0a

Please sign in to comment.