Skip to content

Commit

Permalink
Merge #40593
Browse files Browse the repository at this point in the history
40593: exec: add UnsetNulls to ResetInternalBatch r=yuzefovich,rafiss a=asubiotto

First two commits are fixes related to unsetting nulls in ResetInternalBatch in the aggregator and hashjoiner while the last commit is the actual switch flip as well as the testing addition.

Co-authored-by: Alfonso Subiotto Marqués <[email protected]>
  • Loading branch information
craig[bot] and asubiotto committed Sep 10, 2019
2 parents 1b75c93 + ae1e1e5 commit fb6a397
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 37 deletions.
1 change: 1 addition & 0 deletions pkg/col/coldata/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func (m *MemBatch) Reset(types []coltypes.T, length int) {
func (m *MemBatch) ResetInternalBatch() {
m.SetSelection(false)
for _, v := range m.b {
v.Nulls().UnsetNulls()
if v.Type() == coltypes.Bytes {
v.Bytes().Reset()
}
Expand Down
51 changes: 48 additions & 3 deletions pkg/sql/exec/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,23 @@ type orderedAggregator struct {
// function operators write directly to this output batch.
scratch struct {
coldata.Batch
// shouldResetInternalBatch keeps track of whether the scratch.Batch should
// be reset. It is false in cases where we have overflow results still to
// return and therefore do not want to modify the batch.
shouldResetInternalBatch bool
// resumeIdx is the index at which the aggregation functions should start
// writing to on the next iteration of Next().
resumeIdx int
// outputSize is col.BatchSize by default.
outputSize int
}

// unsafeBatch is a coldata.Batch returned when only a subset of the
// scratch.Batch results is returned (i.e. work needs to be resumed on the
// next Next call). The values to return are copied into this batch to protect
// against downstream modification of the internal batch.
unsafeBatch coldata.Batch

// groupCol is the slice that aggregateFuncs use to determine whether a value
// is part of the current aggregation group. See aggregateFunc.Init for more
// information.
Expand Down Expand Up @@ -246,6 +256,7 @@ func (a *orderedAggregator) initWithBatchSize(inputSize, outputSize int) {
vec := a.scratch.ColVec(i)
a.aggregateFuncs[i].Init(a.groupCol, vec)
}
a.unsafeBatch = coldata.NewMemBatchWithSize(a.outputTypes, outputSize)
a.scratch.outputSize = outputSize
}

Expand All @@ -254,7 +265,10 @@ func (a *orderedAggregator) Init() {
}

func (a *orderedAggregator) Next(ctx context.Context) coldata.Batch {
a.scratch.ResetInternalBatch()
if a.scratch.shouldResetInternalBatch {
a.scratch.ResetInternalBatch()
a.scratch.shouldResetInternalBatch = false
}
if a.done {
a.scratch.SetLength(0)
return a.scratch
Expand Down Expand Up @@ -282,7 +296,21 @@ func (a *orderedAggregator) Next(ctx context.Context) coldata.Batch {
if a.scratch.resumeIdx >= a.scratch.outputSize {
// We still have overflow output values.
a.scratch.SetLength(uint16(a.scratch.outputSize))
return a.scratch
for i := 0; i < len(a.outputTypes); i++ {
a.unsafeBatch.ColVec(i).Copy(
coldata.CopySliceArgs{
SliceArgs: coldata.SliceArgs{
Src: a.scratch.ColVec(i),
ColType: a.outputTypes[i],
SrcStartIdx: 0,
SrcEndIdx: uint64(a.scratch.Length()),
},
},
)
}
a.unsafeBatch.SetLength(a.scratch.Length())
a.scratch.shouldResetInternalBatch = false
return a.unsafeBatch
}
}

Expand Down Expand Up @@ -318,13 +346,30 @@ func (a *orderedAggregator) Next(ctx context.Context) coldata.Batch {
copy(a.groupCol, zeroBoolColumn)
}

batchToReturn := a.scratch.Batch
if a.scratch.resumeIdx > a.scratch.outputSize {
a.scratch.SetLength(uint16(a.scratch.outputSize))
for i := 0; i < len(a.outputTypes); i++ {
a.unsafeBatch.ColVec(i).Copy(
coldata.CopySliceArgs{
SliceArgs: coldata.SliceArgs{
Src: a.scratch.ColVec(i),
ColType: a.outputTypes[i],
SrcStartIdx: 0,
SrcEndIdx: uint64(a.scratch.Length()),
},
},
)
}
a.unsafeBatch.SetLength(a.scratch.Length())
batchToReturn = a.unsafeBatch
a.scratch.shouldResetInternalBatch = false
} else {
a.scratch.SetLength(uint16(a.scratch.resumeIdx))
a.scratch.shouldResetInternalBatch = true
}

return a.scratch
return batchToReturn
}

// reset resets the orderedAggregator for another run. Primarily used for
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/exec/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,20 @@ func (hj *hashJoinEqOp) Init() {

func (hj *hashJoinEqOp) Next(ctx context.Context) coldata.Batch {
hj.prober.batch.ResetInternalBatch()
return hj.nextInternal(ctx)
}

func (hj *hashJoinEqOp) nextInternal(ctx context.Context) coldata.Batch {
switch hj.runningState {
case hjBuilding:
hj.build(ctx)
return hj.Next(ctx)
return hj.nextInternal(ctx)
case hjProbing:
hj.prober.exec(ctx)

if hj.prober.batch.Length() == 0 && hj.builder.spec.outer {
hj.initEmitting()
return hj.Next(ctx)
return hj.nextInternal(ctx)
}
return hj.prober.batch
case hjEmittingUnmatched:
Expand Down
5 changes: 0 additions & 5 deletions pkg/sql/exec/mergejoiner_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1244,11 +1244,6 @@ func (o *mergeJoin_JOIN_TYPE_STRING_FILTER_INFO_STRINGOp) Next(ctx context.Conte
if o.needToResetOutput {
o.needToResetOutput = false
o.output.ResetInternalBatch()
for _, vec := range o.output.ColVecs() {
// We only need to explicitly reset nulls since the values will be
// copied over and the correct length will be set.
vec.Nulls().UnsetNulls()
}
}
o.initProberState(ctx)

Expand Down
91 changes: 64 additions & 27 deletions pkg/sql/exec/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ var orderedVerifier verifier = (*opTestOutput).Verify
// error if they aren't equal by set comparison (irrespective of order).
var unorderedVerifier verifier = (*opTestOutput).VerifyAnyOrder

// maybeHasNulls is a helper function that returns whether any of the columns in b
// (maybe) have nulls.
func maybeHasNulls(b coldata.Batch) bool {
for i := 0; i < b.Width(); i++ {
if b.ColVec(i).MaybeHasNulls() {
return true
}
}
return false
}

// runTests is a helper that automatically runs your tests with varied batch
// sizes and with and without a random selection vector.
// tups is the set of input tuples.
Expand All @@ -89,34 +100,60 @@ func runTests(
}
})

t.Run("verifySelResets", func(t *testing.T) {
// Verify that all operators have an unset selection vector even if an
// operator later in the chain sets one. This test ensures that operators
// that "own their own batches", such as any operator that has to reshape
// its output, always reset their selection vectors before returning a fresh
// batch.
inputSources := make([]Operator, len(tups))
for i, tup := range tups {
inputSources[i] = newOpTestInput(1 /* batchSize */, tup)
}
op, err := constructor(inputSources)
if err != nil {
t.Fatal(err)
t.Run("verifySelAndNullResets", func(t *testing.T) {
// This test ensures that operators that "own their own batches", such as
// any operator that has to reshape its output, are not affected by
// downstream modification of batches.
// We run the main loop twice: once to determine what the operator would
// output on its second Next call (we need the first call to Next to get a
// reference to a batch to modify), and a second time to modify the batch
// and verify that this does not change the operator output.
var secondBatchHasSelection, secondBatchHasNulls bool
for round := 0; round < 2; round++ {
inputSources := make([]Operator, len(tups))
for i, tup := range tups {
inputSources[i] = newOpTestInput(1 /* batchSize */, tup)
}
op, err := constructor(inputSources)
if err != nil {
t.Fatal(err)
}
op.Init()
ctx := context.Background()
b := op.Next(ctx)
if round == 1 {
if secondBatchHasSelection {
b.SetSelection(false)
} else {
b.SetSelection(true)
}
if secondBatchHasNulls {
// ResetInternalBatch will throw away the null information.
b.ResetInternalBatch()
} else {
for i := 0; i < b.Width(); i++ {
b.ColVec(i).Nulls().SetNulls()
}
}
}
b = op.Next(ctx)
if round == 0 {
secondBatchHasSelection = b.Selection() != nil
secondBatchHasNulls = maybeHasNulls(b)
}
if round == 1 {
if secondBatchHasSelection {
assert.NotNil(t, b.Selection())
} else {
assert.Nil(t, b.Selection())
}
if secondBatchHasNulls {
assert.True(t, maybeHasNulls(b))
} else {
assert.False(t, maybeHasNulls(b))
}
}
}
op.Init()
ctx := context.Background()
b := op.Next(ctx)
if b.Selection() != nil {
// We're testing an operator that needs to set a selection vector for some
// reason already, so we can't test the condition we're looking for.
return
}
// Set the selection vector by hand.
b.SetSelection(true)
b = op.Next(ctx)
// Make sure that the next time we call the operator, it has an empty
// selection vector.
assert.Nil(t, b.Selection())
})
}

Expand Down

0 comments on commit fb6a397

Please sign in to comment.