From e644636d5fa35bf7f55b1dc5a0faf4a27282c8fd Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Wed, 23 Feb 2022 00:19:43 +0800 Subject: [PATCH] cdc/codec: fix string for unsigned value in canal-json. (#4629) close pingcap/tiflow#4635 --- cdc/sink/codec/canal.go | 39 ++++++++++++++++++++++++------------ cdc/sink/codec/canal_test.go | 4 +++- 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/cdc/sink/codec/canal.go b/cdc/sink/codec/canal.go index 20fdfa363ae..9d2bbbd4725 100644 --- a/cdc/sink/codec/canal.go +++ b/cdc/sink/codec/canal.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "math" + "reflect" "strconv" "strings" @@ -133,38 +134,47 @@ func (b *canalEntryBuilder) buildHeader(commitTs uint64, schema string, table st return h } -func getJavaSQLType(c *model.Column, mysqlType string) (result JavaSQLType) { +func getJavaSQLType(c *model.Column, mysqlType string) (result JavaSQLType, err error) { javaType := mySQLType2JavaType(c.Type, c.Flag.IsBinary()) switch javaType { case JavaSQLTypeBINARY, JavaSQLTypeVARBINARY, JavaSQLTypeLONGVARBINARY: if strings.Contains(mysqlType, "text") { - return JavaSQLTypeCLOB + return JavaSQLTypeCLOB, nil } - return JavaSQLTypeBLOB + return JavaSQLTypeBLOB, nil } // flag `isUnsigned` only for `numerical` and `bit`, `year` data type. if !c.Flag.IsUnsigned() { - return javaType + return javaType, nil } // for year, to `int64`, others to `uint64`. // no need to promote type for `year` and `bit` if c.Type == mysql.TypeYear || c.Type == mysql.TypeBit { - return javaType + return javaType, nil } if c.Type == mysql.TypeFloat || c.Type == mysql.TypeDouble || c.Type == mysql.TypeNewDecimal { - return javaType + return javaType, nil } - // for **unsigned** integral types, should have type in `uint64`. see reference: - // https://github.com/pingcap/ticdc/blob/f0a38a7aaf9f3b11a4d807da275b567642733f58/cdc/entry/mounter.go#L493 + // for **unsigned** integral types, type would be `uint64` or `string`. see reference: + // https://github.com/pingcap/tiflow/blob/1e3dd155049417e3fd7bf9b0a0c7b08723b33791/cdc/entry/mounter.go#L501 // https://github.com/pingcap/tidb/blob/6495a5a116a016a3e077d181b8c8ad81f76ac31b/types/datum.go#L423-L455 - number, ok := c.Value.(uint64) - if !ok { - log.Panic("unsigned value not in type uint64", zap.Any("column", c)) + var number uint64 + switch v := c.Value.(type) { + case uint64: + number = v + case string: + a, err := strconv.ParseUint(v, 10, 64) + if err != nil { + return javaType, err + } + number = a + default: + return javaType, errors.Errorf("unexpected type for unsigned value: %+v, column: %+v", reflect.TypeOf(v), c) } // Some special cases handled in canal @@ -193,7 +203,7 @@ func getJavaSQLType(c *model.Column, mysqlType string) (result JavaSQLType) { } } - return javaType + return javaType, nil } // In the official canal-json implementation, value were extracted from binlog buffer. @@ -277,7 +287,10 @@ func getMySQLType(c *model.Column) string { // see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L756-L872 func (b *canalEntryBuilder) buildColumn(c *model.Column, colName string, updated bool) (*canal.Column, error) { mysqlType := getMySQLType(c) - javaType := getJavaSQLType(c, mysqlType) + javaType, err := getJavaSQLType(c, mysqlType) + if err != nil { + return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) + } value, err := b.formatValue(c.Value, javaType) if err != nil { diff --git a/cdc/sink/codec/canal_test.go b/cdc/sink/codec/canal_test.go index c25b3cb9800..59c21edc82e 100644 --- a/cdc/sink/codec/canal_test.go +++ b/cdc/sink/codec/canal_test.go @@ -411,6 +411,7 @@ var testColumnsTable = []*testColumnTuple{ {&model.Column{Name: "bigint", Type: mysql.TypeLonglong, Value: int64(9223372036854775807)}, "bigint", JavaSQLTypeBIGINT, "9223372036854775807"}, {&model.Column{Name: "bigint unsigned", Type: mysql.TypeLonglong, Value: uint64(9223372036854775807), Flag: model.UnsignedFlag}, "bigint unsigned", JavaSQLTypeBIGINT, "9223372036854775807"}, {&model.Column{Name: "bigint unsigned 2", Type: mysql.TypeLonglong, Value: uint64(9223372036854775808), Flag: model.UnsignedFlag}, "bigint unsigned", JavaSQLTypeDECIMAL, "9223372036854775808"}, + {&model.Column{Name: "bigint unsigned 3", Type: mysql.TypeLonglong, Value: "0", Flag: model.UnsignedFlag}, "bigint unsigned", JavaSQLTypeBIGINT, "0"}, {&model.Column{Name: "float", Type: mysql.TypeFloat, Value: 3.14}, "float", JavaSQLTypeREAL, "3.14"}, {&model.Column{Name: "double", Type: mysql.TypeDouble, Value: 2.71}, "double", JavaSQLTypeDOUBLE, "2.71"}, @@ -454,7 +455,8 @@ func (s *canalEntrySuite) TestGetMySQLTypeAndJavaSQLType(c *check.C) { obtainedMySQLType := getMySQLType(item.column) c.Assert(obtainedMySQLType, check.Equals, item.expectedMySQLType) - obtainedJavaSQLType := getJavaSQLType(item.column, obtainedMySQLType) + obtainedJavaSQLType, err := getJavaSQLType(item.column, obtainedMySQLType) + c.Assert(err, check.IsNil) c.Assert(obtainedJavaSQLType, check.Equals, item.expectedJavaSQLType) } }