Skip to content

Commit

Permalink
Merge #67757
Browse files Browse the repository at this point in the history
67757: colexec: fix CASE operator a bit r=yuzefovich a=yuzefovich

Whenever we're updating the length on the batch containing bytes-like
vectors, we are updating those vectors to have non-decreasing offsets.
In case the batch has a selection vector set, we're using the largest
index in the selection to update the offsets. This logic relies on the
assumption that the selection vector is set on the batch **before**
setting the length which wasn't the case in a couple of places.

In particular, the recent refactor of the case operator added the
support of bytes-like types as the output of the case operator, but the
existing code was setting the length on the batch first. This is now
fixed.

Addresses: #64793 (comment).
Fixes: #67744.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Jul 19, 2021
2 parents 5e1c2c6 + 928c959 commit 5da96b4
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 72 deletions.
4 changes: 3 additions & 1 deletion pkg/col/coldata/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
type Batch interface {
// Length returns the number of values in the columns in the batch.
Length() int
// SetLength sets the number of values in the columns in the batch.
// SetLength sets the number of values in the columns in the batch. Note
// that if the selection vector will be set or updated on the batch, it must
// be set **before** setting the length.
SetLength(int)
// Capacity returns the maximum number of values that can be stored in the
// columns in the batch. Note that it could be a lower bound meaning some
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/colexec/aggregators_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,11 +532,7 @@ func (o *singleBatchOperator) reset(vecs []coldata.Vec, inputLen int, sel []int)
for i, vec := range vecs {
o.batch.ReplaceCol(vec, i)
}
o.batch.SetLength(inputLen)
o.batch.SetSelection(sel != nil)
if sel != nil {
copy(o.batch.Selection(), sel[:inputLen])
}
colexecutils.UpdateBatchState(o.batch, inputLen, sel != nil, sel)
}

// aggBucket stores the aggregation functions for the corresponding aggregation
Expand Down
48 changes: 6 additions & 42 deletions pkg/sql/colexec/and_or_projection.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 1 addition & 7 deletions pkg/sql/colexec/and_or_projection_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,7 @@ func (o *_OP_LOWERProjOp) Next() coldata.Batch {
// {{end}}

// Now we need to restore the original selection vector and length.
if usesSel {
sel := batch.Selection()
copy(sel[:origLen], o.origSel[:origLen])
} else {
batch.SetSelection(false)
}
batch.SetLength(origLen)
colexecutils.UpdateBatchState(batch, origLen, usesSel, o.origSel)

outputCol := batch.ColVec(o.outputIdx)
outputVals := outputCol.Bool()
Expand Down
17 changes: 3 additions & 14 deletions pkg/sql/colexec/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,21 +296,14 @@ func (c *caseOp) Next() coldata.Batch {
}
}
// Set the buffered batch into the desired state.
c.buffer.batch.SetLength(curIdx)
c.buffer.batch.SetSelection(true)
colexecutils.UpdateBatchState(c.buffer.batch, curIdx, true /* usesSel */, c.prevSel)
prevHasSel = true
copy(c.buffer.batch.Selection()[:curIdx], c.prevSel)
c.prevSel = c.prevSel[:curIdx]
} else {
// There were no matches with the current WHEN arm, so we simply need
// to restore the buffered batch into the previous state.
prevLen := origLen - numAlreadyMatched
c.buffer.batch.SetLength(prevLen)
c.buffer.batch.SetSelection(prevHasSel)
if prevHasSel {
copy(c.buffer.batch.Selection()[:prevLen], c.prevSel)
c.prevSel = c.prevSel[:prevLen]
}
colexecutils.UpdateBatchState(c.buffer.batch, prevLen, prevHasSel, c.prevSel)
}
// Now our selection vector is set to exclude all the things that have
// matched so far. Reset the buffer and run the next case arm.
Expand Down Expand Up @@ -363,10 +356,6 @@ func (c *caseOp) Next() coldata.Batch {
}

// Restore the original state of the buffered batch.
c.buffer.batch.SetLength(origLen)
c.buffer.batch.SetSelection(origHasSel)
if origHasSel {
copy(c.buffer.batch.Selection()[:origLen], c.origSel[:origLen])
}
colexecutils.UpdateBatchState(c.buffer.batch, origLen, origHasSel, c.origSel)
return c.buffer.batch
}
14 changes: 14 additions & 0 deletions pkg/sql/colexec/colexecutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,3 +294,17 @@ func EnsureSelectionVectorLength(old []int, length int) []int {
}
return make([]int, length)
}

// UpdateBatchState updates batch to have the specified length and the selection
// vector. If usesSel is true, then sel must be non-nil; otherwise, sel is
// ignored.
func UpdateBatchState(batch coldata.Batch, length int, usesSel bool, sel []int) {
batch.SetSelection(usesSel)
if usesSel {
copy(batch.Selection()[:length], sel[:length])
}
// Note: when usesSel is true, we have to set the length on the batch
// **after** setting the selection vector because we might use the values
// in the selection vector to maintain invariants (like for flat bytes).
batch.SetLength(length)
}
4 changes: 1 addition & 3 deletions pkg/sql/colflow/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,9 +653,7 @@ func (r *HashRouter) processNextBatch(ctx context.Context) bool {
selections := r.tupleDistributor.Distribute(b, r.hashCols)
for i, o := range r.outputs {
if len(selections[i]) > 0 {
b.SetSelection(true)
copy(b.Selection(), selections[i])
b.SetLength(len(selections[i]))
colexecutils.UpdateBatchState(b, len(selections[i]), true /* usesSel */, selections[i])
if o.addBatch(ctx, b) {
// This batch blocked the output.
r.numBlockedOutputs++
Expand Down
21 changes: 21 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/vectorize
Original file line number Diff line number Diff line change
Expand Up @@ -1210,3 +1210,24 @@ statement ok
CREATE TYPE greeting AS ENUM ('hello');
CREATE TABLE greeting_table (x greeting);
EXPLAIN (VEC) SELECT * FROM greeting_table;

# Regression test for incorrectly updating the batch in the CASE operator.
statement ok
SET vectorize=on;
CREATE TABLE t64793 AS
SELECT
g % 2 = 1 AS _bool, g::STRING AS _string
FROM
ROWS FROM (generate_series(1, 5)) AS g;
SET vectorize=experimental_always;
INSERT INTO t64793 DEFAULT VALUES;

query T rowsort
SELECT CASE WHEN _bool THEN _string ELSE _string END FROM t64793;
----
1
2
3
4
5
NULL

0 comments on commit 5da96b4

Please sign in to comment.