diff --git a/cdc/sink/codec/canal.go b/cdc/sink/codec/canal.go index 20fdfa363ae..7a2e4142b86 100644 --- a/cdc/sink/codec/canal.go +++ b/cdc/sink/codec/canal.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "math" + "reflect" "strconv" "strings" @@ -133,38 +134,50 @@ func (b *canalEntryBuilder) buildHeader(commitTs uint64, schema string, table st return h } -func getJavaSQLType(c *model.Column, mysqlType string) (result JavaSQLType) { +func getJavaSQLType(c *model.Column, mysqlType string) (result JavaSQLType, err error) { javaType := mySQLType2JavaType(c.Type, c.Flag.IsBinary()) switch javaType { case JavaSQLTypeBINARY, JavaSQLTypeVARBINARY, JavaSQLTypeLONGVARBINARY: if strings.Contains(mysqlType, "text") { - return JavaSQLTypeCLOB + return JavaSQLTypeCLOB, nil } - return JavaSQLTypeBLOB + return JavaSQLTypeBLOB, nil } // flag `isUnsigned` only for `numerical` and `bit`, `year` data type. if !c.Flag.IsUnsigned() { - return javaType + return javaType, nil } // for year, to `int64`, others to `uint64`. // no need to promote type for `year` and `bit` if c.Type == mysql.TypeYear || c.Type == mysql.TypeBit { - return javaType + return javaType, nil } if c.Type == mysql.TypeFloat || c.Type == mysql.TypeDouble || c.Type == mysql.TypeNewDecimal { - return javaType + return javaType, nil } - // for **unsigned** integral types, should have type in `uint64`. see reference: - // https://github.com/pingcap/ticdc/blob/f0a38a7aaf9f3b11a4d807da275b567642733f58/cdc/entry/mounter.go#L493 + // for **unsigned** integral types, type would be `uint64` or `string`. see reference: + // https://github.com/pingcap/tiflow/blob/1e3dd155049417e3fd7bf9b0a0c7b08723b33791/cdc/entry/mounter.go#L501 // https://github.com/pingcap/tidb/blob/6495a5a116a016a3e077d181b8c8ad81f76ac31b/types/datum.go#L423-L455 - number, ok := c.Value.(uint64) - if !ok { - log.Panic("unsigned value not in type uint64", zap.Any("column", c)) + if c.Value == nil { + return javaType, nil + } + var number uint64 + switch v := c.Value.(type) { + case uint64: + number = v + case string: + a, err := strconv.ParseUint(v, 10, 64) + if err != nil { + return javaType, err + } + number = a + default: + return javaType, errors.Errorf("unexpected type for unsigned value: %+v, column: %+v", reflect.TypeOf(v), c) } // Some special cases handled in canal @@ -193,7 +206,7 @@ func getJavaSQLType(c *model.Column, mysqlType string) (result JavaSQLType) { } } - return javaType + return javaType, nil } // In the official canal-json implementation, value were extracted from binlog buffer. @@ -277,7 +290,10 @@ func getMySQLType(c *model.Column) string { // see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L756-L872 func (b *canalEntryBuilder) buildColumn(c *model.Column, colName string, updated bool) (*canal.Column, error) { mysqlType := getMySQLType(c) - javaType := getJavaSQLType(c, mysqlType) + javaType, err := getJavaSQLType(c, mysqlType) + if err != nil { + return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) + } value, err := b.formatValue(c.Value, javaType) if err != nil { diff --git a/cdc/sink/codec/canal_flat_test.go b/cdc/sink/codec/canal_flat_test.go index fb968f2b2ac..203590c33d9 100644 --- a/cdc/sink/codec/canal_flat_test.go +++ b/cdc/sink/codec/canal_flat_test.go @@ -113,6 +113,12 @@ func (s *canalFlatSuite) TestNewCanalFlatMessage4DML(c *check.C) { continue } + // for `Column.Value` is nil, which mean's it is nullable, set the value to `""` + if obtainedValue == nil { + c.Assert(item.expectedValue, check.Equals, "") + continue + } + if bytes, ok := item.column.Value.([]byte); ok { expectedValue, err := charmap.ISO8859_1.NewDecoder().Bytes(bytes) c.Assert(err, check.IsNil) @@ -194,8 +200,12 @@ func (s *canalFlatSuite) TestNewCanalFlatEventBatchDecoder4RowMessage(c *check.C for _, col := range consumed.Columns { expected, ok := expectedDecodedValues[col.Name] c.Assert(ok, check.IsTrue) + if col.Value == nil { + c.Assert(expected, check.Equals, "") + } else { + c.Assert(col.Value, check.Equals, expected) + } - c.Assert(col.Value, check.Equals, expected) for _, item := range testCaseInsert.Columns { if item.Name == col.Name { c.Assert(col.Type, check.Equals, item.Type) diff --git a/cdc/sink/codec/canal_test.go b/cdc/sink/codec/canal_test.go index c25b3cb9800..8ff59a57599 100644 --- a/cdc/sink/codec/canal_test.go +++ b/cdc/sink/codec/canal_test.go @@ -395,22 +395,32 @@ var testColumnsTable = []*testColumnTuple{ {&model.Column{Name: "tinyint", Type: mysql.TypeTiny, Value: int64(127)}, "tinyint", JavaSQLTypeTINYINT, "127"}, // TinyInt {&model.Column{Name: "tinyint unsigned", Type: mysql.TypeTiny, Value: uint64(127), Flag: model.UnsignedFlag}, "tinyint unsigned", JavaSQLTypeTINYINT, "127"}, {&model.Column{Name: "tinyint unsigned 2", Type: mysql.TypeTiny, Value: uint64(128), Flag: model.UnsignedFlag}, "tinyint unsigned", JavaSQLTypeSMALLINT, "128"}, + {&model.Column{Name: "tinyint unsigned 3", Type: mysql.TypeTiny, Value: "0", Flag: model.UnsignedFlag}, "tinyint unsigned", JavaSQLTypeTINYINT, "0"}, + {&model.Column{Name: "tinyint unsigned 4", Type: mysql.TypeTiny, Value: nil, Flag: model.BinaryFlag | model.UnsignedFlag | model.NullableFlag}, "tinyint unsigned", JavaSQLTypeTINYINT, ""}, {&model.Column{Name: "smallint", Type: mysql.TypeShort, Value: int64(32767)}, "smallint", JavaSQLTypeSMALLINT, "32767"}, {&model.Column{Name: "smallint unsigned", Type: mysql.TypeShort, Value: uint64(32767), Flag: model.UnsignedFlag}, "smallint unsigned", JavaSQLTypeSMALLINT, "32767"}, {&model.Column{Name: "smallint unsigned 2", Type: mysql.TypeShort, Value: uint64(32768), Flag: model.UnsignedFlag}, "smallint unsigned", JavaSQLTypeINTEGER, "32768"}, + {&model.Column{Name: "smallint unsigned 3", Type: mysql.TypeShort, Value: "0", Flag: model.UnsignedFlag}, "smallint unsigned", JavaSQLTypeSMALLINT, "0"}, + {&model.Column{Name: "smallint unsigned 4", Type: mysql.TypeShort, Value: nil, Flag: model.BinaryFlag | model.UnsignedFlag | model.NullableFlag}, "smallint unsigned", JavaSQLTypeSMALLINT, ""}, {&model.Column{Name: "mediumint", Type: mysql.TypeInt24, Value: int64(8388607)}, "mediumint", JavaSQLTypeINTEGER, "8388607"}, {&model.Column{Name: "mediumint unsigned", Type: mysql.TypeInt24, Value: uint64(8388607), Flag: model.UnsignedFlag}, "mediumint unsigned", JavaSQLTypeINTEGER, "8388607"}, {&model.Column{Name: "mediumint unsigned 2", Type: mysql.TypeInt24, Value: uint64(8388608), Flag: model.UnsignedFlag}, "mediumint unsigned", JavaSQLTypeINTEGER, "8388608"}, + {&model.Column{Name: "mediumint unsigned 3", Type: mysql.TypeInt24, Value: "0", Flag: model.UnsignedFlag}, "mediumint unsigned", JavaSQLTypeINTEGER, "0"}, + {&model.Column{Name: "mediumint unsigned 4", Type: mysql.TypeInt24, Value: nil, Flag: model.BinaryFlag | model.UnsignedFlag | model.NullableFlag}, "mediumint unsigned", JavaSQLTypeINTEGER, ""}, {&model.Column{Name: "int", Type: mysql.TypeLong, Value: int64(2147483647)}, "int", JavaSQLTypeINTEGER, "2147483647"}, {&model.Column{Name: "int unsigned", Type: mysql.TypeLong, Value: uint64(2147483647), Flag: model.UnsignedFlag}, "int unsigned", JavaSQLTypeINTEGER, "2147483647"}, {&model.Column{Name: "int unsigned 2", Type: mysql.TypeLong, Value: uint64(2147483648), Flag: model.UnsignedFlag}, "int unsigned", JavaSQLTypeBIGINT, "2147483648"}, + {&model.Column{Name: "int unsigned 3", Type: mysql.TypeLong, Value: "0", Flag: model.UnsignedFlag}, "int unsigned", JavaSQLTypeINTEGER, "0"}, + {&model.Column{Name: "int unsigned 4", Type: mysql.TypeLong, Value: nil, Flag: model.BinaryFlag | model.UnsignedFlag | model.NullableFlag}, "int unsigned", JavaSQLTypeINTEGER, ""}, {&model.Column{Name: "bigint", Type: mysql.TypeLonglong, Value: int64(9223372036854775807)}, "bigint", JavaSQLTypeBIGINT, "9223372036854775807"}, {&model.Column{Name: "bigint unsigned", Type: mysql.TypeLonglong, Value: uint64(9223372036854775807), Flag: model.UnsignedFlag}, "bigint unsigned", JavaSQLTypeBIGINT, "9223372036854775807"}, {&model.Column{Name: "bigint unsigned 2", Type: mysql.TypeLonglong, Value: uint64(9223372036854775808), Flag: model.UnsignedFlag}, "bigint unsigned", JavaSQLTypeDECIMAL, "9223372036854775808"}, + {&model.Column{Name: "bigint unsigned 3", Type: mysql.TypeLonglong, Value: "0", Flag: model.UnsignedFlag}, "bigint unsigned", JavaSQLTypeBIGINT, "0"}, + {&model.Column{Name: "bigint unsigned 4", Type: mysql.TypeLonglong, Value: nil, Flag: model.BinaryFlag | model.UnsignedFlag | model.NullableFlag}, "bigint unsigned", JavaSQLTypeBIGINT, ""}, {&model.Column{Name: "float", Type: mysql.TypeFloat, Value: 3.14}, "float", JavaSQLTypeREAL, "3.14"}, {&model.Column{Name: "double", Type: mysql.TypeDouble, Value: 2.71}, "double", JavaSQLTypeDOUBLE, "2.71"}, @@ -454,7 +464,8 @@ func (s *canalEntrySuite) TestGetMySQLTypeAndJavaSQLType(c *check.C) { obtainedMySQLType := getMySQLType(item.column) c.Assert(obtainedMySQLType, check.Equals, item.expectedMySQLType) - obtainedJavaSQLType := getJavaSQLType(item.column, obtainedMySQLType) + obtainedJavaSQLType, err := getJavaSQLType(item.column, obtainedMySQLType) + c.Assert(err, check.IsNil) c.Assert(obtainedJavaSQLType, check.Equals, item.expectedJavaSQLType) } }