Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mounter(ticdc): fix mounter add default value type unsupported (#3846) #3856

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 39 additions & 33 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,20 +311,21 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill
colName := colInfo.Name.O
colDatums, exist := datums[colInfo.ID]
var colValue interface{}
if !exist && !fillWithDefaultValue {
continue
}
var err error
var warn string
if exist {
var err error
var warn string
colValue, warn, err = formatColVal(colDatums, colInfo.Tp)
if err != nil {
return nil, errors.Trace(err)
}
if warn != "" {
log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String()))
}
} else if fillWithDefaultValue {
colValue = getDefaultOrZeroValue(colInfo)
} else {
continue
colValue, warn, err = getDefaultOrZeroValue(colInfo)
}
if err != nil {
return nil, errors.Trace(err)
}
if warn != "" {
log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String()))
}
cols[tableInfo.RowColumnsOffset[colInfo.ID]] = &model.Column{
Name: colName,
Expand Down Expand Up @@ -408,6 +409,7 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr

var emptyBytes = make([]byte, 0)

// formatColVal return interface{} need to meet the same requirement as getDefaultOrZeroValue
func formatColVal(datum types.Datum, tp byte) (value interface{}, warn string, err error) {
if datum.IsNull() {
return nil, "", nil
Expand Down Expand Up @@ -448,35 +450,39 @@ func formatColVal(datum types.Datum, tp byte) (value interface{}, warn string, e
}
return v, warn, nil
default:
// FIXME: GetValue() may return some types that go sql not support, which will cause sink DML fail
// Make specified convert upper if you need
// Go sql support type ref to: https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236
return datum.GetValue(), "", nil
}
}

func getDefaultOrZeroValue(col *timodel.ColumnInfo) interface{} {
// see https://github.com/pingcap/tidb/issues/9304
// must use null if TiDB not write the column value when default value is null
// and the value is null
// getDefaultOrZeroValue return interface{} need to meet to require type in
// https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236
// Supported type is: nil, basic type(Int, Int8,..., Float32, Float64, String), Slice(uint8), other types not support
func getDefaultOrZeroValue(col *timodel.ColumnInfo) (value interface{}, warn string, err error) {
var d types.Datum
if !mysql.HasNotNullFlag(col.Flag) {
d := types.NewDatum(nil)
return d.GetValue()
}

if col.GetDefaultValue() != nil {
d := types.NewDatum(col.GetDefaultValue())
return d.GetValue()
}
switch col.Tp {
case mysql.TypeEnum:
// For enum type, if no default value and not null is set,
// the default value is the first element of the enum list
d := types.NewDatum(col.FieldType.Elems[0])
return d.GetValue()
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar:
return emptyBytes
// see https://github.com/pingcap/tidb/issues/9304
// must use null if TiDB not write the column value when default value is null
// and the value is null
d = types.NewDatum(nil)
} else if col.GetDefaultValue() != nil {
d = types.NewDatum(col.GetDefaultValue())
} else {
switch col.Tp {
case mysql.TypeEnum:
// For enum type, if no default value and not null is set,
// the default value is the first element of the enum list
d = types.NewDatum(col.FieldType.Elems[0])
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar:
return emptyBytes, "", nil
default:
d = table.GetZeroValue(col)
}
}

d := table.GetZeroValue(col)
return d.GetValue()
return formatColVal(d, col.Tp)
}

// DecodeTableID decodes the raw key to a table ID
Expand Down
Loading