Skip to content

Commit

Permalink
sink(cdc): avoid temporary memory allocations for avro (#11637)
Browse files Browse the repository at this point in the history
close #11590
  • Loading branch information
hicqu authored Oct 28, 2024
1 parent dd2d54a commit 792da42
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 76 deletions.
6 changes: 3 additions & 3 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,8 +556,8 @@ func (r *RowChangedEvent) GetHandleKeyColumnValues() []string {
}

// HandleKeyColInfos returns the column(s) and colInfo(s) corresponding to the handle key(s)
func (r *RowChangedEvent) HandleKeyColInfos() ([]*Column, []rowcodec.ColInfo) {
pkeyCols := make([]*Column, 0)
func (r *RowChangedEvent) HandleKeyColInfos() ([]*ColumnData, []rowcodec.ColInfo) {
pkeyCols := make([]*ColumnData, 0)
pkeyColInfos := make([]rowcodec.ColInfo, 0)

var cols []*ColumnData
Expand All @@ -571,7 +571,7 @@ func (r *RowChangedEvent) HandleKeyColInfos() ([]*Column, []rowcodec.ColInfo) {
colInfos := tableInfo.GetColInfosForRowChangedEvent()
for i, col := range cols {
if col != nil && tableInfo.ForceGetColumnFlagType(col.ColumnID).IsHandleKey() {
pkeyCols = append(pkeyCols, columnData2Column(col, tableInfo))
pkeyCols = append(pkeyCols, col)
pkeyColInfos = append(pkeyColInfos, colInfos[i])
}
}
Expand Down
139 changes: 66 additions & 73 deletions pkg/sink/codec/avro/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ type BatchEncoder struct {
}

type avroEncodeInput struct {
columns []*model.Column
*model.TableInfo
columns []*model.ColumnData
colInfos []rowcodec.ColInfo
}

Expand Down Expand Up @@ -82,11 +83,12 @@ func (a *BatchEncoder) encodeKey(ctx context.Context, topic string, e *model.Row
return nil, nil
}

keyColumns := &avroEncodeInput{
columns: cols,
colInfos: colInfos,
keyColumns := avroEncodeInput{
TableInfo: e.TableInfo,
columns: cols,
colInfos: colInfos,
}
avroCodec, header, err := a.getKeySchemaCodec(ctx, topic, &e.TableInfo.TableName, e.TableInfo.Version, keyColumns)
avroCodec, header, err := a.getKeySchemaCodec(ctx, topic, e.TableInfo.TableName, e.TableInfo.Version, keyColumns)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -119,7 +121,7 @@ func topicName2SchemaSubjects(topicName, subjectSuffix string) string {
}

func (a *BatchEncoder) getValueSchemaCodec(
ctx context.Context, topic string, tableName *model.TableName, tableVersion uint64, input *avroEncodeInput,
ctx context.Context, topic string, tableName model.TableName, tableVersion uint64, input avroEncodeInput,
) (*goavro.Codec, []byte, error) {
schemaGen := func() (string, error) {
schema, err := a.value2AvroSchema(tableName, input)
Expand All @@ -139,7 +141,7 @@ func (a *BatchEncoder) getValueSchemaCodec(
}

func (a *BatchEncoder) getKeySchemaCodec(
ctx context.Context, topic string, tableName *model.TableName, tableVersion uint64, keyColumns *avroEncodeInput,
ctx context.Context, topic string, tableName model.TableName, tableVersion uint64, keyColumns avroEncodeInput,
) (*goavro.Codec, []byte, error) {
schemaGen := func() (string, error) {
schema, err := a.key2AvroSchema(tableName, keyColumns)
Expand All @@ -163,15 +165,16 @@ func (a *BatchEncoder) encodeValue(ctx context.Context, topic string, e *model.R
return nil, nil
}

input := &avroEncodeInput{
columns: e.GetColumns(),
colInfos: e.TableInfo.GetColInfosForRowChangedEvent(),
input := avroEncodeInput{
TableInfo: e.TableInfo,
columns: e.Columns,
colInfos: e.TableInfo.GetColInfosForRowChangedEvent(),
}
if len(input.columns) == 0 {
return nil, nil
}

avroCodec, header, err := a.getValueSchemaCodec(ctx, topic, &e.TableInfo.TableName, e.TableInfo.Version, input)
avroCodec, header, err := a.getValueSchemaCodec(ctx, topic, e.TableInfo.TableName, e.TableInfo.Version, input)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -386,12 +389,12 @@ var type2TiDBType = map[byte]string{
mysql.TypeTiDBVectorFloat32: "TiDBVECTORFloat32",
}

func getTiDBTypeFromColumn(col *model.Column) string {
tt := type2TiDBType[col.Type]
if col.Flag.IsUnsigned() && (tt == "INT" || tt == "BIGINT") {
func getTiDBTypeFromColumn(col model.ColumnDataX) string {
tt := type2TiDBType[col.GetType()]
if col.GetFlag().IsUnsigned() && (tt == "INT" || tt == "BIGINT") {
return tt + " UNSIGNED"
}
if col.Flag.IsBinary() && tt == "TEXT" {
if col.GetFlag().IsBinary() && tt == "TEXT" {
return "BLOB"
}
return tt
Expand Down Expand Up @@ -519,45 +522,44 @@ func (a *BatchEncoder) schemaWithExtension(
return top
}

func (a *BatchEncoder) columns2AvroSchema(
tableName *model.TableName,
input *avroEncodeInput,
) (*avroSchemaTop, error) {
func (a *BatchEncoder) columns2AvroSchema(tableName model.TableName, input avroEncodeInput) (*avroSchemaTop, error) {
top := &avroSchemaTop{
Tp: "record",
Name: common.SanitizeName(tableName.Table),
Namespace: getAvroNamespace(a.namespace, tableName.Schema),
Fields: nil,
}
for i, col := range input.columns {
if col == nil {
for _, col := range input.columns {
colx := model.GetColumnDataX(col, input.TableInfo)
if colx.ColumnData == nil {
continue
}
avroType, err := a.columnToAvroSchema(col, input.colInfos[i].Ft)

avroType, err := a.columnToAvroSchema(colx)
if err != nil {
return nil, err
}
field := make(map[string]interface{})
field["name"] = common.SanitizeName(col.Name)
field["name"] = common.SanitizeName(colx.GetName())

copied := *col
copied.Value = copied.Default
defaultValue, _, err := a.columnToAvroData(&copied, input.colInfos[i].Ft)
copied := colx
copied.ColumnData = &model.ColumnData{ColumnID: colx.ColumnID, Value: colx.GetDefaultValue()}
defaultValue, _, err := a.columnToAvroData(copied)
if err != nil {
log.Error("fail to get default value for avro schema")
return nil, errors.Trace(err)
}
// goavro doesn't support set default value for logical type
// https://github.com/linkedin/goavro/issues/202
if _, ok := avroType.(avroLogicalTypeSchema); ok {
if col.Flag.IsNullable() {
if colx.GetFlag().IsNullable() {
field["type"] = []interface{}{"null", avroType}
field["default"] = nil
} else {
field["type"] = avroType
}
} else {
if col.Flag.IsNullable() {
if colx.GetFlag().IsNullable() {
// https://stackoverflow.com/questions/22938124/avro-field-default-values
if defaultValue == nil {
field["type"] = []interface{}{"null", avroType}
Expand All @@ -577,12 +579,9 @@ func (a *BatchEncoder) columns2AvroSchema(
return top, nil
}

func (a *BatchEncoder) value2AvroSchema(
tableName *model.TableName,
input *avroEncodeInput,
) (string, error) {
func (a *BatchEncoder) value2AvroSchema(tableName model.TableName, input avroEncodeInput) (string, error) {
if a.config.EnableRowChecksum {
sort.Sort(input)
sort.Sort(&input)
}

top, err := a.columns2AvroSchema(tableName, input)
Expand All @@ -605,10 +604,7 @@ func (a *BatchEncoder) value2AvroSchema(
return string(str), nil
}

func (a *BatchEncoder) key2AvroSchema(
tableName *model.TableName,
keyColumns *avroEncodeInput,
) (string, error) {
func (a *BatchEncoder) key2AvroSchema(tableName model.TableName, keyColumns avroEncodeInput) (string, error) {
top, err := a.columns2AvroSchema(tableName, keyColumns)
if err != nil {
return "", err
Expand All @@ -622,45 +618,43 @@ func (a *BatchEncoder) key2AvroSchema(
return string(str), nil
}

func (a *BatchEncoder) columns2AvroData(
input *avroEncodeInput,
) (map[string]interface{}, error) {
func (a *BatchEncoder) columns2AvroData(input avroEncodeInput) (map[string]interface{}, error) {
ret := make(map[string]interface{}, len(input.columns))
for i, col := range input.columns {
if col == nil {
for _, col := range input.columns {
colx := model.GetColumnDataX(col, input.TableInfo)
if colx.ColumnData == nil {
continue
}
data, str, err := a.columnToAvroData(col, input.colInfos[i].Ft)

data, str, err := a.columnToAvroData(colx)
if err != nil {
return nil, err
}

// https: //pkg.go.dev/github.com/linkedin/goavro/v2#Union
if col.Flag.IsNullable() {
ret[common.SanitizeName(col.Name)] = goavro.Union(str, data)
if colx.GetFlag().IsNullable() {
ret[common.SanitizeName(colx.GetName())] = goavro.Union(str, data)
} else {
ret[common.SanitizeName(col.Name)] = data
ret[common.SanitizeName(colx.GetName())] = data
}
}

log.Debug("rowToAvroData", zap.Any("data", ret))
return ret, nil
}

func (a *BatchEncoder) columnToAvroSchema(
col *model.Column,
ft *types.FieldType,
) (interface{}, error) {
func (a *BatchEncoder) columnToAvroSchema(col model.ColumnDataX) (interface{}, error) {
tt := getTiDBTypeFromColumn(col)
switch col.Type {

switch col.GetType() {
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeInt24:
// BOOL/TINYINT/SMALLINT/MEDIUMINT
return avroSchema{
Type: "int",
Parameters: map[string]string{tidbType: tt},
}, nil
case mysql.TypeLong: // INT
if col.Flag.IsUnsigned() {
if col.GetFlag().IsUnsigned() {
return avroSchema{
Type: "long",
Parameters: map[string]string{tidbType: tt},
Expand All @@ -672,7 +666,7 @@ func (a *BatchEncoder) columnToAvroSchema(
}, nil
case mysql.TypeLonglong: // BIGINT
t := "long"
if col.Flag.IsUnsigned() &&
if col.GetFlag().IsUnsigned() &&
a.config.AvroBigintUnsignedHandlingMode == common.BigintUnsignedHandlingModeString {
t = "string"
}
Expand All @@ -691,9 +685,9 @@ func (a *BatchEncoder) columnToAvroSchema(
Parameters: map[string]string{tidbType: tt},
}, nil
case mysql.TypeBit:
displayFlen := ft.GetFlen()
displayFlen := col.GetColumnInfo().FieldType.GetFlen()
if displayFlen == -1 {
displayFlen, _ = mysql.GetDefaultFieldLengthAndDecimal(col.Type)
displayFlen, _ = mysql.GetDefaultFieldLengthAndDecimal(col.GetType())
}
return avroSchema{
Type: "bytes",
Expand All @@ -704,6 +698,7 @@ func (a *BatchEncoder) columnToAvroSchema(
}, nil
case mysql.TypeNewDecimal:
if a.config.AvroDecimalHandlingMode == common.DecimalHandlingModePrecise {
ft := col.GetColumnInfo().FieldType
defaultFlen, defaultDecimal := mysql.GetDefaultFieldLengthAndDecimal(ft.GetType())
displayFlen, displayDecimal := ft.GetFlen(), ft.GetDecimal()
// length not specified, set it to system type default
Expand Down Expand Up @@ -738,16 +733,17 @@ func (a *BatchEncoder) columnToAvroSchema(
mysql.TypeLongBlob,
mysql.TypeBlob:
t := "string"
if col.Flag.IsBinary() {
if col.GetFlag().IsBinary() {
t = "bytes"
}
return avroSchema{
Type: t,
Parameters: map[string]string{tidbType: tt},
}, nil
case mysql.TypeEnum, mysql.TypeSet:
es := make([]string, 0, len(ft.GetElems()))
for _, e := range ft.GetElems() {
elems := col.GetColumnInfo().FieldType.GetElems()
es := make([]string, 0, len(elems))
for _, e := range elems {
e = common.EscapeEnumAndSetOptions(e)
es = append(es, e)
}
Expand Down Expand Up @@ -779,20 +775,17 @@ func (a *BatchEncoder) columnToAvroSchema(
Parameters: map[string]string{tidbType: tt},
}, nil
default:
log.Error("unknown mysql type", zap.Any("mysqlType", col.Type))
log.Error("unknown mysql type", zap.Any("mysqlType", col.GetType()))
return nil, cerror.ErrAvroEncodeFailed.GenWithStack("unknown mysql type")
}
}

func (a *BatchEncoder) columnToAvroData(
col *model.Column,
ft *types.FieldType,
) (interface{}, string, error) {
func (a *BatchEncoder) columnToAvroData(col model.ColumnDataX) (interface{}, string, error) {
if col.Value == nil {
return nil, "null", nil
}

switch col.Type {
switch col.GetType() {
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeInt24:
if v, ok := col.Value.(string); ok {
n, err := strconv.ParseInt(v, 10, 32)
Expand All @@ -801,7 +794,7 @@ func (a *BatchEncoder) columnToAvroData(
}
return int32(n), "int", nil
}
if col.Flag.IsUnsigned() {
if col.GetFlag().IsUnsigned() {
return int32(col.Value.(uint64)), "int", nil
}
return int32(col.Value.(int64)), "int", nil
Expand All @@ -811,18 +804,18 @@ func (a *BatchEncoder) columnToAvroData(
if err != nil {
return nil, "", cerror.WrapError(cerror.ErrAvroEncodeFailed, err)
}
if col.Flag.IsUnsigned() {
if col.GetFlag().IsUnsigned() {
return n, "long", nil
}
return int32(n), "int", nil
}
if col.Flag.IsUnsigned() {
if col.GetFlag().IsUnsigned() {
return int64(col.Value.(uint64)), "long", nil
}
return int32(col.Value.(int64)), "int", nil
case mysql.TypeLonglong:
if v, ok := col.Value.(string); ok {
if col.Flag.IsUnsigned() {
if col.GetFlag().IsUnsigned() {
if a.config.AvroBigintUnsignedHandlingMode == common.BigintUnsignedHandlingModeString {
return v, "string", nil
}
Expand All @@ -838,7 +831,7 @@ func (a *BatchEncoder) columnToAvroData(
}
return n, "long", nil
}
if col.Flag.IsUnsigned() {
if col.GetFlag().IsUnsigned() {
if a.config.AvroBigintUnsignedHandlingMode == common.BigintUnsignedHandlingModeLong {
return int64(col.Value.(uint64)), "long", nil
}
Expand Down Expand Up @@ -888,7 +881,7 @@ func (a *BatchEncoder) columnToAvroData(
mysql.TypeBlob,
mysql.TypeMediumBlob,
mysql.TypeLongBlob:
if col.Flag.IsBinary() {
if col.GetFlag().IsBinary() {
if v, ok := col.Value.(string); ok {
return []byte(v), "bytes", nil
}
Expand All @@ -902,7 +895,7 @@ func (a *BatchEncoder) columnToAvroData(
if v, ok := col.Value.(string); ok {
return v, "string", nil
}
elements := ft.GetElems()
elements := col.GetColumnInfo().FieldType.GetElems()
number := col.Value.(uint64)
enumVar, err := types.ParseEnumValue(elements, number)
if err != nil {
Expand All @@ -914,7 +907,7 @@ func (a *BatchEncoder) columnToAvroData(
if v, ok := col.Value.(string); ok {
return v, "string", nil
}
elements := ft.GetElems()
elements := col.GetColumnInfo().FieldType.GetElems()
number := col.Value.(uint64)
setVar, err := types.ParseSetValue(elements, number)
if err != nil {
Expand Down Expand Up @@ -943,7 +936,7 @@ func (a *BatchEncoder) columnToAvroData(
}
return nil, "", cerror.ErrAvroEncodeFailed
default:
log.Error("unknown mysql type", zap.Any("value", col.Value), zap.Any("mysqlType", col.Type))
log.Error("unknown mysql type", zap.Any("value", col.Value), zap.Any("mysqlType", col.GetType()))
return nil, "", cerror.ErrAvroEncodeFailed.GenWithStack("unknown mysql type")
}
}
Expand Down

0 comments on commit 792da42

Please sign in to comment.