Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unsupported data type issue caused by default value filling logic #4518

Merged
merged 3 commits into from
Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 59 additions & 34 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func UnmarshalDDL(raw *model.RawKVEntry) (*timodel.Job, error) {
if err != nil {
return nil, errors.Trace(err)
}
log.Debug("get new DDL job", zap.String("detail", job.String()))
if !job.IsDone() && !job.IsSynced() {
return nil, nil
}
Expand All @@ -311,20 +312,21 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill
colName := colInfo.Name.O
colDatums, exist := datums[colInfo.ID]
var colValue interface{}
if !exist && !fillWithDefaultValue {
continue
}
var err error
var warn string
if exist {
var err error
var warn string
colValue, warn, err = formatColVal(colDatums, colInfo.Tp)
if err != nil {
return nil, errors.Trace(err)
}
if warn != "" {
log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String()))
}
} else if fillWithDefaultValue {
colValue = getDefaultOrZeroValue(colInfo)
} else {
continue
colValue, warn, err = getDefaultOrZeroValue(colInfo)
}
if err != nil {
return nil, errors.Trace(err)
}
if warn != "" {
log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String()))
}
cols[tableInfo.RowColumnsOffset[colInfo.ID]] = &model.Column{
Name: colName,
Expand All @@ -336,6 +338,8 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill
return cols, nil
}

var emptyBytes = make([]byte, 0)

func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, error) {
var err error
// Decode previous columns.
Expand Down Expand Up @@ -406,8 +410,6 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr
}, nil
}

var emptyBytes = make([]byte, 0)

func formatColVal(datum types.Datum, tp byte) (value interface{}, warn string, err error) {
if datum.IsNull() {
return nil, "", nil
Expand Down Expand Up @@ -448,35 +450,58 @@ func formatColVal(datum types.Datum, tp byte) (value interface{}, warn string, e
}
return v, warn, nil
default:
// NOTICE: GetValue() may return some types that go sql not support, which will cause sink DML fail
// Make specified convert upper if you need
// Go sql support type ref to: https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236
return datum.GetValue(), "", nil
}
}

func getDefaultOrZeroValue(col *timodel.ColumnInfo) interface{} {
// see https://github.com/pingcap/tidb/issues/9304
// must use null if TiDB not write the column value when default value is null
// and the value is null
if !mysql.HasNotNullFlag(col.Flag) {
d := types.NewDatum(nil)
return d.GetValue()
// Scenarios when call this function:
// (1) column define default null at creating + insert without explicit column
// (2) alter table add column default xxx + old existing data
// (3) amend + insert without explicit column + alter table add column default xxx
// (4) online DDL drop column + data insert at state delete-only
//
// getDefaultOrZeroValue return interface{} need to meet to require type in
// https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236
// Supported type is: nil, basic type(Int, Int8,..., Float32, Float64, String), Slice(uint8), other types not support
func getDefaultOrZeroValue(col *timodel.ColumnInfo) (interface{}, string, error) {
var d types.Datum
// NOTICE: SHOULD use OriginDefaultValue here, more info pls ref to
// https://github.com/pingcap/tiflow/issues/4048
// FIXME: Too many corner cases may hit here, like type truncate, timezone
// (1) If this column is uk(no pk), will cause data inconsistency in Scenarios(2)
// (2) If not fix here, will cause data inconsistency in Scenarios(3) directly
// Ref: https://github.com/pingcap/tidb/blob/d2c352980a43bb593db81fd1db996f47af596d91/table/column.go#L489
if col.GetOriginDefaultValue() != nil {
d = types.NewDatum(col.GetOriginDefaultValue())
return d.GetValue(), "", nil
}

if col.GetDefaultValue() != nil {
d := types.NewDatum(col.GetDefaultValue())
return d.GetValue()
}
switch col.Tp {
case mysql.TypeEnum:
// For enum type, if no default value and not null is set,
// the default value is the first element of the enum list
d := types.NewDatum(col.FieldType.Elems[0])
return d.GetValue()
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar:
return emptyBytes
if !mysql.HasNotNullFlag(col.Flag) {
// NOTICE: NotNullCheck need do after OriginDefaultValue check, as when TiDB meet "amend + add column default xxx",
// ref: https://github.com/pingcap/ticdc/issues/3929
// must use null if TiDB not write the column value when default value is null
// and the value is null, see https://github.com/pingcap/tidb/issues/9304
d = types.NewDatum(nil)
} else {
switch col.Tp {
case mysql.TypeEnum:
// For enum type, if no default value and not null is set,
// the default value is the first element of the enum list
d = types.NewDatum(col.FieldType.Elems[0])
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar:
return emptyBytes, "", nil
default:
d = table.GetZeroValue(col)
if d.IsNull() {
log.Error("meet unsupported column type", zap.String("column info", col.String()))
}
}
}

d := table.GetZeroValue(col)
return d.GetValue()
return formatColVal(d, col.Tp)
}

// DecodeTableID decodes the raw key to a table ID
Expand Down
Loading