diff --git a/cdc/model/sink.go b/cdc/model/sink.go index bb5de91816a..2dd5634c89c 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -556,8 +556,8 @@ func (r *RowChangedEvent) GetHandleKeyColumnValues() []string { } // HandleKeyColInfos returns the column(s) and colInfo(s) corresponding to the handle key(s) -func (r *RowChangedEvent) HandleKeyColInfos() ([]*Column, []rowcodec.ColInfo) { - pkeyCols := make([]*Column, 0) +func (r *RowChangedEvent) HandleKeyColInfos() ([]*ColumnData, []rowcodec.ColInfo) { + pkeyCols := make([]*ColumnData, 0) pkeyColInfos := make([]rowcodec.ColInfo, 0) var cols []*ColumnData @@ -571,7 +571,7 @@ func (r *RowChangedEvent) HandleKeyColInfos() ([]*Column, []rowcodec.ColInfo) { colInfos := tableInfo.GetColInfosForRowChangedEvent() for i, col := range cols { if col != nil && tableInfo.ForceGetColumnFlagType(col.ColumnID).IsHandleKey() { - pkeyCols = append(pkeyCols, columnData2Column(col, tableInfo)) + pkeyCols = append(pkeyCols, col) pkeyColInfos = append(pkeyColInfos, colInfos[i]) } } diff --git a/pkg/sink/codec/avro/avro.go b/pkg/sink/codec/avro/avro.go index e0100217ab4..6ac329a22ea 100644 --- a/pkg/sink/codec/avro/avro.go +++ b/pkg/sink/codec/avro/avro.go @@ -49,7 +49,8 @@ type BatchEncoder struct { } type avroEncodeInput struct { - columns []*model.Column + *model.TableInfo + columns []*model.ColumnData colInfos []rowcodec.ColInfo } @@ -82,11 +83,12 @@ func (a *BatchEncoder) encodeKey(ctx context.Context, topic string, e *model.Row return nil, nil } - keyColumns := &avroEncodeInput{ - columns: cols, - colInfos: colInfos, + keyColumns := avroEncodeInput{ + TableInfo: e.TableInfo, + columns: cols, + colInfos: colInfos, } - avroCodec, header, err := a.getKeySchemaCodec(ctx, topic, &e.TableInfo.TableName, e.TableInfo.Version, keyColumns) + avroCodec, header, err := a.getKeySchemaCodec(ctx, topic, e.TableInfo.TableName, e.TableInfo.Version, keyColumns) if err != nil { return nil, errors.Trace(err) } @@ -119,7 +121,7 @@ func topicName2SchemaSubjects(topicName, subjectSuffix string) string { } func (a *BatchEncoder) getValueSchemaCodec( - ctx context.Context, topic string, tableName *model.TableName, tableVersion uint64, input *avroEncodeInput, + ctx context.Context, topic string, tableName model.TableName, tableVersion uint64, input avroEncodeInput, ) (*goavro.Codec, []byte, error) { schemaGen := func() (string, error) { schema, err := a.value2AvroSchema(tableName, input) @@ -139,7 +141,7 @@ func (a *BatchEncoder) getValueSchemaCodec( } func (a *BatchEncoder) getKeySchemaCodec( - ctx context.Context, topic string, tableName *model.TableName, tableVersion uint64, keyColumns *avroEncodeInput, + ctx context.Context, topic string, tableName model.TableName, tableVersion uint64, keyColumns avroEncodeInput, ) (*goavro.Codec, []byte, error) { schemaGen := func() (string, error) { schema, err := a.key2AvroSchema(tableName, keyColumns) @@ -163,15 +165,16 @@ func (a *BatchEncoder) encodeValue(ctx context.Context, topic string, e *model.R return nil, nil } - input := &avroEncodeInput{ - columns: e.GetColumns(), - colInfos: e.TableInfo.GetColInfosForRowChangedEvent(), + input := avroEncodeInput{ + TableInfo: e.TableInfo, + columns: e.Columns, + colInfos: e.TableInfo.GetColInfosForRowChangedEvent(), } if len(input.columns) == 0 { return nil, nil } - avroCodec, header, err := a.getValueSchemaCodec(ctx, topic, &e.TableInfo.TableName, e.TableInfo.Version, input) + avroCodec, header, err := a.getValueSchemaCodec(ctx, topic, e.TableInfo.TableName, e.TableInfo.Version, input) if err != nil { return nil, errors.Trace(err) } @@ -386,12 +389,12 @@ var type2TiDBType = map[byte]string{ mysql.TypeTiDBVectorFloat32: "TiDBVECTORFloat32", } -func getTiDBTypeFromColumn(col *model.Column) string { - tt := type2TiDBType[col.Type] - if col.Flag.IsUnsigned() && (tt == "INT" || tt == "BIGINT") { +func getTiDBTypeFromColumn(col model.ColumnDataX) string { + tt := type2TiDBType[col.GetType()] + if col.GetFlag().IsUnsigned() && (tt == "INT" || tt == "BIGINT") { return tt + " UNSIGNED" } - if col.Flag.IsBinary() && tt == "TEXT" { + if col.GetFlag().IsBinary() && tt == "TEXT" { return "BLOB" } return tt @@ -519,30 +522,29 @@ func (a *BatchEncoder) schemaWithExtension( return top } -func (a *BatchEncoder) columns2AvroSchema( - tableName *model.TableName, - input *avroEncodeInput, -) (*avroSchemaTop, error) { +func (a *BatchEncoder) columns2AvroSchema(tableName model.TableName, input avroEncodeInput) (*avroSchemaTop, error) { top := &avroSchemaTop{ Tp: "record", Name: common.SanitizeName(tableName.Table), Namespace: getAvroNamespace(a.namespace, tableName.Schema), Fields: nil, } - for i, col := range input.columns { - if col == nil { + for _, col := range input.columns { + colx := model.GetColumnDataX(col, input.TableInfo) + if colx.ColumnData == nil { continue } - avroType, err := a.columnToAvroSchema(col, input.colInfos[i].Ft) + + avroType, err := a.columnToAvroSchema(colx) if err != nil { return nil, err } field := make(map[string]interface{}) - field["name"] = common.SanitizeName(col.Name) + field["name"] = common.SanitizeName(colx.GetName()) - copied := *col - copied.Value = copied.Default - defaultValue, _, err := a.columnToAvroData(&copied, input.colInfos[i].Ft) + copied := colx + copied.ColumnData = &model.ColumnData{ColumnID: colx.ColumnID, Value: colx.GetDefaultValue()} + defaultValue, _, err := a.columnToAvroData(copied) if err != nil { log.Error("fail to get default value for avro schema") return nil, errors.Trace(err) @@ -550,14 +552,14 @@ func (a *BatchEncoder) columns2AvroSchema( // goavro doesn't support set default value for logical type // https://github.com/linkedin/goavro/issues/202 if _, ok := avroType.(avroLogicalTypeSchema); ok { - if col.Flag.IsNullable() { + if colx.GetFlag().IsNullable() { field["type"] = []interface{}{"null", avroType} field["default"] = nil } else { field["type"] = avroType } } else { - if col.Flag.IsNullable() { + if colx.GetFlag().IsNullable() { // https://stackoverflow.com/questions/22938124/avro-field-default-values if defaultValue == nil { field["type"] = []interface{}{"null", avroType} @@ -577,12 +579,9 @@ func (a *BatchEncoder) columns2AvroSchema( return top, nil } -func (a *BatchEncoder) value2AvroSchema( - tableName *model.TableName, - input *avroEncodeInput, -) (string, error) { +func (a *BatchEncoder) value2AvroSchema(tableName model.TableName, input avroEncodeInput) (string, error) { if a.config.EnableRowChecksum { - sort.Sort(input) + sort.Sort(&input) } top, err := a.columns2AvroSchema(tableName, input) @@ -605,10 +604,7 @@ func (a *BatchEncoder) value2AvroSchema( return string(str), nil } -func (a *BatchEncoder) key2AvroSchema( - tableName *model.TableName, - keyColumns *avroEncodeInput, -) (string, error) { +func (a *BatchEncoder) key2AvroSchema(tableName model.TableName, keyColumns avroEncodeInput) (string, error) { top, err := a.columns2AvroSchema(tableName, keyColumns) if err != nil { return "", err @@ -622,24 +618,24 @@ func (a *BatchEncoder) key2AvroSchema( return string(str), nil } -func (a *BatchEncoder) columns2AvroData( - input *avroEncodeInput, -) (map[string]interface{}, error) { +func (a *BatchEncoder) columns2AvroData(input avroEncodeInput) (map[string]interface{}, error) { ret := make(map[string]interface{}, len(input.columns)) - for i, col := range input.columns { - if col == nil { + for _, col := range input.columns { + colx := model.GetColumnDataX(col, input.TableInfo) + if colx.ColumnData == nil { continue } - data, str, err := a.columnToAvroData(col, input.colInfos[i].Ft) + + data, str, err := a.columnToAvroData(colx) if err != nil { return nil, err } // https: //pkg.go.dev/github.com/linkedin/goavro/v2#Union - if col.Flag.IsNullable() { - ret[common.SanitizeName(col.Name)] = goavro.Union(str, data) + if colx.GetFlag().IsNullable() { + ret[common.SanitizeName(colx.GetName())] = goavro.Union(str, data) } else { - ret[common.SanitizeName(col.Name)] = data + ret[common.SanitizeName(colx.GetName())] = data } } @@ -647,12 +643,10 @@ func (a *BatchEncoder) columns2AvroData( return ret, nil } -func (a *BatchEncoder) columnToAvroSchema( - col *model.Column, - ft *types.FieldType, -) (interface{}, error) { +func (a *BatchEncoder) columnToAvroSchema(col model.ColumnDataX) (interface{}, error) { tt := getTiDBTypeFromColumn(col) - switch col.Type { + + switch col.GetType() { case mysql.TypeTiny, mysql.TypeShort, mysql.TypeInt24: // BOOL/TINYINT/SMALLINT/MEDIUMINT return avroSchema{ @@ -660,7 +654,7 @@ func (a *BatchEncoder) columnToAvroSchema( Parameters: map[string]string{tidbType: tt}, }, nil case mysql.TypeLong: // INT - if col.Flag.IsUnsigned() { + if col.GetFlag().IsUnsigned() { return avroSchema{ Type: "long", Parameters: map[string]string{tidbType: tt}, @@ -672,7 +666,7 @@ func (a *BatchEncoder) columnToAvroSchema( }, nil case mysql.TypeLonglong: // BIGINT t := "long" - if col.Flag.IsUnsigned() && + if col.GetFlag().IsUnsigned() && a.config.AvroBigintUnsignedHandlingMode == common.BigintUnsignedHandlingModeString { t = "string" } @@ -691,9 +685,9 @@ func (a *BatchEncoder) columnToAvroSchema( Parameters: map[string]string{tidbType: tt}, }, nil case mysql.TypeBit: - displayFlen := ft.GetFlen() + displayFlen := col.GetColumnInfo().FieldType.GetFlen() if displayFlen == -1 { - displayFlen, _ = mysql.GetDefaultFieldLengthAndDecimal(col.Type) + displayFlen, _ = mysql.GetDefaultFieldLengthAndDecimal(col.GetType()) } return avroSchema{ Type: "bytes", @@ -704,6 +698,7 @@ func (a *BatchEncoder) columnToAvroSchema( }, nil case mysql.TypeNewDecimal: if a.config.AvroDecimalHandlingMode == common.DecimalHandlingModePrecise { + ft := col.GetColumnInfo().FieldType defaultFlen, defaultDecimal := mysql.GetDefaultFieldLengthAndDecimal(ft.GetType()) displayFlen, displayDecimal := ft.GetFlen(), ft.GetDecimal() // length not specified, set it to system type default @@ -738,7 +733,7 @@ func (a *BatchEncoder) columnToAvroSchema( mysql.TypeLongBlob, mysql.TypeBlob: t := "string" - if col.Flag.IsBinary() { + if col.GetFlag().IsBinary() { t = "bytes" } return avroSchema{ @@ -746,8 +741,9 @@ func (a *BatchEncoder) columnToAvroSchema( Parameters: map[string]string{tidbType: tt}, }, nil case mysql.TypeEnum, mysql.TypeSet: - es := make([]string, 0, len(ft.GetElems())) - for _, e := range ft.GetElems() { + elems := col.GetColumnInfo().FieldType.GetElems() + es := make([]string, 0, len(elems)) + for _, e := range elems { e = common.EscapeEnumAndSetOptions(e) es = append(es, e) } @@ -779,20 +775,17 @@ func (a *BatchEncoder) columnToAvroSchema( Parameters: map[string]string{tidbType: tt}, }, nil default: - log.Error("unknown mysql type", zap.Any("mysqlType", col.Type)) + log.Error("unknown mysql type", zap.Any("mysqlType", col.GetType())) return nil, cerror.ErrAvroEncodeFailed.GenWithStack("unknown mysql type") } } -func (a *BatchEncoder) columnToAvroData( - col *model.Column, - ft *types.FieldType, -) (interface{}, string, error) { +func (a *BatchEncoder) columnToAvroData(col model.ColumnDataX) (interface{}, string, error) { if col.Value == nil { return nil, "null", nil } - switch col.Type { + switch col.GetType() { case mysql.TypeTiny, mysql.TypeShort, mysql.TypeInt24: if v, ok := col.Value.(string); ok { n, err := strconv.ParseInt(v, 10, 32) @@ -801,7 +794,7 @@ func (a *BatchEncoder) columnToAvroData( } return int32(n), "int", nil } - if col.Flag.IsUnsigned() { + if col.GetFlag().IsUnsigned() { return int32(col.Value.(uint64)), "int", nil } return int32(col.Value.(int64)), "int", nil @@ -811,18 +804,18 @@ func (a *BatchEncoder) columnToAvroData( if err != nil { return nil, "", cerror.WrapError(cerror.ErrAvroEncodeFailed, err) } - if col.Flag.IsUnsigned() { + if col.GetFlag().IsUnsigned() { return n, "long", nil } return int32(n), "int", nil } - if col.Flag.IsUnsigned() { + if col.GetFlag().IsUnsigned() { return int64(col.Value.(uint64)), "long", nil } return int32(col.Value.(int64)), "int", nil case mysql.TypeLonglong: if v, ok := col.Value.(string); ok { - if col.Flag.IsUnsigned() { + if col.GetFlag().IsUnsigned() { if a.config.AvroBigintUnsignedHandlingMode == common.BigintUnsignedHandlingModeString { return v, "string", nil } @@ -838,7 +831,7 @@ func (a *BatchEncoder) columnToAvroData( } return n, "long", nil } - if col.Flag.IsUnsigned() { + if col.GetFlag().IsUnsigned() { if a.config.AvroBigintUnsignedHandlingMode == common.BigintUnsignedHandlingModeLong { return int64(col.Value.(uint64)), "long", nil } @@ -888,7 +881,7 @@ func (a *BatchEncoder) columnToAvroData( mysql.TypeBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob: - if col.Flag.IsBinary() { + if col.GetFlag().IsBinary() { if v, ok := col.Value.(string); ok { return []byte(v), "bytes", nil } @@ -902,7 +895,7 @@ func (a *BatchEncoder) columnToAvroData( if v, ok := col.Value.(string); ok { return v, "string", nil } - elements := ft.GetElems() + elements := col.GetColumnInfo().FieldType.GetElems() number := col.Value.(uint64) enumVar, err := types.ParseEnumValue(elements, number) if err != nil { @@ -914,7 +907,7 @@ func (a *BatchEncoder) columnToAvroData( if v, ok := col.Value.(string); ok { return v, "string", nil } - elements := ft.GetElems() + elements := col.GetColumnInfo().FieldType.GetElems() number := col.Value.(uint64) setVar, err := types.ParseSetValue(elements, number) if err != nil { @@ -943,7 +936,7 @@ func (a *BatchEncoder) columnToAvroData( } return nil, "", cerror.ErrAvroEncodeFailed default: - log.Error("unknown mysql type", zap.Any("value", col.Value), zap.Any("mysqlType", col.Type)) + log.Error("unknown mysql type", zap.Any("value", col.Value), zap.Any("mysqlType", col.GetType())) return nil, "", cerror.ErrAvroEncodeFailed.GenWithStack("unknown mysql type") } }