Skip to content

Commit

Permalink
executor: add conversion to opaque value for json_objectagg and json_…
Browse files Browse the repository at this point in the history
…arrayagg (#37337)

close #25053
  • Loading branch information
YangKeao authored Aug 25, 2022
1 parent b4b5223 commit 4c8c918
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 37 deletions.
5 changes: 3 additions & 2 deletions executor/aggfuncs/func_json_arrayagg.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,10 @@ func (e *jsonArrayagg) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup
return 0, errors.Trace(err)
}

realItem := item.Clone().GetValue()
realItem := getRealJSONValue(item, e.args[0].GetType())

switch x := realItem.(type) {
case nil, bool, int64, uint64, float64, string, json.BinaryJSON, *types.MyDecimal, []uint8, types.Time, types.Duration:
case nil, bool, int64, uint64, float64, string, json.BinaryJSON, json.Opaque, *types.MyDecimal, []uint8, types.Time, types.Duration:
p.entries = append(p.entries, realItem)
memDelta += getValMemDelta(realItem)
default:
Expand Down
60 changes: 44 additions & 16 deletions executor/aggfuncs/func_json_objectagg.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@
package aggfuncs

import (
"strings"
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser/charset"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/json"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/stringutil"
)

const (
Expand Down Expand Up @@ -85,45 +87,68 @@ func (e *jsonObjectAgg) AppendFinalResult2Chunk(sctx sessionctx.Context, pr Part
func (e *jsonObjectAgg) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (memDelta int64, err error) {
p := (*partialResult4JsonObjectAgg)(pr)
for _, row := range rowsInGroup {
key, err := e.args[0].Eval(row)
key, keyIsNull, err := e.args[0].EvalString(sctx, row)
if err != nil {
return 0, errors.Trace(err)
}
key = strings.Clone(key)

value, err := e.args[1].Eval(row)
if err != nil {
return 0, errors.Trace(err)
}

if key.IsNull() {
if keyIsNull {
return 0, json.ErrJSONDocumentNULLKey
}

// the result json's key is string, so it needs to convert the first arg to string
keyString, err := key.ToString()
value, err := e.args[1].Eval(row)
if err != nil {
return 0, errors.Trace(err)
}
keyString = stringutil.Copy(keyString)

realVal := value.Clone().GetValue()
realVal := getRealJSONValue(value, e.args[1].GetType())
switch x := realVal.(type) {
case nil, bool, int64, uint64, float64, string, json.BinaryJSON, *types.MyDecimal, []uint8, types.Time, types.Duration:
if _, ok := p.entries[keyString]; !ok {
memDelta += int64(len(keyString)) + getValMemDelta(realVal)
case nil, bool, int64, uint64, float64, string, json.BinaryJSON, json.Opaque, *types.MyDecimal, types.Time, types.Duration:
if _, ok := p.entries[key]; !ok {
memDelta += int64(len(key)) + getValMemDelta(realVal)
if len(p.entries)+1 > (1<<p.bInMap)*hack.LoadFactorNum/hack.LoadFactorDen {
memDelta += (1 << p.bInMap) * hack.DefBucketMemoryUsageForMapStringToAny
p.bInMap++
}
}
p.entries[keyString] = realVal
p.entries[key] = realVal

default:
return 0, json.ErrUnsupportedSecondArgumentType.GenWithStackByArgs(x)
}
}
return memDelta, nil
}

func getRealJSONValue(value types.Datum, ft *types.FieldType) interface{} {
realVal := value.Clone().GetValue()
switch value.Kind() {
case types.KindBinaryLiteral, types.KindMysqlBit, types.KindBytes:
buf := value.GetBytes()
realVal = json.Opaque{
TypeCode: ft.GetType(),
Buf: buf,
}
case types.KindString:
if ft.GetCharset() == charset.CharsetBin {
buf := value.GetBytes()
resultBuf := buf
if ft.GetType() == mysql.TypeString {
// the tailing zero should also be in the opaque json
resultBuf = make([]byte, ft.GetFlen())
copy(resultBuf, buf)
}
realVal = json.Opaque{
TypeCode: ft.GetType(),
Buf: resultBuf,
}
}
}

return realVal
}

func getValMemDelta(val interface{}) (memDelta int64) {
memDelta = DefInterfaceSize
switch v := val.(type) {
Expand All @@ -140,6 +165,9 @@ func getValMemDelta(val interface{}) (memDelta int64) {
case json.BinaryJSON:
// +1 for the memory usage of the TypeCode of json
memDelta += int64(len(v.Value) + 1)
case json.Opaque:
// +1 for the memory usage of the TypeCode of opaque value
memDelta += int64(len(v.Buf) + 1)
case *types.MyDecimal:
memDelta += DefMyDecimalSize
case []uint8:
Expand Down
62 changes: 46 additions & 16 deletions executor/aggfuncs/func_json_objectagg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,38 @@ import (

"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/charset"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/json"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/mock"
)

func getJSONValue(secondArg types.Datum, valueType *types.FieldType) interface{} {
if valueType.GetType() == mysql.TypeString && valueType.GetCharset() == charset.CharsetBin {
buf := make([]byte, valueType.GetFlen())
copy(buf, secondArg.GetBytes())
return json.Opaque{
TypeCode: mysql.TypeString,
Buf: buf,
}
}
return secondArg.GetValue()
}

func TestMergePartialResult4JsonObjectagg(t *testing.T) {
typeList := []byte{mysql.TypeLonglong, mysql.TypeDouble, mysql.TypeString, mysql.TypeJSON}
var argCombines [][]byte
typeList := []*types.FieldType{
types.NewFieldType(mysql.TypeLonglong),
types.NewFieldType(mysql.TypeDouble),
types.NewFieldType(mysql.TypeString),
types.NewFieldType(mysql.TypeJSON),
types.NewFieldTypeBuilder().SetType(mysql.TypeString).SetFlen(10).SetCharset(charset.CharsetBin).BuildP(),
}
var argCombines [][]*types.FieldType
for i := 0; i < len(typeList); i++ {
for j := 0; j < len(typeList); j++ {
argTypes := []byte{typeList[i], typeList[j]}
argTypes := []*types.FieldType{typeList[i], typeList[j]}
argCombines = append(argCombines, argTypes)
}
}
Expand All @@ -43,25 +62,28 @@ func TestMergePartialResult4JsonObjectagg(t *testing.T) {
entries1 := make(map[string]interface{})
entries2 := make(map[string]interface{})

argTypes := argCombines[k]
fGenFunc := getDataGenFunc(types.NewFieldType(argTypes[0]))
sGenFunc := getDataGenFunc(types.NewFieldType(argTypes[1]))
fGenFunc := getDataGenFunc(argCombines[k][0])
sGenFunc := getDataGenFunc(argCombines[k][1])

for m := 0; m < numRows; m++ {
firstArg := fGenFunc(m)
secondArg := sGenFunc(m)
keyString, _ := firstArg.ToString()
entries1[keyString] = secondArg.GetValue()

valueType := argCombines[k][1]
entries1[keyString] = getJSONValue(secondArg, valueType)
}

for m := 2; m < numRows; m++ {
firstArg := fGenFunc(m)
secondArg := sGenFunc(m)
keyString, _ := firstArg.ToString()
entries2[keyString] = secondArg.GetValue()

valueType := argCombines[k][1]
entries2[keyString] = getJSONValue(secondArg, valueType)
}

aggTest := buildMultiArgsAggTester(ast.AggFuncJsonObjectAgg, argTypes, mysql.TypeJSON, numRows, json.CreateBinary(entries1), json.CreateBinary(entries2), json.CreateBinary(entries1))
aggTest := buildMultiArgsAggTesterWithFieldType(ast.AggFuncJsonObjectAgg, argCombines[k], types.NewFieldType(mysql.TypeJSON), numRows, json.CreateBinary(entries1), json.CreateBinary(entries2), json.CreateBinary(entries1))

tests = append(tests, aggTest)
}
Expand All @@ -73,11 +95,17 @@ func TestMergePartialResult4JsonObjectagg(t *testing.T) {
}

func TestJsonObjectagg(t *testing.T) {
typeList := []byte{mysql.TypeLonglong, mysql.TypeDouble, mysql.TypeString, mysql.TypeJSON}
var argCombines [][]byte
typeList := []*types.FieldType{
types.NewFieldType(mysql.TypeLonglong),
types.NewFieldType(mysql.TypeDouble),
types.NewFieldType(mysql.TypeString),
types.NewFieldType(mysql.TypeJSON),
types.NewFieldTypeBuilder().SetType(mysql.TypeString).SetFlen(10).SetCharset(charset.CharsetBin).BuildP(),
}
var argCombines [][]*types.FieldType
for i := 0; i < len(typeList); i++ {
for j := 0; j < len(typeList); j++ {
argTypes := []byte{typeList[i], typeList[j]}
argTypes := []*types.FieldType{typeList[i], typeList[j]}
argCombines = append(argCombines, argTypes)
}
}
Expand All @@ -89,17 +117,19 @@ func TestJsonObjectagg(t *testing.T) {
entries := make(map[string]interface{})

argTypes := argCombines[k]
fGenFunc := getDataGenFunc(types.NewFieldType(argTypes[0]))
sGenFunc := getDataGenFunc(types.NewFieldType(argTypes[1]))
fGenFunc := getDataGenFunc(argTypes[0])
sGenFunc := getDataGenFunc(argTypes[1])

for m := 0; m < numRows; m++ {
firstArg := fGenFunc(m)
secondArg := sGenFunc(m)
keyString, _ := firstArg.ToString()
entries[keyString] = secondArg.GetValue()

valueType := argCombines[k][1]
entries[keyString] = getJSONValue(secondArg, valueType)
}

aggTest := buildMultiArgsAggTester(ast.AggFuncJsonObjectAgg, argTypes, mysql.TypeJSON, numRows, nil, json.CreateBinary(entries))
aggTest := buildMultiArgsAggTesterWithFieldType(ast.AggFuncJsonObjectAgg, argTypes, types.NewFieldType(mysql.TypeJSON), numRows, nil, json.CreateBinary(entries))

tests = append(tests, aggTest)
}
Expand Down
12 changes: 9 additions & 3 deletions expression/aggregation/base_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ func (a *baseFuncDesc) TypeInfer(ctx sessionctx.Context) error {
case ast.AggFuncVarPop, ast.AggFuncStddevPop, ast.AggFuncVarSamp, ast.AggFuncStddevSamp:
a.typeInfer4PopOrSamp(ctx)
case ast.AggFuncJsonArrayagg:
a.typeInfer4JsonFuncs(ctx)
a.typeInfer4JsonArrayAgg(ctx)
case ast.AggFuncJsonObjectAgg:
a.typeInfer4JsonFuncs(ctx)
a.typeInfer4JsonObjectAgg(ctx)
default:
return errors.Errorf("unsupported agg function: %s", a.Name)
}
Expand Down Expand Up @@ -289,11 +289,17 @@ func (a *baseFuncDesc) typeInfer4BitFuncs(ctx sessionctx.Context) {
a.Args[0] = expression.WrapWithCastAsInt(ctx, a.Args[0])
}

func (a *baseFuncDesc) typeInfer4JsonFuncs(ctx sessionctx.Context) {
func (a *baseFuncDesc) typeInfer4JsonArrayAgg(ctx sessionctx.Context) {
a.RetTp = types.NewFieldType(mysql.TypeJSON)
types.SetBinChsClnFlag(a.RetTp)
}

func (a *baseFuncDesc) typeInfer4JsonObjectAgg(ctx sessionctx.Context) {
a.RetTp = types.NewFieldType(mysql.TypeJSON)
types.SetBinChsClnFlag(a.RetTp)
a.Args[0] = expression.WrapWithCastAsString(ctx, a.Args[0])
}

func (a *baseFuncDesc) typeInfer4NumberFuncs() {
a.RetTp = types.NewFieldType(mysql.TypeLonglong)
a.RetTp.SetFlen(21)
Expand Down

0 comments on commit 4c8c918

Please sign in to comment.