Skip to content

Commit

Permalink
sql: remove Filter from PostProcessSpec
Browse files Browse the repository at this point in the history
Release notes: None
  • Loading branch information
RaduBerinde committed Dec 1, 2020
1 parent 4209026 commit ff832dd
Show file tree
Hide file tree
Showing 19 changed files with 309 additions and 562 deletions.
35 changes: 17 additions & 18 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,10 +415,10 @@ func (r opResult) createDiskBackedSort(
colmem.NewAllocator(ctx, sortChunksMemAccount, factory), input, inputTypes,
ordering.Columns, int(matchLen),
)
} else if post.Limit != 0 && post.Filter.Empty() && post.Limit < math.MaxUint64-post.Offset {
// There is a limit specified with no post-process filter, so we know
// exactly how many rows the sorter should output. The last part of the
// condition is making sure there is no overflow.
} else if post.Limit != 0 && post.Limit < math.MaxUint64-post.Offset {
// There is a limit specified, so we know exactly how many rows the sorter
// should output. The last part of the condition is making sure there is no
// overflow.
//
// Choose a top K sorter, which uses a heap to avoid storing more rows
// than necessary.
Expand Down Expand Up @@ -1244,8 +1244,7 @@ func NewColOperator(
}

// planAndMaybeWrapFilter plans a filter. If the filter is unsupported, it is
// planned as a wrapped noop processor with the filter as a post-processing
// stage.
// planned as a wrapped filterer processor.
func (r opResult) planAndMaybeWrapFilter(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
Expand All @@ -1268,8 +1267,18 @@ func (r opResult) planAndMaybeWrapFilter(
)
}

post := &execinfrapb.PostProcessSpec{Filter: filter}
return r.wrapPostProcessSpec(ctx, flowCtx, args, post, args.Spec.ResultTypes, factory, err)
filtererSpec := &execinfrapb.ProcessorSpec{
Core: execinfrapb.ProcessorCoreUnion{
Filterer: &execinfrapb.FiltererSpec{
Filter: filter,
},
},
ResultTypes: args.Spec.ResultTypes,
}
return r.createAndWrapRowSource(
ctx, flowCtx, args, []colexecbase.Operator{r.Op}, [][]*types.T{r.ColumnTypes},
filtererSpec, factory, err,
)
}
r.Op = op
return nil
Expand Down Expand Up @@ -1312,16 +1321,6 @@ func (r *postProcessResult) planPostProcessSpec(
post *execinfrapb.PostProcessSpec,
factory coldata.ColumnFactory,
) error {
if !post.Filter.Empty() {
op, err := planFilterExpr(
ctx, flowCtx, evalCtx, r.Op, r.ColumnTypes, post.Filter, args.StreamingMemAccount, factory, args.ExprHelper,
)
if err != nil {
return err
}
r.Op = op
}

if post.Projection {
r.Op, r.ColumnTypes = addProjection(r.Op, r.ColumnTypes, post.OutputColumns)
} else if post.RenderExprs != nil {
Expand Down
7 changes: 3 additions & 4 deletions pkg/sql/colexec/is_null_ops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,9 @@ func TestIsNullSelOp(t *testing.T) {
spec := &execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{{ColumnTypes: typs}},
Core: execinfrapb.ProcessorCoreUnion{
Noop: &execinfrapb.NoopCoreSpec{},
},
Post: execinfrapb.PostProcessSpec{
Filter: execinfrapb.Expression{Expr: fmt.Sprintf("@1 %s", c.selExpr)},
Filterer: &execinfrapb.FiltererSpec{
Filter: execinfrapb.Expression{Expr: fmt.Sprintf("@1 %s", c.selExpr)},
},
},
ResultTypes: typs,
}
Expand Down
Loading

0 comments on commit ff832dd

Please sign in to comment.