Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
syncer: don't clear online ddl if execute ddl failed (#465)
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC authored Feb 4, 2020
1 parent 346d0f1 commit 23665f1
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 19 deletions.
3 changes: 2 additions & 1 deletion _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ ErrSyncerUnitResolveCasualityFail,[code=36056:class=sync-unit:scope=internal:lev
ErrSyncerUnitReopenStreamNotSupport,[code=36057:class=sync-unit:scope=internal:level=high],"reopen %T not supported"
ErrSyncerUnitUpdateConfigInSharding,[code=36058:class=sync-unit:scope=internal:level=high],"try update config when some tables' (%v) sharding DDL not synced not supported"
ErrSyncerUnitExecWithNoBlockingDDL,[code=36059:class=sync-unit:scope=internal:level=high],"process unit not waiting for sharding DDL to sync"
ErrSyncerUnitNewBWList,[code=36060:class=sync-unit:scope=internal:level=high],"new black white list"
ErrSyncerUnitGenBWList,[code=36060:class=sync-unit:scope=internal:level=high],"generate black white list"
ErrSyncerUnitHandleDDLFailed,[code=36061:class=sync-unit:scope=internal:level=high],"fail to handle ddl job for %s"
ErrMasterSQLOpNilRequest,[code=38001:class=dm-master:scope=internal:level=medium],"nil request not valid"
ErrMasterSQLOpNotSupport,[code=38002:class=dm-master:scope=internal:level=medium],"op %s not supported"
ErrMasterSQLOpWithoutSharding,[code=38003:class=dm-master:scope=internal:level=medium],"operate request without --sharding specified not valid"
Expand Down
28 changes: 28 additions & 0 deletions pkg/binlog/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,31 @@ func verifyUUIDSuffix(suffix string) bool {
}
return true
}

// AdjustPosition adjusts the filename with uuid suffix in mysql position
// for example: mysql-bin|000001.000002 -> mysql-bin.000002
func AdjustPosition(pos gmysql.Position) gmysql.Position {
realPos, err := RealMySQLPos(pos)
if err != nil {
// just return the origin pos
return pos
}

return realPos
}

// ComparePosition returns:
// 1 if pos1 is bigger than pos2
// 0 if pos1 is equal to pos2
// -1 if pos1 is less than pos2
func ComparePosition(pos1, pos2 gmysql.Position) int {
adjustedPos1 := AdjustPosition(pos1)
adjustedPos2 := AdjustPosition(pos2)

// means both pos1 and pos2 have uuid in name, so need also compare the uuid
if adjustedPos1.Name != pos1.Name && adjustedPos2.Name != pos2.Name {
return pos1.Compare(pos2)
}

return adjustedPos1.Compare(adjustedPos2)
}
117 changes: 117 additions & 0 deletions pkg/binlog/position_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,120 @@ func (t *testPositionSuite) TestVerifyUUIDSuffix(c *C) {
c.Assert(verifyUUIDSuffix(cs.suffix), Equals, cs.valid)
}
}

func (t *testPositionSuite) TestAdjustPosition(c *C) {
cases := []struct {
pos gmysql.Position
adjustedPos gmysql.Position
}{
{
gmysql.Position{
"mysql-bin.00001",
123,
},
gmysql.Position{
"mysql-bin.00001",
123,
},
}, {
gmysql.Position{
"mysql-bin|00001.00002",
123,
},
gmysql.Position{
"mysql-bin.00002",
123,
},
}, {
gmysql.Position{
"mysql-bin|00001.00002.00003",
123,
},
gmysql.Position{
"mysql-bin|00001.00002.00003",
123,
},
},
}

for _, cs := range cases {
adjustedPos := AdjustPosition(cs.pos)
c.Assert(adjustedPos.Name, Equals, cs.adjustedPos.Name)
c.Assert(adjustedPos.Pos, Equals, cs.adjustedPos.Pos)
}
}

func (t *testPositionSuite) TestComparePosition(c *C) {
cases := []struct {
pos1 gmysql.Position
pos2 gmysql.Position
cmp int
}{
{
gmysql.Position{
Name: "mysql-bin.00001",
Pos: 123,
},
gmysql.Position{
Name: "mysql-bin.00002",
Pos: 123,
},
-1,
}, {
gmysql.Position{
Name: "mysql-bin.00001",
Pos: 123,
},
gmysql.Position{
Name: "mysql-bin.00001",
Pos: 123,
},
0,
}, {
gmysql.Position{
Name: "mysql-bin.00002",
Pos: 123,
},
gmysql.Position{
Name: "mysql-bin.00001",
Pos: 123,
},
1,
}, {
gmysql.Position{
Name: "mysql-bin|00001.00002",
Pos: 123,
},
gmysql.Position{
Name: "mysql-bin|00002.00001",
Pos: 123,
},
-1,
}, {
gmysql.Position{
Name: "mysql-bin|00001.00002",
Pos: 123,
},
gmysql.Position{
Name: "mysql-bin|00001.00002",
Pos: 123,
},
0,
}, {
gmysql.Position{
Name: "mysql-bin|00002.00001",
Pos: 123,
},
gmysql.Position{
Name: "mysql-bin|00001.00002",
Pos: 123,
},
1,
},
}

for _, cs := range cases {
cmp := ComparePosition(cs.pos1, cs.pos2)
c.Assert(cmp, Equals, cs.cmp)
}
}
6 changes: 4 additions & 2 deletions pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,8 @@ const (
codeSyncerUnitReopenStreamNotSupport
codeSyncerUnitUpdateConfigInSharding
codeSyncerUnitExecWithNoBlockingDDL
codeSyncerUnitNewBWList
codeSyncerUnitGenBWList
codeSyncerUnitHandleDDLFailed
)

// DM-master error code
Expand Down Expand Up @@ -796,7 +797,8 @@ var (
ErrSyncerUnitReopenStreamNotSupport = New(codeSyncerUnitReopenStreamNotSupport, ClassSyncUnit, ScopeInternal, LevelHigh, "reopen %T not supported")
ErrSyncerUnitUpdateConfigInSharding = New(codeSyncerUnitUpdateConfigInSharding, ClassSyncUnit, ScopeInternal, LevelHigh, "try update config when some tables' (%v) sharding DDL not synced not supported")
ErrSyncerUnitExecWithNoBlockingDDL = New(codeSyncerUnitExecWithNoBlockingDDL, ClassSyncUnit, ScopeInternal, LevelHigh, "process unit not waiting for sharding DDL to sync")
ErrSyncerUnitNewBWList = New(codeSyncerUnitNewBWList, ClassSyncUnit, ScopeInternal, LevelHigh, "new black white list")
ErrSyncerUnitGenBWList = New(codeSyncerUnitGenBWList, ClassSyncUnit, ScopeInternal, LevelHigh, "generate black white list")
ErrSyncerUnitHandleDDLFailed = New(codeSyncerUnitHandleDDLFailed, ClassSyncUnit, ScopeInternal, LevelHigh, "fail to handle ddl job for %s")

// DM-master error
ErrMasterSQLOpNilRequest = New(codeMasterSQLOpNilRequest, ClassDMMaster, ScopeInternal, LevelMedium, "nil request not valid")
Expand Down
18 changes: 18 additions & 0 deletions syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ type CheckPoint interface {
// corresponding to to Meta.Pos
GlobalPoint() mysql.Position

// TablePoint returns all table's stream checkpoint
TablePoint() map[string]map[string]mysql.Position

// FlushedGlobalPoint returns the flushed global binlog stream's checkpoint
// corresponding to to Meta.Pos
FlushedGlobalPoint() mysql.Position
Expand Down Expand Up @@ -421,6 +424,21 @@ func (cp *RemoteCheckPoint) GlobalPoint() mysql.Position {
return cp.globalPoint.MySQLPos()
}

// TablePoint implements CheckPoint.TablePoint
func (cp *RemoteCheckPoint) TablePoint() map[string]map[string]mysql.Position {
cp.RLock()
defer cp.RUnlock()

tablePoint := make(map[string]map[string]mysql.Position)
for schema, tables := range cp.points {
tablePoint[schema] = make(map[string]mysql.Position)
for table, point := range tables {
tablePoint[schema][table] = point.MySQLPos()
}
}
return tablePoint
}

// FlushedGlobalPoint implements CheckPoint.FlushedGlobalPoint
func (cp *RemoteCheckPoint) FlushedGlobalPoint() mysql.Position {
return cp.globalPoint.FlushedMySQLPos()
Expand Down
8 changes: 8 additions & 0 deletions syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,14 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter
}

func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError func(error) bool, queries []string, args ...[]interface{}) (int, error) {
failpoint.Inject("ExecuteSQLWithIgnoreFailed", func(val failpoint.Value) {
queryPattern := val.(string)
if len(queries) == 1 && strings.Contains(queries[0], queryPattern) {
tctx.L().Warn("executeSQLWithIgnore failed", zap.String("failpoint", "ExecuteSQLWithIgnoreFailed"))
failpoint.Return(0, terror.ErrDBUnExpect.Generate("invalid connection"))
}
})

if len(queries) == 0 {
return 0, nil
}
Expand Down
9 changes: 9 additions & 0 deletions syncer/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb-tools/pkg/filter"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -176,6 +177,10 @@ func (s *OnlineDDLStorage) Get(ghostSchema, ghostTable string) *GhostDDLInfo {
return nil
}

if mSchema == nil || mSchema[ghostTable] == nil {
return nil
}

clone := new(GhostDDLInfo)
*clone = *mSchema[ghostTable]

Expand Down Expand Up @@ -204,6 +209,10 @@ func (s *OnlineDDLStorage) Save(tctx *tcontext.Context, ghostSchema, ghostTable,

// maybe we meed more checks for it

if len(info.DDLs) != 0 && info.DDLs[len(info.DDLs)-1] == ddl {
tctx.L().Warn("online ddl may be saved before, just ignore it", zap.String("ddl", ddl))
return nil
}
info.DDLs = append(info.DDLs, ddl)
ddlsBytes, err := json.Marshal(mSchema[ghostTable])
if err != nil {
Expand Down
26 changes: 20 additions & 6 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func (s *Syncer) Init(ctx context.Context) (err error) {

s.bwList, err = filter.New(s.cfg.CaseSensitive, s.cfg.BWList)
if err != nil {
return terror.ErrSyncerUnitNewBWList.Delegate(err)
return terror.ErrSyncerUnitGenBWList.Delegate(err)
}

s.binlogFilter, err = bf.NewBinlogEvent(s.cfg.CaseSensitive, s.cfg.FilterRules)
Expand Down Expand Up @@ -470,7 +470,8 @@ func (s *Syncer) initShardingGroups() error {
// IsFreshTask implements Unit.IsFreshTask
func (s *Syncer) IsFreshTask(ctx context.Context) (bool, error) {
globalPoint := s.checkpoint.GlobalPoint()
return globalPoint.Compare(minCheckpoint) <= 0, nil
tablePoint := s.checkpoint.TablePoint()
return binlog.ComparePosition(globalPoint, minCheckpoint) <= 0 && len(tablePoint) == 0, nil
}

func (s *Syncer) resetReplicationSyncer() {
Expand Down Expand Up @@ -1115,9 +1116,10 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
s.tctx.L().Error("panic log", zap.Reflect("error message", err1), zap.Stack("statck"))
err = terror.ErrSyncerUnitPanic.Generate(err1)
}
// flush the jobs channels, but if error occurred, we should not flush the checkpoints
if err1 := s.flushJobs(); err1 != nil {
s.tctx.L().Error("fail to finish all jobs when binlog replication exits", log.ShortError(err1))

s.jobWg.Wait()
if err2 := s.flushCheckPoints(); err2 != nil {
s.tctx.L().Warn("fail to flush check points when exit task", zap.Error(err2))
}
}()

Expand Down Expand Up @@ -1722,11 +1724,19 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
s.tctx.L().Info("replace ddls to preset ddls by sql operator in normal mode", zap.String("event", "query"), zap.Strings("preset ddls", appliedSQLs), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("position", ec.currentPos))
needHandleDDLs = appliedSQLs // maybe nil
}

job := newDDLJob(nil, needHandleDDLs, *ec.lastPos, *ec.currentPos, nil, nil, *ec.traceID)
err = s.addJobFunc(job)
if err != nil {
return err
}

// when add ddl job, will execute ddl and then flush checkpoint.
// if execute ddl failed, the execErrorDetected will be true.
if s.execErrorDetected.Get() {
return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query)
}

s.tctx.L().Info("finish to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("position", ec.currentPos))

for _, tbl := range targetTbls {
Expand Down Expand Up @@ -1909,6 +1919,10 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
return err
}

if s.execErrorDetected.Get() {
return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query)
}

if len(onlineDDLTableNames) > 0 {
err = s.clearOnlineDDL(ec.tctx, ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name)
if err != nil {
Expand Down Expand Up @@ -2347,7 +2361,7 @@ func (s *Syncer) Update(cfg *config.SubTaskConfig) error {
oldBwList = s.bwList
s.bwList, err = filter.New(cfg.CaseSensitive, cfg.BWList)
if err != nil {
return terror.ErrSyncerUnitNewBWList.Delegate(err)
return terror.ErrSyncerUnitGenBWList.Delegate(err)
}

// update route
Expand Down
8 changes: 0 additions & 8 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1502,10 +1502,6 @@ func (s *testSyncerSuite) TestRun(c *C) {
update,
"REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?);",
[]interface{}{int64(580981944116838401), "b"},
}, {
flush,
"",
nil,
},
}

Expand Down Expand Up @@ -1566,10 +1562,6 @@ func (s *testSyncerSuite) TestRun(c *C) {
del,
"DELETE FROM `test_1`.`t_2` WHERE `id` = ? LIMIT 1;",
[]interface{}{int32(3)},
}, {
flush,
"",
nil,
},
}

Expand Down
2 changes: 1 addition & 1 deletion tests/online_ddl/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ tables = ["t_target"]
schema = "online_ddl"
table = "t_target"
ignore-columns = ["id"]
is-online_ddl = true
is-sharding = true
index-field = "uid"

[[table-config.source-tables]]
Expand Down
2 changes: 1 addition & 1 deletion tests/online_ddl/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: test
task-mode: all
is-sharding: true
meta-schema: "dm_meta"
remove-meta: true
remove-meta: false
disable-heartbeat: true
timezone: "Asia/Shanghai"
online-ddl-scheme: online-ddl-scheme-placeholder
Expand Down

0 comments on commit 23665f1

Please sign in to comment.