-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
aggfuncs: implement Count with new aggregation framework #7009
Conversation
Hi contributor, thanks for your PR. This patch needs to be approved by someone of admins. They should reply with "/ok-to-test" to accept this PR for running test automatically. |
executor/aggfuncs/func_count.go
Outdated
|
||
func (e *countOriginal) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) error { | ||
p := (*partialResult4Count)(pr) | ||
p.count += int64(len(rowsInGroup)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should ignore the NULL
values in a group, only count the non-NULL values
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I Know: while in countOriginal
, the e.args
's len is 0, and I can't do the e.args[0].EvalInt(sctx, row)
in range for rowInGroup
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
COUNT
is special, it can have many parameters- You can only call
EvalInt
when the type of input parameter is integer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused by the expression.Expression
.
- The comment said that:
args stores the input arguments for an aggregate function
. And where theinput arguments
from? From the user's sql? - In the
AVG
aggfuncs, I found the use ofe.args[0]
ande.args[1]
. It looks likee.args[0]
representscount
ande.args[1]
representssum
. When and where these value determined?
I tried to trace the calling tree but failed to answer these qusetions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We execute the aggregate function inside an aggregate operator, the input arguments
are from the child of that aggregate operator. For example:
TiDB(localhost:4000) > explain select count(distinct a, b) from t;
+----------------------+------+----------------------------------------------+-------+
| id | task | operator info | count |
+----------------------+------+----------------------------------------------+-------+
| StreamAgg_8 | root | funcs:count(distinct test.t.a, test.t.b) | 1.00 |
| └─TableReader_15 | root | data:TableScan_14 | 3.00 |
| └─TableScan_14 | cop | table:t, range:[-inf,+inf], keep order:false | 3.00 |
+----------------------+------+----------------------------------------------+-------+
3 rows in set (0.00 sec)
In this case, we execute the count(distinct a, b)
in the aggregate operator StreamAgg_8
, and its input arguments come from the child operator, namely TableReader_15
.
"In the AVG aggfuncs, I found the use of e.args[0] and e.args[1]. It looks like e.args[0] represents count and e.args[1]represents sum", this kind of AVG
function handles the partial result of another aggregate operator, for example:
TiDB(localhost:4000) > explain select avg(a) from t;
+------------------------+------+----------------------------------------------+-------+
| id | task | operator info | count |
+------------------------+------+----------------------------------------------+-------+
| StreamAgg_16 | root | funcs:avg(col_0, col_1) | 1.00 |
| └─TableReader_17 | root | data:StreamAgg_8 | 1.00 |
| └─StreamAgg_8 | cop | funcs:avg(test.t.a) | 1.00 |
| └─TableScan_15 | cop | table:t, range:[-inf,+inf], keep order:false | 3.00 |
+------------------------+------+----------------------------------------------+-------+
4 rows in set (0.00 sec)
Here we have two aggregate operators, StreamAgg_8
belongs to a coprocessor task and is executed on the TiKV server, StreamAgg_16
belongs to a root task and is executed on the TiDB server.
StreamAgg_8
handles the original data of table t
produces partial result, which is further read by TableReader_17
and StreamAgg_16
. In the partial result of StreamAgg_8
, count
comes first and sum
comes second. This is determined by the planner, see this for more detail.
StreamAgg_16
handles the partial result of StreamAgg_8
, reads count
in the first parameter and sum
in the second parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the wonderful explain!
executor/aggfuncs/func_count.go
Outdated
baseAggFunc | ||
} | ||
|
||
type partialResult4Count struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type partialResult4Count = int64
is better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
executor/aggfuncs/builder.go
Outdated
// Build count functions which consume the original data and update their | ||
// partial results. | ||
case aggregation.CompleteMode, aggregation.Partial1Mode: | ||
return &countOriginal{baseCount{base}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation has not properly handled the distinct
property. We should implement other AggFunc
s for COUNT
to count the distinct and non-NULL values in a data group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like countOriginalWithDistinct
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Xuanwo Maybe what you need is HashChunkRow()
defined in util/codec/codec.go?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks cool, let me check that.
2d2fc4e
to
e07c870
Compare
@zz-jason Please take a look again, I'm not sure am I in the right way. I failed this test for now:
I believe the problem is e07c870#diff-0558a5726173797efd5bac024b5a3cadR114 , how can I get the right col index for by an Expression ? |
After read https://www.pingcap.com/blog-cn/tidb-source-code-reading-10/ , I solved this problem, please review again. |
PTAL @zz-jason |
executor/aggfuncs/builder.go
Outdated
@@ -70,8 +93,8 @@ func buildAvg(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc { | |||
case aggregation.DedupMode: | |||
return nil // not implemented yet. | |||
|
|||
// Build avg functions which consume the original data and update their | |||
// partial results. | |||
// Build avg functions which consume the original data and update their |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this tab. so as line 97, 112, 113
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a mistake introduced be Goland, I will fix it.
executor/aggfuncs/func_count.go
Outdated
p := (*partialResult4Count)(pr) | ||
|
||
for _, row := range rowsInGroup { | ||
_, isNull, err := e.args[0].EvalInt(sctx, row) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should implement countOriginal4Int/ countOriginal4Real/ countOriginal4String/ countOriginal4Decimal/ countOriginal4Datetime/ countOriginal4Duration/ countOriginal4Json.
Since it will cause an error if we call EvalInt on other types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
executor/aggfuncs/func_count.go
Outdated
p := (*partialResult4Count)(pr) | ||
|
||
for _, row := range rowsInGroup { | ||
buf, hasNull, err := encodeRow(sctx, e.args, row) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- This is not correct, we should not encode the whole row. We only check whether the args of the aggregation function is distinct.
- encodeRow is expensive, this implementation can be referred.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed at #7009 (comment) , count
may have many parameters, like select count(distinct a, b) from t
, we need to eval the whole row to judge whether current row should be counted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Such as
select count(distinct a, b), c from t;
Therow
will be [a, b, c]. So encode the whole row is still not correct. - There is already a distinctChecker, we can implement a
checkChunkRow
function for it. - You can implement a function like
EncodeValue4ChunkRow
in codec.go, and invokeencodeChunkRow
in it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
encodeRow
will get the value fromrow
bye.args
.- I think the
checkChunkRow
is whatencodeRow
did here. - The problem of
encodeChunkRow
is I can't get the corrent colIdx by expressions. And it looks like I can't importexpression
inchunk
package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- The
allTypes
is the fieldTypes of count.Args.
ThecolIdx
is the ordinal of count.Args, e.g. for count(a, b, c), the colIdx is [0,1,2]. - I think we'd better change
encodeRow
toevalOneRow
which only eval the result of the expressions and invoke EncodeValue4ChunkRow outside. If you take a look atEncodeValue
, you can see that there are 2 parameterscomparable
andhash
, butencodeRow
does not consider them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I used to use this logic (invoke
HashChunkRow
in codec.go instead ifencodeRow
) but failed testselect count(distinct c), count(distinct a, b) from t
. For thedistinct c
, the ordinal is0
, but the correct ordinal is2
. - I will have a try.
executor/aggfuncs/utils.go
Outdated
@@ -0,0 +1,131 @@ | |||
package aggfuncs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not need this file.
|
executor/aggfuncs/func_count.go
Outdated
continue | ||
} | ||
|
||
buf, err := encodeRow(sctx, e.args, row) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
count(distinct)
is special, we can do it this way for convenience, although it sacrifices some performance:
- extract a function to evaluate an argument and encode it into a provided
[]byte
, as for theseappendXXX
functions, for exampleappendInt64
, you can refer to https://github.com/pingcap/tidb/blob/master/util/chunk/column.go#L105, which is much faster thanEncodeXXX
functions, and thus we can remove the implementation ofdistinctChecker
, which is inefficient
func (e *countOriginalWithDistinct) evalAndEncode(arg expression.Expression, encodedBytes []byte) ([]byte, bool, error) {
switch e.args[i].GetType().EvalType() {
case types.ETInt:
val, isNull, err := e.args[i].EvalInt(ctx, row)
if err != nil || isNull {
return encodedBytes, isNull, errors.Trace(err)
}
encodedBytes = appendInt64(encodedBytes, val)
case types.ETReal:
...
}
return encodedBytes, false, nil
}
- evaluate each argument and encode them into a
[]byte
, use that[]byte
to check where the input parameters are duplicated:
encodedBytes := make([]byte, 0, 8)
for _, row := range rowsInGroup {
for i := 0; i < len(e.args) && !hasNull; i++ {
encodedBytes, isNull, err = e. evalAndEncode(e.args[i], encodedBytes)
if err != nil { return ... }
if isNull { hasNull = true; break... }
}
if hasNull || p.exists[encodedBytes] {
continue
}
p.exists[encodedBytes] = true
p.count++
}
executor/aggfuncs/func_count.go
Outdated
p := (*partialResult4Count)(pr) | ||
|
||
for _, row := range rowsInGroup { | ||
isNull, err := e.args[0].IsNull(sctx, row) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to implement a IsNull
api on the Expression
interface, and we'd better not to do this:
- this API is currently only used in
COUNT
function - in the future, we are planning to change the aggregate argument from
args []expression.Expression
toargs []*expression.Column
There are two ways to solve the NULL
input issue:
- use a single
countOriginal
to handle all the input types, and:
switch e.args[i].GetType().EvalType() {
case types.ETInt:
val, isNull, err := e.args[0].EvalInt(ctx, row)
if err != nil {
return errors.Trace(err)
}
if isNull {
continue
}
case types.ETReal:
...
}
- implement a
countOriginal4XX
for each input type:
countOriginal4Decimal
countOriginal4Int64
- ...
I suggest the second way: It reduces a lot of CPU branch predications, which are introduced by the switch
statement, utilizes the CPU pipelines thus is more inefficient than the first way.
@zz-jason PTAL I use a |
executor/aggfuncs/func_count.go
Outdated
"unsafe" | ||
|
||
"github.com/juju/errors" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this empty line
executor/aggfuncs/func_count.go
Outdated
return encodedBytes, false, nil | ||
} | ||
|
||
func appendInt64(encodedBytes []byte, val int64) (_ []byte) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about changing the return value definition from (_ []byte)
to []byte
?
executor/aggfuncs/func_count.go
Outdated
} | ||
|
||
func appendInt64(encodedBytes []byte, val int64) (_ []byte) { | ||
buf := make([]byte, 8) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can pass buf
as a parameter of this function and reuse it if possible to avoid frequently object allocation.
executor/aggfuncs/func_count.go
Outdated
buf := []byte{} | ||
|
||
for _, row := range rowsInGroup { | ||
encodedBytes := make([]byte, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can encodedBytes
be reused for every row
?
@zz-jason There is not |
executor/aggfuncs/func_count.go
Outdated
import ( | ||
"encoding/binary" | ||
"unsafe" | ||
"github.com/juju/errors" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually separate the imported packages to two sections: one for the golang standard packages, one for the third part packages. For example: https://github.com/pingcap/tidb/blob/master/executor/aggfuncs/aggfuncs.go#L16. So here we should put "github.com/juju/errors" together with other third part packages and leave an empty line between the golang standard packages like "unsafe" 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I misread the comment: #7009 (comment) , I will fix it.
"github.com/pingcap/tidb/types" | ||
"github.com/pingcap/tidb/types/json" | ||
"github.com/pingcap/tidb/util/chunk" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, this is what I'm trying to explain:
import (
"encoding/binary"
"unsafe"
"github.com/juju/errors"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/json"
"github.com/pingcap/tidb/util/chunk"
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I got the idea just now.
But the reset didn't clean the |
@Xuanwo |
So I don't need to reslice the func appendInt64(encodedBytes, buf []byte, val int64) []byte {
*(*int64)(unsafe.Pointer(&buf[0])) = val
buf = buf[:8]
encodedBytes = append(encodedBytes, buf...)
return encodedBytes
} |
@Xuanwo I'm sorry that I misunderstand your last comments. |
OK, I'm ready for reviewing now. |
Thanks for you great contribution, LGTM 👍 |
/run-all-tests |
hi, @Xuanwo drop table if exists tab0, tab1;
CREATE TABLE tab0(pk INTEGER PRIMARY KEY, col0 INTEGER, col1 FLOAT, col2 TEXT, col3 INTEGER, col4 FLOAT, col5 TEXT);
INSERT INTO tab0 VALUES(0,854,111.96,'mguub',711,966.36,'snwlo');
INSERT INTO tab0 VALUES(1,518,457.51,'hzanm',251,363.97,'xljvu');
INSERT INTO tab0 VALUES(2,640,325.31,'jempi',596,569.99,'xmtxn');
INSERT INTO tab0 VALUES(3,758,256.26,'lktfw',174,453.33,'imxxc');
INSERT INTO tab0 VALUES(4,98,727.48,'qiyfp',918,376.45,'gavyb');
INSERT INTO tab0 VALUES(5,203,268.89,'nwrqf',885,321.93,'ixrql');
INSERT INTO tab0 VALUES(6,554,593.89,'hdikx',886,12.12,'xzvvo');
INSERT INTO tab0 VALUES(7,195,720.95,'yydxj',108,45.77,'dlbem');
INSERT INTO tab0 VALUES(8,363,102.25,'kmgry',740,9.66,'cussx');
INSERT INTO tab0 VALUES(9,106,745.31,'mwyzu',598,612.17,'aftom');
CREATE TABLE tab1(pk INTEGER PRIMARY KEY, col0 INTEGER, col1 FLOAT, col2 TEXT, col3 INTEGER, col4 FLOAT, col5 TEXT);
CREATE INDEX idx_tab1_0 on tab1 (col0);
CREATE INDEX idx_tab1_1 on tab1 (col1);
CREATE INDEX idx_tab1_3 on tab1 (col3);
CREATE INDEX idx_tab1_4 on tab1 (col4);
INSERT INTO tab1 SELECT * FROM tab0;
SELECT DISTINCT COUNT( DISTINCT + col4 ) col0 FROM tab1 WHERE 1; want 10, but got 5 |
/run-all-tests |
executor/aggfuncs/func_count.go
Outdated
return nil | ||
} | ||
|
||
func (e *countOriginalWithDistinct) evalAndEncode( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment for this function.
executor/aggfuncs/func_count.go
Outdated
break | ||
} | ||
} | ||
if hasNull || p.valSet.exist(string(encodedBytes)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extract string(encodeBytes)
as a variable to reduce memory usage
benchmark:
func BenchmarkBytesToString(b *testing.B) {
str := ""
for i := 0; i < 100; i ++{
str += "0"
}
bytes := []byte(str)
b.ResetTimer()
for i := 0; i < b.N; i++{
s := newStringSet()
s.exist(string(bytes))
s.insert(string(bytes))
}
}
func BenchmarkBytesToString2(b *testing.B) {
str := ""
for i := 0; i < 100; i ++{
str += "0"
}
bytes := []byte(str)
b.ResetTimer()
for i := 0; i < b.N; i++{
s := newStringSet()
tmp := string(bytes)
s.exist(tmp)
s.insert(tmp)
}
}
$ go test -test.bench=. -test.run=None -benchmem
BenchmarkBytesToString-4 10000000 175 ns/op 224 B/op 2 allocs/op
BenchmarkBytesToString2-4 10000000 120 ns/op 112 B/op 1 allocs/op
@Xuanwo please merge master and resolve conflicts. |
Signed-off-by: Xuanwo <[email protected]>
Signed-off-by: Xuanwo <[email protected]>
Signed-off-by: Xuanwo <[email protected]>
Signed-off-by: Xuanwo <[email protected]>
Signed-off-by: Xuanwo <[email protected]>
@zz-jason The code has been rebased. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/run-all-tests |
What have you changed? (mandatory)
implement Count with new aggregation framework
More information can be found at #6952
What are the type of the changes (mandatory)?
How has this PR been tested (mandatory)?
Does this PR affect documentation (docs/docs-cn) update? (optional)
No
Refer to a related PR or issue link (optional)
To #6952