Skip to content

Commit

Permalink
sink(ticdc): add table default value definition for storage sink (pin…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jun 30, 2023
1 parent 631ef8c commit 34f6846
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 14 deletions.
5 changes: 3 additions & 2 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func datum2Column(
zap.String("column", colInfo.Name.String()))
}

defaultValue := getDDLDefaultDefinition(colInfo)
defaultValue := GetDDLDefaultDefinition(colInfo)
offset := tableInfo.RowColumnsOffset[colID]
rawCols[offset] = colDatums
cols[offset] = &model.Column{
Expand Down Expand Up @@ -756,7 +756,8 @@ func getDefaultOrZeroValue(col *timodel.ColumnInfo) (types.Datum, any, int, stri
return d, v, size, warn, err
}

func getDDLDefaultDefinition(col *timodel.ColumnInfo) interface{} {
// GetDDLDefaultDefinition returns the default definition of a column.
func GetDDLDefaultDefinition(col *timodel.ColumnInfo) interface{} {
defaultValue := col.GetDefaultValue()
if defaultValue == nil {
defaultValue = col.GetOriginDefaultValue()
Expand Down
2 changes: 1 addition & 1 deletion cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ func TestGetDefaultZeroValue(t *testing.T) {
for _, tc := range testCases {
_, val, _, _, _ := getDefaultOrZeroValue(&tc.ColInfo)
require.Equal(t, tc.Res, val, tc.Name)
val = getDDLDefaultDefinition(&tc.ColInfo)
val = GetDDLDefaultDefinition(&tc.ColInfo)
require.Equal(t, tc.Default, val, tc.Name)
}
}
Expand Down
16 changes: 10 additions & 6 deletions pkg/sink/cloudstorage/table_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/hash"
Expand All @@ -37,12 +38,13 @@ const (

// TableCol denotes the column info for a table definition.
type TableCol struct {
Name string `json:"ColumnName" `
Tp string `json:"ColumnType"`
Precision string `json:"ColumnPrecision,omitempty"`
Scale string `json:"ColumnScale,omitempty"`
Nullable string `json:"ColumnNullable,omitempty"`
IsPK string `json:"ColumnIsPk,omitempty"`
Name string `json:"ColumnName" `
Tp string `json:"ColumnType"`
Default interface{} `json:"ColumnDefault,omitempty"`
Precision string `json:"ColumnPrecision,omitempty"`
Scale string `json:"ColumnScale,omitempty"`
Nullable string `json:"ColumnNullable,omitempty"`
IsPK string `json:"ColumnIsPk,omitempty"`
}

// FromTiColumnInfo converts from TiDB ColumnInfo to TableCol.
Expand Down Expand Up @@ -71,6 +73,7 @@ func (t *TableCol) FromTiColumnInfo(col *timodel.ColumnInfo) {
if mysql.HasNotNullFlag(col.GetFlag()) {
t.Nullable = "false"
}
t.Default = entry.GetDDLDefaultDefinition(col)

switch col.GetType() {
case mysql.TypeTimestamp, mysql.TypeDatetime, mysql.TypeDuration:
Expand Down Expand Up @@ -110,6 +113,7 @@ func (t *TableCol) ToTiColumnInfo() (*timodel.ColumnInfo, error) {
if t.Nullable == "false" {
col.AddFlag(mysql.NotNullFlag)
}
col.DefaultValue = t.Default
if strings.Contains(t.Tp, "BLOB") || strings.Contains(t.Tp, "BINARY") {
col.SetCharset(charset.CharsetBin)
} else {
Expand Down
34 changes: 29 additions & 5 deletions pkg/sink/cloudstorage/table_definition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,38 @@ func generateTableDef() (TableDefinition, *model.TableInfo) {
var columns []*timodel.ColumnInfo
ft := types.NewFieldType(mysql.TypeLong)
ft.SetFlag(mysql.PriKeyFlag | mysql.NotNullFlag)
col := &timodel.ColumnInfo{Name: timodel.NewCIStr("Id"), FieldType: *ft}
col := &timodel.ColumnInfo{
Name: timodel.NewCIStr("Id"),
FieldType: *ft,
DefaultValue: 10,
}
columns = append(columns, col)

ft = types.NewFieldType(mysql.TypeVarchar)
ft.SetFlag(mysql.NotNullFlag)
ft.SetFlen(128)
col = &timodel.ColumnInfo{Name: timodel.NewCIStr("LastName"), FieldType: *ft}
col = &timodel.ColumnInfo{
Name: timodel.NewCIStr("LastName"),
FieldType: *ft,
DefaultValue: "Default LastName",
}
columns = append(columns, col)

ft = types.NewFieldType(mysql.TypeVarchar)
ft.SetFlen(64)
col = &timodel.ColumnInfo{Name: timodel.NewCIStr("FirstName"), FieldType: *ft}
col = &timodel.ColumnInfo{
Name: timodel.NewCIStr("FirstName"),
FieldType: *ft,
DefaultValue: "Default FirstName",
}
columns = append(columns, col)

ft = types.NewFieldType(mysql.TypeDatetime)
col = &timodel.ColumnInfo{Name: timodel.NewCIStr("Birthday"), FieldType: *ft}
col = &timodel.ColumnInfo{
Name: timodel.NewCIStr("Birthday"),
FieldType: *ft,
DefaultValue: 12345678,
}
columns = append(columns, col)

tableInfo := &model.TableInfo{
Expand Down Expand Up @@ -380,22 +396,26 @@ func TestTableDefinition(t *testing.T) {
"ColumnName": "Id",
"ColumnType": "INT",
"ColumnPrecision": "11",
"ColumnDefault":10,
"ColumnNullable": "false",
"ColumnIsPk": "true"
},
{
"ColumnName": "LastName",
"ColumnType": "VARCHAR",
"ColumnDefault":"Default LastName",
"ColumnPrecision": "128",
"ColumnNullable": "false"
},
{
"ColumnName": "FirstName",
"ColumnDefault":"Default FirstName",
"ColumnType": "VARCHAR",
"ColumnPrecision": "64"
},
{
"ColumnName": "Birthday",
"ColumnDefault":1.2345678e+07,
"ColumnType": "DATETIME"
}
],
Expand Down Expand Up @@ -424,22 +444,26 @@ func TestTableDefinition(t *testing.T) {
"ColumnName": "Id",
"ColumnType": "INT",
"ColumnPrecision": "11",
"ColumnDefault":10,
"ColumnNullable": "false",
"ColumnIsPk": "true"
},
{
"ColumnName": "LastName",
"ColumnType": "VARCHAR",
"ColumnDefault":"Default LastName",
"ColumnPrecision": "128",
"ColumnNullable": "false"
},
{
"ColumnName": "FirstName",
"ColumnDefault":"Default FirstName",
"ColumnType": "VARCHAR",
"ColumnPrecision": "64"
},
{
"ColumnName": "Birthday",
"ColumnDefault":1.2345678e+07,
"ColumnType": "DATETIME"
}
],
Expand Down Expand Up @@ -471,7 +495,7 @@ func TestTableDefinitionGenFilePath(t *testing.T) {
def, _ := generateTableDef()
tablePath, err := def.GenerateSchemaFilePath()
require.NoError(t, err)
require.Equal(t, "schema1/table1/meta/schema_100_0785427252.json", tablePath)
require.Equal(t, "schema1/table1/meta/schema_100_3752767265.json", tablePath)
}

func TestTableDefinitionSum32(t *testing.T) {
Expand Down

0 comments on commit 34f6846

Please sign in to comment.