-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
executor: support group_concat under new aggregation evaluation framework #7032
Changes from 17 commits
701eb4c
6b3c066
437a9e5
d08fd92
91557ef
3ad8478
892709f
4665a59
68aad31
ae7b9b8
6a1ba50
df72247
04c23d7
bf314b2
4c809f8
1129a42
9690a2c
94600fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
// Copyright 2018 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package aggfuncs | ||
|
||
import ( | ||
"bytes" | ||
|
||
"github.com/juju/errors" | ||
"github.com/pingcap/tidb/sessionctx" | ||
"github.com/pingcap/tidb/util/chunk" | ||
) | ||
|
||
type baseGroupConcat4String struct { | ||
baseAggFunc | ||
|
||
sep string | ||
} | ||
|
||
func (e *baseGroupConcat4String) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. put functions belonging to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. baseGroupConcat4String only has one function. |
||
p := (*partialResult4GroupConcat)(pr) | ||
if p.buffer == nil { | ||
chk.AppendNull(e.ordinal) | ||
return nil | ||
} | ||
chk.AppendString(e.ordinal, p.buffer.String()) | ||
return nil | ||
} | ||
|
||
type basePartialResult4GroupConcat struct { | ||
buffer *bytes.Buffer | ||
} | ||
|
||
type partialResult4GroupConcat struct { | ||
basePartialResult4GroupConcat | ||
} | ||
|
||
type groupConcat struct { | ||
baseGroupConcat4String | ||
} | ||
|
||
func (e *groupConcat) AllocPartialResult() PartialResult { | ||
return PartialResult(new(partialResult4GroupConcat)) | ||
} | ||
|
||
func (e *groupConcat) ResetPartialResult(pr PartialResult) { | ||
p := (*partialResult4GroupConcat)(pr) | ||
p.buffer = nil | ||
} | ||
|
||
func (e *groupConcat) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (err error) { | ||
p := (*partialResult4GroupConcat)(pr) | ||
v, isNull, isWriteSep := "", false, false | ||
for _, row := range rowsInGroup { | ||
isWriteSep = false | ||
for _, arg := range e.args { | ||
v, isNull, err = arg.EvalString(sctx, row) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
if isNull { | ||
continue | ||
} | ||
This comment was marked as resolved.
Sorry, something went wrong. |
||
isWriteSep = true | ||
if p.buffer == nil { | ||
p.buffer = &bytes.Buffer{} | ||
} | ||
p.buffer.WriteString(v) | ||
} | ||
if isWriteSep { | ||
This comment was marked as resolved.
Sorry, something went wrong.
This comment was marked as resolved.
Sorry, something went wrong.
This comment was marked as resolved.
Sorry, something went wrong. |
||
p.buffer.WriteString(e.sep) | ||
} | ||
} | ||
p.buffer.Truncate(p.buffer.Len() - len(e.sep)) | ||
// TODO: if total length is greater than global var group_concat_max_len, truncate it. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. make this as an issue and put the issue number into this comment. |
||
// issue: #7034 | ||
return nil | ||
} | ||
|
||
type partialResult4GroupConcatDistinct struct { | ||
basePartialResult4GroupConcat | ||
valsBuf *bytes.Buffer | ||
valSet stringSet | ||
} | ||
|
||
type groupConcatDistinct struct { | ||
baseGroupConcat4String | ||
} | ||
|
||
func (e *groupConcatDistinct) AllocPartialResult() PartialResult { | ||
p := new(partialResult4GroupConcatDistinct) | ||
p.valsBuf = &bytes.Buffer{} | ||
p.valSet = newStringSet() | ||
return PartialResult(p) | ||
} | ||
|
||
func (e *groupConcatDistinct) ResetPartialResult(pr PartialResult) { | ||
p := (*partialResult4GroupConcatDistinct)(pr) | ||
p.buffer, p.valSet = nil, newStringSet() | ||
} | ||
|
||
func (e *groupConcatDistinct) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (err error) { | ||
p := (*partialResult4GroupConcatDistinct)(pr) | ||
v, isNull := "", false | ||
for _, row := range rowsInGroup { | ||
p.valsBuf.Reset() | ||
for _, arg := range e.args { | ||
v, isNull, err = arg.EvalString(sctx, row) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
if isNull { | ||
continue | ||
} | ||
p.valsBuf.WriteString(v) | ||
} | ||
joinedVals := p.valsBuf.String() | ||
if p.valSet.exist(joinedVals) { | ||
continue | ||
} | ||
p.valSet.insert(joinedVals) | ||
// write separator | ||
if p.buffer == nil { | ||
p.buffer = &bytes.Buffer{} | ||
} else { | ||
p.buffer.WriteString(e.sep) | ||
} | ||
// write values | ||
p.buffer.WriteString(joinedVals) | ||
} | ||
// TODO: if total length is greater than global var group_concat_max_len, truncate it. | ||
// issue: #7034 | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reorganize the import packages