Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#3846
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
maxshuang authored and ti-chi-bot committed Dec 13, 2021
1 parent 0c853a6 commit 105437c
Show file tree
Hide file tree
Showing 2 changed files with 433 additions and 0 deletions.
83 changes: 83 additions & 0 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,14 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill
colName := colInfo.Name.O
colDatums, exist := datums[colInfo.ID]
var colValue interface{}
if !exist && !fillWithDefaultValue {
continue
}
var err error
var warn string
var size int
if exist {
<<<<<<< HEAD
var err error
var warn string
colValue, warn, err = formatColVal(colDatums, colInfo.Tp)
Expand All @@ -325,7 +332,19 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill
colValue = getDefaultOrZeroValue(colInfo)
} else {
continue
=======
colValue, size, warn, err = formatColVal(colDatums, colInfo.Tp)
} else if fillWithDefaultValue {
colValue, size, warn, err = getDefaultOrZeroValue(colInfo)
>>>>>>> 566581819 (mounter(ticdc): fix mounter add default value type unsupported (#3846))
}
if err != nil {
return nil, errors.Trace(err)
}
if warn != "" {
log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String()))
}
colSize += size
cols[tableInfo.RowColumnsOffset[colInfo.ID]] = &model.Column{
Name: colName,
Type: colInfo.Tp,
Expand Down Expand Up @@ -408,7 +427,35 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr

var emptyBytes = make([]byte, 0)

<<<<<<< HEAD
func formatColVal(datum types.Datum, tp byte) (value interface{}, warn string, err error) {
=======
const (
sizeOfEmptyColumn = int(unsafe.Sizeof(model.Column{}))
sizeOfEmptyBytes = int(unsafe.Sizeof(emptyBytes))
sizeOfEmptyString = int(unsafe.Sizeof(""))
)

func sizeOfDatum(d types.Datum) int {
array := [...]types.Datum{d}
return int(types.EstimatedMemUsage(array[:], 1))
}

func sizeOfString(s string) int {
// string data size + string struct size.
return len(s) + sizeOfEmptyString
}

func sizeOfBytes(b []byte) int {
// bytes data size + bytes struct size.
return len(b) + sizeOfEmptyBytes
}

// formatColVal return interface{} need to meet the same requirement as getDefaultOrZeroValue
func formatColVal(datum types.Datum, tp byte) (
value interface{}, size int, warn string, err error,
) {
>>>>>>> 566581819 (mounter(ticdc): fix mounter add default value type unsupported (#3846))
if datum.IsNull() {
return nil, "", nil
}
Expand Down Expand Up @@ -448,6 +495,7 @@ func formatColVal(datum types.Datum, tp byte) (value interface{}, warn string, e
}
return v, warn, nil
default:
<<<<<<< HEAD
return datum.GetValue(), "", nil
}
}
Expand Down Expand Up @@ -477,6 +525,41 @@ func getDefaultOrZeroValue(col *timodel.ColumnInfo) interface{} {

d := table.GetZeroValue(col)
return d.GetValue()
=======
// FIXME: GetValue() may return some types that go sql not support, which will cause sink DML fail
// Make specified convert upper if you need
// Go sql support type ref to: https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236
return datum.GetValue(), sizeOfDatum(datum), "", nil
}
}

// getDefaultOrZeroValue return interface{} need to meet to require type in
// https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236
// Supported type is: nil, basic type(Int, Int8,..., Float32, Float64, String), Slice(uint8), other types not support
func getDefaultOrZeroValue(col *timodel.ColumnInfo) (interface{}, int, string, error) {
var d types.Datum
if !mysql.HasNotNullFlag(col.Flag) {
// see https://github.com/pingcap/tidb/issues/9304
// must use null if TiDB not write the column value when default value is null
// and the value is null
d = types.NewDatum(nil)
} else if col.GetDefaultValue() != nil {
d = types.NewDatum(col.GetDefaultValue())
} else {
switch col.Tp {
case mysql.TypeEnum:
// For enum type, if no default value and not null is set,
// the default value is the first element of the enum list
d = types.NewDatum(col.FieldType.Elems[0])
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar:
return emptyBytes, sizeOfEmptyBytes, "", nil
default:
d = table.GetZeroValue(col)
}
}

return formatColVal(d, col.Tp)
>>>>>>> 566581819 (mounter(ticdc): fix mounter add default value type unsupported (#3846))
}

// DecodeTableID decodes the raw key to a table ID
Expand Down
Loading

0 comments on commit 105437c

Please sign in to comment.