From 2b96b5c2494492e351daa946cdc4cc7836ae058e Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 24 Sep 2020 15:21:49 +0800 Subject: [PATCH] tracker: don't execute add FK in case tracker doesn't have referenced table --- syncer/syncer.go | 18 +++++++++-- syncer/syncer_test.go | 74 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 3 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 3f79ff3666..36389ad051 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -2082,12 +2082,13 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e return nil } +// input `sql` should be a single DDL, which came from parserpkg.SplitDDL func (s *Syncer) trackDDL(usedSchema string, sql string, tableNames [][]*filter.Table, stmt ast.StmtNode, ec *eventContext) error { srcTable := tableNames[0][0] // Make sure the tables are all loaded into the schema tracker. var shouldExecDDLOnSchemaTracker, shouldSchemaExist, shouldTableExist bool - switch stmt.(type) { + switch node := stmt.(type) { case *ast.CreateDatabaseStmt: shouldExecDDLOnSchemaTracker = true case *ast.AlterDatabaseStmt: @@ -2108,11 +2109,22 @@ func (s *Syncer) trackDDL(usedSchema string, sql string, tableNames [][]*filter. if err := s.checkpoint.DeleteTablePoint(ec.tctx, srcTable.Schema, srcTable.Name); err != nil { return err } - case *ast.RenameTableStmt, *ast.CreateIndexStmt, *ast.DropIndexStmt, *ast.RepairTableStmt, *ast.AlterTableStmt: - // TODO: RENAME TABLE / ALTER TABLE RENAME should require special treatment. + case *ast.RenameTableStmt, *ast.CreateIndexStmt, *ast.DropIndexStmt, *ast.RepairTableStmt: + // TODO: RENAME TABLE should require special treatment. shouldExecDDLOnSchemaTracker = true shouldSchemaExist = true shouldTableExist = true + case *ast.AlterTableStmt: + // TODO: ALTER TABLE RENAME should require special treatment. + // for DDL that adds FK, since TiDB doesn't fully support it yet, and the referenced table may not be stored in + // tracker, we simply ignore execution of this DDL. + if len(node.Specs) == 1 && node.Specs[0].Constraint != nil && node.Specs[0].Constraint.Tp == ast.ConstraintForeignKey { + shouldExecDDLOnSchemaTracker = false + } else { + shouldExecDDLOnSchemaTracker = true + } + shouldSchemaExist = true + shouldTableExist = true case *ast.LockTablesStmt, *ast.UnlockTablesStmt, *ast.CleanupTableLockStmt, *ast.TruncateTableStmt: break default: diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 2218b1211d..93143ff75d 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1498,6 +1498,80 @@ func (s *testSyncerSuite) TestRemoveMetadataIsFine(c *C) { c.Assert(fresh, IsFalse) } +func (s *testSyncerSuite) TestTrackDDL(c *C) { + var ( + testDB = "test_db" + testTbl = "test_tbl" + filter = [][]*filter.Table{{{testDB, testTbl}}, {{testDB, testTbl}}} + ec = &eventContext{tctx: tcontext.Background()} + ) + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + dbConn, err := db.Conn(context.Background()) + c.Assert(err, IsNil) + + checkPointDB, checkPointMock, err := sqlmock.New() + checkPointDBConn, err := checkPointDB.Conn(context.Background()) + c.Assert(err, IsNil) + + syncer := NewSyncer(s.cfg, nil) + syncer.toDBConns = []*DBConn{{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}, + {cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}} + syncer.ddlDBConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})} + syncer.checkpoint.(*RemoteCheckPoint).dbConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(checkPointDBConn, &retry.FiniteRetryStrategy{})} + syncer.schemaTracker, err = schema.NewTracker(defaultTestSessionCfg, syncer.ddlDBConn.baseConn) + c.Assert(err, IsNil) + + cases := []struct { + sql string + callback func() + }{ + {"CREATE DATABASE IF NOT EXISTS " + testDB, func() {}}, + {"ALTER DATABASE " + testDB + " DEFAULT COLLATE utf8_bin", func() {}}, + {"DROP DATABASE IF EXISTS " + testDB, func() {}}, + {fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.%s (c int)", testDB, testTbl), func() {}}, + {fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", testDB, testTbl), func() {}}, + {"CREATE INDEX idx1 ON " + testTbl + " (c)", func() { + mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows( + sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "")) + mock.ExpectQuery("SHOW CREATE TABLE.*").WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow(testTbl, " CREATE TABLE `"+testTbl+"` (\n `c` int(11) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + }}, + {fmt.Sprintf("ALTER TABLE %s.%s add c2 int", testDB, testTbl), func() { + mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows( + sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "")) + mock.ExpectQuery("SHOW CREATE TABLE.*").WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow(testTbl, " CREATE TABLE `"+testTbl+"` (\n `c` int(11) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + }}, + // alter add FK will not executed on tracker (otherwise will report error tb2 not exist) + {fmt.Sprintf("ALTER TABLE %s.%s add constraint foreign key (c) references tb2(c)", testDB, testTbl), func() { + mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows( + sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "")) + mock.ExpectQuery("SHOW CREATE TABLE.*").WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow(testTbl, " CREATE TABLE `"+testTbl+"` (\n `c` int(11) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + }}, + {"TRUNCATE TABLE " + testTbl, func() {}}, + } + + p := parser.New() + for _, ca := range cases { + stmts, warns, err := p.Parse(ca.sql, "", "") + c.Assert(err, IsNil) + c.Assert(warns, HasLen, 0) + c.Assert(stmts, HasLen, 1) + + ca.callback() + + c.Assert(syncer.trackDDL(testDB, ca.sql, filter, stmts[0], ec), IsNil) + c.Assert(syncer.schemaTracker.Reset(), IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) + c.Assert(checkPointMock.ExpectationsWereMet(), IsNil) + } +} + func executeSQLAndWait(expectJobNum int) { for i := 0; i < 10; i++ { time.Sleep(time.Second)