Skip to content

Commit

Permalink
Merge #44104
Browse files Browse the repository at this point in the history
44104: colexec: refactor CASE operator r=yuzefovich a=yuzefovich

This commit removes selectionBatch which was used by the CASE operator
and, instead, moves the logic that selectionBatch was providing into the
operator itself. I think it is clearer this way.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Jan 21, 2020
2 parents 9cf883b + 61606f4 commit b96fa11
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 106 deletions.
54 changes: 6 additions & 48 deletions pkg/sql/colexec/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,85 +23,43 @@ type bufferOp struct {

// read is true if someone has read the current batch already.
read bool
batch *selectionBatch
batch coldata.Batch
}

var _ InternalMemoryOperator = &bufferOp{}
var _ Operator = &bufferOp{}

// NewBufferOp returns a new bufferOp, initialized to buffer batches from the
// supplied input.
func NewBufferOp(input Operator) Operator {
return &bufferOp{
OneInputNode: NewOneInputNode(input),
batch: &selectionBatch{
sel: make([]uint16, coldata.BatchSize()),
},
}
}

func (b *bufferOp) InternalMemoryUsage() int {
// We internally use a single selection vector within the selectionBatch.
return sizeOfBatchSizeSelVector
}

func (b *bufferOp) Init() {
b.input.Init()
}

// rewind resets this buffer to be readable again.
// NOTE: it is the caller responsibility to restore the batch into the desired
// state.
func (b *bufferOp) rewind() {
b.read = false
b.batch.SetLength(b.batch.Batch.Length())
b.batch.useSel = b.batch.Batch.Selection() != nil
if b.batch.useSel {
copy(b.batch.sel, b.batch.Batch.Selection())
}
}

// advance reads the next batch from the input into the buffer, preparing itself
// for reads.
func (b *bufferOp) advance(ctx context.Context) {
b.batch.Batch = b.input.Next(ctx)
b.batch = b.input.Next(ctx)
b.rewind()
}

func (b *bufferOp) Next(ctx context.Context) coldata.Batch {
if b.read {
// TODO(yuzefovich): use coldata.ZeroBatch.
b.batch.SetLength(0)
return b.batch
}
b.read = true
return b.batch
}

// selectionBatch is a smaller wrapper around coldata.Batch that adds another
// selection vector on top. This is useful for operators that might want to
// permute the selection vector for downstream operators without touching the
// original selection of the batch.
type selectionBatch struct {
coldata.Batch
sel []uint16
useSel bool
n uint16
}

var _ coldata.Batch = &selectionBatch{}

func (s *selectionBatch) SetSelection(b bool) {
s.useSel = b
}

func (s *selectionBatch) Selection() []uint16 {
if s.useSel {
return s.sel
}
return nil
}

func (s *selectionBatch) Length() uint16 {
return s.n
}

func (s *selectionBatch) SetLength(n uint16) {
s.n = n
}
19 changes: 0 additions & 19 deletions pkg/sql/colexec/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,4 @@ func TestBufferOp(t *testing.T) {
require.Nil(t, b.Selection())
require.Equal(t, uint16(0), b.Length())
})

t.Run("TestBufferRestoresOriginalBatch", func(t *testing.T) {
buffer.rewind()
b := buffer.Next(ctx)
b.SetSelection(true)
sel := b.Selection()
sel[0] = 1
b.SetLength(1)

// We have modified the selection batch, but rewinding the buffer should
// restore the returned batch to the original state.
buffer.rewind()
b = buffer.Next(ctx)
require.Nil(t, b.Selection())
require.Equal(t, uint16(len(inputTuples)), b.Length())
b = buffer.Next(ctx)
require.Nil(t, b.Selection())
require.Equal(t, uint16(0), b.Length())
})
}
104 changes: 66 additions & 38 deletions pkg/sql/colexec/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ type caseOp struct {
// the input batch. We need to do this because we're going to destructively
// modify the selection vector in order to do the work of the case statement.
origSel []uint16
// prevSel is a buffer used to keep track of the selection vector before
// running a case arm (i.e. "previous to the current case arm"). We need to
// keep track of it because case arm will modify the selection vector of the
// batch, and then we need to figure out which tuples have not been matched
// by the current case arm (those present in the "previous" sel and not
// present in the "current" sel).
prevSel []uint16
}

var _ InternalMemoryOperator = &caseOp{}
Expand All @@ -57,8 +64,8 @@ func (c *caseOp) Child(nth int, verbose bool) execinfra.OpNode {
}

func (c *caseOp) InternalMemoryUsage() int {
// We internally use a single selection vector, origSel.
return sizeOfBatchSizeSelVector
// We internally use two selection vectors, origSel and prevSel.
return 2 * sizeOfBatchSizeSelVector
}

// NewCaseOp returns an operator that runs a case statement.
Expand Down Expand Up @@ -89,6 +96,7 @@ func NewCaseOp(
outputIdx: outputIdx,
typ: typ,
origSel: make([]uint16, coldata.BatchSize()),
prevSel: make([]uint16, coldata.BatchSize()),
}
}

Expand All @@ -101,15 +109,15 @@ func (c *caseOp) Init() {

func (c *caseOp) Next(ctx context.Context) coldata.Batch {
c.buffer.advance(ctx)
origLen := c.buffer.batch.Batch.Length()
origLen := c.buffer.batch.Length()
if c.buffer.batch.Width() == c.outputIdx {
c.allocator.AppendColumn(c.buffer.batch, c.typ)
}
// NB: we don't short-circuit if the batch is length 0 here, because we have
// to make sure to run all of our case arms. This is unfortunate.
// TODO(jordan): add this back in once batches are right-sized by planning.
var origHasSel bool
if sel := c.buffer.batch.Batch.Selection(); sel != nil {
if sel := c.buffer.batch.Selection(); sel != nil {
origHasSel = true
copy(c.origSel, sel)
}
Expand All @@ -119,24 +127,30 @@ func (c *caseOp) Next(ctx context.Context) coldata.Batch {
// for non-zero length batches.
// TODO(yuzefovich): remove it once we can short-circuit.
var (
outputCol coldata.Vec
batch coldata.Batch
destVecs []coldata.Vec
outputCol coldata.Vec
destVecs []coldata.Vec
prevLen = origLen
prevHasSel bool
)
if origLen > 0 {
outputCol = c.buffer.batch.Batch.ColVec(c.outputIdx)
outputCol = c.buffer.batch.ColVec(c.outputIdx)
destVecs = []coldata.Vec{outputCol}
if sel := c.buffer.batch.Selection(); sel != nil {
prevHasSel = true
c.prevSel = c.prevSel[:origLen]
copy(c.prevSel[:origLen], sel[:origLen])
}
}
c.allocator.PerformOperation(destVecs, func() {
for i := range c.caseOps {
// Run the next case operator chain. It will project its THEN expression
// for all tuples that matched its WHEN expression and that were not
// already matched.
batch = c.caseOps[i].Next(ctx)
batch := c.caseOps[i].Next(ctx)
// The batch's projection column now additionally contains results for all
// of the tuples that passed the ith WHEN clause. The batch's selection
// vector is set to the same selection of tuples.
// Now, we must subtract this selection vector from the last buffered
// Now, we must subtract this selection vector from the previous
// selection vector, so that the next operator gets to operate on the
// remaining set of tuples in the input that haven't matched an arm of the
// case statement.
Expand All @@ -157,8 +171,8 @@ func (c *caseOp) Next(ctx context.Context) coldata.Batch {
// current case arm.
var subtractIdx int
var curIdx uint16
if origLen > 0 {
inputCol := c.buffer.batch.Batch.ColVec(c.thenIdxs[i])
if batch.Length() > 0 {
inputCol := batch.ColVec(c.thenIdxs[i])
// Copy the results into the output vector, using the toSubtract selection
// vector to copy only the elements that we actually wrote according to the
// current case arm.
Expand All @@ -173,49 +187,62 @@ func (c *caseOp) Next(ctx context.Context) coldata.Batch {
},
SelOnDest: true,
})
if oldSel := c.buffer.batch.Batch.Selection(); oldSel != nil {
// We have an old selection vector, which represents the tuples that
// haven't yet been matched. Remove the ones that just matched from the
// old selection vector.
for i := range oldSel[:c.buffer.batch.Batch.Length()] {
if subtractIdx < len(toSubtract) && toSubtract[subtractIdx] == oldSel[i] {
// The ith element of the old selection vector matched the current one
// in toSubtract. Skip writing this element, removing it from the old
// selection vector.
if prevHasSel {
// We have a previous selection vector, which represents the tuples
// that haven't yet been matched. Remove the ones that just matched
// from the previous selection vector.
for i := range c.prevSel {
if subtractIdx < len(toSubtract) && toSubtract[subtractIdx] == c.prevSel[i] {
// The ith element of the previous selection vector matched the
// current one in toSubtract. Skip writing this element, removing
// it from the previous selection vector.
subtractIdx++
continue
}
oldSel[curIdx] = oldSel[i]
c.prevSel[curIdx] = c.prevSel[i]
curIdx++
}
} else {
// No selection vector means there have been no matches yet, and we were
// considering the entire batch of tuples for this case arm. Make a new
// selection vector with all of the tuples but the ones that just matched.
c.buffer.batch.Batch.SetSelection(true)
oldSel = c.buffer.batch.Batch.Selection()
for i := uint16(0); i < c.buffer.batch.Batch.Length(); i++ {
c.prevSel = c.prevSel[:cap(c.prevSel)]
for i := uint16(0); i < origLen; i++ {
if subtractIdx < len(toSubtract) && toSubtract[subtractIdx] == i {
subtractIdx++
continue
}
oldSel[curIdx] = i
c.prevSel[curIdx] = i
curIdx++
}
}
c.buffer.batch.Batch.SetLength(curIdx)

// Now our selection vector is set to exclude all the things that have
// matched so far. Reset the buffer and run the next case arm.
c.buffer.rewind()
// Set the buffered batch into the desired state.
c.buffer.batch.SetLength(curIdx)
prevLen = curIdx
c.buffer.batch.SetSelection(true)
prevHasSel = true
copy(c.buffer.batch.Selection()[:curIdx], c.prevSel)
c.prevSel = c.prevSel[:curIdx]
} else {
// There were no matches with the current WHEN arm, so we simply need
// to restore the buffered batch into the previous state.
c.buffer.batch.SetLength(prevLen)
c.buffer.batch.SetSelection(prevHasSel)
if prevHasSel {
copy(c.buffer.batch.Selection()[:prevLen], c.prevSel)
c.prevSel = c.prevSel[:prevLen]
}
}
// Now our selection vector is set to exclude all the things that have
// matched so far. Reset the buffer and run the next case arm.
c.buffer.rewind()
}
// Finally, run the else operator, which will project into all tuples that
// are remaining in the selection vector (didn't match any case arms). Once
// that's done, restore the original selection vector and return the batch.
batch = c.elseOp.Next(ctx)
if origLen > 0 {
inputCol := c.buffer.batch.Batch.ColVec(c.thenIdxs[len(c.thenIdxs)-1])
batch := c.elseOp.Next(ctx)
if batch.Length() > 0 {
inputCol := batch.ColVec(c.thenIdxs[len(c.thenIdxs)-1])
outputCol.Copy(
coldata.CopySliceArgs{
SliceArgs: coldata.SliceArgs{
Expand All @@ -229,10 +256,11 @@ func (c *caseOp) Next(ctx context.Context) coldata.Batch {
})
}
})
batch.SetLength(origLen)
batch.SetSelection(origHasSel)
// Restore the original state of the buffered batch.
c.buffer.batch.SetLength(origLen)
c.buffer.batch.SetSelection(origHasSel)
if origHasSel {
copy(batch.Selection()[:origLen], c.origSel[:origLen])
copy(c.buffer.batch.Selection()[:origLen], c.origSel[:origLen])
}
return batch
return c.buffer.batch
}
1 change: 0 additions & 1 deletion pkg/sql/colexec/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -1443,7 +1443,6 @@ func planProjectionOperators(
}

buffer := NewBufferOp(input)
internalMemUsed += buffer.(InternalMemoryOperator).InternalMemoryUsage()
caseOps := make([]Operator, len(t.Whens))
caseOutputType := typeconv.FromColumnType(t.ResolvedType())
caseOutputIdx := len(columnTypes)
Expand Down

0 comments on commit b96fa11

Please sign in to comment.