Skip to content

Commit

Permalink
Merge #38828
Browse files Browse the repository at this point in the history
38828: exec: handle nulls in AVG aggregator r=rafiss a=rafiss

closes #37738 

The random aggregation test uses float64 now so that AVG can be included in this test, until #38823 can be addressed.

```
BenchmarkAggregator/AVG/hash/Decimal/groupSize=1/hasNulls=false/numInputBatches=64-12       	      20	  72817604 ns/op	   7.20 MB/s
BenchmarkAggregator/AVG/hash/Decimal/groupSize=1/hasNulls=true/numInputBatches=64-12        	      20	  67807960 ns/op	   7.73 MB/s
BenchmarkAggregator/AVG/hash/Decimal/groupSize=2/hasNulls=false/numInputBatches=64-12       	      30	  45907478 ns/op	  11.42 MB/s
BenchmarkAggregator/AVG/hash/Decimal/groupSize=2/hasNulls=true/numInputBatches=64-12        	      30	  44468409 ns/op	  11.79 MB/s
BenchmarkAggregator/AVG/hash/Decimal/groupSize=512/hasNulls=false/numInputBatches=64-12     	     200	   8712009 ns/op	  60.18 MB/s
BenchmarkAggregator/AVG/hash/Decimal/groupSize=512/hasNulls=true/numInputBatches=64-12      	     200	   8852871 ns/op	  59.22 MB/s
BenchmarkAggregator/AVG/hash/Decimal/groupSize=1024/hasNulls=false/numInputBatches=64-12    	     200	   9058353 ns/op	  57.88 MB/s
BenchmarkAggregator/AVG/hash/Decimal/groupSize=1024/hasNulls=true/numInputBatches=64-12     	     200	   9289322 ns/op	  56.44 MB/s
BenchmarkAggregator/AVG/ordered/Decimal/groupSize=1/hasNulls=false/numInputBatches=64-12    	      20	  67161015 ns/op	   7.81 MB/s
BenchmarkAggregator/AVG/ordered/Decimal/groupSize=1/hasNulls=true/numInputBatches=64-12     	      20	  60712202 ns/op	   8.64 MB/s
BenchmarkAggregator/AVG/ordered/Decimal/groupSize=2/hasNulls=false/numInputBatches=64-12    	      30	  40953924 ns/op	  12.80 MB/s
BenchmarkAggregator/AVG/ordered/Decimal/groupSize=2/hasNulls=true/numInputBatches=64-12     	      30	  39335420 ns/op	  13.33 MB/s
BenchmarkAggregator/AVG/ordered/Decimal/groupSize=512/hasNulls=false/numInputBatches=64-12  	     300	   4201872 ns/op	 124.77 MB/s
BenchmarkAggregator/AVG/ordered/Decimal/groupSize=512/hasNulls=true/numInputBatches=64-12   	     300	   4202353 ns/op	 124.76 MB/s
BenchmarkAggregator/AVG/ordered/Decimal/groupSize=1024/hasNulls=false/numInputBatches=64-12 	     500	   3982187 ns/op	 131.66 MB/s
BenchmarkAggregator/AVG/ordered/Decimal/groupSize=1024/hasNulls=true/numInputBatches=64-12  	     500	   3766494 ns/op	 139.20 MB/s
```

Release note: None

Co-authored-by: Rafi Shamim <[email protected]>
  • Loading branch information
craig[bot] and rafiss committed Jul 11, 2019
2 parents 030ee0e + dbc93bd commit a8d548f
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 67 deletions.
63 changes: 38 additions & 25 deletions pkg/sql/exec/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package exec
import (
"context"
"fmt"
"math"
"testing"

"github.com/cockroachdb/apd"
Expand Down Expand Up @@ -418,8 +419,9 @@ func TestAggregatorAllFunctions(t *testing.T) {
distsqlpb.AggregatorSpec_SUM_INT,
distsqlpb.AggregatorSpec_MIN,
distsqlpb.AggregatorSpec_MAX,
distsqlpb.AggregatorSpec_AVG,
},
aggCols: [][]uint32{{0}, {1}, {}, {1}, {1}, {2}, {2}, {2}},
aggCols: [][]uint32{{0}, {1}, {}, {1}, {1}, {2}, {2}, {2}, {1}},
colTypes: []types.T{types.Int64, types.Decimal, types.Int64},
input: tuples{
{0, nil, nil},
Expand All @@ -428,8 +430,8 @@ func TestAggregatorAllFunctions(t *testing.T) {
{1, nil, nil},
},
expected: tuples{
{0, 3.1, 2, 1, 3.1, 5, 5, 5},
{1, nil, 2, 0, nil, nil, nil, nil},
{0, 3.1, 2, 1, 3.1, 5, 5, 5, 3.1},
{1, nil, 2, 0, nil, nil, nil, nil, nil},
},
convToDecimal: true,
},
Expand All @@ -446,7 +448,7 @@ func TestAggregatorAllFunctions(t *testing.T) {
[]tuples{tc.input},
tc.expected,
orderedVerifier,
[]int{0, 1, 2, 3, 4, 5, 6, 7}[:len(tc.expected[0])],
[]int{0, 1, 2, 3, 4, 5, 6, 7, 8}[:len(tc.expected[0])],
func(input []Operator) (Operator, error) {
return agg.new(input[0], tc.colTypes, tc.aggFns, tc.groupCols, tc.aggCols)
})
Expand All @@ -467,12 +469,13 @@ func TestAggregatorRandom(t *testing.T) {
t.Run(fmt.Sprintf("%s/groupSize=%d/numInputBatches=%d/hasNulls=%t", agg.name, groupSize, numInputBatches, hasNulls),
func(t *testing.T) {
nTuples := coldata.BatchSize * numInputBatches
typs := []types.T{types.Int64, types.Int64}
typs := []types.T{types.Int64, types.Float64}
cols := []coldata.Vec{
coldata.NewMemColumn(typs[0], nTuples),
coldata.NewMemColumn(typs[1], nTuples)}
groups, aggCol, aggColNulls := cols[0].Int64(), cols[1].Int64(), cols[1].Nulls()
var expRowCounts, expCounts, expSums, expMins, expMaxs []int64
groups, aggCol, aggColNulls := cols[0].Int64(), cols[1].Float64(), cols[1].Nulls()
var expRowCounts, expCounts []int64
var expSums, expMins, expMaxs []float64
// SUM, MIN, MAX, and AVG aggregators can output null.
var expNulls []bool
curGroup := -1
Expand All @@ -489,10 +492,10 @@ func TestAggregatorRandom(t *testing.T) {
if hasNulls && rng.Float64() < nullProbability {
aggColNulls.SetNull(uint16(i))
} else {
// Keep the inputs small so they are a realistic size. Using
// values in the range [0, 2^63) is not realistic and makes
// decimal operations slower.
aggCol[i] = rng.Int63()%2048 - 1024
// Keep the inputs small so they are a realistic size. Using a
// large range is not realistic and makes decimal operations
// slower.
aggCol[i] = 2048 * (rng.Float64() - 0.5)
expNulls[curGroup] = false
expCounts[curGroup]++
expSums[curGroup] += aggCol[i]
Expand All @@ -511,9 +514,10 @@ func TestAggregatorRandom(t *testing.T) {
distsqlpb.AggregatorSpec_COUNT,
distsqlpb.AggregatorSpec_SUM_INT,
distsqlpb.AggregatorSpec_MIN,
distsqlpb.AggregatorSpec_MAX},
distsqlpb.AggregatorSpec_MAX,
distsqlpb.AggregatorSpec_AVG},
[]uint32{0},
[][]uint32{{}, {1}, {1}, {1}, {1}},
[][]uint32{{}, {1}, {1}, {1}, {1}, {1}},
)
if err != nil {
t.Fatal(err)
Expand All @@ -529,12 +533,14 @@ func TestAggregatorRandom(t *testing.T) {
sumCol := b.ColVec(2)
minCol := b.ColVec(3)
maxCol := b.ColVec(4)
avgCol := b.ColVec(5)
for j := uint16(0); j < b.Length(); j++ {
rowCount := rowCountCol.Int64()[j]
count := countCol.Int64()[j]
sum := sumCol.Int64()[j]
min := minCol.Int64()[j]
max := maxCol.Int64()[j]
sum := sumCol.Float64()[j]
min := minCol.Float64()[j]
max := maxCol.Float64()[j]
avg := avgCol.Float64()[j]
expRowCount := expRowCounts[tupleIdx]
if rowCount != expRowCount {
t.Fatalf("Found rowCount %d, expected %d, idx %d of batch %d", rowCount, expRowCount, j, i)
Expand All @@ -547,26 +553,33 @@ func TestAggregatorRandom(t *testing.T) {
expNull := expNulls[tupleIdx]
if expNull {
if !sumCol.Nulls().NullAt(uint16(j)) {
t.Fatalf("Found non-null sum %d, expected null, idx %d of batch %d", sum, j, i)
t.Fatalf("Found non-null sum %f, expected null, idx %d of batch %d", sum, j, i)
}
if !minCol.Nulls().NullAt(uint16(j)) {
t.Fatalf("Found non-null min %d, expected null, idx %d of batch %d", sum, j, i)
t.Fatalf("Found non-null min %f, expected null, idx %d of batch %d", sum, j, i)
}
if !maxCol.Nulls().NullAt(uint16(j)) {
t.Fatalf("Found non-null max %d, expected null, idx %d of batch %d", sum, j, i)
t.Fatalf("Found non-null max %f, expected null, idx %d of batch %d", sum, j, i)
}
if !avgCol.Nulls().NullAt(uint16(j)) {
t.Fatalf("Found non-null avg %f, expected null, idx %d of batch %d", sum, j, i)
}
} else {
expSum := expSums[tupleIdx]
if sum != expSum {
t.Fatalf("Found sum %d, expected %d, idx %d of batch %d", sum, expSum, j, i)
if math.Abs(sum-expSum) > 1e-6 {
t.Fatalf("Found sum %f, expected %f, idx %d of batch %d", sum, expSum, j, i)
}
expMin := expMins[tupleIdx]
if min != expMin {
t.Fatalf("Found min %d, expected %d, idx %d of batch %d", min, expMin, j, i)
t.Fatalf("Found min %f, expected %f, idx %d of batch %d", min, expMin, j, i)
}
expMax := expMaxs[tupleIdx]
if max != expMax {
t.Fatalf("Found max %d, expected %d, idx %d of batch %d", max, expMax, j, i)
t.Fatalf("Found max %f, expected %f, idx %d of batch %d", max, expMax, j, i)
}
expAvg := expSum / float64(expCount)
if math.Abs(avg-expAvg) > 1e-6 {
t.Fatalf("Found avg %f, expected %f, idx %d of batch %d", avg, expAvg, j, i)
}
}
tupleIdx++
Expand Down Expand Up @@ -810,14 +823,14 @@ func TestHashAggregator(t *testing.T) {
}
}

func min64(a, b int64) int64 {
func min64(a, b float64) float64 {
if a < b {
return a
}
return b
}

func max64(a, b int64) int64 {
func max64(a, b float64) float64 {
if a > b {
return a
}
Expand Down
146 changes: 108 additions & 38 deletions pkg/sql/exec/avg_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ func _ASSIGN_DIV_INT64(_, _, _ string) {
panic("")
}

// _ASSIGN_ADD is the template addition function for assigning the first input
// to the result of the second input + the third input.
func _ASSIGN_ADD(_, _, _ string) {
panic("")
}

// */}}

func newAvgAgg(t types.T) (aggregateFunc, error) {
Expand All @@ -64,14 +70,20 @@ type avg_TYPEAgg struct {
groups []bool
scratch struct {
curIdx int
// groupSums[i] keeps track of the sum of elements belonging to the ith
// group.
groupSums []_GOTYPE
// groupCounts[i] keeps track of the number of elements that we've seen
// belonging to the ith group.
groupCounts []int64
// curSum keeps track of the sum of elements belonging to the current group,
// so we can index into the slice once per group, instead of on each
// iteration.
curSum _GOTYPE
// curCount keeps track of the number of elements that we've seen
// belonging to the current group.
curCount int64
// vec points to the output vector.
vec []_GOTYPE
// nulls points to the output null vector that we are updating.
nulls *coldata.Nulls
// foundNonNullForCurrentGroup tracks if we have seen any non-null values
// for the group that is currently being aggregated.
foundNonNullForCurrentGroup bool
}
}

Expand All @@ -80,16 +92,17 @@ var _ aggregateFunc = &avg_TYPEAgg{}
func (a *avg_TYPEAgg) Init(groups []bool, v coldata.Vec) {
a.groups = groups
a.scratch.vec = v._TemplateType()
a.scratch.groupSums = make([]_GOTYPE, len(a.scratch.vec))
a.scratch.groupCounts = make([]int64, len(a.scratch.vec))
a.scratch.nulls = v.Nulls()
a.Reset()
}

func (a *avg_TYPEAgg) Reset() {
copy(a.scratch.groupSums, zero_TYPEColumn)
copy(a.scratch.groupCounts, zeroInt64Column)
copy(a.scratch.vec, zero_TYPEColumn)
a.scratch.curIdx = -1
a.scratch.curSum = zero_TYPEColumn[0]
a.scratch.curCount = 0
a.scratch.foundNonNullForCurrentGroup = false
a.scratch.nulls.UnsetNulls()
a.done = false
}

Expand All @@ -100,10 +113,7 @@ func (a *avg_TYPEAgg) CurrentOutputIndex() int {
func (a *avg_TYPEAgg) SetOutputIndex(idx int) {
if a.scratch.curIdx != -1 {
a.scratch.curIdx = idx
copy(a.scratch.groupSums[idx+1:], zero_TYPEColumn)
copy(a.scratch.groupCounts[idx+1:], zeroInt64Column)
// TODO(asubiotto): We might not have to zero a.scratch.vec since we
// overwrite with an independent value.
a.scratch.nulls.UnsetNullsAfter(uint16(idx + 1))
copy(a.scratch.vec[idx+1:], zero_TYPEColumn)
}
}
Expand All @@ -114,42 +124,102 @@ func (a *avg_TYPEAgg) Compute(b coldata.Batch, inputIdxs []uint32) {
}
inputLen := b.Length()
if inputLen == 0 {
// The aggregation is finished. Flush the last value.

// The aggregation is finished. Flush the last value. If we haven't found
// any non-nulls for this group so far, the output for this group should
// be null. If a.scratch.curIdx is negative, it means the input has zero rows, and
// there should be no output at all.
if a.scratch.curIdx >= 0 {
_ASSIGN_DIV_INT64("a.scratch.vec[a.scratch.curIdx]", "a.scratch.groupSums[a.scratch.curIdx]", "a.scratch.groupCounts[a.scratch.curIdx]")
if !a.scratch.foundNonNullForCurrentGroup {
a.scratch.nulls.SetNull(uint16(a.scratch.curIdx))
} else {
_ASSIGN_DIV_INT64("a.scratch.vec[a.scratch.curIdx]", "a.scratch.curSum", "a.scratch.curCount")
}
}
a.scratch.curIdx++
a.done = true
return
}
col, sel := b.ColVec(int(inputIdxs[0]))._TemplateType(), b.Selection()
if sel != nil {
sel = sel[:inputLen]
for _, i := range sel {
x := 0
if a.groups[i] {
x = 1
vec, sel := b.ColVec(int(inputIdxs[0])), b.Selection()
col, nulls := vec._TemplateType(), vec.Nulls()
if nulls.MaybeHasNulls() {
if sel != nil {
sel = sel[:inputLen]
for _, i := range sel {
_ACCUMULATE_AVG(a, nulls, i, true)
}
} else {
col = col[:inputLen]
for i := range col {
_ACCUMULATE_AVG(a, nulls, i, true)
}
a.scratch.curIdx += x
_ASSIGN_ADD("a.scratch.groupSums[a.scratch.curIdx]", "a.scratch.groupSums[a.scratch.curIdx]", "col[i]")
a.scratch.groupCounts[a.scratch.curIdx]++
}
} else {
col = col[:inputLen]
for i := range col {
x := 0
if a.groups[i] {
x = 1
if sel != nil {
sel = sel[:inputLen]
for _, i := range sel {
_ACCUMULATE_AVG(a, nulls, i, false)
}
} else {
col = col[:inputLen]
for i := range col {
_ACCUMULATE_AVG(a, nulls, i, false)
}
a.scratch.curIdx += x
_ASSIGN_ADD("a.scratch.groupSums[a.scratch.curIdx]", "a.scratch.groupSums[a.scratch.curIdx]", "col[i]")
a.scratch.groupCounts[a.scratch.curIdx]++
}
}

for i := 0; i < a.scratch.curIdx; i++ {
_ASSIGN_DIV_INT64("a.scratch.vec[i]", "a.scratch.groupSums[i]", "a.scratch.groupCounts[i]")
}
}

// {{end}}

// {{/*
// _ACCUMULATE_AVG updates the total sum/count for current group using the value
// of the ith row. If this is the first row of a new group, then the average is
// computed for the current group. If no non-nulls have been found for the
// current group, then the output for the current group is set to null.
func _ACCUMULATE_AVG(a *_AGG_TYPEAgg, nulls *coldata.Nulls, i int, _HAS_NULLS bool) { // */}}

// {{define "accumulateAvg"}}
if a.groups[i] {
// If we encounter a new group, and we haven't found any non-nulls for the
// current group, the output for this group should be null. If
// a.scratch.curIdx is negative, it means that this is the first group.
if a.scratch.curIdx >= 0 {
if !a.scratch.foundNonNullForCurrentGroup {
a.scratch.nulls.SetNull(uint16(a.scratch.curIdx))
} else {
// {{with .Global}}
_ASSIGN_DIV_INT64("a.scratch.vec[a.scratch.curIdx]", "a.scratch.curSum", "a.scratch.curCount")
// {{end}}
}
}
a.scratch.curIdx++

// {{/*
// We only need to reset this flag if there are nulls. If there are no
// nulls, this will be updated unconditionally below.
// */}}
// {{ if .HasNulls }}
a.scratch.foundNonNullForCurrentGroup = false
// {{ end }}

// The next element of vec is guaranteed to be initialized to the zero
// value. We can't use zero_TYPEColumn here because this is outside of
// the earlier template block.
a.scratch.curSum = a.scratch.vec[a.scratch.curIdx]
a.scratch.curCount = 0
}
var isNull bool
// {{ if .HasNulls }}
isNull = nulls.NullAt(uint16(i))
// {{ else }}
isNull = false
// {{ end }}
if !isNull {
_ASSIGN_ADD("a.scratch.curSum", "a.scratch.curSum", "col[i]")
a.scratch.curCount++
a.scratch.foundNonNullForCurrentGroup = true
}
// {{end}}

// {{/*
} // */}}
7 changes: 5 additions & 2 deletions pkg/sql/exec/execgen/cmd/execgen/avg_agg_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,12 @@ func genAvgAgg(wr io.Writer) error {
assignDivRe := regexp.MustCompile(`_ASSIGN_DIV_INT64\((.*),(.*),(.*)\)`)
s = assignDivRe.ReplaceAllString(s, "{{.AssignDivInt64 $1 $2 $3}}")
assignAddRe := regexp.MustCompile(`_ASSIGN_ADD\((.*),(.*),(.*)\)`)
s = assignAddRe.ReplaceAllString(s, "{{.AssignAdd $1 $2 $3}}")
s = assignAddRe.ReplaceAllString(s, "{{.Global.AssignAdd $1 $2 $3}}")

tmpl, err := template.New("avg_agg").Parse(s)
accumulateAvg := makeFunctionRegex("_ACCUMULATE_AVG", 4)
s = accumulateAvg.ReplaceAllString(s, `{{template "accumulateAvg" buildDict "Global" . "HasNulls" $4}}`)

tmpl, err := template.New("avg_agg").Funcs(template.FuncMap{"buildDict": buildDict}).Parse(s)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/exec/execgen/cmd/execgen/min_max_agg_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func genMinMaxAgg(wr io.Writer) error {
assignCmpRe := regexp.MustCompile(`_ASSIGN_CMP\((.*),(.*),(.*)\)`)
s = assignCmpRe.ReplaceAllString(s, "{{.Global.Assign $1 $2 $3}}")

accumulateSum := makeFunctionRegex("_ACCUMULATE_MINMAX", 4)
s = accumulateSum.ReplaceAllString(s, `{{template "accumulateMinMax" buildDict "Global" . "HasNulls" $4}}`)
accumulateMinMax := makeFunctionRegex("_ACCUMULATE_MINMAX", 4)
s = accumulateMinMax.ReplaceAllString(s, `{{template "accumulateMinMax" buildDict "Global" . "HasNulls" $4}}`)

tmpl, err := template.New("min_max_agg").Funcs(template.FuncMap{"buildDict": buildDict}).Parse(s)
if err != nil {
Expand Down

0 comments on commit a8d548f

Please sign in to comment.