Skip to content

Commit

Permalink
mounter(ticdc): default value convert to the correct data type (#10804)…
Browse files Browse the repository at this point in the history
… (#10829)

close #10803
  • Loading branch information
ti-chi-bot authored Apr 15, 2024
1 parent dd860a8 commit d331dd5
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 168 deletions.
30 changes: 19 additions & 11 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) {
}

func datum2Column(
tableInfo *model.TableInfo, datums map[int64]types.Datum,
tableInfo *model.TableInfo, datums map[int64]types.Datum, tz *time.Location,
) ([]*model.Column, []types.Datum, []*timodel.ColumnInfo, []rowcodec.ColInfo, error) {
cols := make([]*model.Column, len(tableInfo.RowColumnsOffset))
rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset))
Expand Down Expand Up @@ -374,7 +374,7 @@ func datum2Column(
if exist {
colValue, size, warn, err = formatColVal(colDatums, colInfo)
} else {
colDatums, colValue, size, warn, err = getDefaultOrZeroValue(colInfo)
colDatums, colValue, size, warn, err = getDefaultOrZeroValue(colInfo, tz)
}
if err != nil {
return nil, nil, nil, nil, errors.Trace(err)
Expand Down Expand Up @@ -508,7 +508,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
if row.PreRowExist {
// FIXME(leoppro): using pre table info to mounter pre column datum
// the pre column and current column in one event may using different table info
preCols, preRawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow)
preCols, preRawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow, m.tz)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand Down Expand Up @@ -537,7 +537,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
current uint32
)
if row.RowExist {
cols, rawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.Row)
cols, rawCols, columnInfos, extendColumnInfos, err = datum2Column(tableInfo, row.Row, m.tz)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand Down Expand Up @@ -704,20 +704,28 @@ func formatColVal(datum types.Datum, col *timodel.ColumnInfo) (
// https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236
// Supported type is: nil, basic type(Int, Int8,..., Float32, Float64, String), Slice(uint8), other types not support
// TODO: Check default expr support
func getDefaultOrZeroValue(col *timodel.ColumnInfo) (types.Datum, any, int, string, error) {
var d types.Datum
func getDefaultOrZeroValue(
col *timodel.ColumnInfo, tz *time.Location,
) (types.Datum, any, int, string, error) {
var (
d types.Datum
err error
)
// NOTICE: SHOULD use OriginDefaultValue here, more info pls ref to
// https://github.com/pingcap/tiflow/issues/4048
// FIXME: Too many corner cases may hit here, like type truncate, timezone
// (1) If this column is uk(no pk), will cause data inconsistency in Scenarios(2)
// (2) If not fix here, will cause data inconsistency in Scenarios(3) directly
// Ref: https://github.com/pingcap/tidb/blob/d2c352980a43bb593db81fd1db996f47af596d91/table/column.go#L489
if col.GetOriginDefaultValue() != nil {
d = types.NewDatum(col.GetOriginDefaultValue())
return d, d.GetValue(), sizeOfDatum(d), "", nil
}

if !mysql.HasNotNullFlag(col.GetFlag()) {
datum := types.NewDatum(col.GetOriginDefaultValue())
sctx := new(stmtctx.StatementContext)
sctx.TimeZone = tz
d, err = datum.ConvertTo(sctx, &col.FieldType)
if err != nil {
return d, d.GetValue(), sizeOfDatum(d), "", errors.Trace(err)
}
} else if !mysql.HasNotNullFlag(col.GetFlag()) {
// NOTICE: NotNullCheck need do after OriginDefaultValue check, as when TiDB meet "amend + add column default xxx",
// ref: https://github.com/pingcap/ticdc/issues/3929
// must use null if TiDB not write the column value when default value is null
Expand Down
Loading

0 comments on commit d331dd5

Please sign in to comment.