Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexec: fix CASE operator a bit #67757

Merged
merged 1 commit into from
Jul 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/col/coldata/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
type Batch interface {
// Length returns the number of values in the columns in the batch.
Length() int
// SetLength sets the number of values in the columns in the batch.
// SetLength sets the number of values in the columns in the batch. Note
// that if the selection vector will be set or updated on the batch, it must
// be set **before** setting the length.
SetLength(int)
// Capacity returns the maximum number of values that can be stored in the
// columns in the batch. Note that it could be a lower bound meaning some
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/colexec/aggregators_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,11 +532,7 @@ func (o *singleBatchOperator) reset(vecs []coldata.Vec, inputLen int, sel []int)
for i, vec := range vecs {
o.batch.ReplaceCol(vec, i)
}
o.batch.SetLength(inputLen)
o.batch.SetSelection(sel != nil)
if sel != nil {
copy(o.batch.Selection(), sel[:inputLen])
}
colexecutils.UpdateBatchState(o.batch, inputLen, sel != nil, sel)
}

// aggBucket stores the aggregation functions for the corresponding aggregation
Expand Down
48 changes: 6 additions & 42 deletions pkg/sql/colexec/and_or_projection.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 1 addition & 7 deletions pkg/sql/colexec/and_or_projection_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,7 @@ func (o *_OP_LOWERProjOp) Next() coldata.Batch {
// {{end}}

// Now we need to restore the original selection vector and length.
if usesSel {
sel := batch.Selection()
copy(sel[:origLen], o.origSel[:origLen])
} else {
batch.SetSelection(false)
}
batch.SetLength(origLen)
colexecutils.UpdateBatchState(batch, origLen, usesSel, o.origSel)

outputCol := batch.ColVec(o.outputIdx)
outputVals := outputCol.Bool()
Expand Down
17 changes: 3 additions & 14 deletions pkg/sql/colexec/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,21 +296,14 @@ func (c *caseOp) Next() coldata.Batch {
}
}
// Set the buffered batch into the desired state.
c.buffer.batch.SetLength(curIdx)
c.buffer.batch.SetSelection(true)
colexecutils.UpdateBatchState(c.buffer.batch, curIdx, true /* usesSel */, c.prevSel)
prevHasSel = true
copy(c.buffer.batch.Selection()[:curIdx], c.prevSel)
c.prevSel = c.prevSel[:curIdx]
} else {
// There were no matches with the current WHEN arm, so we simply need
// to restore the buffered batch into the previous state.
prevLen := origLen - numAlreadyMatched
c.buffer.batch.SetLength(prevLen)
c.buffer.batch.SetSelection(prevHasSel)
if prevHasSel {
copy(c.buffer.batch.Selection()[:prevLen], c.prevSel)
c.prevSel = c.prevSel[:prevLen]
}
colexecutils.UpdateBatchState(c.buffer.batch, prevLen, prevHasSel, c.prevSel)
}
// Now our selection vector is set to exclude all the things that have
// matched so far. Reset the buffer and run the next case arm.
Expand Down Expand Up @@ -363,10 +356,6 @@ func (c *caseOp) Next() coldata.Batch {
}

// Restore the original state of the buffered batch.
c.buffer.batch.SetLength(origLen)
c.buffer.batch.SetSelection(origHasSel)
if origHasSel {
copy(c.buffer.batch.Selection()[:origLen], c.origSel[:origLen])
}
colexecutils.UpdateBatchState(c.buffer.batch, origLen, origHasSel, c.origSel)
return c.buffer.batch
}
14 changes: 14 additions & 0 deletions pkg/sql/colexec/colexecutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,3 +294,17 @@ func EnsureSelectionVectorLength(old []int, length int) []int {
}
return make([]int, length)
}

// UpdateBatchState updates batch to have the specified length and the selection
// vector. If usesSel is true, then sel must be non-nil; otherwise, sel is
// ignored.
func UpdateBatchState(batch coldata.Batch, length int, usesSel bool, sel []int) {
batch.SetSelection(usesSel)
if usesSel {
copy(batch.Selection()[:length], sel[:length])
}
// Note: when usesSel is true, we have to set the length on the batch
// **after** setting the selection vector because we might use the values
// in the selection vector to maintain invariants (like for flat bytes).
batch.SetLength(length)
}
4 changes: 1 addition & 3 deletions pkg/sql/colflow/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,9 +653,7 @@ func (r *HashRouter) processNextBatch(ctx context.Context) bool {
selections := r.tupleDistributor.Distribute(b, r.hashCols)
for i, o := range r.outputs {
if len(selections[i]) > 0 {
b.SetSelection(true)
copy(b.Selection(), selections[i])
b.SetLength(len(selections[i]))
colexecutils.UpdateBatchState(b, len(selections[i]), true /* usesSel */, selections[i])
if o.addBatch(ctx, b) {
// This batch blocked the output.
r.numBlockedOutputs++
Expand Down
21 changes: 21 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/vectorize
Original file line number Diff line number Diff line change
Expand Up @@ -1210,3 +1210,24 @@ statement ok
CREATE TYPE greeting AS ENUM ('hello');
CREATE TABLE greeting_table (x greeting);
EXPLAIN (VEC) SELECT * FROM greeting_table;

# Regression test for incorrectly updating the batch in the CASE operator.
statement ok
SET vectorize=on;
CREATE TABLE t64793 AS
SELECT
g % 2 = 1 AS _bool, g::STRING AS _string
FROM
ROWS FROM (generate_series(1, 5)) AS g;
SET vectorize=experimental_always;
INSERT INTO t64793 DEFAULT VALUES;

query T rowsort
SELECT CASE WHEN _bool THEN _string ELSE _string END FROM t64793;
----
1
2
3
4
5
NULL