Skip to content

Commit

Permalink
model(cdc): add a function to recover TiDB TableInfo (#7480)
Browse files Browse the repository at this point in the history
ref #3242
  • Loading branch information
lance6716 authored Nov 3, 2022
1 parent f16eb3c commit f82ff66
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 1 deletion.
76 changes: 76 additions & 0 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,27 @@
package entry

import (
"bytes"
"context"
"strings"
"testing"
"time"

"github.com/pingcap/log"
ticonfig "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/executor"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
pfilter "github.com/pingcap/tiflow/pkg/filter"
Expand Down Expand Up @@ -1087,3 +1094,72 @@ func TestDecodeEventIgnoreRow(t *testing.T) {
decodeAndCheckRowInTable(tableInfo.ID, toRawKV)
}
}

func TestBuildTableInfo(t *testing.T) {
cases := []struct {
origin string
recovered string
}{
{
"CREATE TABLE t1 (c INT PRIMARY KEY)",
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) NOT NULL,\n" +
" PRIMARY KEY (`c`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
{
"CREATE TABLE t1 (" +
" c INT UNSIGNED," +
" c2 VARCHAR(10) NOT NULL," +
" c3 BIT(10) NOT NULL," +
" UNIQUE KEY (c2, c3)" +
")",
// CDC discards field length.
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) unsigned DEFAULT NULL,\n" +
" `c2` varchar(0) NOT NULL,\n" +
" `c3` bit(0) NOT NULL,\n" +
" UNIQUE KEY `idx_0` (`c2`(0),`c3`(0))\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
{
"CREATE TABLE t1 (" +
" c INT UNSIGNED," +
" gen INT AS (c+1) VIRTUAL," +
" c2 VARCHAR(10) NOT NULL," +
" gen2 INT AS (c+2) STORED," +
" c3 BIT(10) NOT NULL," +
" PRIMARY KEY (c, c2)" +
")",
// CDC discards virtual generated column, and generating expression of stored generated column.
"CREATE TABLE `BuildTiDBTableInfo` (\n" +
" `c` int(0) unsigned NOT NULL,\n" +
" `c2` varchar(0) NOT NULL,\n" +
" `gen2` int(0) GENERATED ALWAYS AS (pass_generated_check) STORED,\n" +
" `c3` bit(0) NOT NULL,\n" +
" PRIMARY KEY (`c`(0),`c2`(0)) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin",
},
}
p := parser.New()
for _, c := range cases {
stmt, err := p.ParseOneStmt(c.origin, "", "")
require.NoError(t, err)
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, true)
require.NoError(t, err)
recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
require.Equal(t, c.recovered, showCreateTable(t, recoveredTI))
}
}

var tiCtx = mock.NewContext()

func showCreateTable(t *testing.T, ti *timodel.TableInfo) string {
result := bytes.NewBuffer(make([]byte, 0, 512))
err := executor.ConstructResultOfShowCreateTable(tiCtx, ti, autoid.Allocators{}, result)
require.NoError(t, err)
return result.String()
}
2 changes: 1 addition & 1 deletion cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (ti *TableInfo) initColumnsFlag() {
// See https://dev.mysql.com/doc/refman/5.7/en/show-columns.html.
// Yet if an index has multiple columns, we would like to easily determine that all those columns are indexed,
// which is crucial for the completeness of the information we pass to the downstream.
// Therefore, instead of using the MySql standard,
// Therefore, instead of using the MySQL standard,
// we made our own decision to mark all columns in an index with the appropriate flag(s).
for _, idxInfo := range ti.Indices {
for _, idxCol := range idxInfo.Columns {
Expand Down
92 changes: 92 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/pkg/quotes"
"github.com/pingcap/tiflow/pkg/util"
Expand Down Expand Up @@ -442,6 +443,97 @@ type RedoColumn struct {
Flag uint64 `msg:"flag"`
}

// BuildTiDBTableInfo builds a TiDB TableInfo from given information.
func BuildTiDBTableInfo(columns []*Column, indexColumns [][]int) *model.TableInfo {
ret := &model.TableInfo{}
// nowhere will use this field, so we set a debug message
ret.Name = model.NewCIStr("BuildTiDBTableInfo")

for i, col := range columns {
columnInfo := &model.ColumnInfo{
Offset: i,
State: model.StatePublic,
}
if col == nil {
// by referring to datum2Column, nil is happened when
// - !IsColCDCVisible, which means the column is a virtual generated
// column
// - !exist && !fillWithDefaultValue, which means upstream does not
// send the column value
// just mock for the first case
columnInfo.Name = model.NewCIStr("omitted")
columnInfo.GeneratedExprString = "pass_generated_check"
columnInfo.GeneratedStored = false
ret.Columns = append(ret.Columns, columnInfo)
continue
}
columnInfo.Name = model.NewCIStr(col.Name)
columnInfo.SetType(col.Type)
// TiKV always use utf8mb4 to store, and collation is not recorded by CDC
columnInfo.SetCharset(mysql.UTF8MB4Charset)
columnInfo.SetCollate(mysql.UTF8MB4DefaultCollation)

// inverse initColumnsFlag
flag := col.Flag
if flag.IsBinary() {
columnInfo.SetCharset("binary")
}
if flag.IsGeneratedColumn() {
// we do not use this field, so we set it to any non-empty string
columnInfo.GeneratedExprString = "pass_generated_check"
columnInfo.GeneratedStored = true
}
if flag.IsHandleKey() {
columnInfo.AddFlag(mysql.PriKeyFlag)
ret.IsCommonHandle = true
} else if flag.IsPrimaryKey() {
columnInfo.AddFlag(mysql.PriKeyFlag)
}
if flag.IsUniqueKey() {
columnInfo.AddFlag(mysql.UniqueKeyFlag)
}
if !flag.IsNullable() {
columnInfo.AddFlag(mysql.NotNullFlag)
}
if flag.IsMultipleKey() {
columnInfo.AddFlag(mysql.MultipleKeyFlag)
}
if flag.IsUnsigned() {
columnInfo.AddFlag(mysql.UnsignedFlag)
}
ret.Columns = append(ret.Columns, columnInfo)
}

for i, colOffsets := range indexColumns {
indexInfo := &model.IndexInfo{
Name: model.NewCIStr(fmt.Sprintf("idx_%d", i)),
State: model.StatePublic,
}
firstCol := columns[colOffsets[0]]
if firstCol.Flag.IsPrimaryKey() {
indexInfo.Primary = true
}
if firstCol.Flag.IsUniqueKey() {
indexInfo.Unique = true
}

for _, offset := range colOffsets {
col := ret.Columns[offset]

indexCol := &model.IndexColumn{}
indexCol.Name = col.Name
indexCol.Offset = offset
indexInfo.Columns = append(indexInfo.Columns, indexCol)
}

// TODO: revert the "all column set index related flag" to "only the
// first column set index related flag" if needed

ret.Indices = append(ret.Indices, indexInfo)
}
return ret
}

// ColumnValueString returns the string representation of the column value
func ColumnValueString(c interface{}) string {
var data string
Expand Down

0 comments on commit f82ff66

Please sign in to comment.