diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 4869c12cfc47..34dc32a3c921 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -727,25 +727,6 @@ func NewColOperator( result := opResult{NewColOperatorResult: colexecargs.GetNewColOperatorResult()} r := result.NewColOperatorResult spec := args.Spec - // Throughout this method we often use the type slice from the input spec to - // create the type schema of an operator. However, it is possible that the - // same type slice is shared by multiple stages of processors. If it just so - // happens that there is free capacity in the slice, and we append to it - // when planning operators for both stages, we might corrupt the type schema - // captured by the operators for the earlier stage. In order to prevent such - // type schema corruption we cap the slice to force creation of a fresh copy - // on the first append. - if flowCtx.Gateway { - // Sharing of the same type slice is only possible on the gateway node - // because we don't serialize the specs created during the physical - // planning. On the remote nodes each stage of processors gets their own - // allocation, so there is no aliasing that can lead to the type schema - // corruption. - for i := range spec.Input { - inputSpec := &spec.Input[i] - inputSpec.ColumnTypes = inputSpec.ColumnTypes[:len(inputSpec.ColumnTypes):len(inputSpec.ColumnTypes)] - } - } inputs := args.Inputs if args.Factory == nil { // This code path is only used in tests. diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index b1e19109b97b..7f0ccf565c3e 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -1087,6 +1087,8 @@ func (p *PlanningCtx) getCleanupFunc() func() { // plan to a planNode subtree. // // These plans are built recursively on a planNode tree. +// +// PhysicalPlan is immutable after its finalization. type PhysicalPlan struct { physicalplan.PhysicalPlan @@ -5140,8 +5142,8 @@ func finalizePlanWithRowCount( Type: execinfrapb.StreamEndpointSpec_SYNC_RESPONSE, }) - // Assign processor IDs. for i, p := range plan.Processors { + // Assign processor IDs. plan.Processors[i].Spec.ProcessorID = int32(i) // Double check that our reliance on ProcessorID == index is good. if _, ok := plan.LocalVectorSources[int32(i)]; ok { @@ -5150,6 +5152,24 @@ func finalizePlanWithRowCount( panic(errors.AssertionFailedf("expected processor to be Values")) } } + // Prevent the type schema corruption as found in #130402. + // + // Namely, during the vectorized operator planning we often use the type + // slice from the input spec to create the type schema of an operator. + // However, it is possible that the same type slice is shared by + // multiple stages of processors. If it just so happens that there is + // free capacity in the slice, and we append to it when planning + // operators for both stages, we might corrupt the type schema captured + // by the operators for the earlier stage. In order to prevent such type + // schema corruption we cap the slice to force creation of a fresh copy + // on the first append. + // + // We can't do this capping later (during the vectorized planning) + // because the physical plan is immutable once finalized. + for j := range p.Spec.Input { + inputSpec := &p.Spec.Input[j] + inputSpec.ColumnTypes = inputSpec.ColumnTypes[:len(inputSpec.ColumnTypes):len(inputSpec.ColumnTypes)] + } } }