Skip to content

Commit

Permalink
Merge #74437
Browse files Browse the repository at this point in the history
74437: colexecagg: reduce the size of hash aggregates r=yuzefovich a=yuzefovich

**colexec: fix a recent bug with aggTypes**

When we introduced the hash aggregation with partial order support, we
mistakenly removed the ordered aggregation from `aggTypes` slice that is
used in some tests as well as in the benchmarks. This is now fixed.

Release note: None

**colexecagg: replace concrete slices with native type aliases**

This commit replaces concrete slices (like `[]int64`) with the
corresponding native type aliases (like `coldata.Int64s`). This allows
us to use inlined `Set` methods.

Release note: None

**colexecagg: remove some redundant COPYVAL calls**

This commit removes several `execgen.COPYVAL` calls that were redundant
because the first and the second argument are the same. These calls are
redundant because we already performed the same call right after
calling `Get` from the original vector and we will perform a deep copy
when calling `Set` next.

Release note: None

**colexecagg: reduce the size of hash aggregates**

This commit reduces the size of the hash aggregates by removing the
reference to the well-typed column (i.e. a concrete unwrapped
`coldata.Vec`, something like `[]int64`). This is possible because the
hash aggregates only access the concrete column once, in `Flush`, so
there is no point in storing the concrete column as we do for the
ordered aggregates. We still perform the interface dispatch call only
once - previously it was in `SetOutput`, now it is in `Flush`. This
should be a non-trivial improvement since the hash aggregation uses
a separate aggregation function object for each bucket.

This change also allows us to remove the overriding of `SetOutput`
method implementation provided by the base struct from the hash and
window aggregates.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Jan 5, 2022
2 parents ced0dd4 + 2f50c4c commit 00cb5d1
Show file tree
Hide file tree
Showing 38 changed files with 420 additions and 1,120 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/colexec/aggregators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ var aggTypesWithPartial = []aggType{
},
}

var aggTypes = aggTypesWithPartial[:1]
var aggTypes = aggTypesWithPartial[:2]

func (tc *aggregatorTestCase) init() error {
if tc.convToDecimal {
Expand Down
14 changes: 8 additions & 6 deletions pkg/sql/colexec/colexecagg/any_not_null_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,25 +88,24 @@ func newAnyNotNull_AGGKINDAggAlloc(
type anyNotNull_TYPE_AGGKINDAgg struct {
// {{if eq "_AGGKIND" "Ordered"}}
orderedAggregateFuncBase
col _GOTYPESLICE
// {{else}}
unorderedAggregateFuncBase
// {{end}}
col _GOTYPESLICE
curAgg _GOTYPE
foundNonNullForCurrentGroup bool
}

var _ AggregateFunc = &anyNotNull_TYPE_AGGKINDAgg{}

// {{if eq "_AGGKIND" "Ordered"}}
func (a *anyNotNull_TYPE_AGGKINDAgg) SetOutput(vec coldata.Vec) {
// {{if eq "_AGGKIND" "Ordered"}}
a.orderedAggregateFuncBase.SetOutput(vec)
// {{else}}
a.unorderedAggregateFuncBase.SetOutput(vec)
// {{end}}
a.col = vec.TemplateType()
}

// {{end}}

func (a *anyNotNull_TYPE_AGGKINDAgg) Compute(
vecs []coldata.Vec, inputIdxs []uint32, startIdx, endIdx int, sel []int,
) {
Expand Down Expand Up @@ -175,11 +174,14 @@ func (a *anyNotNull_TYPE_AGGKINDAgg) Flush(outputIdx int) {
_ = outputIdx
outputIdx = a.curIdx
a.curIdx++
col := a.col
// {{else}}
col := a.vec.TemplateType()
// {{end}}
if !a.foundNonNullForCurrentGroup {
a.nulls.SetNull(outputIdx)
} else {
a.col.Set(outputIdx, a.curAgg)
col.Set(outputIdx, a.curAgg)
}
// {{if or (.IsBytesLike) (eq .VecMethod "Datum")}}
// Release the reference to curAgg eagerly.
Expand Down
16 changes: 9 additions & 7 deletions pkg/sql/colexec/colexecagg/avg_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ func newAvg_AGGKINDAggAlloc(
type avg_TYPE_AGGKINDAgg struct {
// {{if eq "_AGGKIND" "Ordered"}}
orderedAggregateFuncBase
// col points to the statically-typed output vector.
col _RET_GOTYPESLICE
// {{else}}
unorderedAggregateFuncBase
// {{end}}
Expand All @@ -104,8 +106,6 @@ type avg_TYPE_AGGKINDAgg struct {
// curCount keeps track of the number of non-null elements that we've seen
// belonging to the current group.
curCount int64
// col points to the statically-typed output vector.
col []_RET_GOTYPE
// {{if .NeedsHelper}}
// {{/*
// overloadHelper is used only when we perform the summation of integers
Expand All @@ -118,15 +118,14 @@ type avg_TYPE_AGGKINDAgg struct {

var _ AggregateFunc = &avg_TYPE_AGGKINDAgg{}

// {{if eq "_AGGKIND" "Ordered"}}
func (a *avg_TYPE_AGGKINDAgg) SetOutput(vec coldata.Vec) {
// {{if eq "_AGGKIND" "Ordered"}}
a.orderedAggregateFuncBase.SetOutput(vec)
// {{else}}
a.unorderedAggregateFuncBase.SetOutput(vec)
// {{end}}
a.col = vec._RET_TYPE()
}

// {{end}}

func (a *avg_TYPE_AGGKINDAgg) Compute(
vecs []coldata.Vec, inputIdxs []uint32, startIdx, endIdx int, sel []int,
) {
Expand Down Expand Up @@ -213,11 +212,14 @@ func (a *avg_TYPE_AGGKINDAgg) Flush(outputIdx int) {
_ = outputIdx
outputIdx = a.curIdx
a.curIdx++
col := a.col
// {{else}}
col := a.vec._RET_TYPE()
// {{end}}
if a.curCount == 0 {
a.nulls.SetNull(outputIdx)
} else {
_ASSIGN_DIV_INT64(a.col[outputIdx], a.curSum, a.curCount, a.col, _, _)
_ASSIGN_DIV_INT64(col[outputIdx], a.curSum, a.curCount, a.col, _, _)
}
}

Expand Down
14 changes: 8 additions & 6 deletions pkg/sql/colexec/colexecagg/bool_and_or_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ func newBool_OP_TYPE_AGGKINDAggAlloc(
type bool_OP_TYPE_AGGKINDAgg struct {
// {{if eq "_AGGKIND" "Ordered"}}
orderedAggregateFuncBase
col coldata.Bools
// {{else}}
unorderedAggregateFuncBase
// {{end}}
col []bool
curAgg bool
// foundNonNullForCurrentGroup tracks if we have seen any non-null values
// for the group that is currently being aggregated.
Expand All @@ -71,15 +71,14 @@ type bool_OP_TYPE_AGGKINDAgg struct {

var _ AggregateFunc = &bool_OP_TYPE_AGGKINDAgg{}

// {{if eq "_AGGKIND" "Ordered"}}
func (a *bool_OP_TYPE_AGGKINDAgg) SetOutput(vec coldata.Vec) {
// {{if eq "_AGGKIND" "Ordered"}}
a.orderedAggregateFuncBase.SetOutput(vec)
// {{else}}
a.unorderedAggregateFuncBase.SetOutput(vec)
// {{end}}
a.col = vec.Bool()
}

// {{end}}

func (a *bool_OP_TYPE_AGGKINDAgg) Compute(
vecs []coldata.Vec, inputIdxs []uint32, startIdx, endIdx int, sel []int,
) {
Expand Down Expand Up @@ -153,11 +152,14 @@ func (a *bool_OP_TYPE_AGGKINDAgg) Flush(outputIdx int) {
_ = outputIdx
outputIdx = a.curIdx
a.curIdx++
col := a.col
// {{else}}
col := a.vec.Bool()
// {{end}}
if !a.foundNonNullForCurrentGroup {
a.nulls.SetNull(outputIdx)
} else {
a.col[outputIdx] = a.curAgg
col[outputIdx] = a.curAgg
}
}

Expand Down
16 changes: 9 additions & 7 deletions pkg/sql/colexec/colexecagg/concat_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,26 @@ func newConcat_AGGKINDAggAlloc(allocator *colmem.Allocator, allocSize int64) agg
type concat_AGGKINDAgg struct {
// {{if eq "_AGGKIND" "Ordered"}}
orderedAggregateFuncBase
// col points to the output vector we are updating.
col *coldata.Bytes
// {{else}}
unorderedAggregateFuncBase
// {{end}}
// curAgg holds the running total.
curAgg []byte
// col points to the output vector we are updating.
col *coldata.Bytes
// foundNonNullForCurrentGroup tracks if we have seen any non-null values
// for the group that is currently being aggregated.
foundNonNullForCurrentGroup bool
}

// {{if eq "_AGGKIND" "Ordered"}}
func (a *concat_AGGKINDAgg) SetOutput(vec coldata.Vec) {
// {{if eq "_AGGKIND" "Ordered"}}
a.orderedAggregateFuncBase.SetOutput(vec)
// {{else}}
a.unorderedAggregateFuncBase.SetOutput(vec)
// {{end}}
a.col = vec.Bytes()
}

// {{end}}

func (a *concat_AGGKINDAgg) Compute(
vecs []coldata.Vec, inputIdxs []uint32, startIdx, endIdx int, sel []int,
) {
Expand Down Expand Up @@ -130,11 +129,14 @@ func (a *concat_AGGKINDAgg) Flush(outputIdx int) {
_ = outputIdx
outputIdx = a.curIdx
a.curIdx++
col := a.col
// {{else}}
col := a.vec.Bytes()
// {{end}}
if !a.foundNonNullForCurrentGroup {
a.nulls.SetNull(outputIdx)
} else {
a.col.Set(outputIdx, a.curAgg)
col.Set(outputIdx, a.curAgg)
}
// Release the reference to curAgg eagerly.
a.allocator.AdjustMemoryUsage(-int64(len(a.curAgg)))
Expand Down
14 changes: 8 additions & 6 deletions pkg/sql/colexec/colexecagg/count_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,23 @@ func newCount_COUNTKIND_AGGKINDAggAlloc(
type count_COUNTKIND_AGGKINDAgg struct {
// {{if eq "_AGGKIND" "Ordered"}}
orderedAggregateFuncBase
col coldata.Int64s
// {{else}}
unorderedAggregateFuncBase
// {{end}}
col []int64
curAgg int64
}

var _ AggregateFunc = &count_COUNTKIND_AGGKINDAgg{}

// {{if eq "_AGGKIND" "Ordered"}}
func (a *count_COUNTKIND_AGGKINDAgg) SetOutput(vec coldata.Vec) {
// {{if eq "_AGGKIND" "Ordered"}}
a.orderedAggregateFuncBase.SetOutput(vec)
// {{else}}
a.unorderedAggregateFuncBase.SetOutput(vec)
// {{end}}
a.col = vec.Int64()
}

// {{end}}

func (a *count_COUNTKIND_AGGKINDAgg) Compute(
vecs []coldata.Vec, inputIdxs []uint32, startIdx, endIdx int, sel []int,
) {
Expand Down Expand Up @@ -146,8 +145,11 @@ func (a *count_COUNTKIND_AGGKINDAgg) Flush(outputIdx int) {
_ = outputIdx
outputIdx = a.curIdx
a.curIdx++
col := a.col
// {{else}}
col := a.vec.Int64()
// {{end}}
a.col[outputIdx] = a.curAgg
col[outputIdx] = a.curAgg
}

// {{if eq "_AGGKIND" "Ordered"}}
Expand Down
8 changes: 0 additions & 8 deletions pkg/sql/colexec/colexecagg/default_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,6 @@ type default_AGGKINDAgg struct {

var _ AggregateFunc = &default_AGGKINDAgg{}

func (a *default_AGGKINDAgg) SetOutput(vec coldata.Vec) {
// {{if eq "_AGGKIND" "Ordered"}}
a.orderedAggregateFuncBase.SetOutput(vec)
// {{else}}
a.unorderedAggregateFuncBase.SetOutput(vec)
// {{end}}
}

func (a *default_AGGKINDAgg) Compute(
vecs []coldata.Vec, inputIdxs []uint32, startIdx, endIdx int, sel []int,
) {
Expand Down
Loading

0 comments on commit 00cb5d1

Please sign in to comment.