diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index 47ab83e64..e2fae9a55 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -492,6 +492,10 @@ ErrSchemaTrackerInvalidCreateTableStmt,[code=44009:class=schema-tracker:scope=in ErrSchemaTrackerRestoreStmtFail,[code=44010:class=schema-tracker:scope=internal:level=medium], "Message: fail to restore the statement" ErrSchemaTrackerCannotDropTable,[code=44011:class=schema-tracker:scope=internal:level=high], "Message: failed to drop table for %v in schema tracker" ErrSchemaTrackerInit,[code=44012:class=schema-tracker:scope=internal:level=high], "Message: failed to create schema tracker" +ErrSchemaTrackerCannotSetDownstreamSQLMode,[code=44013:class=schema-tracker:scope=internal:level=high], "Message: failed to set default downstream sql_mode %v in schema tracker" +ErrSchemaTrackerCannotInitDownstreamParser,[code=44014:class=schema-tracker:scope=internal:level=high], "Message: failed to init downstream parser by sql_mode %v in schema tracker" +ErrSchemaTrackerCannotMockDownstreamTable,[code=44015:class=schema-tracker:scope=internal:level=high], "Message: failed to mock downstream table by create table statement %v in schema tracker" +ErrSchemaTrackerCannotFetchDownstreamCreateTableStmt,[code=44016:class=schema-tracker:scope=internal:level=high], "Message: failed to fetch downstream table %v by show create table statement in schema tracker" ErrSchedulerNotStarted,[code=46001:class=scheduler:scope=internal:level=high], "Message: the scheduler has not started" ErrSchedulerStarted,[code=46002:class=scheduler:scope=internal:level=medium], "Message: the scheduler has already started" ErrSchedulerWorkerExist,[code=46003:class=scheduler:scope=internal:level=medium], "Message: dm-worker with name %s already exists" diff --git a/errors.toml b/errors.toml index 969bdb6e5..8b35b0663 100644 --- a/errors.toml +++ b/errors.toml @@ -2962,6 +2962,30 @@ description = "" workaround = "" tags = ["internal", "high"] +[error.DM-schema-tracker-44013] +message = "failed to set default downstream sql_mode %v in schema tracker" +description = "" +workaround = "" +tags = ["internal", "high"] + +[error.DM-schema-tracker-44014] +message = "failed to init downstream parser by sql_mode %v in schema tracker" +description = "" +workaround = "" +tags = ["internal", "high"] + +[error.DM-schema-tracker-44015] +message = "failed to mock downstream table by create table statement %v in schema tracker" +description = "" +workaround = "" +tags = ["internal", "high"] + +[error.DM-schema-tracker-44016] +message = "failed to fetch downstream table %v by show create table statement in schema tracker" +description = "" +workaround = "" +tags = ["internal", "high"] + [error.DM-scheduler-46001] message = "the scheduler has not started" description = "" diff --git a/pkg/schema/tracker.go b/pkg/schema/tracker.go index cd204256f..1e77c6c00 100644 --- a/pkg/schema/tracker.go +++ b/pkg/schema/tracker.go @@ -25,22 +25,30 @@ import ( "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/mock" "go.uber.org/zap" - "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/log" + dterror "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" + "github.com/pingcap/dm/syncer/dbconn" ) const ( // TiDBClusteredIndex is the variable name for clustered index. TiDBClusteredIndex = "tidb_enable_clustered_index" + // downstream mock table id, consists of serial numbers of letters. + mockTableID = 121402101900011104 ) var ( @@ -54,15 +62,30 @@ var ( // Tracker is used to track schema locally. type Tracker struct { - store kv.Storage - dom *domain.Domain - se session.Session + store kv.Storage + dom *domain.Domain + se session.Session + dsTracker *downstreamTracker // downstream tracker tableid -> createTableStmt +} + +// downstreamTracker tracks downstream schema. +type downstreamTracker struct { + downstreamConn *dbconn.DBConn // downstream connection + stmtParser *parser.Parser // statement parser + tableInfos map[string]*downstreamTableInfo // downstream table infos +} + +// downstreamTableInfo contains tableinfo and index cache. +type downstreamTableInfo struct { + tableInfo *model.TableInfo // tableInfo which comes from parse create statement syntaxtree + indexCache *model.IndexInfo // index cache include pk/uk(not null) + availableUKCache []*model.IndexInfo // index cache include uks(data not null) } // NewTracker creates a new tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve -// some variable from downstream TiDB using `tidbConn`. +// some variable from downstream using `downstreamConn`. // NOTE **sessionCfg is a reference to caller**. -func NewTracker(ctx context.Context, task string, sessionCfg map[string]string, tidbConn *conn.BaseConn) (*Tracker, error) { +func NewTracker(ctx context.Context, task string, sessionCfg map[string]string, downstreamConn *dbconn.DBConn) (*Tracker, error) { // NOTE: tidb uses a **global** config so can't isolate tracker's config from each other. If that isolation is needed, // we might SetGlobalConfig before every call to tracker, or use some patch like https://github.com/bouk/monkey tidbConfig.UpdateGlobal(func(conf *tidbConfig.Config) { @@ -81,7 +104,7 @@ func NewTracker(ctx context.Context, task string, sessionCfg map[string]string, for _, k := range downstreamVars { if _, ok := sessionCfg[k]; !ok { var ignoredColumn interface{} - rows, err2 := tidbConn.QuerySQL(tctx, fmt.Sprintf("SHOW VARIABLES LIKE '%s'", k)) + rows, err2 := downstreamConn.QuerySQL(tctx, fmt.Sprintf("SHOW VARIABLES LIKE '%s'", k)) if err2 != nil { return nil, err2 } @@ -155,10 +178,17 @@ func NewTracker(ctx context.Context, task string, sessionCfg map[string]string, return nil, err } + // init downstreamTracker + dsTracker := &downstreamTracker{ + downstreamConn: downstreamConn, + tableInfos: make(map[string]*downstreamTableInfo), + } + return &Tracker{ - store: store, - dom: dom, - se: se, + store: store, + dom: dom, + se: se, + dsTracker: dsTracker, }, nil } @@ -329,3 +359,231 @@ func (tr *Tracker) CreateTableIfNotExists(table *filter.Table, ti *model.TableIn func (tr *Tracker) GetSystemVar(name string) (string, bool) { return tr.se.GetSessionVars().GetSystemVar(name) } + +// GetDownStreamIndexInfo gets downstream PK/UK(not null) Index. +// note. this function will init downstreamTrack's table info. +func (tr *Tracker) GetDownStreamIndexInfo(tctx *tcontext.Context, tableID string, originTi *model.TableInfo) (*model.IndexInfo, error) { + dti, ok := tr.dsTracker.tableInfos[tableID] + if !ok { + log.L().Info("Downstream schema tracker init. ", zap.String("tableID", tableID)) + ti, err := tr.getTableInfoByCreateStmt(tctx, tableID) + if err != nil { + log.L().Error("Init dowstream schema info error. ", zap.String("tableID", tableID), zap.Error(err)) + return nil, err + } + + dti = getDownStreamTi(ti, originTi) + tr.dsTracker.tableInfos[tableID] = dti + } + return dti.indexCache, nil +} + +// GetAvailableDownStreamUKIndexInfo gets available downstream UK whose data is not null. +// note. this function will not init downstreamTrack. +func (tr *Tracker) GetAvailableDownStreamUKIndexInfo(tableID string, data []interface{}) *model.IndexInfo { + dti, ok := tr.dsTracker.tableInfos[tableID] + + if !ok || len(dti.availableUKCache) == 0 { + return nil + } + // func for check data is not null + fn := func(i int) bool { + return data[i] != nil + } + + for i, uk := range dti.availableUKCache { + // check uk's column data is not null + if isSpecifiedIndexColumn(uk, fn) { + if i != 0 { + // exchange available uk to the first of the array to reduce judgements for next row + dti.availableUKCache[0], dti.availableUKCache[i] = dti.availableUKCache[i], dti.availableUKCache[0] + } + return uk + } + } + return nil +} + +// RemoveDownstreamSchema just remove schema or table in downstreamTrack. +func (tr *Tracker) RemoveDownstreamSchema(targetTables []*filter.Table) { + if len(targetTables) == 0 { + return + } + + for _, targetTable := range targetTables { + tableID := utils.GenTableID(targetTable) + _, ok := tr.dsTracker.tableInfos[tableID] + if !ok { + // handle just have schema + if targetTable.Schema != "" && targetTable.Name == "" { + for k := range tr.dsTracker.tableInfos { + if strings.HasPrefix(k, tableID+".") { + delete(tr.dsTracker.tableInfos, k) + log.L().Info("Remove downstream schema tracker", zap.String("tableID", tableID)) + } + } + } + } else { + delete(tr.dsTracker.tableInfos, tableID) + log.L().Info("Remove downstream schema tracker", zap.String("tableID", tableID)) + } + } +} + +// getTableInfoByCreateStmt get downstream tableInfo by "SHOW CREATE TABLE" stmt. +func (tr *Tracker) getTableInfoByCreateStmt(tctx *tcontext.Context, tableID string) (*model.TableInfo, error) { + if tr.dsTracker.stmtParser == nil { + err := tr.initDownStreamSQLModeAndParser(tctx) + if err != nil { + return nil, err + } + } + + querySQL := fmt.Sprintf("SHOW CREATE TABLE %s", tableID) + rows, err := tr.dsTracker.downstreamConn.QuerySQL(tctx, querySQL) + if err != nil { + return nil, dterror.ErrSchemaTrackerCannotFetchDownstreamCreateTableStmt.Delegate(err, tableID) + } + defer rows.Close() + var tableName, createStr string + if rows.Next() { + if err = rows.Scan(&tableName, &createStr); err != nil { + return nil, dterror.DBErrorAdapt(rows.Err(), dterror.ErrDBDriverError) + } + } + + log.L().Info("Show create table info", zap.String("tableID", tableID), zap.String("create string", createStr)) + // parse create table stmt. + stmtNode, err := tr.dsTracker.stmtParser.ParseOneStmt(createStr, "", "") + if err != nil { + return nil, dterror.ErrSchemaTrackerInvalidCreateTableStmt.Delegate(err, createStr) + } + + ti, err := ddl.MockTableInfo(mock.NewContext(), stmtNode.(*ast.CreateTableStmt), mockTableID) + if err != nil { + return nil, dterror.ErrSchemaTrackerCannotMockDownstreamTable.Delegate(err, createStr) + } + return ti, nil +} + +// initDownStreamTrackerParser init downstream tracker parser by default sql_mode. +func (tr *Tracker) initDownStreamSQLModeAndParser(tctx *tcontext.Context) error { + setSQLMode := fmt.Sprintf("SET SESSION SQL_MODE = '%s'", mysql.DefaultSQLMode) + _, err := tr.dsTracker.downstreamConn.ExecuteSQL(tctx, []string{setSQLMode}) + if err != nil { + return dterror.ErrSchemaTrackerCannotSetDownstreamSQLMode.Delegate(err, mysql.DefaultSQLMode) + } + stmtParser, err := utils.GetParserFromSQLModeStr(mysql.DefaultSQLMode) + if err != nil { + return dterror.ErrSchemaTrackerCannotInitDownstreamParser.Delegate(err, mysql.DefaultSQLMode) + } + tr.dsTracker.stmtParser = stmtParser + return nil +} + +// getDownStreamTi constructs downstreamTable index cache by tableinfo. +func getDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *downstreamTableInfo { + var ( + indexCache *model.IndexInfo + availableUKCache = make([]*model.IndexInfo, 0, len(ti.Indices)) + hasPk = false + ) + + // func for check not null constraint + fn := func(i int) bool { + return mysql.HasNotNullFlag(ti.Columns[i].Flag) + } + + for _, idx := range ti.Indices { + indexRedict := redirectIndexKeys(idx, originTi) + if indexRedict == nil { + continue + } + if idx.Primary { + indexCache = indexRedict + hasPk = true + } else if idx.Unique { + // second check not null unique key + if !hasPk && isSpecifiedIndexColumn(idx, fn) { + indexCache = indexRedict + } else { + availableUKCache = append(availableUKCache, indexRedict) + } + } + } + + // handle pk exceptional case. + // e.g. "create table t(a int primary key, b int)". + if !hasPk { + exPk := redirectIndexKeys(handlePkExCase(ti), originTi) + if exPk != nil { + indexCache = exPk + } + } + + return &downstreamTableInfo{ + tableInfo: ti, + indexCache: indexCache, + availableUKCache: availableUKCache, + } +} + +// redirectIndexKeys redirect index's columns offset in origin tableinfo. +func redirectIndexKeys(index *model.IndexInfo, originTi *model.TableInfo) *model.IndexInfo { + if index == nil || originTi == nil { + return nil + } + + columns := make([]*model.IndexColumn, 0, len(index.Columns)) + for _, key := range index.Columns { + if originColumn := model.FindColumnInfo(originTi.Columns, key.Name.O); originColumn != nil { + column := &model.IndexColumn{ + Name: key.Name, + Offset: originColumn.Offset, + Length: key.Length, + } + columns = append(columns, column) + } + } + if len(columns) == len(index.Columns) { + return &model.IndexInfo{ + Table: index.Table, + Unique: index.Unique, + Primary: index.Primary, + State: index.State, + Tp: index.Tp, + Columns: columns, + } + } + return nil +} + +// handlePkExCase is handle pk exceptional case. +// e.g. "create table t(a int primary key, b int)". +func handlePkExCase(ti *model.TableInfo) *model.IndexInfo { + if pk := ti.GetPkColInfo(); pk != nil { + return &model.IndexInfo{ + Table: ti.Name, + Unique: true, + Primary: true, + State: model.StatePublic, + Tp: model.IndexTypeBtree, + Columns: []*model.IndexColumn{{ + Name: pk.Name, + Offset: pk.Offset, + Length: types.UnspecifiedLength, + }}, + } + } + return nil +} + +// isSpecifiedIndexColumn checks all of index's columns are matching 'fn'. +func isSpecifiedIndexColumn(index *model.IndexInfo, fn func(i int) bool) bool { + for _, col := range index.Columns { + if !fn(col.Offset) { + return false + } + } + return true +} diff --git a/pkg/schema/tracker_test.go b/pkg/schema/tracker_test.go index 425394219..88a920e18 100644 --- a/pkg/schema/tracker_test.go +++ b/pkg/schema/tracker_test.go @@ -26,9 +26,20 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/log" "github.com/pingcap/tidb-tools/pkg/filter" + "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + timock "github.com/pingcap/tidb/util/mock" "go.uber.org/zap/zapcore" + "github.com/pingcap/dm/dm/config" + dlog "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/syncer/dbconn" + + tcontext "github.com/pingcap/dm/pkg/context" + "github.com/pingcap/dm/pkg/conn" ) @@ -41,12 +52,14 @@ var _ = Suite(&trackerSuite{}) var defaultTestSessionCfg = map[string]string{"sql_mode": "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"} type trackerSuite struct { - baseConn *conn.BaseConn + dbConn *dbconn.DBConn db *sql.DB backupKeys []string + cfg *config.SubTaskConfig } func (s *trackerSuite) SetUpSuite(c *C) { + s.cfg = &config.SubTaskConfig{} s.backupKeys = downstreamVars downstreamVars = []string{"sql_mode"} db, _, err := sqlmock.New() @@ -54,7 +67,7 @@ func (s *trackerSuite) SetUpSuite(c *C) { c.Assert(err, IsNil) con, err := db.Conn(context.Background()) c.Assert(err, IsNil) - s.baseConn = conn.NewBaseConn(con, nil) + s.dbConn = &dbconn.DBConn{Cfg: s.cfg, BaseConn: conn.NewBaseConn(con, nil)} } func (s *trackerSuite) TearDownSuite(c *C) { @@ -75,28 +88,29 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) { con, err := db.Conn(context.Background()) c.Assert(err, IsNil) baseConn := conn.NewBaseConn(con, nil) - + dbConn := &dbconn.DBConn{Cfg: s.cfg, BaseConn: baseConn} // user give correct session config - _, err = NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, baseConn) + + _, err = NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn) c.Assert(err, IsNil) // user give wrong session session, will return error sessionCfg := map[string]string{"sql_mode": "HaHa"} - _, err = NewTracker(context.Background(), "test-tracker", sessionCfg, baseConn) + _, err = NewTracker(context.Background(), "test-tracker", sessionCfg, dbConn) c.Assert(err, NotNil) // discover session config failed, will return error mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows( sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("sql_mode", "HaHa")) - _, err = NewTracker(context.Background(), "test-tracker", nil, baseConn) + _, err = NewTracker(context.Background(), "test-tracker", nil, dbConn) c.Assert(err, NotNil) // empty or default config in downstream mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows( sqlmock.NewRows([]string{"Variable_name", "Value"}). - AddRow("sql_mode", "")) - tracker, err := NewTracker(context.Background(), "test-tracker", nil, baseConn) + AddRow("sql_mode", defaultTestSessionCfg["sql_mode"])) + tracker, err := NewTracker(context.Background(), "test-tracker", nil, dbConn) c.Assert(err, IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) err = tracker.Exec(context.Background(), "", "create database testdb;") @@ -106,7 +120,7 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) { mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows( sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_DATE,NO_ZERO_IN_DATE")) - tracker, err = NewTracker(context.Background(), "test-tracker", nil, baseConn) + tracker, err = NewTracker(context.Background(), "test-tracker", nil, dbConn) c.Assert(err, IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) c.Assert(tracker.se.GetSessionVars().SQLMode.HasOnlyFullGroupBy(), IsTrue) @@ -123,7 +137,7 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) { // user set session config, get tracker config from downstream // no `STRICT_TRANS_TABLES`, no error now sessionCfg = map[string]string{"sql_mode": "NO_ZERO_DATE,NO_ZERO_IN_DATE,ANSI_QUOTES"} - tracker, err = NewTracker(context.Background(), "test-tracker", sessionCfg, baseConn) + tracker, err = NewTracker(context.Background(), "test-tracker", sessionCfg, dbConn) c.Assert(err, IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) @@ -155,7 +169,7 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) { "sql_mode": "NO_ZERO_DATE,NO_ZERO_IN_DATE,ANSI_QUOTES", "tidb_enable_clustered_index": "ON", } - tracker, err = NewTracker(context.Background(), "test-tracker", sessionCfg, baseConn) + tracker, err = NewTracker(context.Background(), "test-tracker", sessionCfg, dbConn) c.Assert(err, IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) @@ -175,7 +189,7 @@ func (s *trackerSuite) TestDDL(c *C) { Name: "foo", } - tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.baseConn) + tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn) c.Assert(err, IsNil) // Table shouldn't exist before initialization. @@ -241,7 +255,7 @@ func (s *trackerSuite) TestDDL(c *C) { func (s *trackerSuite) TestGetSingleColumnIndices(c *C) { log.SetLevel(zapcore.ErrorLevel) - tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.baseConn) + tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn) c.Assert(err, IsNil) ctx := context.Background() @@ -280,7 +294,7 @@ func (s *trackerSuite) TestGetSingleColumnIndices(c *C) { func (s *trackerSuite) TestCreateSchemaIfNotExists(c *C) { log.SetLevel(zapcore.ErrorLevel) - tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.baseConn) + tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn) c.Assert(err, IsNil) // We cannot create a table without a database. @@ -308,7 +322,7 @@ func (s *trackerSuite) TestCreateSchemaIfNotExists(c *C) { func (s *trackerSuite) TestMultiDrop(c *C) { log.SetLevel(zapcore.ErrorLevel) - tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.baseConn) + tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn) c.Assert(err, IsNil) ctx := context.Background() @@ -356,7 +370,7 @@ func (s *trackerSuite) TestCreateTableIfNotExists(c *C) { Name: "foo", } - tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.baseConn) + tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn) c.Assert(err, IsNil) // Create some sort of complicated table. @@ -436,7 +450,7 @@ func (s *trackerSuite) TestAllSchemas(c *C) { log.SetLevel(zapcore.ErrorLevel) ctx := context.Background() - tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.baseConn) + tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn) c.Assert(err, IsNil) // nothing should exist... @@ -510,15 +524,404 @@ func (s *trackerSuite) TestNotSupportedVariable(c *C) { con, err := db.Conn(context.Background()) c.Assert(err, IsNil) baseConn := conn.NewBaseConn(con, nil) + dbConn := &dbconn.DBConn{Cfg: s.cfg, BaseConn: baseConn} mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows( sqlmock.NewRows([]string{"Variable_name", "Value"}). - AddRow("sql_mode", "")) + AddRow("sql_mode", defaultTestSessionCfg["sql_mode"])) oldSessionVar := map[string]string{ "tidb_enable_change_column_type": "ON", } + _, err = NewTracker(context.Background(), "test-tracker", oldSessionVar, dbConn) + c.Assert(err, IsNil) +} + +func (s *trackerSuite) TestInitDownStreamSQLModeAndParser(c *C) { + log.SetLevel(zapcore.ErrorLevel) + + // tracker and sqlmock + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + defer db.Close() + con, err := db.Conn(context.Background()) + c.Assert(err, IsNil) + baseConn := conn.NewBaseConn(con, nil) + dbConn := &dbconn.DBConn{Cfg: s.cfg, BaseConn: baseConn} + + tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn) + c.Assert(err, IsNil) + + mock.ExpectBegin() + mock.ExpectExec(fmt.Sprintf("SET SESSION SQL_MODE = '%s'", mysql.DefaultSQLMode)).WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectCommit() + + tctx := tcontext.NewContext(context.Background(), dlog.L()) + + err = tracker.initDownStreamSQLModeAndParser(tctx) + c.Assert(err, IsNil) + c.Assert(tracker.dsTracker.stmtParser, NotNil) +} + +func (s *trackerSuite) TestGetDownStreamIndexInfo(c *C) { + log.SetLevel(zapcore.ErrorLevel) + + // origin table info + p := parser.New() + se := timock.NewContext() + node, err := p.ParseOneStmt("create table t(a int, b int, c varchar(10))", "utf8mb4", "utf8mb4_bin") + c.Assert(err, IsNil) + oriTi, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), 1) + c.Assert(err, IsNil) + + // tracker and sqlmock + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + defer db.Close() + con, err := db.Conn(context.Background()) + c.Assert(err, IsNil) + baseConn := conn.NewBaseConn(con, nil) + dbConn := &dbconn.DBConn{Cfg: s.cfg, BaseConn: baseConn} + tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn) + c.Assert(err, IsNil) + + mock.ExpectBegin() + mock.ExpectExec(fmt.Sprintf("SET SESSION SQL_MODE = '%s'", mysql.DefaultSQLMode)).WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectCommit() + + tableID := "`test`.`test`" + + // downstream has no pk/uk + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int, b int, c varchar(10))")) + indexinfo, err := tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + _, ok := tracker.dsTracker.tableInfos[tableID] + c.Assert(ok, IsTrue) + c.Assert(indexinfo, IsNil) + delete(tracker.dsTracker.tableInfos, tableID) + + // downstream has pk(not constraints like "create table t(a int primary key,b int not null)" + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int, b int, c varchar(10), PRIMARY KEY (c))")) + indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + _, ok = tracker.dsTracker.tableInfos[tableID] + c.Assert(ok, IsTrue) + c.Assert(indexinfo, NotNil) + delete(tracker.dsTracker.tableInfos, tableID) + + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int primary key, b int, c varchar(10))")) + indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + _, ok = tracker.dsTracker.tableInfos[tableID] + c.Assert(ok, IsTrue) + c.Assert(indexinfo, NotNil) + delete(tracker.dsTracker.tableInfos, tableID) + + // downstream has composite pks + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int, b int, c varchar(10), PRIMARY KEY (a,b))")) + indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + _, ok = tracker.dsTracker.tableInfos[tableID] + c.Assert(ok, IsTrue) + c.Assert(len(indexinfo.Columns) == 2, IsTrue) + delete(tracker.dsTracker.tableInfos, tableID) + + // downstream has uk(not null) + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int unique not null, b int, c varchar(10))")) + indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + _, ok = tracker.dsTracker.tableInfos[tableID] + c.Assert(ok, IsTrue) + c.Assert(indexinfo.Columns, NotNil) + delete(tracker.dsTracker.tableInfos, tableID) + + // downstream has uk(without not null) + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int unique, b int, c varchar(10))")) + indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + dti, ok := tracker.dsTracker.tableInfos[tableID] + c.Assert(ok, IsTrue) + c.Assert(indexinfo, IsNil) + c.Assert(dti.availableUKCache, NotNil) + delete(tracker.dsTracker.tableInfos, tableID) + + // downstream has uks + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int unique, b int unique, c varchar(10) unique not null)")) + indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + dti, ok = tracker.dsTracker.tableInfos[tableID] + c.Assert(ok, IsTrue) + c.Assert(indexinfo, NotNil) + c.Assert(len(dti.availableUKCache) == 2, IsTrue) + delete(tracker.dsTracker.tableInfos, tableID) + + // downstream has pk and uk, pk has priority + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int unique not null , b int, c varchar(10), PRIMARY KEY (c))")) + indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + c.Assert(indexinfo.Primary, IsTrue) + delete(tracker.dsTracker.tableInfos, tableID) + + // downstream has more columns than upstream, and that column in used in PK + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int , d int PRIMARY KEY, c varchar(10), b int unique not null)")) + indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + c.Assert(indexinfo, NotNil) + c.Assert(indexinfo.Primary, IsFalse) + delete(tracker.dsTracker.tableInfos, tableID) + + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int , d int PRIMARY KEY, c varchar(10), b int unique)")) + indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + c.Assert(indexinfo, IsNil) + dti, ok = tracker.dsTracker.tableInfos[tableID] + c.Assert(ok, IsTrue) + c.Assert(len(dti.availableUKCache) == 1, IsTrue) + delete(tracker.dsTracker.tableInfos, tableID) + + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int , d int PRIMARY KEY, c varchar(10), b int)")) + indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + c.Assert(indexinfo, IsNil) + delete(tracker.dsTracker.tableInfos, tableID) + + // downstream has more columns than upstream, and that column in used in UK(not null) + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int , d int unique not null, c varchar(10), b int unique not null)")) + indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + c.Assert(indexinfo, NotNil) + c.Assert(indexinfo.Columns[0].Name.L == "b", IsTrue) + delete(tracker.dsTracker.tableInfos, tableID) + + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int , d int unique not null, c varchar(10), b int unique)")) + indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + c.Assert(indexinfo, IsNil) + dti, ok = tracker.dsTracker.tableInfos[tableID] + c.Assert(ok, IsTrue) + c.Assert(len(dti.availableUKCache) == 1, IsTrue) + delete(tracker.dsTracker.tableInfos, tableID) + + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int , d int unique not null, c varchar(10), b int)")) + indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + c.Assert(indexinfo, IsNil) + delete(tracker.dsTracker.tableInfos, tableID) +} + +func (s *trackerSuite) TestGetAvailableDownStreanUKIndexInfo(c *C) { + log.SetLevel(zapcore.ErrorLevel) + + // origin table info + p := parser.New() + se := timock.NewContext() + node, err := p.ParseOneStmt("create table t(a int, b int, c varchar(10))", "utf8mb4", "utf8mb4_bin") + c.Assert(err, IsNil) + oriTi, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), 1) + c.Assert(err, IsNil) + + // tracker and sqlmock + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + defer db.Close() + con, err := db.Conn(context.Background()) + c.Assert(err, IsNil) + baseConn := conn.NewBaseConn(con, nil) + dbConn := &dbconn.DBConn{Cfg: s.cfg, BaseConn: baseConn} + tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn) + c.Assert(err, IsNil) + + mock.ExpectBegin() + mock.ExpectExec(fmt.Sprintf("SET SESSION SQL_MODE = '%s'", mysql.DefaultSQLMode)).WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectCommit() + + tableID := "`test`.`test`" + + // downstream has no uk + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int, b int, c varchar(10))")) + indexinfo, err := tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + c.Assert(indexinfo, IsNil) + data := []interface{}{1, 2, 3} + indexinfo = tracker.GetAvailableDownStreamUKIndexInfo(tableID, data) + c.Assert(indexinfo, IsNil) + delete(tracker.dsTracker.tableInfos, tableID) + + // downstream has uk but data is null + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int unique, b int, c varchar(10))")) + indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + c.Assert(indexinfo, IsNil) + data = []interface{}{nil, 2, 3} + indexinfo = tracker.GetAvailableDownStreamUKIndexInfo(tableID, data) + c.Assert(indexinfo, IsNil) + delete(tracker.dsTracker.tableInfos, tableID) + + // downstream has uk and data is not null + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int unique, b int, c varchar(10))")) + indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + c.Assert(indexinfo, IsNil) + data = []interface{}{1, 2, 3} + indexinfo = tracker.GetAvailableDownStreamUKIndexInfo(tableID, data) + c.Assert(indexinfo, NotNil) + delete(tracker.dsTracker.tableInfos, tableID) + + // downstream has union uk but data has null + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int, b int, c varchar(10), unique key(a, b))")) + indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + c.Assert(indexinfo, IsNil) + data = []interface{}{1, nil, 3} + indexinfo = tracker.GetAvailableDownStreamUKIndexInfo(tableID, data) + c.Assert(indexinfo, IsNil) + delete(tracker.dsTracker.tableInfos, tableID) + + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int, b int, c varchar(10), unique key(a, b))")) + indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + c.Assert(indexinfo, IsNil) + data = []interface{}{1, nil, nil} + indexinfo = tracker.GetAvailableDownStreamUKIndexInfo(tableID, data) + c.Assert(indexinfo, IsNil) + delete(tracker.dsTracker.tableInfos, tableID) + + // downstream has union uk but data has null + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int, b int, c varchar(10), unique key(a, b))")) + indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + c.Assert(indexinfo, IsNil) + data = []interface{}{1, 2, 3} + indexinfo = tracker.GetAvailableDownStreamUKIndexInfo(tableID, data) + c.Assert(indexinfo, NotNil) + delete(tracker.dsTracker.tableInfos, tableID) +} + +func (s *trackerSuite) TestReTrackDownStreamIndex(c *C) { + log.SetLevel(zapcore.ErrorLevel) + + // origin table info + p := parser.New() + se := timock.NewContext() + node, err := p.ParseOneStmt("create table t(a int, b int, c varchar(10))", "utf8mb4", "utf8mb4_bin") + c.Assert(err, IsNil) + oriTi, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), 1) + c.Assert(err, IsNil) + + // tracker and sqlmock + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + defer db.Close() + con, err := db.Conn(context.Background()) + c.Assert(err, IsNil) + baseConn := conn.NewBaseConn(con, nil) + dbConn := &dbconn.DBConn{Cfg: s.cfg, BaseConn: baseConn} + tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn) + c.Assert(err, IsNil) + + mock.ExpectBegin() + mock.ExpectExec(fmt.Sprintf("SET SESSION SQL_MODE = '%s'", mysql.DefaultSQLMode)).WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectCommit() + + tableID := "`test`.`test`" + + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int, b int, c varchar(10), PRIMARY KEY (a,b))")) + _, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + _, ok := tracker.dsTracker.tableInfos[tableID] + c.Assert(ok, IsTrue) + + // just table + targetTables := []*filter.Table{{Schema: "test", Name: "a"}, {Schema: "test", Name: "test"}} + tracker.RemoveDownstreamSchema(targetTables) + _, ok = tracker.dsTracker.tableInfos[tableID] + c.Assert(ok, IsFalse) + + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int, b int, c varchar(10), PRIMARY KEY (a,b))")) + _, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + _, ok = tracker.dsTracker.tableInfos[tableID] + c.Assert(ok, IsTrue) + + tracker.RemoveDownstreamSchema(targetTables) + _, ok = tracker.dsTracker.tableInfos[tableID] + c.Assert(ok, IsFalse) + + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int primary key, b int, c varchar(10))")) + _, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + _, ok = tracker.dsTracker.tableInfos[tableID] + c.Assert(ok, IsTrue) + + // just schema + targetTables = []*filter.Table{{Schema: "test", Name: "a"}, {Schema: "test", Name: ""}} + tracker.RemoveDownstreamSchema(targetTables) + _, ok = tracker.dsTracker.tableInfos[tableID] + c.Assert(ok, IsFalse) + + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int, b int, c varchar(10), PRIMARY KEY (a,b))")) + _, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + c.Assert(err, IsNil) + _, ok = tracker.dsTracker.tableInfos[tableID] + c.Assert(ok, IsTrue) + + tracker.RemoveDownstreamSchema(targetTables) + _, ok = tracker.dsTracker.tableInfos[tableID] + c.Assert(ok, IsFalse) - _, err = NewTracker(context.Background(), "test-tracker", oldSessionVar, baseConn) + mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test", "create table t(a int primary key, b int, c varchar(10))")) + _, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) + _, ok = tracker.dsTracker.tableInfos[tableID] + c.Assert(ok, IsTrue) } diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index b8e8f1ec0..22aa41abd 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -605,6 +605,10 @@ const ( codeSchemaTrackerRestoreStmtFail codeSchemaTrackerCannotDropTable codeSchemaTrackerInit + codeSchemaTrackerCannotSetDownstreamSQLMode + codeSchemaTrackerCannotInitDownstreamParser + codeSchemaTrackerCannotMockDownstreamTable + codeSchemaTrackerCannotFetchDownstreamCreateTableStmt ) // HA scheduler. @@ -1237,7 +1241,15 @@ var ( "fail to restore the statement", "") ErrSchemaTrackerCannotDropTable = New(codeSchemaTrackerCannotDropTable, ClassSchemaTracker, ScopeInternal, LevelHigh, "failed to drop table for %v in schema tracker", "") - ErrSchemaTrackerInit = New(codeSchemaTrackerInit, ClassSchemaTracker, ScopeInternal, LevelHigh, "failed to create schema tracker", "") + ErrSchemaTrackerInit = New(codeSchemaTrackerInit, ClassSchemaTracker, ScopeInternal, LevelHigh, "failed to create schema tracker", "") + ErrSchemaTrackerCannotSetDownstreamSQLMode = New(codeSchemaTrackerCannotSetDownstreamSQLMode, ClassSchemaTracker, ScopeInternal, LevelHigh, + "failed to set default downstream sql_mode %v in schema tracker", "") + ErrSchemaTrackerCannotInitDownstreamParser = New(codeSchemaTrackerCannotInitDownstreamParser, ClassSchemaTracker, ScopeInternal, LevelHigh, + "failed to init downstream parser by sql_mode %v in schema tracker", "") + ErrSchemaTrackerCannotMockDownstreamTable = New(codeSchemaTrackerCannotMockDownstreamTable, ClassSchemaTracker, ScopeInternal, LevelHigh, + "failed to mock downstream table by create table statement %v in schema tracker", "") + ErrSchemaTrackerCannotFetchDownstreamCreateTableStmt = New(codeSchemaTrackerCannotFetchDownstreamCreateTableStmt, ClassSchemaTracker, ScopeInternal, LevelHigh, + "failed to fetch downstream table %v by show create table statement in schema tracker", "") // HA scheduler. ErrSchedulerNotStarted = New(codeSchedulerNotStarted, ClassScheduler, ScopeInternal, LevelHigh, "the scheduler has not started", "") diff --git a/syncer/causality_test.go b/syncer/causality_test.go index 2e4988fb4..34d9d8811 100644 --- a/syncer/causality_test.go +++ b/syncer/causality_test.go @@ -19,6 +19,8 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/tidb-tools/pkg/filter" "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/types" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/dm/dm/config" @@ -57,6 +59,18 @@ func (s *testSyncerSuite) TestCasuality(c *C) { schema := "create table tb(a int primary key, b int unique);" ti, err := createTableInfo(p, se, int64(0), schema) c.Assert(err, IsNil) + tiIndex := &model.IndexInfo{ + Table: ti.Name, + Unique: true, + Primary: true, + State: model.StatePublic, + Tp: model.IndexTypeBtree, + Columns: []*model.IndexColumn{{ + Name: ti.Columns[0].Name, + Offset: ti.Columns[0].Offset, + Length: types.UnspecifiedLength, + }}, + } jobCh := make(chan *job, 10) syncer := &Syncer{ @@ -103,7 +117,7 @@ func (s *testSyncerSuite) TestCasuality(c *C) { ec := &eventContext{startLocation: &location, currentLocation: &location, lastLocation: &location} for _, tc := range testCases { - job := newDMLJob(tc.op, table, table, newDML(tc.op, false, "", table, tc.oldVals, tc.vals, tc.oldVals, tc.vals, ti.Columns, ti), ec) + job := newDMLJob(tc.op, table, table, newDML(tc.op, false, "", table, tc.oldVals, tc.vals, tc.oldVals, tc.vals, ti.Columns, ti, tiIndex), ec) jobCh <- job } diff --git a/syncer/dml.go b/syncer/dml.go index 9af48a14b..2dbaa8185 100644 --- a/syncer/dml.go +++ b/syncer/dml.go @@ -19,6 +19,7 @@ import ( "strconv" "strings" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb-tools/pkg/filter" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/parser/model" @@ -26,6 +27,7 @@ import ( "github.com/pingcap/tidb/parser/types" "go.uber.org/zap" + tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" ) @@ -81,18 +83,20 @@ RowLoop: } } - dmls = append(dmls, newDML(insert, param.safeMode, param.targetTableID, param.sourceTable, nil, value, nil, originalValue, columns, ti)) + dmls = append(dmls, newDML(insert, param.safeMode, param.targetTableID, param.sourceTable, nil, value, nil, originalValue, columns, ti, nil)) } return dmls, nil } func (s *Syncer) genAndFilterUpdateDMLs( + tctx *tcontext.Context, param *genDMLParam, oldValueFilters []expression.Expression, newValueFilters []expression.Expression, ) ([]*DML, error) { var ( + tableID = param.targetTableID data = param.data originalData = param.originalData columns = param.columns @@ -100,6 +104,12 @@ func (s *Syncer) genAndFilterUpdateDMLs( dmls = make([]*DML, 0, len(data)/2) ) + // if downstream pk/uk(not null) exits, then use downstream pk/uk(not null) + downstreamIndexColumns, err := s.schemaTracker.GetDownStreamIndexInfo(tctx, tableID, ti) + if err != nil { + return nil, err + } + RowLoop: for i := 0; i < len(data); i += 2 { oldData := data[i] @@ -145,19 +155,30 @@ RowLoop: } } - dmls = append(dmls, newDML(update, param.safeMode, param.targetTableID, param.sourceTable, oldValues, changedValues, oriOldValues, oriChangedValues, columns, ti)) + if downstreamIndexColumns == nil { + downstreamIndexColumns = s.schemaTracker.GetAvailableDownStreamUKIndexInfo(tableID, oriOldValues) + } + + dmls = append(dmls, newDML(update, param.safeMode, param.targetTableID, param.sourceTable, oldValues, changedValues, oriOldValues, oriChangedValues, columns, ti, downstreamIndexColumns)) } return dmls, nil } -func (s *Syncer) genAndFilterDeleteDMLs(param *genDMLParam, filterExprs []expression.Expression) ([]*DML, error) { +func (s *Syncer) genAndFilterDeleteDMLs(tctx *tcontext.Context, param *genDMLParam, filterExprs []expression.Expression) ([]*DML, error) { var ( + tableID = param.targetTableID dataSeq = param.originalData ti = param.sourceTableInfo dmls = make([]*DML, 0, len(dataSeq)) ) + // if downstream pk/uk(not null) exits, then use downstream pk/uk(not null) + downstreamIndexColumns, err := s.schemaTracker.GetDownStreamIndexInfo(tctx, tableID, ti) + if err != nil { + return nil, err + } + RowLoop: for _, data := range dataSeq { if len(data) != len(ti.Columns) { @@ -176,7 +197,12 @@ RowLoop: continue RowLoop } } - dmls = append(dmls, newDML(del, false, param.targetTableID, param.sourceTable, nil, value, nil, value, ti.Columns, ti)) + + if downstreamIndexColumns == nil { + downstreamIndexColumns = s.schemaTracker.GetAvailableDownStreamUKIndexInfo(tableID, value) + } + + dmls = append(dmls, newDML(del, false, param.targetTableID, param.sourceTable, nil, value, nil, value, ti.Columns, ti, downstreamIndexColumns)) } return dmls, nil @@ -287,14 +313,6 @@ func findFitIndex(ti *model.TableInfo) *model.IndexInfo { return getSpecifiedIndexColumn(ti, fn) } -func getAvailableIndexColumn(ti *model.TableInfo, data []interface{}) *model.IndexInfo { - fn := func(i int) bool { - return data[i] == nil - } - - return getSpecifiedIndexColumn(ti, fn) -} - func getSpecifiedIndexColumn(ti *model.TableInfo, fn func(i int) bool) *model.IndexInfo { for _, indexCols := range ti.Indices { if !indexCols.Unique { @@ -408,32 +426,34 @@ func checkLogColumns(skipped [][]int) error { // DML stores param for DML. type DML struct { - targetTableID string - sourceTable *filter.Table - op opType - oldValues []interface{} // only for update SQL - values []interface{} - columns []*model.ColumnInfo - sourceTableInfo *model.TableInfo - originOldValues []interface{} // only for update SQL - originValues []interface{} // use to gen key and `WHERE` - safeMode bool - key string // use to detect causality + targetTableID string + sourceTable *filter.Table + op opType + oldValues []interface{} // only for update SQL + values []interface{} + columns []*model.ColumnInfo + sourceTableInfo *model.TableInfo + originOldValues []interface{} // only for update SQL + originValues []interface{} // use to gen key and `WHERE` + safeMode bool + key string // use to detect causality + downstreamIndexInfo *model.IndexInfo } // newDML creates DML. -func newDML(op opType, safeMode bool, targetTableID string, sourceTable *filter.Table, oldValues, values, originOldValues, originValues []interface{}, columns []*model.ColumnInfo, sourceTableInfo *model.TableInfo) *DML { +func newDML(op opType, safeMode bool, targetTableID string, sourceTable *filter.Table, oldValues, values, originOldValues, originValues []interface{}, columns []*model.ColumnInfo, sourceTableInfo *model.TableInfo, downstreamIndexInfo *model.IndexInfo) *DML { return &DML{ - op: op, - safeMode: safeMode, - targetTableID: targetTableID, - sourceTable: sourceTable, - oldValues: oldValues, - values: values, - columns: columns, - sourceTableInfo: sourceTableInfo, - originOldValues: originOldValues, - originValues: originValues, + op: op, + safeMode: safeMode, + targetTableID: targetTableID, + sourceTable: sourceTable, + oldValues: oldValues, + values: values, + columns: columns, + sourceTableInfo: sourceTableInfo, + originOldValues: originOldValues, + originValues: originValues, + downstreamIndexInfo: downstreamIndexInfo, } } @@ -476,19 +496,23 @@ func (dml *DML) whereColumnsAndValues() ([]string, []interface{}) { values = dml.originOldValues } - defaultIndexColumns := findFitIndex(dml.sourceTableInfo) - - if defaultIndexColumns == nil { - defaultIndexColumns = getAvailableIndexColumn(dml.sourceTableInfo, values) - } - if defaultIndexColumns != nil { - columns, values = getColumnData(dml.sourceTableInfo.Columns, defaultIndexColumns, values) + if dml.downstreamIndexInfo != nil { + columns, values = getColumnData(dml.sourceTableInfo.Columns, dml.downstreamIndexInfo, values) } columnNames := make([]string, 0, len(columns)) for _, column := range columns { columnNames = append(columnNames, column.Name.O) } + + failpoint.Inject("DownstreamTrackerWhereCheck", func() { + if dml.op == update { + log.L().Info("UpdateWhereColumnsCheck", zap.String("Columns", fmt.Sprintf("%v", columnNames))) + } else if dml.op == del { + log.L().Info("DeleteWhereColumnsCheck", zap.String("Columns", fmt.Sprintf("%v", columnNames))) + } + }) + return columnNames, values } diff --git a/syncer/dml_test.go b/syncer/dml_test.go index 841e342ec..4afc71474 100644 --- a/syncer/dml_test.go +++ b/syncer/dml_test.go @@ -234,6 +234,18 @@ func (s *testSyncerSuite) TestGenWhere(c *C) { schema2 := "create table test.tb(id int, col1 int, col2 int, name varchar(24))" ti2, err := createTableInfo(p, se, 0, schema2) c.Assert(err, IsNil) + ti1Index := &model.IndexInfo{ + Table: ti1.Name, + Unique: true, + Primary: true, + State: model.StatePublic, + Tp: model.IndexTypeBtree, + Columns: []*model.IndexColumn{{ + Name: ti1.Columns[0].Name, + Offset: ti1.Columns[0].Offset, + Length: types.UnspecifiedLength, + }}, + } testCases := []struct { dml *DML @@ -241,26 +253,27 @@ func (s *testSyncerSuite) TestGenWhere(c *C) { values []interface{} }{ { - newDML(del, false, "", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti1.Columns, ti1), + newDML(del, false, "", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti1.Columns, ti1, ti1Index), "`id` = ?", []interface{}{1}, }, { - newDML(update, false, "", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, ti1.Columns, ti1), + newDML(update, false, "", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, ti1.Columns, ti1, ti1Index), "`id` = ?", []interface{}{1}, }, { - newDML(del, false, "", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti2.Columns, ti2), + newDML(del, false, "", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti2.Columns, ti2, nil), "`id` = ? AND `col1` = ? AND `col2` = ? AND `name` = ?", []interface{}{1, 2, 3, "haha"}, }, { - newDML(update, false, "", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, ti2.Columns, ti2), + newDML(update, false, "", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, ti2.Columns, ti2, nil), "`id` = ? AND `col1` = ? AND `col2` = ? AND `name` = ?", []interface{}{1, 2, 3, "haha"}, }, } + for _, tc := range testCases { var buf strings.Builder whereValues := tc.dml.genWhere(&buf) @@ -275,6 +288,18 @@ func (s *testSyncerSuite) TestGenSQL(c *C) { schema := "create table test.tb(id int primary key, col1 int unique not null, col2 int unique, name varchar(24))" ti, err := createTableInfo(p, se, 0, schema) c.Assert(err, IsNil) + tiIndex := &model.IndexInfo{ + Table: ti.Name, + Unique: true, + Primary: true, + State: model.StatePublic, + Tp: model.IndexTypeBtree, + Columns: []*model.IndexColumn{{ + Name: ti.Columns[0].Name, + Offset: ti.Columns[0].Offset, + Length: types.UnspecifiedLength, + }}, + } testCases := []struct { dml *DML @@ -282,27 +307,27 @@ func (s *testSyncerSuite) TestGenSQL(c *C) { args [][]interface{} }{ { - newDML(insert, false, "`targetSchema`.`targetTable`", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti), + newDML(insert, false, "`targetSchema`.`targetTable`", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti, tiIndex), []string{"INSERT INTO `targetSchema`.`targetTable` (`id`,`col1`,`col2`,`name`) VALUES (?,?,?,?)"}, [][]interface{}{{1, 2, 3, "haha"}}, }, { - newDML(insert, true, "`targetSchema`.`targetTable`", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti), + newDML(insert, true, "`targetSchema`.`targetTable`", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti, tiIndex), []string{"INSERT INTO `targetSchema`.`targetTable` (`id`,`col1`,`col2`,`name`) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`col2`=VALUES(`col2`),`name`=VALUES(`name`)"}, [][]interface{}{{1, 2, 3, "haha"}}, }, { - newDML(del, false, "`targetSchema`.`targetTable`", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti), + newDML(del, false, "`targetSchema`.`targetTable`", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti, tiIndex), []string{"DELETE FROM `targetSchema`.`targetTable` WHERE `id` = ? LIMIT 1"}, [][]interface{}{{1}}, }, { - newDML(update, false, "`targetSchema`.`targetTable`", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti), + newDML(update, false, "`targetSchema`.`targetTable`", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti, tiIndex), []string{"UPDATE `targetSchema`.`targetTable` SET `id` = ?, `col1` = ?, `col2` = ?, `name` = ? WHERE `id` = ? LIMIT 1"}, [][]interface{}{{4, 5, 6, "hihi", 1}}, }, { - newDML(update, true, "`targetSchema`.`targetTable`", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti), + newDML(update, true, "`targetSchema`.`targetTable`", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti, tiIndex), []string{"DELETE FROM `targetSchema`.`targetTable` WHERE `id` = ? LIMIT 1", "INSERT INTO `targetSchema`.`targetTable` (`id`,`col1`,`col2`,`name`) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`col2`=VALUES(`col2`),`name`=VALUES(`name`)"}, [][]interface{}{{1}, {4, 5, 6, "hihi"}}, }, diff --git a/syncer/expr_filter_group_test.go b/syncer/expr_filter_group_test.go index 8ca2f1198..6476856f0 100644 --- a/syncer/expr_filter_group_test.go +++ b/syncer/expr_filter_group_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/schema" + "github.com/pingcap/dm/syncer/dbconn" ) func (s *testFilterSuite) TestSkipDMLByExpression(c *C) { @@ -91,8 +92,9 @@ create table t ( ) c.Assert(log.InitLogger(&log.Config{Level: "debug"}), IsNil) + dbConn := &dbconn.DBConn{Cfg: &config.SubTaskConfig{}, BaseConn: s.baseConn} for _, ca := range cases { - schemaTracker, err := schema.NewTracker(ctx, "unit-test", defaultTestSessionCfg, s.baseConn) + schemaTracker, err := schema.NewTracker(ctx, "unit-test", defaultTestSessionCfg, dbConn) c.Assert(err, IsNil) c.Assert(schemaTracker.CreateSchemaIfNotExists(dbName), IsNil) c.Assert(schemaTracker.Exec(ctx, dbName, ca.tableStr), IsNil) @@ -348,9 +350,10 @@ create table t ( ) c.Assert(log.InitLogger(&log.Config{Level: "debug"}), IsNil) + dbConn := &dbconn.DBConn{Cfg: &config.SubTaskConfig{}, BaseConn: s.baseConn} for _, ca := range cases { c.Log(ca.tableStr) - schemaTracker, err := schema.NewTracker(ctx, "unit-test", defaultTestSessionCfg, s.baseConn) + schemaTracker, err := schema.NewTracker(ctx, "unit-test", defaultTestSessionCfg, dbConn) c.Assert(err, IsNil) c.Assert(schemaTracker.CreateSchemaIfNotExists(dbName), IsNil) c.Assert(schemaTracker.Exec(ctx, dbName, ca.tableStr), IsNil) @@ -398,7 +401,9 @@ create table t ( );` exprStr = "d > 1" ) - schemaTracker, err := schema.NewTracker(ctx, "unit-test", defaultTestSessionCfg, s.baseConn) + + dbConn := &dbconn.DBConn{Cfg: &config.SubTaskConfig{}, BaseConn: s.baseConn} + schemaTracker, err := schema.NewTracker(ctx, "unit-test", defaultTestSessionCfg, dbConn) c.Assert(err, IsNil) c.Assert(schemaTracker.CreateSchemaIfNotExists(dbName), IsNil) c.Assert(schemaTracker.Exec(ctx, dbName, tableStr), IsNil) diff --git a/syncer/job_test.go b/syncer/job_test.go index 83a175b86..c6fd45d57 100644 --- a/syncer/job_test.go +++ b/syncer/job_test.go @@ -17,6 +17,8 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/tidb-tools/pkg/filter" "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/types" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/dm/pkg/binlog" @@ -87,13 +89,25 @@ func (t *testJobSuite) TestJob(c *C) { se := mock.NewContext() ti, err := createTableInfo(p, se, 0, schema) c.Assert(err, IsNil) + tiIndex := &model.IndexInfo{ + Table: ti.Name, + Unique: true, + Primary: true, + State: model.StatePublic, + Tp: model.IndexTypeBtree, + Columns: []*model.IndexColumn{{ + Name: ti.Columns[0].Name, + Offset: ti.Columns[0].Offset, + Length: types.UnspecifiedLength, + }}, + } testCases := []struct { job *job jobStr string }{ { - newDMLJob(insert, table, table, newDML(insert, true, "targetTable", table, nil, []interface{}{2, 2}, nil, []interface{}{2, 2}, ti.Columns, ti), ec), + newDMLJob(insert, table, table, newDML(insert, true, "targetTable", table, nil, []interface{}{2, 2}, nil, []interface{}{2, 2}, ti.Columns, ti, tiIndex), ec), "tp: insert, dml: [safemode: true, targetTableID: targetTable, op: insert, columns: [id col1], oldValues: [], values: [2 2]], ddls: [], last_location: position: (, 4), gtid-set: , start_location: position: (, 4), gtid-set: , current_location: position: (, 4), gtid-set: ", }, { newDDLJob(qec), diff --git a/syncer/syncer.go b/syncer/syncer.go index 8150772e7..670a334c9 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -138,10 +138,11 @@ type Syncer struct { fromDB *dbconn.UpStreamConn - toDB *conn.BaseDB - toDBConns []*dbconn.DBConn - ddlDB *conn.BaseDB - ddlDBConn *dbconn.DBConn + toDB *conn.BaseDB + toDBConns []*dbconn.DBConn + ddlDB *conn.BaseDB + ddlDBConn *dbconn.DBConn + downstreamTrackConn *dbconn.DBConn dmlJobCh chan *job ddlJobCh chan *job @@ -313,7 +314,7 @@ func (s *Syncer) Init(ctx context.Context) (err error) { } rollbackHolder.Add(fr.FuncRollback{Name: "close-DBs", Fn: s.closeDBs}) - s.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, s.cfg.To.Session, s.ddlDBConn.BaseConn) + s.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, s.cfg.To.Session, s.downstreamTrackConn) if err != nil { return terror.ErrSchemaTrackerInit.Delegate(err) } @@ -572,6 +573,11 @@ func (s *Syncer) resetDBs(tctx *tcontext.Context) error { return terror.WithScope(err, terror.ScopeDownstream) } + err = s.downstreamTrackConn.ResetConn(tctx) + if err != nil { + return terror.WithScope(err, terror.ScopeDownstream) + } + err = s.checkpoint.ResetConn(tctx) if err != nil { return terror.WithScope(err, terror.ScopeDownstream) @@ -2071,7 +2077,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err } param.safeMode = ec.safeMode - dmls, err = s.genAndFilterUpdateDMLs(param, oldExprFilter, newExprFilter) + dmls, err = s.genAndFilterUpdateDMLs(ec.tctx, param, oldExprFilter, newExprFilter) if err != nil { return terror.Annotatef(err, "gen update sqls failed, sourceTable: %v, targetTable: %v", sourceTable, targetTable) } @@ -2084,7 +2090,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err return err2 } - dmls, err = s.genAndFilterDeleteDMLs(param, exprFilter) + dmls, err = s.genAndFilterDeleteDMLs(ec.tctx, param, exprFilter) if err != nil { return terror.Annotatef(err, "gen delete sqls failed, sourceTable: %v, targetTable: %v", sourceTable, targetTable) } @@ -2663,6 +2669,7 @@ func (s *Syncer) trackDDL(usedSchema string, trackInfo *ddlInfo, ec *eventContex shouldTableExistNum int // tableNames[:shouldTableExistNum] should exist shouldRefTableExistNum int // tableNames[1:shouldTableExistNum] should exist, since first one is "caller table" tryFetchDownstreamTable bool // to make sure if not exists will execute correctly + shouldReTrackDownstreamIndex bool // retrack downstreamIndex ) switch node := trackInfo.originStmt.(type) { @@ -2673,6 +2680,7 @@ func (s *Syncer) trackDDL(usedSchema string, trackInfo *ddlInfo, ec *eventContex shouldSchemaExist = true case *ast.DropDatabaseStmt: shouldExecDDLOnSchemaTracker = true + shouldReTrackDownstreamIndex = true if s.cfg.ShardMode == "" { if err := s.checkpoint.DeleteSchemaPoint(ec.tctx, srcTable.Schema); err != nil { return err @@ -2689,6 +2697,7 @@ func (s *Syncer) trackDDL(usedSchema string, trackInfo *ddlInfo, ec *eventContex tryFetchDownstreamTable = true case *ast.DropTableStmt: shouldExecDDLOnSchemaTracker = true + shouldReTrackDownstreamIndex = true if err := s.checkpoint.DeleteTablePoint(ec.tctx, srcTable); err != nil { return err } @@ -2696,8 +2705,10 @@ func (s *Syncer) trackDDL(usedSchema string, trackInfo *ddlInfo, ec *eventContex shouldExecDDLOnSchemaTracker = true shouldSchemaExist = true shouldTableExistNum = 1 + shouldReTrackDownstreamIndex = true case *ast.AlterTableStmt: shouldSchemaExist = true + shouldReTrackDownstreamIndex = true // for DDL that adds FK, since TiDB doesn't fully support it yet, we simply ignore execution of this DDL. switch { case len(node.Specs) == 1 && node.Specs[0].Constraint != nil && node.Specs[0].Constraint.Tp == ast.ConstraintForeignKey: @@ -2716,6 +2727,10 @@ func (s *Syncer) trackDDL(usedSchema string, trackInfo *ddlInfo, ec *eventContex ec.tctx.L().DPanic("unhandled DDL type cannot be tracked", zap.Stringer("type", reflect.TypeOf(trackInfo.originStmt))) } + if shouldReTrackDownstreamIndex { + s.schemaTracker.RemoveDownstreamSchema(targetTables) + } + if shouldSchemaExist { if err := s.schemaTracker.CreateSchemaIfNotExists(srcTable.Schema); err != nil { return terror.ErrSchemaTrackerCannotCreateSchema.Delegate(err, srcTable.Schema) @@ -2898,13 +2913,14 @@ func (s *Syncer) createDBs(ctx context.Context) error { dbCfg.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(maxDDLConnectionTimeout) var ddlDBConns []*dbconn.DBConn - s.ddlDB, ddlDBConns, err = dbconn.CreateConns(s.tctx, s.cfg, dbCfg, 1) + s.ddlDB, ddlDBConns, err = dbconn.CreateConns(s.tctx, s.cfg, dbCfg, 2) if err != nil { dbconn.CloseUpstreamConn(s.tctx, s.fromDB) dbconn.CloseBaseDB(s.tctx, s.toDB) return err } s.ddlDBConn = ddlDBConns[0] + s.downstreamTrackConn = ddlDBConns[1] printServerVersion(s.tctx, s.fromDB.BaseDB, "upstream") printServerVersion(s.tctx, s.toDB, "downstream") diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 7600472a4..f3828d11f 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -54,6 +54,7 @@ import ( "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" + pmysql "github.com/pingcap/tidb/parser/mysql" "go.uber.org/zap" ) @@ -766,7 +767,19 @@ func (s *testSyncerSuite) TestRun(c *C) { {Cfg: s.cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}, } syncer.ddlDBConn = &dbconn.DBConn{Cfg: s.cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} - syncer.schemaTracker, err = schema.NewTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, syncer.ddlDBConn.BaseConn) + syncer.downstreamTrackConn = &dbconn.DBConn{Cfg: s.cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} + syncer.schemaTracker, err = schema.NewTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, syncer.downstreamTrackConn) + mock.ExpectBegin() + mock.ExpectExec(fmt.Sprintf("SET SESSION SQL_MODE = '%s'", pmysql.DefaultSQLMode)).WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectCommit() + mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_1`").WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("t_1", "create table t_1(id int primary key, name varchar(24))")) + s.mockGetServerUnixTS(mock) + mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_2`").WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("t_2", "create table t_2(id int primary key, name varchar(24))")) + syncer.exprFilterGroup = NewExprFilterGroup(nil) c.Assert(err, IsNil) c.Assert(syncer.Type(), Equals, pb.UnitType_Sync) @@ -778,7 +791,6 @@ func (s *testSyncerSuite) TestRun(c *C) { syncer.setupMockCheckpoint(c, checkPointDBConn, checkPointMock) syncer.reset() - s.mockGetServerUnixTS(mock) events1 := mockBinlogEvents{ mockBinlogEvent{typ: DBCreate, args: []interface{}{"test_1"}}, mockBinlogEvent{typ: TableCreate, args: []interface{}{"test_1", "create table test_1.t_1(id int primary key, name varchar(24))"}}, @@ -1008,7 +1020,16 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { {Cfg: s.cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}, } syncer.ddlDBConn = &dbconn.DBConn{Cfg: s.cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} - syncer.schemaTracker, err = schema.NewTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, syncer.ddlDBConn.BaseConn) + syncer.downstreamTrackConn = &dbconn.DBConn{Cfg: s.cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} + mock.ExpectBegin() + mock.ExpectExec(fmt.Sprintf("SET SESSION SQL_MODE = '%s'", pmysql.DefaultSQLMode)).WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectCommit() + + mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_1`").WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("t_1", "create table t_1(id int primary key, name varchar(24))")) + + syncer.schemaTracker, err = schema.NewTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, syncer.ddlDBConn) syncer.exprFilterGroup = NewExprFilterGroup(nil) c.Assert(err, IsNil) c.Assert(syncer.Type(), Equals, pb.UnitType_Sync) @@ -1197,7 +1218,7 @@ func (s *testSyncerSuite) TestTrackDDL(c *C) { } syncer.ddlDBConn = &dbconn.DBConn{Cfg: s.cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} syncer.checkpoint.(*RemoteCheckPoint).dbConn = &dbconn.DBConn{Cfg: s.cfg, BaseConn: conn.NewBaseConn(checkPointDBConn, &retry.FiniteRetryStrategy{})} - syncer.schemaTracker, err = schema.NewTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, syncer.ddlDBConn.BaseConn) + syncer.schemaTracker, err = schema.NewTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, syncer.ddlDBConn) syncer.exprFilterGroup = NewExprFilterGroup(nil) c.Assert(syncer.genRouter(), IsNil) c.Assert(err, IsNil) @@ -1461,7 +1482,8 @@ func (s *testSyncerSuite) TestTrackDownstreamTableWontOverwrite(c *C) { c.Assert(err, IsNil) baseConn := conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{}) syncer.ddlDBConn = &dbconn.DBConn{Cfg: s.cfg, BaseConn: baseConn} - syncer.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, defaultTestSessionCfg, baseConn) + syncer.downstreamTrackConn = &dbconn.DBConn{Cfg: s.cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} + syncer.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, defaultTestSessionCfg, syncer.downstreamTrackConn) c.Assert(err, IsNil) upTable := &filter.Table{ @@ -1503,7 +1525,8 @@ func (s *testSyncerSuite) TestDownstreamTableHasAutoRandom(c *C) { c.Assert(err, IsNil) baseConn := conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{}) syncer.ddlDBConn = &dbconn.DBConn{Cfg: s.cfg, BaseConn: baseConn} - syncer.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, defaultTestSessionCfg, baseConn) + syncer.downstreamTrackConn = &dbconn.DBConn{Cfg: s.cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} + syncer.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, defaultTestSessionCfg, syncer.downstreamTrackConn) c.Assert(err, IsNil) schemaName := "test" @@ -1543,7 +1566,7 @@ func (s *testSyncerSuite) TestDownstreamTableHasAutoRandom(c *C) { "tidb_skip_utf8_check": "0", schema.TiDBClusteredIndex: "ON", } - syncer.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, sessionCfg, baseConn) + syncer.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, sessionCfg, syncer.downstreamTrackConn) c.Assert(err, IsNil) v, ok := syncer.schemaTracker.GetSystemVar(schema.TiDBClusteredIndex) c.Assert(v, Equals, "ON") diff --git a/tests/downstream_diff_index/conf/dm-master.toml b/tests/downstream_diff_index/conf/dm-master.toml new file mode 100644 index 000000000..7cecf59ad --- /dev/null +++ b/tests/downstream_diff_index/conf/dm-master.toml @@ -0,0 +1,4 @@ +# Master Configuration. +master-addr = ":8261" +advertise-addr = "127.0.0.1:8261" +auto-compaction-retention = "3s" diff --git a/tests/downstream_diff_index/conf/dm-task.yaml b/tests/downstream_diff_index/conf/dm-task.yaml new file mode 100644 index 000000000..5dc73b1ec --- /dev/null +++ b/tests/downstream_diff_index/conf/dm-task.yaml @@ -0,0 +1,61 @@ +--- +name: test +task-mode: all +is-sharding: true +shard-mode: "pessimistic" +meta-schema: "dm_meta" +# enable-heartbeat: true +heartbeat-update-interval: 1 +heartbeat-report-interval: 1 + +target-database: + host: "127.0.0.1" + port: 4000 + user: "root" + password: "" + +mysql-instances: + - source-id: "mysql-replica-01" + black-white-list: "instance" # compatible with deprecated config + route-rules: [ "downstream-table-rules","downstream-schema-rules" ] + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + + - source-id: "mysql-replica-02" + black-white-list: "instance" # compatible with deprecated config + route-rules: [ "downstream-table-rules","downstream-schema-rules" ] + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + +black-white-list: # compatible with deprecated config + instance: + do-dbs: ["downstream_diff_index*"] + +routes: + downstream-table-rules: + schema-pattern: "downstream_diff_index*" + target-schema: "downstream_diff_index" + table-pattern: "t*" + target-table: "t" + downstream-schema-rules: + schema-pattern: "downstream_diff_index*" + target-schema: "downstream_diff_index" + +mydumpers: + global: + threads: 4 + chunk-filesize: 64 + skip-tz-utc: true + extra-args: "" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 diff --git a/tests/downstream_diff_index/conf/dm-worker1.toml b/tests/downstream_diff_index/conf/dm-worker1.toml new file mode 100644 index 000000000..7a72ea72b --- /dev/null +++ b/tests/downstream_diff_index/conf/dm-worker1.toml @@ -0,0 +1,2 @@ +name = "worker1" +join = "127.0.0.1:8261" diff --git a/tests/downstream_diff_index/conf/dm-worker2.toml b/tests/downstream_diff_index/conf/dm-worker2.toml new file mode 100644 index 000000000..010e21c73 --- /dev/null +++ b/tests/downstream_diff_index/conf/dm-worker2.toml @@ -0,0 +1,2 @@ +name = "worker2" +join = "127.0.0.1:8261" diff --git a/tests/downstream_diff_index/conf/source1.yaml b/tests/downstream_diff_index/conf/source1.yaml new file mode 100644 index 000000000..c3b581801 --- /dev/null +++ b/tests/downstream_diff_index/conf/source1.yaml @@ -0,0 +1,10 @@ +source-id: mysql-replica-01 +flavor: '' +enable-gtid: true +from: + host: 127.0.0.1 + user: root + password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= + port: 3306 +checker: + check-enable: false diff --git a/tests/downstream_diff_index/conf/source2.yaml b/tests/downstream_diff_index/conf/source2.yaml new file mode 100644 index 000000000..b5ef1489c --- /dev/null +++ b/tests/downstream_diff_index/conf/source2.yaml @@ -0,0 +1,10 @@ +source-id: mysql-replica-02 +flavor: '' +enable-gtid: true +from: + host: 127.0.0.1 + user: root + password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= + port: 3307 +checker: + check-enable: false diff --git a/tests/downstream_diff_index/data/db1.increment.sql b/tests/downstream_diff_index/data/db1.increment.sql new file mode 100644 index 000000000..24ceb2365 --- /dev/null +++ b/tests/downstream_diff_index/data/db1.increment.sql @@ -0,0 +1,3 @@ +use downstream_diff_index1; +update t1 set c3 = '111' where c1 = 1; +delete from t1 where c1 = 2; diff --git a/tests/downstream_diff_index/data/db1.prepare.sql b/tests/downstream_diff_index/data/db1.prepare.sql new file mode 100644 index 000000000..4d3d84432 --- /dev/null +++ b/tests/downstream_diff_index/data/db1.prepare.sql @@ -0,0 +1,7 @@ +drop database if exists `downstream_diff_index1`; +create database `downstream_diff_index1`; +use `downstream_diff_index1`; +create table t1 (c1 int, c2 int, c3 varchar(10), primary key(c1)); +insert into t1 values(1, 1, '1'); +insert into t1 values(2, 2, '2'); +insert into t1 values(3, 3, '3'); diff --git a/tests/downstream_diff_index/data/db2.increment.sql b/tests/downstream_diff_index/data/db2.increment.sql new file mode 100644 index 000000000..4e3677995 --- /dev/null +++ b/tests/downstream_diff_index/data/db2.increment.sql @@ -0,0 +1,3 @@ +use downstream_diff_index2; +update t2 set c3 = '333' where c1 = 3; +delete from t2 where c1 = 1; diff --git a/tests/downstream_diff_index/data/db2.prepare.sql b/tests/downstream_diff_index/data/db2.prepare.sql new file mode 100644 index 000000000..13fac5510 --- /dev/null +++ b/tests/downstream_diff_index/data/db2.prepare.sql @@ -0,0 +1,7 @@ +drop database if exists `downstream_diff_index2`; +create database `downstream_diff_index2`; +use `downstream_diff_index2`; +create table t2 (c1 int, c2 int, c3 varchar(10), primary key(c1)); +insert into t2 values(1, 12, '13'); +insert into t2 values(2, 22, '23'); +insert into t2 values(3, 32, '33'); diff --git a/tests/downstream_diff_index/data/tidb.prepare.sql b/tests/downstream_diff_index/data/tidb.prepare.sql new file mode 100644 index 000000000..b5b0d0ccb --- /dev/null +++ b/tests/downstream_diff_index/data/tidb.prepare.sql @@ -0,0 +1,4 @@ +drop database if exists `downstream_diff_index`; +create database `downstream_diff_index`; +use `downstream_diff_index`; +create table t (c1 int, c2 int, c3 varchar(10)); diff --git a/tests/downstream_diff_index/run.sh b/tests/downstream_diff_index/run.sh new file mode 100755 index 000000000..69d3ad737 --- /dev/null +++ b/tests/downstream_diff_index/run.sh @@ -0,0 +1,80 @@ +#!/bin/bash + +set -eu + +cur=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $cur/../_utils/test_prepare +WORK_DIR=$TEST_DIR/$TEST_NAME +db1="downstream_diff_index1" +tb1="t1" +db2="downstream_diff_index2" +tb2="t2" +db="downstream_diff_index" +tb="t" + +function run() { + # create table in mysql with pk + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + # create table in tidb with different pk + run_sql_file $cur/data/tidb.prepare.sql $TIDB_HOST $TIDB_PORT $TIDB_PASSWORD + + # start DM worker and master + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + + # worker will inject delete/update sql check + inject_points=( + "github.com/pingcap/dm/syncer/DownstreamTrackerWhereCheck=return()" + ) + export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml + dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 + + # start DM task + dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta" + # check full load data + run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where c1<100;" "count(1): 6" + + # downstream create diff uk + run_sql "alter table ${db}.${tb} add unique key(c2);" $TIDB_PORT $TIDB_PASSWORD + + # db1 increment data with update and delete + run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + # check update data + run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where c1=1 and c3='111';" "count(1): 1" + check_log_contain_with_retry '\[UpdateWhereColumnsCheck\] \[Columns="\[c2\]"\]' $WORK_DIR/worker1/log/dm-worker.log + # check delete data + run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where c1=2;" "count(1): 1" + check_log_contain_with_retry '\[DeleteWhereColumnsCheck\] \[Columns="\[c2\]"\]' $WORK_DIR/worker1/log/dm-worker.log + + # alter schema to test pk + run_sql "alter table ${db}.${tb} add primary key(c3);" $TIDB_PORT $TIDB_PASSWORD + run_sql "alter table ${db1}.${tb1} drop column c2;" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "alter table ${db2}.${tb2} drop column c2;" $MYSQL_PORT2 $MYSQL_PASSWORD2 + + # db2 increment data with update and delete + run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + # check update data + run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where c1=3 and c3='333';" "count(1): 1" + check_log_contain_with_retry '\[UpdateWhereColumnsCheck\] \[Columns="\[c3\]"\]' $WORK_DIR/worker2/log/dm-worker.log + # check delete data + run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where c1=1;" "count(1): 1" + check_log_contain_with_retry '\[DeleteWhereColumnsCheck\] \[Columns="\[c3\]"\]' $WORK_DIR/worker2/log/dm-worker.log +} + +cleanup_data downstream_diff_index +# also cleanup dm processes in case of last run failed +cleanup_process $* +run $* +cleanup_process $* +export GO_FAILPOINTS='' + +echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>" diff --git a/tests/others_integration_1.txt b/tests/others_integration_1.txt index d5361c58d..374b8bdef 100644 --- a/tests/others_integration_1.txt +++ b/tests/others_integration_1.txt @@ -8,3 +8,5 @@ only_dml adjust_gtid checkpoint_transaction lightning_mode +downstream_diff_index +