Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-24.3: sql: fix recently introduced data race #134248

Merged
merged 1 commit into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,25 +727,6 @@ func NewColOperator(
result := opResult{NewColOperatorResult: colexecargs.GetNewColOperatorResult()}
r := result.NewColOperatorResult
spec := args.Spec
// Throughout this method we often use the type slice from the input spec to
// create the type schema of an operator. However, it is possible that the
// same type slice is shared by multiple stages of processors. If it just so
// happens that there is free capacity in the slice, and we append to it
// when planning operators for both stages, we might corrupt the type schema
// captured by the operators for the earlier stage. In order to prevent such
// type schema corruption we cap the slice to force creation of a fresh copy
// on the first append.
if flowCtx.Gateway {
// Sharing of the same type slice is only possible on the gateway node
// because we don't serialize the specs created during the physical
// planning. On the remote nodes each stage of processors gets their own
// allocation, so there is no aliasing that can lead to the type schema
// corruption.
for i := range spec.Input {
inputSpec := &spec.Input[i]
inputSpec.ColumnTypes = inputSpec.ColumnTypes[:len(inputSpec.ColumnTypes):len(inputSpec.ColumnTypes)]
}
}
inputs := args.Inputs
if args.Factory == nil {
// This code path is only used in tests.
Expand Down
22 changes: 21 additions & 1 deletion pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,8 @@ func (p *PlanningCtx) getCleanupFunc() func() {
// plan to a planNode subtree.
//
// These plans are built recursively on a planNode tree.
//
// PhysicalPlan is immutable after its finalization.
type PhysicalPlan struct {
physicalplan.PhysicalPlan

Expand Down Expand Up @@ -5140,8 +5142,8 @@ func finalizePlanWithRowCount(
Type: execinfrapb.StreamEndpointSpec_SYNC_RESPONSE,
})

// Assign processor IDs.
for i, p := range plan.Processors {
// Assign processor IDs.
plan.Processors[i].Spec.ProcessorID = int32(i)
// Double check that our reliance on ProcessorID == index is good.
if _, ok := plan.LocalVectorSources[int32(i)]; ok {
Expand All @@ -5150,6 +5152,24 @@ func finalizePlanWithRowCount(
panic(errors.AssertionFailedf("expected processor to be Values"))
}
}
// Prevent the type schema corruption as found in #130402.
//
// Namely, during the vectorized operator planning we often use the type
// slice from the input spec to create the type schema of an operator.
// However, it is possible that the same type slice is shared by
// multiple stages of processors. If it just so happens that there is
// free capacity in the slice, and we append to it when planning
// operators for both stages, we might corrupt the type schema captured
// by the operators for the earlier stage. In order to prevent such type
// schema corruption we cap the slice to force creation of a fresh copy
// on the first append.
//
// We can't do this capping later (during the vectorized planning)
// because the physical plan is immutable once finalized.
for j := range p.Spec.Input {
inputSpec := &p.Spec.Input[j]
inputSpec.ColumnTypes = inputSpec.ColumnTypes[:len(inputSpec.ColumnTypes):len(inputSpec.ColumnTypes)]
}
}
}

Expand Down