Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

tracker: don't execute add FK on tracker #1093

Merged
merged 2 commits into from
Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2082,12 +2082,13 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
return nil
}

// input `sql` should be a single DDL, which came from parserpkg.SplitDDL
func (s *Syncer) trackDDL(usedSchema string, sql string, tableNames [][]*filter.Table, stmt ast.StmtNode, ec *eventContext) error {
srcTable := tableNames[0][0]

// Make sure the tables are all loaded into the schema tracker.
var shouldExecDDLOnSchemaTracker, shouldSchemaExist, shouldTableExist bool
switch stmt.(type) {
switch node := stmt.(type) {
case *ast.CreateDatabaseStmt:
shouldExecDDLOnSchemaTracker = true
case *ast.AlterDatabaseStmt:
Expand All @@ -2108,11 +2109,22 @@ func (s *Syncer) trackDDL(usedSchema string, sql string, tableNames [][]*filter.
if err := s.checkpoint.DeleteTablePoint(ec.tctx, srcTable.Schema, srcTable.Name); err != nil {
return err
}
case *ast.RenameTableStmt, *ast.CreateIndexStmt, *ast.DropIndexStmt, *ast.RepairTableStmt, *ast.AlterTableStmt:
// TODO: RENAME TABLE / ALTER TABLE RENAME should require special treatment.
case *ast.RenameTableStmt, *ast.CreateIndexStmt, *ast.DropIndexStmt, *ast.RepairTableStmt:
// TODO: RENAME TABLE should require special treatment.
shouldExecDDLOnSchemaTracker = true
shouldSchemaExist = true
shouldTableExist = true
case *ast.AlterTableStmt:
// TODO: ALTER TABLE RENAME should require special treatment.
// for DDL that adds FK, since TiDB doesn't fully support it yet, and the referenced table may not be stored in
// tracker, we simply ignore execution of this DDL.
if len(node.Specs) == 1 && node.Specs[0].Constraint != nil && node.Specs[0].Constraint.Tp == ast.ConstraintForeignKey {
shouldExecDDLOnSchemaTracker = false
} else {
shouldExecDDLOnSchemaTracker = true
}
shouldSchemaExist = true
shouldTableExist = true
case *ast.LockTablesStmt, *ast.UnlockTablesStmt, *ast.CleanupTableLockStmt, *ast.TruncateTableStmt:
break
default:
Expand Down
74 changes: 74 additions & 0 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1498,6 +1498,80 @@ func (s *testSyncerSuite) TestRemoveMetadataIsFine(c *C) {
c.Assert(fresh, IsFalse)
}

func (s *testSyncerSuite) TestTrackDDL(c *C) {
var (
testDB = "test_db"
testTbl = "test_tbl"
filter = [][]*filter.Table{{{testDB, testTbl}}, {{testDB, testTbl}}}
ec = &eventContext{tctx: tcontext.Background()}
)
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
dbConn, err := db.Conn(context.Background())
c.Assert(err, IsNil)

checkPointDB, checkPointMock, err := sqlmock.New()
checkPointDBConn, err := checkPointDB.Conn(context.Background())
c.Assert(err, IsNil)

syncer := NewSyncer(s.cfg, nil)
syncer.toDBConns = []*DBConn{{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})},
{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}}
syncer.ddlDBConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}
syncer.checkpoint.(*RemoteCheckPoint).dbConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(checkPointDBConn, &retry.FiniteRetryStrategy{})}
syncer.schemaTracker, err = schema.NewTracker(defaultTestSessionCfg, syncer.ddlDBConn.baseConn)
c.Assert(err, IsNil)

cases := []struct {
sql string
callback func()
}{
{"CREATE DATABASE IF NOT EXISTS " + testDB, func() {}},
{"ALTER DATABASE " + testDB + " DEFAULT COLLATE utf8_bin", func() {}},
{"DROP DATABASE IF EXISTS " + testDB, func() {}},
{fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.%s (c int)", testDB, testTbl), func() {}},
{fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", testDB, testTbl), func() {}},
{"CREATE INDEX idx1 ON " + testTbl + " (c)", func() {
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE.*").WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow(testTbl, " CREATE TABLE `"+testTbl+"` (\n `c` int(11) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
}},
{fmt.Sprintf("ALTER TABLE %s.%s add c2 int", testDB, testTbl), func() {
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE.*").WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow(testTbl, " CREATE TABLE `"+testTbl+"` (\n `c` int(11) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
}},
// alter add FK will not executed on tracker (otherwise will report error tb2 not exist)
{fmt.Sprintf("ALTER TABLE %s.%s add constraint foreign key (c) references tb2(c)", testDB, testTbl), func() {
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE.*").WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow(testTbl, " CREATE TABLE `"+testTbl+"` (\n `c` int(11) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
}},
{"TRUNCATE TABLE " + testTbl, func() {}},
}

p := parser.New()
for _, ca := range cases {
stmts, warns, err := p.Parse(ca.sql, "", "")
c.Assert(err, IsNil)
c.Assert(warns, HasLen, 0)
c.Assert(stmts, HasLen, 1)

ca.callback()

c.Assert(syncer.trackDDL(testDB, ca.sql, filter, stmts[0], ec), IsNil)
c.Assert(syncer.schemaTracker.Reset(), IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
c.Assert(checkPointMock.ExpectationsWereMet(), IsNil)
}
}

func executeSQLAndWait(expectJobNum int) {
for i := 0; i < 10; i++ {
time.Sleep(time.Second)
Expand Down