diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 2a6a5130488d..3a56110cb469 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -415,10 +415,10 @@ func (r opResult) createDiskBackedSort( colmem.NewAllocator(ctx, sortChunksMemAccount, factory), input, inputTypes, ordering.Columns, int(matchLen), ) - } else if post.Limit != 0 && post.Filter.Empty() && post.Limit < math.MaxUint64-post.Offset { - // There is a limit specified with no post-process filter, so we know - // exactly how many rows the sorter should output. The last part of the - // condition is making sure there is no overflow. + } else if post.Limit != 0 && post.Limit < math.MaxUint64-post.Offset { + // There is a limit specified, so we know exactly how many rows the sorter + // should output. The last part of the condition is making sure there is no + // overflow. // // Choose a top K sorter, which uses a heap to avoid storing more rows // than necessary. @@ -1244,8 +1244,7 @@ func NewColOperator( } // planAndMaybeWrapFilter plans a filter. If the filter is unsupported, it is -// planned as a wrapped noop processor with the filter as a post-processing -// stage. +// planned as a wrapped filterer processor. func (r opResult) planAndMaybeWrapFilter( ctx context.Context, flowCtx *execinfra.FlowCtx, @@ -1268,8 +1267,18 @@ func (r opResult) planAndMaybeWrapFilter( ) } - post := &execinfrapb.PostProcessSpec{Filter: filter} - return r.wrapPostProcessSpec(ctx, flowCtx, args, post, args.Spec.ResultTypes, factory, err) + filtererSpec := &execinfrapb.ProcessorSpec{ + Core: execinfrapb.ProcessorCoreUnion{ + Filterer: &execinfrapb.FiltererSpec{ + Filter: filter, + }, + }, + ResultTypes: args.Spec.ResultTypes, + } + return r.createAndWrapRowSource( + ctx, flowCtx, args, []colexecbase.Operator{r.Op}, [][]*types.T{r.ColumnTypes}, + filtererSpec, factory, err, + ) } r.Op = op return nil @@ -1312,16 +1321,6 @@ func (r *postProcessResult) planPostProcessSpec( post *execinfrapb.PostProcessSpec, factory coldata.ColumnFactory, ) error { - if !post.Filter.Empty() { - op, err := planFilterExpr( - ctx, flowCtx, evalCtx, r.Op, r.ColumnTypes, post.Filter, args.StreamingMemAccount, factory, args.ExprHelper, - ) - if err != nil { - return err - } - r.Op = op - } - if post.Projection { r.Op, r.ColumnTypes = addProjection(r.Op, r.ColumnTypes, post.OutputColumns) } else if post.RenderExprs != nil { diff --git a/pkg/sql/colexec/is_null_ops_test.go b/pkg/sql/colexec/is_null_ops_test.go index 2fd6e58296ba..a788a0864fb0 100644 --- a/pkg/sql/colexec/is_null_ops_test.go +++ b/pkg/sql/colexec/is_null_ops_test.go @@ -232,10 +232,9 @@ func TestIsNullSelOp(t *testing.T) { spec := &execinfrapb.ProcessorSpec{ Input: []execinfrapb.InputSyncSpec{{ColumnTypes: typs}}, Core: execinfrapb.ProcessorCoreUnion{ - Noop: &execinfrapb.NoopCoreSpec{}, - }, - Post: execinfrapb.PostProcessSpec{ - Filter: execinfrapb.Expression{Expr: fmt.Sprintf("@1 %s", c.selExpr)}, + Filterer: &execinfrapb.FiltererSpec{ + Filter: execinfrapb.Expression{Expr: fmt.Sprintf("@1 %s", c.selExpr)}, + }, }, ResultTypes: typs, } diff --git a/pkg/sql/distsql/columnar_operators_test.go b/pkg/sql/distsql/columnar_operators_test.go index 7226f5bd83b5..1b7ce3ad0153 100644 --- a/pkg/sql/distsql/columnar_operators_test.go +++ b/pkg/sql/distsql/columnar_operators_test.go @@ -618,127 +618,117 @@ func TestHashJoinerAgainstProcessor(t *testing.T) { for _, testSpec := range testSpecs { for nCols := 1; nCols <= maxCols; nCols++ { for nEqCols := 1; nEqCols <= nCols; nEqCols++ { - for _, addFilter := range getAddFilterOptions(testSpec.joinType, nEqCols < nCols) { - triedWithoutOnExpr, triedWithOnExpr := false, false - if !testSpec.onExprSupported { - triedWithOnExpr = true + triedWithoutOnExpr, triedWithOnExpr := false, false + if !testSpec.onExprSupported { + triedWithOnExpr = true + } + for !triedWithoutOnExpr || !triedWithOnExpr { + var ( + lRows, rRows rowenc.EncDatumRows + lEqCols, rEqCols []uint32 + lInputTypes, rInputTypes []*types.T + usingRandomTypes bool + ) + if rng.Float64() < randTypesProbability { + lInputTypes = generateRandomSupportedTypes(rng, nCols) + lEqCols = generateEqualityColumns(rng, nCols, nEqCols) + rInputTypes = append(rInputTypes[:0], lInputTypes...) + rEqCols = append(rEqCols[:0], lEqCols...) + rng.Shuffle(nEqCols, func(i, j int) { + iColIdx, jColIdx := rEqCols[i], rEqCols[j] + rInputTypes[iColIdx], rInputTypes[jColIdx] = rInputTypes[jColIdx], rInputTypes[iColIdx] + rEqCols[i], rEqCols[j] = rEqCols[j], rEqCols[i] + }) + rInputTypes = randomizeJoinRightTypes(rng, rInputTypes) + lRows = rowenc.RandEncDatumRowsOfTypes(rng, nRows, lInputTypes) + rRows = rowenc.RandEncDatumRowsOfTypes(rng, nRows, rInputTypes) + usingRandomTypes = true + } else { + lInputTypes = intTyps[:nCols] + rInputTypes = lInputTypes + lRows = rowenc.MakeRandIntRowsInRange(rng, nRows, nCols, maxNum, nullProbability) + rRows = rowenc.MakeRandIntRowsInRange(rng, nRows, nCols, maxNum, nullProbability) + lEqCols = generateEqualityColumns(rng, nCols, nEqCols) + rEqCols = generateEqualityColumns(rng, nCols, nEqCols) } - for !triedWithoutOnExpr || !triedWithOnExpr { - var ( - lRows, rRows rowenc.EncDatumRows - lEqCols, rEqCols []uint32 - lInputTypes, rInputTypes []*types.T - usingRandomTypes bool - ) - if rng.Float64() < randTypesProbability { - lInputTypes = generateRandomSupportedTypes(rng, nCols) - lEqCols = generateEqualityColumns(rng, nCols, nEqCols) - rInputTypes = append(rInputTypes[:0], lInputTypes...) - rEqCols = append(rEqCols[:0], lEqCols...) - rng.Shuffle(nEqCols, func(i, j int) { - iColIdx, jColIdx := rEqCols[i], rEqCols[j] - rInputTypes[iColIdx], rInputTypes[jColIdx] = rInputTypes[jColIdx], rInputTypes[iColIdx] - rEqCols[i], rEqCols[j] = rEqCols[j], rEqCols[i] - }) - rInputTypes = randomizeJoinRightTypes(rng, rInputTypes) - lRows = rowenc.RandEncDatumRowsOfTypes(rng, nRows, lInputTypes) - rRows = rowenc.RandEncDatumRowsOfTypes(rng, nRows, rInputTypes) - usingRandomTypes = true - } else { - lInputTypes = intTyps[:nCols] - rInputTypes = lInputTypes - lRows = rowenc.MakeRandIntRowsInRange(rng, nRows, nCols, maxNum, nullProbability) - rRows = rowenc.MakeRandIntRowsInRange(rng, nRows, nCols, maxNum, nullProbability) - lEqCols = generateEqualityColumns(rng, nCols, nEqCols) - rEqCols = generateEqualityColumns(rng, nCols, nEqCols) - } - var outputTypes []*types.T - if testSpec.joinType.ShouldIncludeLeftColsInOutput() { - outputTypes = append(outputTypes, lInputTypes...) - } - if testSpec.joinType.ShouldIncludeRightColsInOutput() { - outputTypes = append(outputTypes, rInputTypes...) - } - outputColumns := make([]uint32, len(outputTypes)) - for i := range outputColumns { - outputColumns[i] = uint32(i) - } + var outputTypes []*types.T + if testSpec.joinType.ShouldIncludeLeftColsInOutput() { + outputTypes = append(outputTypes, lInputTypes...) + } + if testSpec.joinType.ShouldIncludeRightColsInOutput() { + outputTypes = append(outputTypes, rInputTypes...) + } + outputColumns := make([]uint32, len(outputTypes)) + for i := range outputColumns { + outputColumns[i] = uint32(i) + } - var filter, onExpr execinfrapb.Expression - if addFilter { - forceSingleSide := !testSpec.joinType.ShouldIncludeLeftColsInOutput() || - !testSpec.joinType.ShouldIncludeRightColsInOutput() - filter = generateFilterExpr( - rng, nCols, nEqCols, outputTypes, usingRandomTypes, forceSingleSide, - ) - } - if triedWithoutOnExpr { - colTypes := append(lInputTypes, rInputTypes...) - onExpr = generateFilterExpr( - rng, nCols, nEqCols, colTypes, usingRandomTypes, false, /* forceSingleSide */ - ) - } - hjSpec := &execinfrapb.HashJoinerSpec{ - LeftEqColumns: lEqCols, - RightEqColumns: rEqCols, - OnExpr: onExpr, - Type: testSpec.joinType, - } - pspec := &execinfrapb.ProcessorSpec{ - Input: []execinfrapb.InputSyncSpec{ - {ColumnTypes: lInputTypes}, - {ColumnTypes: rInputTypes}, - }, - Core: execinfrapb.ProcessorCoreUnion{HashJoiner: hjSpec}, - Post: execinfrapb.PostProcessSpec{ - Projection: true, - OutputColumns: outputColumns, - Filter: filter, - }, - ResultTypes: outputTypes, - } - args := verifyColOperatorArgs{ - anyOrder: true, - inputTypes: [][]*types.T{lInputTypes, rInputTypes}, - inputs: []rowenc.EncDatumRows{lRows, rRows}, - pspec: pspec, - forceDiskSpill: spillForced, - // It is possible that we have a filter that is always false, and this - // will allow us to plan a zero operator which always returns a zero - // batch. In such case, the spilling might not occur and that's ok. - forcedDiskSpillMightNotOccur: !filter.Empty() || !onExpr.Empty(), - numForcedRepartitions: 2, - rng: rng, - } - if testSpec.joinType.IsSetOpJoin() && nEqCols < nCols { - // The output of set operation joins is not fully - // deterministic when there are non-equality - // columns, however, the rows must match on the - // equality columns between vectorized and row - // executions. - args.colIdxsToCheckForEquality = make([]int, nEqCols) - for i := range args.colIdxsToCheckForEquality { - args.colIdxsToCheckForEquality[i] = int(lEqCols[i]) - } + var onExpr execinfrapb.Expression + if triedWithoutOnExpr { + colTypes := append(lInputTypes, rInputTypes...) + onExpr = generateFilterExpr( + rng, nCols, nEqCols, colTypes, usingRandomTypes, false, /* forceSingleSide */ + ) + } + hjSpec := &execinfrapb.HashJoinerSpec{ + LeftEqColumns: lEqCols, + RightEqColumns: rEqCols, + OnExpr: onExpr, + Type: testSpec.joinType, + } + pspec := &execinfrapb.ProcessorSpec{ + Input: []execinfrapb.InputSyncSpec{ + {ColumnTypes: lInputTypes}, + {ColumnTypes: rInputTypes}, + }, + Core: execinfrapb.ProcessorCoreUnion{HashJoiner: hjSpec}, + Post: execinfrapb.PostProcessSpec{ + Projection: true, + OutputColumns: outputColumns, + }, + ResultTypes: outputTypes, + } + args := verifyColOperatorArgs{ + anyOrder: true, + inputTypes: [][]*types.T{lInputTypes, rInputTypes}, + inputs: []rowenc.EncDatumRows{lRows, rRows}, + pspec: pspec, + forceDiskSpill: spillForced, + // It is possible that we have a filter that is always false, and this + // will allow us to plan a zero operator which always returns a zero + // batch. In such case, the spilling might not occur and that's ok. + forcedDiskSpillMightNotOccur: !onExpr.Empty(), + numForcedRepartitions: 2, + rng: rng, + } + if testSpec.joinType.IsSetOpJoin() && nEqCols < nCols { + // The output of set operation joins is not fully + // deterministic when there are non-equality + // columns, however, the rows must match on the + // equality columns between vectorized and row + // executions. + args.colIdxsToCheckForEquality = make([]int, nEqCols) + for i := range args.colIdxsToCheckForEquality { + args.colIdxsToCheckForEquality[i] = int(lEqCols[i]) } + } - if err := verifyColOperator(t, args); err != nil { - fmt.Printf("--- spillForced = %t join type = %s onExpr = %q"+ - " filter = %q seed = %d run = %d ---\n", - spillForced, testSpec.joinType.String(), onExpr.Expr, filter.Expr, seed, run) - fmt.Printf("--- lEqCols = %v rEqCols = %v ---\n", lEqCols, rEqCols) - prettyPrintTypes(lInputTypes, "left_table" /* tableName */) - prettyPrintTypes(rInputTypes, "right_table" /* tableName */) - prettyPrintInput(lRows, lInputTypes, "left_table" /* tableName */) - prettyPrintInput(rRows, rInputTypes, "right_table" /* tableName */) - t.Fatal(err) - } - if onExpr.Expr == "" { - triedWithoutOnExpr = true - } else { - triedWithOnExpr = true - } + if err := verifyColOperator(t, args); err != nil { + fmt.Printf("--- spillForced = %t join type = %s onExpr = %q"+ + " q seed = %d run = %d ---\n", + spillForced, testSpec.joinType.String(), onExpr.Expr, seed, run) + fmt.Printf("--- lEqCols = %v rEqCols = %v ---\n", lEqCols, rEqCols) + prettyPrintTypes(lInputTypes, "left_table" /* tableName */) + prettyPrintTypes(rInputTypes, "right_table" /* tableName */) + prettyPrintInput(lRows, lInputTypes, "left_table" /* tableName */) + prettyPrintInput(rRows, rInputTypes, "right_table" /* tableName */) + t.Fatal(err) + } + if onExpr.Expr == "" { + triedWithoutOnExpr = true + } else { + triedWithOnExpr = true } } } @@ -824,134 +814,125 @@ func TestMergeJoinerAgainstProcessor(t *testing.T) { for _, testSpec := range testSpecs { for nCols := 1; nCols <= maxCols; nCols++ { for nOrderingCols := 1; nOrderingCols <= nCols; nOrderingCols++ { - for _, addFilter := range getAddFilterOptions(testSpec.joinType, nOrderingCols < nCols) { - triedWithoutOnExpr, triedWithOnExpr := false, false - if !testSpec.onExprSupported { - triedWithOnExpr = true - } - for !triedWithoutOnExpr || !triedWithOnExpr { - var ( - lRows, rRows rowenc.EncDatumRows - lInputTypes, rInputTypes []*types.T - lOrderingCols, rOrderingCols []execinfrapb.Ordering_Column - usingRandomTypes bool - ) - if rng.Float64() < randTypesProbability { - lInputTypes = generateRandomSupportedTypes(rng, nCols) - lOrderingCols = generateColumnOrdering(rng, nCols, nOrderingCols) - rInputTypes = append(rInputTypes[:0], lInputTypes...) - rOrderingCols = append(rOrderingCols[:0], lOrderingCols...) - rng.Shuffle(nOrderingCols, func(i, j int) { - iColIdx, jColIdx := rOrderingCols[i].ColIdx, rOrderingCols[j].ColIdx - rInputTypes[iColIdx], rInputTypes[jColIdx] = rInputTypes[jColIdx], rInputTypes[iColIdx] - rOrderingCols[i], rOrderingCols[j] = rOrderingCols[j], rOrderingCols[i] - }) - rInputTypes = randomizeJoinRightTypes(rng, rInputTypes) - lRows = rowenc.RandEncDatumRowsOfTypes(rng, nRows, lInputTypes) - rRows = rowenc.RandEncDatumRowsOfTypes(rng, nRows, rInputTypes) - usingRandomTypes = true - } else { - lInputTypes = intTyps[:nCols] - rInputTypes = lInputTypes - lRows = rowenc.MakeRandIntRowsInRange(rng, nRows, nCols, maxNum, nullProbability) - rRows = rowenc.MakeRandIntRowsInRange(rng, nRows, nCols, maxNum, nullProbability) - lOrderingCols = generateColumnOrdering(rng, nCols, nOrderingCols) - rOrderingCols = generateColumnOrdering(rng, nCols, nOrderingCols) - } - // Set the directions of both columns to be the same. - for i, lCol := range lOrderingCols { - rOrderingCols[i].Direction = lCol.Direction - } - - lMatchedCols := execinfrapb.ConvertToColumnOrdering(execinfrapb.Ordering{Columns: lOrderingCols}) - rMatchedCols := execinfrapb.ConvertToColumnOrdering(execinfrapb.Ordering{Columns: rOrderingCols}) - sort.Slice(lRows, func(i, j int) bool { - cmp, err := lRows[i].Compare(lInputTypes, &da, lMatchedCols, &evalCtx, lRows[j]) - if err != nil { - t.Fatal(err) - } - return cmp < 0 - }) - sort.Slice(rRows, func(i, j int) bool { - cmp, err := rRows[i].Compare(rInputTypes, &da, rMatchedCols, &evalCtx, rRows[j]) - if err != nil { - t.Fatal(err) - } - return cmp < 0 + triedWithoutOnExpr, triedWithOnExpr := false, false + if !testSpec.onExprSupported { + triedWithOnExpr = true + } + for !triedWithoutOnExpr || !triedWithOnExpr { + var ( + lRows, rRows rowenc.EncDatumRows + lInputTypes, rInputTypes []*types.T + lOrderingCols, rOrderingCols []execinfrapb.Ordering_Column + usingRandomTypes bool + ) + if rng.Float64() < randTypesProbability { + lInputTypes = generateRandomSupportedTypes(rng, nCols) + lOrderingCols = generateColumnOrdering(rng, nCols, nOrderingCols) + rInputTypes = append(rInputTypes[:0], lInputTypes...) + rOrderingCols = append(rOrderingCols[:0], lOrderingCols...) + rng.Shuffle(nOrderingCols, func(i, j int) { + iColIdx, jColIdx := rOrderingCols[i].ColIdx, rOrderingCols[j].ColIdx + rInputTypes[iColIdx], rInputTypes[jColIdx] = rInputTypes[jColIdx], rInputTypes[iColIdx] + rOrderingCols[i], rOrderingCols[j] = rOrderingCols[j], rOrderingCols[i] }) - var outputTypes []*types.T - if testSpec.joinType.ShouldIncludeLeftColsInOutput() { - outputTypes = append(outputTypes, lInputTypes...) - } - if testSpec.joinType.ShouldIncludeRightColsInOutput() { - outputTypes = append(outputTypes, rInputTypes...) - } - outputColumns := make([]uint32, len(outputTypes)) - for i := range outputColumns { - outputColumns[i] = uint32(i) - } + rInputTypes = randomizeJoinRightTypes(rng, rInputTypes) + lRows = rowenc.RandEncDatumRowsOfTypes(rng, nRows, lInputTypes) + rRows = rowenc.RandEncDatumRowsOfTypes(rng, nRows, rInputTypes) + usingRandomTypes = true + } else { + lInputTypes = intTyps[:nCols] + rInputTypes = lInputTypes + lRows = rowenc.MakeRandIntRowsInRange(rng, nRows, nCols, maxNum, nullProbability) + rRows = rowenc.MakeRandIntRowsInRange(rng, nRows, nCols, maxNum, nullProbability) + lOrderingCols = generateColumnOrdering(rng, nCols, nOrderingCols) + rOrderingCols = generateColumnOrdering(rng, nCols, nOrderingCols) + } + // Set the directions of both columns to be the same. + for i, lCol := range lOrderingCols { + rOrderingCols[i].Direction = lCol.Direction + } - var filter, onExpr execinfrapb.Expression - if addFilter { - forceSingleSide := !testSpec.joinType.ShouldIncludeLeftColsInOutput() || - !testSpec.joinType.ShouldIncludeRightColsInOutput() - filter = generateFilterExpr( - rng, nCols, nOrderingCols, outputTypes, usingRandomTypes, forceSingleSide, - ) - } - if triedWithoutOnExpr { - colTypes := append(lInputTypes, rInputTypes...) - onExpr = generateFilterExpr( - rng, nCols, nOrderingCols, colTypes, usingRandomTypes, false, /* forceSingleSide */ - ) - } - mjSpec := &execinfrapb.MergeJoinerSpec{ - OnExpr: onExpr, - LeftOrdering: execinfrapb.Ordering{Columns: lOrderingCols}, - RightOrdering: execinfrapb.Ordering{Columns: rOrderingCols}, - Type: testSpec.joinType, - NullEquality: testSpec.joinType.IsSetOpJoin(), - } - pspec := &execinfrapb.ProcessorSpec{ - Input: []execinfrapb.InputSyncSpec{{ColumnTypes: lInputTypes}, {ColumnTypes: rInputTypes}}, - Core: execinfrapb.ProcessorCoreUnion{MergeJoiner: mjSpec}, - Post: execinfrapb.PostProcessSpec{Projection: true, OutputColumns: outputColumns, Filter: filter}, - ResultTypes: outputTypes, - } - args := verifyColOperatorArgs{ - anyOrder: testSpec.anyOrder, - inputTypes: [][]*types.T{lInputTypes, rInputTypes}, - inputs: []rowenc.EncDatumRows{lRows, rRows}, - pspec: pspec, - rng: rng, - } - if testSpec.joinType.IsSetOpJoin() && nOrderingCols < nCols { - // The output of set operation joins is not fully - // deterministic when there are non-equality - // columns, however, the rows must match on the - // equality columns between vectorized and row - // executions. - args.colIdxsToCheckForEquality = make([]int, nOrderingCols) - for i := range args.colIdxsToCheckForEquality { - args.colIdxsToCheckForEquality[i] = int(lOrderingCols[i].ColIdx) - } + lMatchedCols := execinfrapb.ConvertToColumnOrdering(execinfrapb.Ordering{Columns: lOrderingCols}) + rMatchedCols := execinfrapb.ConvertToColumnOrdering(execinfrapb.Ordering{Columns: rOrderingCols}) + sort.Slice(lRows, func(i, j int) bool { + cmp, err := lRows[i].Compare(lInputTypes, &da, lMatchedCols, &evalCtx, lRows[j]) + if err != nil { + t.Fatal(err) } - if err := verifyColOperator(t, args); err != nil { - fmt.Printf("--- join type = %s onExpr = %q filter = %q seed = %d run = %d ---\n", - testSpec.joinType.String(), onExpr.Expr, filter.Expr, seed, run) - fmt.Printf("--- left ordering = %v right ordering = %v ---\n", lOrderingCols, rOrderingCols) - prettyPrintTypes(lInputTypes, "left_table" /* tableName */) - prettyPrintTypes(rInputTypes, "right_table" /* tableName */) - prettyPrintInput(lRows, lInputTypes, "left_table" /* tableName */) - prettyPrintInput(rRows, rInputTypes, "right_table" /* tableName */) + return cmp < 0 + }) + sort.Slice(rRows, func(i, j int) bool { + cmp, err := rRows[i].Compare(rInputTypes, &da, rMatchedCols, &evalCtx, rRows[j]) + if err != nil { t.Fatal(err) } - if onExpr.Expr == "" { - triedWithoutOnExpr = true - } else { - triedWithOnExpr = true + return cmp < 0 + }) + var outputTypes []*types.T + if testSpec.joinType.ShouldIncludeLeftColsInOutput() { + outputTypes = append(outputTypes, lInputTypes...) + } + if testSpec.joinType.ShouldIncludeRightColsInOutput() { + outputTypes = append(outputTypes, rInputTypes...) + } + outputColumns := make([]uint32, len(outputTypes)) + for i := range outputColumns { + outputColumns[i] = uint32(i) + } + + var onExpr execinfrapb.Expression + if triedWithoutOnExpr { + colTypes := append(lInputTypes, rInputTypes...) + onExpr = generateFilterExpr( + rng, nCols, nOrderingCols, colTypes, usingRandomTypes, false, /* forceSingleSide */ + ) + } + mjSpec := &execinfrapb.MergeJoinerSpec{ + OnExpr: onExpr, + LeftOrdering: execinfrapb.Ordering{Columns: lOrderingCols}, + RightOrdering: execinfrapb.Ordering{Columns: rOrderingCols}, + Type: testSpec.joinType, + NullEquality: testSpec.joinType.IsSetOpJoin(), + } + pspec := &execinfrapb.ProcessorSpec{ + Input: []execinfrapb.InputSyncSpec{{ColumnTypes: lInputTypes}, {ColumnTypes: rInputTypes}}, + Core: execinfrapb.ProcessorCoreUnion{MergeJoiner: mjSpec}, + Post: execinfrapb.PostProcessSpec{Projection: true, OutputColumns: outputColumns}, + ResultTypes: outputTypes, + } + args := verifyColOperatorArgs{ + anyOrder: testSpec.anyOrder, + inputTypes: [][]*types.T{lInputTypes, rInputTypes}, + inputs: []rowenc.EncDatumRows{lRows, rRows}, + pspec: pspec, + rng: rng, + } + if testSpec.joinType.IsSetOpJoin() && nOrderingCols < nCols { + // The output of set operation joins is not fully + // deterministic when there are non-equality + // columns, however, the rows must match on the + // equality columns between vectorized and row + // executions. + args.colIdxsToCheckForEquality = make([]int, nOrderingCols) + for i := range args.colIdxsToCheckForEquality { + args.colIdxsToCheckForEquality[i] = int(lOrderingCols[i].ColIdx) } } + if err := verifyColOperator(t, args); err != nil { + fmt.Printf("--- join type = %s onExpr = %q seed = %d run = %d ---\n", + testSpec.joinType.String(), onExpr.Expr, seed, run) + fmt.Printf("--- left ordering = %v right ordering = %v ---\n", lOrderingCols, rOrderingCols) + prettyPrintTypes(lInputTypes, "left_table" /* tableName */) + prettyPrintTypes(rInputTypes, "right_table" /* tableName */) + prettyPrintInput(lRows, lInputTypes, "left_table" /* tableName */) + prettyPrintInput(rRows, rInputTypes, "right_table" /* tableName */) + t.Fatal(err) + } + if onExpr.Expr == "" { + triedWithoutOnExpr = true + } else { + triedWithOnExpr = true + } } } } @@ -979,16 +960,6 @@ func generateColumnOrdering( return orderingCols } -func getAddFilterOptions(joinType descpb.JoinType, nonEqualityColsPresent bool) []bool { - if joinType.IsSetOpJoin() && nonEqualityColsPresent { - // Output of set operation join when rows have non equality columns is - // not deterministic, so applying a filter on top of it can produce - // arbitrary results, and we skip such configuration. - return []bool{false} - } - return []bool{false, true} -} - // generateFilterExpr populates an execinfrapb.Expression that contains a // single comparison which can be either comparing a column from the left // against a column from the right or comparing a column from either side diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index 0ec8762e6574..8f3c016fab91 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -60,9 +60,6 @@ type ProcOutputHelper struct { // post-processed row directly. output RowReceiver RowAlloc rowenc.EncDatumRowAlloc - // filter is an optional filter that determines whether a single row is - // output or not. - filter *execinfrapb.ExprHelper // renderExprs has length > 0 if we have a rendering. Only one of renderExprs // and outputCols can be set. renderExprs []execinfrapb.ExprHelper @@ -120,12 +117,6 @@ func (h *ProcOutputHelper) Init( } h.output = output h.numInternalCols = len(coreOutputTypes) - if post.Filter != (execinfrapb.Expression{}) { - h.filter = &execinfrapb.ExprHelper{} - if err := h.filter.Init(post.Filter, coreOutputTypes, semaCtx, evalCtx); err != nil { - return err - } - } if post.Projection { for _, col := range post.OutputColumns { if int(col) >= h.numInternalCols { @@ -203,12 +194,6 @@ func (h *ProcOutputHelper) NeededColumns() (colIdxs util.FastIntSet) { } for i := 0; i < h.numInternalCols; i++ { - // See if filter requires this column. - if h.filter != nil && h.filter.Vars.IndexedVarUsed(i) { - colIdxs.Add(i) - continue - } - // See if render expressions require this column. for j := range h.renderExprs { if h.renderExprs[j].Vars.IndexedVarUsed(i) { @@ -285,19 +270,6 @@ func (h *ProcOutputHelper) ProcessRow( return nil, false, nil } - if h.filter != nil { - // Filtering. - passes, err := h.filter.EvalFilter(row) - if err != nil { - return nil, false, err - } - if !passes { - if log.V(4) { - log.Infof(ctx, "filtered out row %s", row.String(h.filter.Types)) - } - return nil, true, nil - } - } h.rowIdx++ if h.rowIdx <= h.offset { // Suppress row. diff --git a/pkg/sql/execinfra/readerbase.go b/pkg/sql/execinfra/readerbase.go index b3e7999b571d..06c99f776a6a 100644 --- a/pkg/sql/execinfra/readerbase.go +++ b/pkg/sql/execinfra/readerbase.go @@ -53,11 +53,6 @@ func LimitHint(specLimitHint int64, post *execinfrapb.PostProcessSpec) (limitHin limitHint = specLimitHint + RowChannelBufSize + 1 } - if !post.Filter.Empty() { - // We have a filter so we will likely need to read more rows. - limitHint *= 2 - } - return limitHint } diff --git a/pkg/sql/execinfra/version.go b/pkg/sql/execinfra/version.go index 7b67c3a17b93..db428cfbf876 100644 --- a/pkg/sql/execinfra/version.go +++ b/pkg/sql/execinfra/version.go @@ -39,11 +39,11 @@ import "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" // // ATTENTION: When updating these fields, add a brief description of what // changed to the version history below. -const Version execinfrapb.DistSQLVersion = 42 +const Version execinfrapb.DistSQLVersion = 43 // MinAcceptedVersion is the oldest version that the server is compatible with. // A server will not accept flows with older versions. -const MinAcceptedVersion execinfrapb.DistSQLVersion = 42 +const MinAcceptedVersion execinfrapb.DistSQLVersion = 43 /* @@ -51,6 +51,10 @@ const MinAcceptedVersion execinfrapb.DistSQLVersion = 42 Please add new entries at the top. +- Version: 43 (MinAcceptedVersion: 43) + - Filter was removed from PostProcessSpec and a new Filterer processor was + added. + - Version: 42 (MinAcceptedVersion: 42) - A new field NeededColumns is added to TableReaderSpec which is now required by the vectorized ColBatchScans to be set up. diff --git a/pkg/sql/execinfrapb/flow_diagram.go b/pkg/sql/execinfrapb/flow_diagram.go index 21ae42368f66..814d4c9ac473 100644 --- a/pkg/sql/execinfrapb/flow_diagram.go +++ b/pkg/sql/execinfrapb/flow_diagram.go @@ -430,9 +430,6 @@ func (post *PostProcessSpec) summary() []string { // (namely InterleavedReaderJoiner) that have multiple PostProcessors. func (post *PostProcessSpec) summaryWithPrefix(prefix string) []string { var res []string - if !post.Filter.Empty() { - res = append(res, fmt.Sprintf("%sFilter: %s", prefix, post.Filter)) - } if post.Projection { outputColumns := "None" if len(post.OutputColumns) > 0 { diff --git a/pkg/sql/execinfrapb/flow_diagram_test.go b/pkg/sql/execinfrapb/flow_diagram_test.go index 5a03be98cea1..394daf7827fd 100644 --- a/pkg/sql/execinfrapb/flow_diagram_test.go +++ b/pkg/sql/execinfrapb/flow_diagram_test.go @@ -122,7 +122,6 @@ func TestPlanDiagramIndexJoin(t *testing.T) { }}, Core: ProcessorCoreUnion{JoinReader: &JoinReaderSpec{Table: *desc}}, Post: PostProcessSpec{ - Filter: Expression{Expr: "@1+@2<@3"}, Projection: true, OutputColumns: []uint32{2}, }, @@ -149,7 +148,7 @@ func TestPlanDiagramIndexJoin(t *testing.T) { {"nodeIdx":0,"inputs":[],"core":{"title":"TableReader/0","details":["Table@SomeIndex","Out: @1,@2"]},"outputs":[],"stage":1}, {"nodeIdx":1,"inputs":[],"core":{"title":"TableReader/1","details":["Table@SomeIndex","Out: @1,@2"]},"outputs":[],"stage":1}, {"nodeIdx":2,"inputs":[],"core":{"title":"TableReader/2","details":["Table@SomeIndex","Out: @1,@2"]},"outputs":[],"stage":1}, - {"nodeIdx":2,"inputs":[{"title":"ordered","details":["@2+"]}],"core":{"title":"JoinReader/3","details":["Table@primary","Filter: @1+@2\u003c@3","Out: @3"]},"outputs":[],"stage":2}, + {"nodeIdx":2,"inputs":[{"title":"ordered","details":["@2+"]}],"core":{"title":"JoinReader/3","details":["Table@primary","Out: @3"]},"outputs":[],"stage":2}, {"nodeIdx":2,"inputs":[],"core":{"title":"Response","details":[]},"outputs":[]} ], "edges":[ @@ -163,7 +162,7 @@ func TestPlanDiagramIndexJoin(t *testing.T) { compareDiagrams(t, json, expected) - expectedURL := "https://cockroachdb.github.io/distsqlplan/decode.html#eJy0ksFLwzAUxu_-FeO77oFt4imnXiZO1OnmTXuozWMUuqQmKUxG_3dpKm6DTSbTY_L1e79fH9nAv9dQWMzuJ6PF093oZjKfgGCs5odixR7qBSkIAgSJnNA4W7L31vXRJn441WuohFCZpg39dU4orWOoDUIVaobCc_FW85wLze4yAUFzKKo6jo9RtrArnhrNaxBmbVCjLKVMIO8Itg3bwT4US4ZKO9qBp6fD0z-Hi9Ph4l_hW6Z1mh3rfVomxsi7A4a3tjJfgvKQYOOqVeE-QLiu6sCuNxxn4rVNEllm8ltbHnUWv1nYnH1jjec9lWOTk_6HWC95WIC3rSv50dkyvsjhOIu9eKHZhyGVw2FqYhSXultOzymLc8ryx_LVXjnp8u7iMwAA__-WuTh_" + expectedURL := "https://cockroachdb.github.io/distsqlplan/decode.html#eJy0ksFq8zAQhO__U4S5_oJack86-RJoStu0cW_FB9VagsGWXEmGFON3L5ZLE0NSUtIed8cz33hRD_9WQyJf3y8X-dPd4ma5WYLBWE0PqiEP-QIOBgGGFAVD62xJ3ls3Sn38cKV3kAlDZdoujOuCobSOIHuEKtQEiWf1WtOGlCZ3lYBBU1BVHeOjlOW2oZXRtAPDugtykXGWCRQDg-3CPtgHtSVIPrADOD8fzn8dLs6Hiz-F75nWaXKk57RM_EcxHGl4ayvzWTA9VrB1VaPc-1e99GQ38ZPDbMi31niaIU8lJ2Nx0luaftTbzpX06GwZX940rqMvLjT5MKnpNKxMlOLxDs38ErO4xJx-a76emZOhGP59BAAA___j2TIH" if url.String() != expectedURL { t.Errorf("expected `%s` got `%s`", expectedURL, url.String()) } diff --git a/pkg/sql/execinfrapb/processors_base.pb.go b/pkg/sql/execinfrapb/processors_base.pb.go index fa88bade0a13..f53b880aa7c2 100644 --- a/pkg/sql/execinfrapb/processors_base.pb.go +++ b/pkg/sql/execinfrapb/processors_base.pb.go @@ -27,13 +27,10 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package -// PostProcessSpec describes the processing required to obtain the output -// (filtering, projection). It operates on the internal schema of the processor -// (see ProcessorSpec). +// PostProcessSpec describes the processing required to obtain the output (e.g. +// projection). It operates on the internal schema of the processor (see +// ProcessorSpec). type PostProcessSpec struct { - // A filtering expression which references the internal columns of the - // processor via ordinal references (@1, @2, etc). - Filter Expression `protobuf:"bytes,1,opt,name=filter" json:"filter"` // If true, output_columns describes a projection. Used to differentiate // between an empty projection and no projection. // @@ -61,7 +58,7 @@ func (m *PostProcessSpec) Reset() { *m = PostProcessSpec{} } func (m *PostProcessSpec) String() string { return proto.CompactTextString(m) } func (*PostProcessSpec) ProtoMessage() {} func (*PostProcessSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_base_77125fa0b0ed50f0, []int{0} + return fileDescriptor_processors_base_2f9dd97fffc5def6, []int{0} } func (m *PostProcessSpec) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -94,7 +91,7 @@ func (m *Columns) Reset() { *m = Columns{} } func (m *Columns) String() string { return proto.CompactTextString(m) } func (*Columns) ProtoMessage() {} func (*Columns) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_base_77125fa0b0ed50f0, []int{1} + return fileDescriptor_processors_base_2f9dd97fffc5def6, []int{1} } func (m *Columns) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -130,7 +127,7 @@ func (m *TableReaderSpan) Reset() { *m = TableReaderSpan{} } func (m *TableReaderSpan) String() string { return proto.CompactTextString(m) } func (*TableReaderSpan) ProtoMessage() {} func (*TableReaderSpan) Descriptor() ([]byte, []int) { - return fileDescriptor_processors_base_77125fa0b0ed50f0, []int{2} + return fileDescriptor_processors_base_2f9dd97fffc5def6, []int{2} } func (m *TableReaderSpan) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -175,14 +172,6 @@ func (m *PostProcessSpec) MarshalTo(dAtA []byte) (int, error) { _ = i var l int _ = l - dAtA[i] = 0xa - i++ - i = encodeVarintProcessorsBase(dAtA, i, uint64(m.Filter.Size())) - n1, err := m.Filter.MarshalTo(dAtA[i:]) - if err != nil { - return 0, err - } - i += n1 dAtA[i] = 0x10 i++ if m.Projection { @@ -192,21 +181,21 @@ func (m *PostProcessSpec) MarshalTo(dAtA []byte) (int, error) { } i++ if len(m.OutputColumns) > 0 { - dAtA3 := make([]byte, len(m.OutputColumns)*10) - var j2 int + dAtA2 := make([]byte, len(m.OutputColumns)*10) + var j1 int for _, num := range m.OutputColumns { for num >= 1<<7 { - dAtA3[j2] = uint8(uint64(num)&0x7f | 0x80) + dAtA2[j1] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 - j2++ + j1++ } - dAtA3[j2] = uint8(num) - j2++ + dAtA2[j1] = uint8(num) + j1++ } dAtA[i] = 0x1a i++ - i = encodeVarintProcessorsBase(dAtA, i, uint64(j2)) - i += copy(dAtA[i:], dAtA3[:j2]) + i = encodeVarintProcessorsBase(dAtA, i, uint64(j1)) + i += copy(dAtA[i:], dAtA2[:j1]) } if len(m.RenderExprs) > 0 { for _, msg := range m.RenderExprs { @@ -245,21 +234,21 @@ func (m *Columns) MarshalTo(dAtA []byte) (int, error) { var l int _ = l if len(m.Columns) > 0 { - dAtA5 := make([]byte, len(m.Columns)*10) - var j4 int + dAtA4 := make([]byte, len(m.Columns)*10) + var j3 int for _, num := range m.Columns { for num >= 1<<7 { - dAtA5[j4] = uint8(uint64(num)&0x7f | 0x80) + dAtA4[j3] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 - j4++ + j3++ } - dAtA5[j4] = uint8(num) - j4++ + dAtA4[j3] = uint8(num) + j3++ } dAtA[i] = 0xa i++ - i = encodeVarintProcessorsBase(dAtA, i, uint64(j4)) - i += copy(dAtA[i:], dAtA5[:j4]) + i = encodeVarintProcessorsBase(dAtA, i, uint64(j3)) + i += copy(dAtA[i:], dAtA4[:j3]) } return i, nil } @@ -282,11 +271,11 @@ func (m *TableReaderSpan) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintProcessorsBase(dAtA, i, uint64(m.Span.Size())) - n6, err := m.Span.MarshalTo(dAtA[i:]) + n5, err := m.Span.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n6 + i += n5 return i, nil } @@ -305,8 +294,6 @@ func (m *PostProcessSpec) Size() (n int) { } var l int _ = l - l = m.Filter.Size() - n += 1 + l + sovProcessorsBase(uint64(l)) n += 2 if len(m.OutputColumns) > 0 { l = 0 @@ -395,36 +382,6 @@ func (m *PostProcessSpec) Unmarshal(dAtA []byte) error { return fmt.Errorf("proto: PostProcessSpec: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Filter", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowProcessorsBase - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthProcessorsBase - } - postIndex := iNdEx + msglen - if postIndex > l { - return io.ErrUnexpectedEOF - } - if err := m.Filter.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex case 2: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Projection", wireType) @@ -917,33 +874,33 @@ var ( ) func init() { - proto.RegisterFile("sql/execinfrapb/processors_base.proto", fileDescriptor_processors_base_77125fa0b0ed50f0) + proto.RegisterFile("sql/execinfrapb/processors_base.proto", fileDescriptor_processors_base_2f9dd97fffc5def6) } -var fileDescriptor_processors_base_77125fa0b0ed50f0 = []byte{ - // 376 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x92, 0x41, 0xae, 0xd3, 0x30, - 0x10, 0x86, 0xe3, 0x36, 0xaf, 0x0f, 0xb9, 0x3c, 0x8a, 0x2c, 0x24, 0xa2, 0xa8, 0x32, 0x51, 0x55, - 0x44, 0x58, 0x90, 0x0a, 0x8e, 0x10, 0x60, 0x89, 0x54, 0xb5, 0xac, 0xd8, 0x54, 0xae, 0xe3, 0x94, - 0x40, 0x6a, 0xbb, 0x1e, 0x47, 0xea, 0x31, 0xb8, 0x09, 0xd7, 0xe8, 0xb2, 0xcb, 0xae, 0x10, 0xa4, - 0x17, 0x41, 0xad, 0x13, 0x14, 0x90, 0x58, 0xbc, 0xdd, 0xe8, 0x9b, 0xf9, 0x3d, 0x9f, 0x6c, 0xe3, - 0xe7, 0xb0, 0x2b, 0x67, 0x62, 0x2f, 0x78, 0x21, 0x73, 0xc3, 0xf4, 0x7a, 0xa6, 0x8d, 0xe2, 0x02, - 0x40, 0x19, 0x58, 0xad, 0x19, 0x88, 0x44, 0x1b, 0x65, 0x15, 0x09, 0xb8, 0xe2, 0x5f, 0x8d, 0x62, - 0xfc, 0x73, 0x02, 0xbb, 0x32, 0xc9, 0x0a, 0xb0, 0xb0, 0x2b, 0x4d, 0x25, 0xc3, 0xf0, 0xdf, 0x03, - 0x32, 0x66, 0x99, 0x4b, 0x85, 0xe4, 0x9a, 0xf8, 0x9b, 0x3d, 0xd9, 0xa8, 0x8d, 0xba, 0x96, 0xb3, - 0x4b, 0xe5, 0xe8, 0xe4, 0x7b, 0x0f, 0x8f, 0xe6, 0x0a, 0xec, 0xdc, 0x6d, 0x5f, 0x6a, 0xc1, 0x49, - 0x8a, 0x07, 0x79, 0x51, 0x5a, 0x61, 0x02, 0x14, 0xa1, 0x78, 0xf8, 0x66, 0x9a, 0xfc, 0x4f, 0x22, - 0x79, 0xbf, 0xd7, 0x46, 0x00, 0x14, 0x4a, 0xa6, 0xfe, 0xe1, 0xc7, 0x33, 0x6f, 0xd1, 0x24, 0xc9, - 0x14, 0x63, 0x6d, 0xd4, 0x17, 0xc1, 0x6d, 0xa1, 0x64, 0xd0, 0x8b, 0x50, 0xfc, 0xa0, 0x99, 0xe8, - 0x70, 0xf2, 0x12, 0x3f, 0x52, 0x95, 0xd5, 0x95, 0x5d, 0x71, 0x55, 0x56, 0x5b, 0x09, 0x41, 0x3f, - 0xea, 0xc7, 0x77, 0x69, 0xef, 0x31, 0x5a, 0xdc, 0xb9, 0xce, 0x5b, 0xd7, 0x20, 0x1f, 0xf0, 0x43, - 0x23, 0x64, 0x26, 0xcc, 0x4a, 0xec, 0xb5, 0x81, 0xc0, 0x8f, 0xfa, 0xf7, 0x54, 0x1b, 0xba, 0xfc, - 0x85, 0x03, 0x19, 0xe3, 0x81, 0xca, 0x73, 0x10, 0x36, 0xb8, 0x89, 0x50, 0xec, 0xb7, 0xf6, 0x8e, - 0x91, 0x10, 0xdf, 0x94, 0xc5, 0xb6, 0xb0, 0xc1, 0xa0, 0xd3, 0x74, 0x68, 0xf2, 0x02, 0xdf, 0xb6, - 0x4e, 0x63, 0x7c, 0xdb, 0x7a, 0xa3, 0x3f, 0xde, 0x2d, 0x9a, 0xbc, 0xc3, 0xa3, 0x8f, 0x6c, 0x5d, - 0x8a, 0x85, 0x60, 0x99, 0x30, 0x4b, 0xcd, 0x24, 0x79, 0x8d, 0x7d, 0xd0, 0x4c, 0x36, 0xf7, 0xfa, - 0xb4, 0x23, 0xdf, 0x3c, 0x58, 0x72, 0x19, 0x6b, 0xf6, 0x5d, 0x47, 0xd3, 0x57, 0x87, 0x5f, 0xd4, - 0x3b, 0xd4, 0x14, 0x1d, 0x6b, 0x8a, 0x4e, 0x35, 0x45, 0x3f, 0x6b, 0x8a, 0xbe, 0x9d, 0xa9, 0x77, - 0x3c, 0x53, 0xef, 0x74, 0xa6, 0xde, 0xa7, 0x61, 0xe7, 0x13, 0xfc, 0x0e, 0x00, 0x00, 0xff, 0xff, - 0x6d, 0xe2, 0x8a, 0x1e, 0x57, 0x02, 0x00, 0x00, +var fileDescriptor_processors_base_2f9dd97fffc5def6 = []byte{ + // 370 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x91, 0xcf, 0xce, 0xd2, 0x40, + 0x14, 0xc5, 0x3b, 0x50, 0xfe, 0x64, 0x2a, 0x42, 0x26, 0x26, 0x36, 0x0d, 0x19, 0x1b, 0x82, 0xb1, + 0x2e, 0x2c, 0x91, 0x47, 0x40, 0xdd, 0x98, 0x98, 0x10, 0x70, 0xe5, 0x86, 0x0c, 0xd3, 0x01, 0xab, + 0x65, 0x66, 0x98, 0x3b, 0x4d, 0x78, 0x0c, 0x1f, 0x8b, 0x25, 0x4b, 0x56, 0x46, 0xcb, 0x13, 0xf8, + 0x06, 0x06, 0x5a, 0x4c, 0xfd, 0x92, 0x6f, 0x77, 0xf3, 0xbb, 0xe7, 0xf4, 0x9c, 0xdb, 0xc1, 0x2f, + 0x61, 0x9f, 0x4d, 0xc4, 0x41, 0xf0, 0x54, 0x6e, 0x0c, 0xd3, 0xeb, 0x89, 0x36, 0x8a, 0x0b, 0x00, + 0x65, 0x60, 0xb5, 0x66, 0x20, 0x62, 0x6d, 0x94, 0x55, 0xc4, 0xe7, 0x8a, 0x7f, 0x37, 0x8a, 0xf1, + 0xaf, 0x31, 0xec, 0xb3, 0x38, 0x49, 0xc1, 0xc2, 0x3e, 0x33, 0xb9, 0x0c, 0x82, 0x87, 0x1f, 0x48, + 0x98, 0x65, 0xa5, 0x2b, 0x20, 0x37, 0xc7, 0xff, 0xec, 0xd9, 0x56, 0x6d, 0xd5, 0x6d, 0x9c, 0x5c, + 0xa7, 0x92, 0x8e, 0xfe, 0x20, 0xdc, 0x9f, 0x2b, 0xb0, 0xf3, 0x32, 0x7d, 0xa9, 0x05, 0x27, 0x63, + 0x8c, 0xb5, 0x51, 0xdf, 0x04, 0xb7, 0xa9, 0x92, 0x7e, 0x23, 0x44, 0x51, 0x77, 0xe6, 0x1e, 0x7f, + 0xbe, 0x70, 0x16, 0x35, 0x4e, 0x5e, 0xe3, 0xa7, 0x2a, 0xb7, 0x3a, 0xb7, 0x2b, 0xae, 0xb2, 0x7c, + 0x27, 0xc1, 0x6f, 0x86, 0xcd, 0xa8, 0x37, 0x6b, 0x0c, 0xd0, 0xa2, 0x57, 0x6e, 0xde, 0x95, 0x0b, + 0xf2, 0x09, 0x3f, 0x31, 0x42, 0x26, 0xc2, 0xac, 0xc4, 0x41, 0x1b, 0xf0, 0xdd, 0xb0, 0x19, 0x79, + 0xd3, 0x71, 0xfc, 0xd8, 0x6d, 0xf1, 0x87, 0x83, 0x36, 0x02, 0x20, 0x55, 0xb2, 0x0a, 0xf6, 0x4a, + 0xff, 0x95, 0x03, 0x19, 0xe2, 0xb6, 0xda, 0x6c, 0x40, 0x58, 0xbf, 0x15, 0xa2, 0xc8, 0xad, 0x24, + 0x15, 0x23, 0x01, 0x6e, 0x65, 0xe9, 0x2e, 0xb5, 0x7e, 0xbb, 0xb6, 0x2c, 0xd1, 0x47, 0xb7, 0x8b, + 0x06, 0x8d, 0xd1, 0x2b, 0xdc, 0xb9, 0x37, 0x1b, 0xe2, 0xce, 0xbd, 0x3d, 0xfa, 0xd7, 0xfe, 0x8e, + 0x46, 0xef, 0x71, 0xff, 0x33, 0x5b, 0x67, 0x62, 0x21, 0x58, 0x22, 0xcc, 0x52, 0x33, 0x49, 0xde, + 0x62, 0x17, 0x34, 0x93, 0x3e, 0x0a, 0x51, 0xe4, 0x4d, 0x9f, 0xd7, 0x4e, 0xa8, 0x7e, 0x79, 0x7c, + 0x95, 0x55, 0xa9, 0x37, 0xe9, 0xec, 0xcd, 0xf1, 0x37, 0x75, 0x8e, 0x05, 0x45, 0xa7, 0x82, 0xa2, + 0x73, 0x41, 0xd1, 0xaf, 0x82, 0xa2, 0x1f, 0x17, 0xea, 0x9c, 0x2e, 0xd4, 0x39, 0x5f, 0xa8, 0xf3, + 0xc5, 0xab, 0x3d, 0xe3, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x51, 0xd4, 0xc6, 0xca, 0x19, 0x02, + 0x00, 0x00, } diff --git a/pkg/sql/execinfrapb/processors_base.proto b/pkg/sql/execinfrapb/processors_base.proto index 092e06bf6066..a11766ad5606 100644 --- a/pkg/sql/execinfrapb/processors_base.proto +++ b/pkg/sql/execinfrapb/processors_base.proto @@ -23,14 +23,10 @@ import "sql/execinfrapb/data.proto"; import "roachpb/data.proto"; import "gogoproto/gogo.proto"; -// PostProcessSpec describes the processing required to obtain the output -// (filtering, projection). It operates on the internal schema of the processor -// (see ProcessorSpec). +// PostProcessSpec describes the processing required to obtain the output (e.g. +// projection). It operates on the internal schema of the processor (see +// ProcessorSpec). message PostProcessSpec { - // A filtering expression which references the internal columns of the - // processor via ordinal references (@1, @2, etc). - optional Expression filter = 1 [(gogoproto.nullable) = false]; - // If true, output_columns describes a projection. Used to differentiate // between an empty projection and no projection. // @@ -56,6 +52,8 @@ message PostProcessSpec { // If nonzero, the processor will stop after emitting this many rows. The rows // suppressed by , if any, do not count towards this limit. optional uint64 limit = 6 [(gogoproto.nullable) = false]; + + reserved 1; } message Columns { diff --git a/pkg/sql/flowinfra/server_test.go b/pkg/sql/flowinfra/server_test.go index 23aad9f6ad5a..9d9f909ac0ce 100644 --- a/pkg/sql/flowinfra/server_test.go +++ b/pkg/sql/flowinfra/server_test.go @@ -58,7 +58,6 @@ func TestServer(t *testing.T) { Spans: []execinfrapb.TableReaderSpan{{Span: td.PrimaryIndexSpan(keys.SystemSQLCodec)}}, } post := execinfrapb.PostProcessSpec{ - Filter: execinfrapb.Expression{Expr: "@1 != 2"}, // a != 2 Projection: true, OutputColumns: []uint32{0, 1}, // a } @@ -113,7 +112,7 @@ func TestServer(t *testing.T) { t.Errorf("unexpected metadata: %v", metas) } str := rows.String(rowenc.TwoIntCols) - expected := "[[1 10] [3 30]]" + expected := "[[1 10] [2 20] [3 30]]" if str != expected { t.Errorf("invalid results: %s, expected %s'", str, expected) } diff --git a/pkg/sql/physicalplan/physical_plan.go b/pkg/sql/physicalplan/physical_plan.go index 4dbc29b225ee..5c3f6eab4bdb 100644 --- a/pkg/sql/physicalplan/physical_plan.go +++ b/pkg/sql/physicalplan/physical_plan.go @@ -440,8 +440,7 @@ func (p *PhysicalPlan) CheckLastStagePost() error { // verify this assumption. for i := 1; i < len(p.ResultRouters); i++ { pi := &p.Processors[p.ResultRouters[i]].Spec.Post - if pi.Filter != post.Filter || - pi.Projection != post.Projection || + if pi.Projection != post.Projection || len(pi.OutputColumns) != len(post.OutputColumns) || len(pi.RenderExprs) != len(post.RenderExprs) { return errors.Errorf("inconsistent post-processing: %v vs %v", post, pi) diff --git a/pkg/sql/physicalplan/physical_plan_test.go b/pkg/sql/physicalplan/physical_plan_test.go index 4c5b98ef7723..873666418085 100644 --- a/pkg/sql/physicalplan/physical_plan_test.go +++ b/pkg/sql/physicalplan/physical_plan_test.go @@ -351,7 +351,6 @@ func TestProjectionAndRendering(t *testing.T) { // expressions, however, we don't do that for the expected results. In // order to be able to use the deep comparison below we manually unset // that unserialized field. - post.Filter.LocalExpr = nil for i := range post.RenderExprs { post.RenderExprs[i].LocalExpr = nil } diff --git a/pkg/sql/rowenc/testutils.go b/pkg/sql/rowenc/testutils.go index c42522f585ca..e3af5131db9a 100644 --- a/pkg/sql/rowenc/testutils.go +++ b/pkg/sql/rowenc/testutils.go @@ -1619,11 +1619,6 @@ func IntEncDatum(i int) EncDatum { return EncDatum{Datum: tree.NewDInt(tree.DInt(i))} } -// StrEncDatum returns an EncDatum representation of DString(s). -func StrEncDatum(s string) EncDatum { - return EncDatum{Datum: tree.NewDString(s)} -} - // NullEncDatum returns and EncDatum representation of tree.DNull. func NullEncDatum() EncDatum { return EncDatum{Datum: tree.DNull} diff --git a/pkg/sql/rowexec/joinreader_test.go b/pkg/sql/rowexec/joinreader_test.go index 483ababf6fad..ca2fd90082c0 100644 --- a/pkg/sql/rowexec/joinreader_test.go +++ b/pkg/sql/rowexec/joinreader_test.go @@ -1085,33 +1085,6 @@ func TestIndexJoiner(t *testing.T) { {v[1], v[5], v[6]}, }, }, - { - description: "Test a filter in the post process spec and using a secondary index", - desc: td.TableDesc(), - post: execinfrapb.PostProcessSpec{ - Filter: execinfrapb.Expression{Expr: "@3 <= 5"}, // sum <= 5 - Projection: true, - OutputColumns: []uint32{3}, - }, - input: rowenc.EncDatumRows{ - {v[0], v[1]}, - {v[2], v[5]}, - {v[0], v[5]}, - {v[2], v[1]}, - {v[3], v[4]}, - {v[1], v[3]}, - {v[5], v[1]}, - {v[5], v[0]}, - }, - outputTypes: []*types.T{types.String}, - expected: rowenc.EncDatumRows{ - {rowenc.StrEncDatum("one")}, - {rowenc.StrEncDatum("five")}, - {rowenc.StrEncDatum("two-one")}, - {rowenc.StrEncDatum("one-three")}, - {rowenc.StrEncDatum("five-zero")}, - }, - }, { description: "Test selecting rows using the primary index with multiple family spans", desc: tdf.TableDesc(), diff --git a/pkg/sql/rowexec/processors_test.go b/pkg/sql/rowexec/processors_test.go index 9fef1d073f54..fad5ef45db8b 100644 --- a/pkg/sql/rowexec/processors_test.go +++ b/pkg/sql/rowexec/processors_test.go @@ -82,16 +82,6 @@ func TestPostProcess(t *testing.T) { expected: "[[0 1 2] [0 1 3] [0 1 4] [0 2 3] [0 2 4] [0 3 4] [1 2 3] [1 2 4] [1 3 4] [2 3 4]]", }, - // Filter. - { - post: execinfrapb.PostProcessSpec{ - Filter: execinfrapb.Expression{Expr: "@1 = 1"}, - }, - outputTypes: rowenc.ThreeIntCols, - expNeededCols: []int{0, 1, 2}, - expected: "[[1 2 3] [1 2 4] [1 3 4]]", - }, - // Projection. { post: execinfrapb.PostProcessSpec{ @@ -103,30 +93,6 @@ func TestPostProcess(t *testing.T) { expected: "[[0 2] [0 3] [0 4] [0 3] [0 4] [0 4] [1 3] [1 4] [1 4] [2 4]]", }, - // Filter and projection; filter only refers to projected column. - { - post: execinfrapb.PostProcessSpec{ - Filter: execinfrapb.Expression{Expr: "@1 = 1"}, - Projection: true, - OutputColumns: []uint32{0, 2}, - }, - outputTypes: rowenc.TwoIntCols, - expNeededCols: []int{0, 2}, - expected: "[[1 3] [1 4] [1 4]]", - }, - - // Filter and projection; filter refers to non-projected column. - { - post: execinfrapb.PostProcessSpec{ - Filter: execinfrapb.Expression{Expr: "@2 = 2"}, - Projection: true, - OutputColumns: []uint32{0, 2}, - }, - outputTypes: rowenc.TwoIntCols, - expNeededCols: []int{0, 1, 2}, - expected: "[[0 3] [0 4] [1 3] [1 4]]", - }, - // Rendering. { post: execinfrapb.PostProcessSpec{ @@ -137,28 +103,6 @@ func TestPostProcess(t *testing.T) { expected: "[[0 1 1] [0 1 1] [0 1 1] [0 2 2] [0 2 2] [0 3 3] [1 2 3] [1 2 3] [1 3 4] [2 3 5]]", }, - // Rendering and filtering; filter refers to column used in rendering. - { - post: execinfrapb.PostProcessSpec{ - Filter: execinfrapb.Expression{Expr: "@2 = 2"}, - RenderExprs: []execinfrapb.Expression{{Expr: "@1"}, {Expr: "@2"}, {Expr: "@1 + @2"}}, - }, - outputTypes: rowenc.ThreeIntCols, - expNeededCols: []int{0, 1}, - expected: "[[0 2 2] [0 2 2] [1 2 3] [1 2 3]]", - }, - - // Rendering and filtering; filter refers to column not used in rendering. - { - post: execinfrapb.PostProcessSpec{ - Filter: execinfrapb.Expression{Expr: "@3 = 4"}, - RenderExprs: []execinfrapb.Expression{{Expr: "@1"}, {Expr: "@2"}, {Expr: "@1 + @2"}}, - }, - outputTypes: rowenc.ThreeIntCols, - expNeededCols: []int{0, 1, 2}, - expected: "[[0 1 1] [0 2 2] [0 3 3] [1 2 3] [1 3 4] [2 3 5]]", - }, - // More complex rendering expressions. { post: execinfrapb.PostProcessSpec{ @@ -246,28 +190,6 @@ func TestPostProcess(t *testing.T) { expNeededCols: []int{0, 1, 2}, expected: "[[0 2 3] [0 2 4] [0 3 4] [1 2 3] [1 2 4] [1 3 4] [2 3 4]]", }, - - // Filter + offset. - { - post: execinfrapb.PostProcessSpec{ - Filter: execinfrapb.Expression{Expr: "@1 = 1"}, - Offset: 1, - }, - outputTypes: rowenc.ThreeIntCols, - expNeededCols: []int{0, 1, 2}, - expected: "[[1 2 4] [1 3 4]]", - }, - - // Filter + limit. - { - post: execinfrapb.PostProcessSpec{ - Filter: execinfrapb.Expression{Expr: "@1 = 1"}, - Limit: 2, - }, - outputTypes: rowenc.ThreeIntCols, - expNeededCols: []int{0, 1, 2}, - expected: "[[1 2 3] [1 2 4]]", - }, } for tcIdx, tc := range testCases { diff --git a/pkg/sql/rowexec/sorter.go b/pkg/sql/rowexec/sorter.go index 17d9e8a00b5b..4411b5c0972a 100644 --- a/pkg/sql/rowexec/sorter.go +++ b/pkg/sql/rowexec/sorter.go @@ -152,7 +152,7 @@ func newSorter( output execinfra.RowReceiver, ) (execinfra.Processor, error) { count := uint64(0) - if post.Limit != 0 && post.Filter.Empty() { + if post.Limit != 0 { // The sorter needs to produce Offset + Limit rows. The ProcOutputHelper // will discard the first Offset ones. if post.Limit <= math.MaxUint64-post.Offset { diff --git a/pkg/sql/rowexec/sorter_test.go b/pkg/sql/rowexec/sorter_test.go index ff3cb74e500a..3d23fa14fe34 100644 --- a/pkg/sql/rowexec/sorter_test.go +++ b/pkg/sql/rowexec/sorter_test.go @@ -137,34 +137,6 @@ func TestSorter(t *testing.T) { {v[3], v[2], v[0]}, {v[3], v[3], v[0]}, }, - }, { - name: "SortFilterExpr", - // No specified input ordering but specified postprocess filter expression. - spec: execinfrapb.SorterSpec{ - OutputOrdering: execinfrapb.ConvertToSpecOrdering( - colinfo.ColumnOrdering{ - {ColIdx: 0, Direction: asc}, - {ColIdx: 1, Direction: asc}, - {ColIdx: 2, Direction: asc}, - }), - }, - post: execinfrapb.PostProcessSpec{Filter: execinfrapb.Expression{Expr: "@1 + @2 < 7"}}, - types: rowenc.ThreeIntCols, - input: rowenc.EncDatumRows{ - {v[3], v[3], v[0]}, - {v[3], v[4], v[1]}, - {v[1], v[0], v[4]}, - {v[0], v[0], v[0]}, - {v[4], v[4], v[4]}, - {v[4], v[4], v[5]}, - {v[3], v[2], v[0]}, - }, - expected: rowenc.EncDatumRows{ - {v[0], v[0], v[0]}, - {v[1], v[0], v[4]}, - {v[3], v[2], v[0]}, - {v[3], v[3], v[0]}, - }, }, { name: "SortMatchOrderingNoLimit", // Specified match ordering length but no specified limit. diff --git a/pkg/sql/rowexec/tablereader_test.go b/pkg/sql/rowexec/tablereader_test.go index bc8efef32249..3e8abc08b0ae 100644 --- a/pkg/sql/rowexec/tablereader_test.go +++ b/pkg/sql/rowexec/tablereader_test.go @@ -64,7 +64,7 @@ func TestTableReader(t *testing.T) { sqlutils.CreateTable(t, sqlDB, "t", "a INT, b INT, sum INT, s STRING, PRIMARY KEY (a,b), INDEX bs (b,s)", - 99, + 19, sqlutils.ToRowFn(aFn, bFn, sumFn, sqlutils.RowEnglishFn)) td := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "test", "t") @@ -88,23 +88,21 @@ func TestTableReader(t *testing.T) { Spans: []execinfrapb.TableReaderSpan{{Span: td.PrimaryIndexSpan(keys.SystemSQLCodec)}}, }, post: execinfrapb.PostProcessSpec{ - Filter: execinfrapb.Expression{Expr: "@3 < 5 AND @2 != 3"}, // sum < 5 && b != 3 Projection: true, OutputColumns: []uint32{0, 1}, }, - expected: "[[0 1] [0 2] [0 4] [1 0] [1 1] [1 2] [2 0] [2 1] [2 2] [3 0] [3 1] [4 0]]", + expected: "[[0 1] [0 2] [0 3] [0 4] [0 5] [0 6] [0 7] [0 8] [0 9] [1 0] [1 1] [1 2] [1 3] [1 4] [1 5] [1 6] [1 7] [1 8] [1 9]]", }, { spec: execinfrapb.TableReaderSpec{ Spans: []execinfrapb.TableReaderSpan{{Span: td.PrimaryIndexSpan(keys.SystemSQLCodec)}}, }, post: execinfrapb.PostProcessSpec{ - Filter: execinfrapb.Expression{Expr: "@3 < 5 AND @2 != 3"}, Projection: true, OutputColumns: []uint32{3}, // s Limit: 4, }, - expected: "[['one'] ['two'] ['four'] ['one-zero']]", + expected: "[['one'] ['two'] ['three'] ['four']]", }, { spec: execinfrapb.TableReaderSpec{ @@ -114,11 +112,10 @@ func TestTableReader(t *testing.T) { LimitHint: 1, }, post: execinfrapb.PostProcessSpec{ - Filter: execinfrapb.Expression{Expr: "@1 < 3"}, // sum < 8 Projection: true, OutputColumns: []uint32{0, 1}, }, - expected: "[[2 5] [1 5] [0 5] [2 4] [1 4] [0 4]]", + expected: "[[1 5] [0 5] [1 4] [0 4]]", }, }