Skip to content

Commit

Permalink
Fix the MergeGroup operator never sync the chunksize between groupby …
Browse files Browse the repository at this point in the history
…and aggresult. (#20794)

1. MergeGroup算子没有同步group by列,和Agg Result的分块size,这会最终结果不符合要求。
2. 移除了MergeGroup算子直接拿走第一个Agg结果的优化,避免出现该Agg行数大于sync chunksize的情况。

Approved by: @aunjgr
  • Loading branch information
m-schen authored Dec 17, 2024
1 parent e791f93 commit 8b4f1dc
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 25 deletions.
14 changes: 14 additions & 0 deletions pkg/sql/colexec/aggexec/aggFrame_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,17 @@ func TestFixedToFixedFrameWork_withExecContext(t *testing.T) {
[]int64{1, 2, 3, 4, 5}, nil, int64(15/5), false,
[]int64{2, 3, 4, 5, 6}, []int{3}, int64(15/4), false)
}

func TestMakeInitialAggListFromList(t *testing.T) {
mp := mpool.MustNewZero()

RegisterGroupConcatAgg(123, ",")
mg := NewSimpleAggMemoryManager(mp)
agg0 := MakeAgg(mg, 123, true, []types.Type{types.T_varchar.ToType()}...)

res := MakeInitialAggListFromList(mg, []AggFuncExec{agg0})

require.Equal(t, 1, len(res))
require.Equal(t, int64(123), res[0].AggID())
require.Equal(t, true, res[0].IsDistinct())
}
22 changes: 11 additions & 11 deletions pkg/sql/colexec/aggexec/tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,23 +93,23 @@ func getChunkSizeOfAggregator(a AggFuncExec) int {
return math.MaxInt64
}

func SyncAggregatorsChunkSize(outer []*vector.Vector, as []AggFuncExec, doSync bool) (syncLimit int) {
m := math.MaxInt64
func GetMinAggregatorsChunkSize(outer []*vector.Vector, as []AggFuncExec) (minLimit int) {
minLimit = math.MaxInt64
for _, o := range outer {
if s := GetChunkSizeFromType(*o.GetType()); s < m {
m = s
if s := GetChunkSizeFromType(*o.GetType()); s < minLimit {
minLimit = s
}
}
for _, a := range as {
if s := getChunkSizeOfAggregator(a); s < m {
m = s
if s := getChunkSizeOfAggregator(a); s < minLimit {
minLimit = s
}
}
return minLimit
}

if doSync {
for i := range as {
modifyChunkSizeOfAggregator(as[i], m)
}
func SyncAggregatorsToChunkSize(as []AggFuncExec, syncLimit int) {
for _, a := range as {
modifyChunkSizeOfAggregator(a, syncLimit)
}
return m
}
9 changes: 9 additions & 0 deletions pkg/sql/colexec/aggexec/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,15 @@ func MakeAgg(
panic(fmt.Sprintf("unexpected aggID %d and param types %v.", aggID, param))
}

func MakeInitialAggListFromList(mg AggMemoryManager, list []AggFuncExec) []AggFuncExec {
result := make([]AggFuncExec, 0, len(list))
for _, v := range list {
param, _ := v.TypesInfo()
result = append(result, MakeAgg(mg, v.AggID(), v.IsDistinct(), param...))
}
return result
}

// makeSingleAgg supports to create an aggregation function executor for single column.
func makeSingleAgg(
mg AggMemoryManager,
Expand Down
6 changes: 2 additions & 4 deletions pkg/sql/colexec/group/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,8 @@ func (group *Group) generateInitialResult1WithoutGroupBy(proc *process.Process)
if err != nil {
return err
}
limit := aggexec.SyncAggregatorsChunkSize(nil, aggs, true)

group.ctr.result1.InitOnlyAgg(limit, aggs)
group.ctr.result1.InitOnlyAgg(aggexec.GetMinAggregatorsChunkSize(nil, aggs), aggs)
for i := range group.ctr.result1.AggList {
if err = group.ctr.result1.AggList[i].GroupGrow(1); err != nil {
return err
Expand Down Expand Up @@ -266,8 +265,7 @@ func (group *Group) consumeBatchToGetFinalResult(
if err != nil {
return err
}
limit := aggexec.SyncAggregatorsChunkSize(group.ctr.groupByEvaluate.Vec, aggs, true)
group.ctr.result1.InitWithGroupBy(limit, aggs, group.ctr.groupByEvaluate.Vec)
group.ctr.result1.InitWithGroupBy(aggexec.GetMinAggregatorsChunkSize(group.ctr.groupByEvaluate.Vec, aggs), aggs, group.ctr.groupByEvaluate.Vec)
if err = preExtendAggExecs(aggs, group.PreAllocSize); err != nil {
return err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/colexec/group/execctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,26 @@ func (buf *GroupResultBuffer) IsEmpty() bool {
}

func (buf *GroupResultBuffer) InitOnlyAgg(chunkSize int, aggList []aggexec.AggFuncExec) {
aggexec.SyncAggregatorsToChunkSize(aggList, chunkSize)

buf.ChunkSize = chunkSize
buf.AggList = aggList
buf.ToPopped = make([]*batch.Batch, 0, 1)
buf.ToPopped = append(buf.ToPopped, batch.NewOffHeapEmpty())
}

func (buf *GroupResultBuffer) InitWithGroupBy(chunkSize int, aggList []aggexec.AggFuncExec, groupByVec []*vector.Vector) {
aggexec.SyncAggregatorsToChunkSize(aggList, chunkSize)

buf.ChunkSize = chunkSize
buf.AggList = aggList
buf.ToPopped = make([]*batch.Batch, 0, 1)
buf.ToPopped = append(buf.ToPopped, getInitialBatchWithSameTypeVecs(groupByVec))
}

func (buf *GroupResultBuffer) InitWithBatch(chunkSize int, aggList []aggexec.AggFuncExec, vecExampleBatch *batch.Batch) {
aggexec.SyncAggregatorsToChunkSize(aggList, chunkSize)

buf.ChunkSize = chunkSize
buf.AggList = aggList
buf.ToPopped = make([]*batch.Batch, 0, 1)
Expand Down
13 changes: 9 additions & 4 deletions pkg/sql/colexec/mergegroup/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"math"
)

var makeInitialAggListFromList = aggexec.MakeInitialAggListFromList

func (mergeGroup *MergeGroup) String(buf *bytes.Buffer) {
buf.WriteString(thisOperatorName)
}
Expand Down Expand Up @@ -122,9 +124,13 @@ func (mergeGroup *MergeGroup) consumeBatch(proc *process.Process, b *batch.Batch
if len(b.Vecs) == 0 {

if mergeGroup.ctr.result.IsEmpty() {
mergeGroup.ctr.result.InitOnlyAgg(math.MaxInt32, b.Aggs)
b.Aggs = nil
mergeGroup.ctr.result.InitOnlyAgg(math.MaxInt32, makeInitialAggListFromList(proc, b.Aggs))
mergeGroup.ctr.result.ToPopped[0].SetRowCount(1)
for i := range mergeGroup.ctr.result.AggList {
if err := mergeGroup.ctr.result.AggList[i].GroupGrow(1); err != nil {
return err
}
}
}

for i, input := range b.Aggs {
Expand Down Expand Up @@ -153,8 +159,7 @@ func (mergeGroup *MergeGroup) consumeBatch(proc *process.Process, b *batch.Batch
}

if mergeGroup.ctr.result.IsEmpty() {
mergeGroup.ctr.result.InitWithBatch(aggexec.SyncAggregatorsChunkSize(b.Vecs, b.Aggs, false), b.Aggs, b)
b.Aggs = nil
mergeGroup.ctr.result.InitWithBatch(aggexec.GetMinAggregatorsChunkSize(b.Vecs, b.Aggs), makeInitialAggListFromList(proc, b.Aggs), b)
}

for i, count := 0, b.RowCount(); i < count; i += hashmap.UnitLimit {
Expand Down
25 changes: 19 additions & 6 deletions pkg/sql/colexec/mergegroup/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
// 主要用于测试在该算子中,每个接口被调用的次数,以及传入的值。
type hackAggExecToTestMerge struct {
toFlush int
dst *hackAggExecToTestMerge

aggexec.AggFuncExec
groupNumber int
Expand Down Expand Up @@ -76,6 +77,18 @@ func (h *hackAggExecToTestMerge) Free() {
var hackVecResult = vector.NewVec(types.T_int64.ToType())

func hackMakeAggExecToTestMerge(r int) *hackAggExecToTestMerge {
makeInitialAggListFromList = func(mg aggexec.AggMemoryManager, list []aggexec.AggFuncExec) []aggexec.AggFuncExec {
res := make([]aggexec.AggFuncExec, len(list))
for i := range res {
res[i] = &hackAggExecToTestMerge{
toFlush: r,
isFree: false,
}
list[i].(*hackAggExecToTestMerge).dst = res[i].(*hackAggExecToTestMerge)
}
return res
}

return &hackAggExecToTestMerge{
toFlush: r,
isFree: false,
Expand Down Expand Up @@ -108,9 +121,9 @@ func TestMergeGroup_WithoutGroupBy(t *testing.T) {
if b := r.Batch; b != nil {
require.Equal(t, 1, len(b.Vecs))
require.Equal(t, hackVecResult, b.Vecs[0])
require.Equal(t, 1, exec1.groupNumber)
require.Equal(t, 1, exec1.doMergeTime)
require.Equal(t, 1, exec1.doFlushTime)
require.Equal(t, 1, exec1.dst.groupNumber)
require.Equal(t, 2, exec1.dst.doMergeTime)
require.Equal(t, 1, exec1.dst.doFlushTime)
}

r, err = g.Call(proc)
Expand Down Expand Up @@ -159,9 +172,9 @@ func TestMergeGroup_WithGroupBy(t *testing.T) {
require.Equal(t, int64(2), vs[1])
require.Equal(t, int64(3), vs[2])
require.Equal(t, int64(4), vs[3])
require.Equal(t, 4, exec1.groupNumber) // 1, 2, 3, 4
require.Equal(t, 2, exec1.doBatchMerge)
require.Equal(t, 1, exec1.doFlushTime)
require.Equal(t, 4, exec1.dst.groupNumber) // 1, 2, 3, 4
require.Equal(t, 3, exec1.dst.doBatchMerge)
require.Equal(t, 1, exec1.dst.doFlushTime)
}

r, err = g.Call(proc)
Expand Down

0 comments on commit 8b4f1dc

Please sign in to comment.