From 105437cbb73734795bfb0d6379e6dc18f1b44f12 Mon Sep 17 00:00:00 2001 From: maxshuang Date: Mon, 13 Dec 2021 20:16:35 +0800 Subject: [PATCH] This is an automated cherry-pick of #3846 Signed-off-by: ti-chi-bot --- cdc/entry/mounter.go | 83 +++++++++ cdc/entry/mounter_test.go | 350 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 433 insertions(+) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 9b27761fac8..f7dded96a8a 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -311,7 +311,14 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill colName := colInfo.Name.O colDatums, exist := datums[colInfo.ID] var colValue interface{} + if !exist && !fillWithDefaultValue { + continue + } + var err error + var warn string + var size int if exist { +<<<<<<< HEAD var err error var warn string colValue, warn, err = formatColVal(colDatums, colInfo.Tp) @@ -325,7 +332,19 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill colValue = getDefaultOrZeroValue(colInfo) } else { continue +======= + colValue, size, warn, err = formatColVal(colDatums, colInfo.Tp) + } else if fillWithDefaultValue { + colValue, size, warn, err = getDefaultOrZeroValue(colInfo) +>>>>>>> 566581819 (mounter(ticdc): fix mounter add default value type unsupported (#3846)) + } + if err != nil { + return nil, errors.Trace(err) } + if warn != "" { + log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String())) + } + colSize += size cols[tableInfo.RowColumnsOffset[colInfo.ID]] = &model.Column{ Name: colName, Type: colInfo.Tp, @@ -408,7 +427,35 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr var emptyBytes = make([]byte, 0) +<<<<<<< HEAD func formatColVal(datum types.Datum, tp byte) (value interface{}, warn string, err error) { +======= +const ( + sizeOfEmptyColumn = int(unsafe.Sizeof(model.Column{})) + sizeOfEmptyBytes = int(unsafe.Sizeof(emptyBytes)) + sizeOfEmptyString = int(unsafe.Sizeof("")) +) + +func sizeOfDatum(d types.Datum) int { + array := [...]types.Datum{d} + return int(types.EstimatedMemUsage(array[:], 1)) +} + +func sizeOfString(s string) int { + // string data size + string struct size. + return len(s) + sizeOfEmptyString +} + +func sizeOfBytes(b []byte) int { + // bytes data size + bytes struct size. + return len(b) + sizeOfEmptyBytes +} + +// formatColVal return interface{} need to meet the same requirement as getDefaultOrZeroValue +func formatColVal(datum types.Datum, tp byte) ( + value interface{}, size int, warn string, err error, +) { +>>>>>>> 566581819 (mounter(ticdc): fix mounter add default value type unsupported (#3846)) if datum.IsNull() { return nil, "", nil } @@ -448,6 +495,7 @@ func formatColVal(datum types.Datum, tp byte) (value interface{}, warn string, e } return v, warn, nil default: +<<<<<<< HEAD return datum.GetValue(), "", nil } } @@ -477,6 +525,41 @@ func getDefaultOrZeroValue(col *timodel.ColumnInfo) interface{} { d := table.GetZeroValue(col) return d.GetValue() +======= + // FIXME: GetValue() may return some types that go sql not support, which will cause sink DML fail + // Make specified convert upper if you need + // Go sql support type ref to: https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236 + return datum.GetValue(), sizeOfDatum(datum), "", nil + } +} + +// getDefaultOrZeroValue return interface{} need to meet to require type in +// https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236 +// Supported type is: nil, basic type(Int, Int8,..., Float32, Float64, String), Slice(uint8), other types not support +func getDefaultOrZeroValue(col *timodel.ColumnInfo) (interface{}, int, string, error) { + var d types.Datum + if !mysql.HasNotNullFlag(col.Flag) { + // see https://github.com/pingcap/tidb/issues/9304 + // must use null if TiDB not write the column value when default value is null + // and the value is null + d = types.NewDatum(nil) + } else if col.GetDefaultValue() != nil { + d = types.NewDatum(col.GetDefaultValue()) + } else { + switch col.Tp { + case mysql.TypeEnum: + // For enum type, if no default value and not null is set, + // the default value is the first element of the enum list + d = types.NewDatum(col.FieldType.Elems[0]) + case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar: + return emptyBytes, sizeOfEmptyBytes, "", nil + default: + d = table.GetZeroValue(col) + } + } + + return formatColVal(d, col.Tp) +>>>>>>> 566581819 (mounter(ticdc): fix mounter add default value type unsupported (#3846)) } // DecodeTableID decodes the raw key to a table ID diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index 523bf8d1015..25cb0be6276 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -26,9 +26,19 @@ import ( "github.com/pingcap/ticdc/pkg/util/testleak" ticonfig "github.com/pingcap/tidb/config" tidbkv "github.com/pingcap/tidb/kv" +<<<<<<< HEAD "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/util/testkit" +======= + timodel "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/types" + "github.com/stretchr/testify/require" +>>>>>>> 566581819 (mounter(ticdc): fix mounter add default value type unsupported (#3846)) "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" ) @@ -56,9 +66,17 @@ func (s *mountTxnsSuite) TestMounterDisableOldValue(c *check.C) { createTableDDL: "create table many_index(id int not null unique key, c1 int unique key, c2 int, INDEX (c2))", values: [][]interface{}{{1, 1, 1}, {2, 2, 2}, {3, 3, 3}, {4, 4, 4}, {5, 5, 5}}, }, { +<<<<<<< HEAD tableName: "default_value", createTableDDL: "create table default_value(id int primary key, c1 int, c2 int not null default 5, c3 varchar(20), c4 varchar(20) not null default '666')", values: [][]interface{}{{1}, {2}, {3}, {4}, {5}}, +======= + tableName: "default_value", + createTableDDL: "create table default_value(id int primary key, c1 int, c2 int not null default 5, c3 varchar(20), c4 varchar(20) not null default '666')", + values: [][]interface{}{{1}, {2}, {3}, {4}, {5}}, + putApproximateBytes: [][]int{{676, 676, 676, 676, 676}}, + delApproximateBytes: [][]int{{353, 353, 353, 353, 353}}, +>>>>>>> 566581819 (mounter(ticdc): fix mounter add default value type unsupported (#3846)) }, { tableName: "partition_table", createTableDDL: `CREATE TABLE partition_table ( @@ -103,6 +121,11 @@ func (s *mountTxnsSuite) TestMounterDisableOldValue(c *check.C) { {4, 127, 32767, 8388607, 2147483647, 9223372036854775807}, {5, -128, -32768, -8388608, -2147483648, -9223372036854775808}, }, +<<<<<<< HEAD +======= + putApproximateBytes: [][]int{{986, 626, 986, 986, 986}}, + delApproximateBytes: [][]int{{346, 346, 346, 346, 346}}, +>>>>>>> 566581819 (mounter(ticdc): fix mounter add default value type unsupported (#3846)) }, { tableName: "tp_text", createTableDDL: `create table tp_text @@ -143,6 +166,11 @@ func (s *mountTxnsSuite) TestMounterDisableOldValue(c *check.C) { {5, "你好", "我好", "大家好", "道路", "千万条", "安全", "第一条", "行车", "不规范", "亲人", "两行泪", "!"}, {6, "😀", "😃", "😄", "😁", "😆", "😅", "😂", "🤣", "☺️", "😊", "😇", "🙂"}, }, +<<<<<<< HEAD +======= + putApproximateBytes: [][]int{{1019, 1459, 1411, 1323, 1398, 1369}}, + delApproximateBytes: [][]int{{347, 347, 347, 347, 347, 347}}, +>>>>>>> 566581819 (mounter(ticdc): fix mounter add default value type unsupported (#3846)) }, { tableName: "tp_time", createTableDDL: `create table tp_time @@ -160,6 +188,11 @@ func (s *mountTxnsSuite) TestMounterDisableOldValue(c *check.C) { {1}, {2, "2020-02-20", "2020-02-20 02:20:20", "2020-02-20 02:20:20", "02:20:20", "2020"}, }, +<<<<<<< HEAD +======= + putApproximateBytes: [][]int{{627, 819}}, + delApproximateBytes: [][]int{{347, 347}}, +>>>>>>> 566581819 (mounter(ticdc): fix mounter add default value type unsupported (#3846)) }, { tableName: "tp_real", createTableDDL: `create table tp_real @@ -262,7 +295,12 @@ func testMounterDisableOldValue(c *check.C, tc struct { mounter.tz = time.Local ctx := context.Background() +<<<<<<< HEAD mountAndCheckRowInTable := func(tableID int64, f func(key []byte, value []byte) *model.RawKVEntry) int { +======= + // [TODO] check size and readd rowBytes + mountAndCheckRowInTable := func(tableID int64, _ []int, f func(key []byte, value []byte) *model.RawKVEntry) int { +>>>>>>> 566581819 (mounter(ticdc): fix mounter add default value type unsupported (#3846)) var rows int walkTableSpanInStore(c, store, tableID, func(key []byte, value []byte) { rawKV := f(key, value) @@ -272,8 +310,16 @@ func testMounterDisableOldValue(c *check.C, tc struct { return } rows++ +<<<<<<< HEAD c.Assert(row.Table.Table, check.Equals, tc.tableName) c.Assert(row.Table.Schema, check.Equals, "test") +======= + require.Equal(t, row.Table.Table, tc.tableName) + require.Equal(t, row.Table.Schema, "test") + // [TODO] check size and reopen this check + // require.Equal(t, rowBytes[rows-1], row.ApproximateBytes(), row) + t.Log("ApproximateBytes", tc.tableName, rows-1, row.ApproximateBytes()) +>>>>>>> 566581819 (mounter(ticdc): fix mounter add default value type unsupported (#3846)) // TODO: test column flag, column type and index columns if len(row.Columns) != 0 { checkSQL, params := prepareCheckSQL(c, tc.tableName, row.Columns) @@ -395,3 +441,307 @@ func walkTableSpanInStore(c *check.C, store tidbkv.Storage, tableID int64, f fun c.Assert(err, check.IsNil) } } + +// Check following MySQL type, ref to: +// https://github.com/pingcap/tidb/blob/master/parser/mysql/type.go +type columnInfoAndResult struct { + ColInfo timodel.ColumnInfo + Res interface{} +} + +func TestFormatColVal(t *testing.T) {} + +func TestGetDefaultZeroValue(t *testing.T) { + colAndRess := []columnInfoAndResult{ + // mysql flag null + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Flag: uint(0), + }, + }, + Res: nil, + }, + // mysql.TypeTiny + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeTiny, + Flag: mysql.NotNullFlag, + }, + }, + Res: int64(0), + }, + // mysql.TypeShort + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeShort, + Flag: mysql.NotNullFlag, + }, + }, + Res: int64(0), + }, + // mysql.TypeLong + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeLong, + Flag: mysql.NotNullFlag, + }, + }, + Res: int64(0), + }, + // mysql.TypeLonglong + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeLonglong, + Flag: mysql.NotNullFlag, + }, + }, + Res: int64(0), + }, + // mysql.TypeInt24 + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeInt24, + Flag: mysql.NotNullFlag, + }, + }, + Res: int64(0), + }, + // mysql.TypeFloat + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeFloat, + Flag: mysql.NotNullFlag, + }, + }, + Res: float64(0), + }, + // mysql.TypeDouble + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeDouble, + Flag: mysql.NotNullFlag, + }, + }, + Res: float64(0), + }, + // mysql.TypeNewDecimal + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeNewDecimal, + Flag: mysql.NotNullFlag, + Flen: 5, + Decimal: 2, + }, + }, + Res: "0", // related with Flen and Decimal, [TODO] need check default + }, + // mysql.TypeNull + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeNull, + Flag: mysql.NotNullFlag, + }, + }, + Res: nil, + }, + // mysql.TypeTimestamp + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeTimestamp, + Flag: mysql.NotNullFlag, + }, + }, + Res: "0000-00-00 00:00:00", + }, + // mysql.TypeDate + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeDate, + Flag: mysql.NotNullFlag, + }, + }, + Res: "0000-00-00", + }, + // mysql.TypeDuration + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeDuration, + Flag: mysql.NotNullFlag, + }, + }, + Res: "00:00:00", + }, + // mysql.TypeDatetime + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeDatetime, + Flag: mysql.NotNullFlag, + }, + }, + Res: "0000-00-00 00:00:00", + }, + // mysql.TypeYear + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeYear, + Flag: mysql.NotNullFlag, + }, + }, + Res: int64(0), + }, + // mysql.TypeNewDate + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeNewDate, + Flag: mysql.NotNullFlag, + }, + }, + Res: nil, // [TODO] seems not support by TiDB, need check + }, + // mysql.TypeVarchar + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeVarchar, + Flag: mysql.NotNullFlag, + }, + }, + Res: []byte{}, + }, + // mysql.TypeTinyBlob + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeTinyBlob, + Flag: mysql.NotNullFlag, + }, + }, + Res: []byte{}, + }, + // mysql.TypeMediumBlob + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeMediumBlob, + Flag: mysql.NotNullFlag, + }, + }, + Res: []byte{}, + }, + // mysql.TypeLongBlob + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeLongBlob, + Flag: mysql.NotNullFlag, + }, + }, + Res: []byte{}, + }, + // mysql.TypeBlob + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeBlob, + Flag: mysql.NotNullFlag, + }, + }, + Res: []byte{}, + }, + // mysql.TypeVarString + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeVarString, + Flag: mysql.NotNullFlag, + }, + }, + Res: []byte{}, + }, + // mysql.TypeString + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeString, + Flag: mysql.NotNullFlag, + }, + }, + Res: []byte{}, + }, + // mysql.TypeBit + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Flag: mysql.NotNullFlag, + Tp: mysql.TypeBit, + }, + }, + Res: uint64(0), + }, + // mysql.TypeJSON + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeJSON, + Flag: mysql.NotNullFlag, + }, + }, + Res: "null", + }, + // mysql.TypeEnum + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeEnum, + Flag: mysql.NotNullFlag, + Elems: []string{"e0", "e1"}, + }, + }, + Res: uint64(0), + }, + // mysql.TypeSet + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeSet, + Flag: mysql.NotNullFlag, + }, + }, + Res: uint64(0), + }, + // mysql.TypeGeometry + { + ColInfo: timodel.ColumnInfo{ + FieldType: types.FieldType{ + Tp: mysql.TypeGeometry, + Flag: mysql.NotNullFlag, + }, + }, + Res: nil, + }, + } + testGetDefaultZeroValue(t, colAndRess) +} + +func testGetDefaultZeroValue(t *testing.T, colAndRess []columnInfoAndResult) { + for _, colAndRes := range colAndRess { + val, _, _, _ := getDefaultOrZeroValue(&colAndRes.ColInfo) + require.Equal(t, colAndRes.Res, val) + } +}