Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: don't clear online ddl if execute ddl failed(#449) #465

Merged
merged 2 commits into from
Feb 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ ErrSyncerUnitResolveCasualityFail,[code=36056:class=sync-unit:scope=internal:lev
ErrSyncerUnitReopenStreamNotSupport,[code=36057:class=sync-unit:scope=internal:level=high],"reopen %T not supported"
ErrSyncerUnitUpdateConfigInSharding,[code=36058:class=sync-unit:scope=internal:level=high],"try update config when some tables' (%v) sharding DDL not synced not supported"
ErrSyncerUnitExecWithNoBlockingDDL,[code=36059:class=sync-unit:scope=internal:level=high],"process unit not waiting for sharding DDL to sync"
ErrSyncerUnitNewBWList,[code=36060:class=sync-unit:scope=internal:level=high],"new black white list"
ErrSyncerUnitGenBWList,[code=36060:class=sync-unit:scope=internal:level=high],"generate black white list"
ErrSyncerUnitHandleDDLFailed,[code=36061:class=sync-unit:scope=internal:level=high],"fail to handle ddl job for %s"
ErrMasterSQLOpNilRequest,[code=38001:class=dm-master:scope=internal:level=medium],"nil request not valid"
ErrMasterSQLOpNotSupport,[code=38002:class=dm-master:scope=internal:level=medium],"op %s not supported"
ErrMasterSQLOpWithoutSharding,[code=38003:class=dm-master:scope=internal:level=medium],"operate request without --sharding specified not valid"
Expand Down
28 changes: 28 additions & 0 deletions pkg/binlog/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,31 @@ func verifyUUIDSuffix(suffix string) bool {
}
return true
}

// AdjustPosition adjusts the filename with uuid suffix in mysql position
// for example: mysql-bin|000001.000002 -> mysql-bin.000002
func AdjustPosition(pos gmysql.Position) gmysql.Position {
realPos, err := RealMySQLPos(pos)
if err != nil {
// just return the origin pos
return pos
}

return realPos
}

// ComparePosition returns:
// 1 if pos1 is bigger than pos2
// 0 if pos1 is equal to pos2
// -1 if pos1 is less than pos2
func ComparePosition(pos1, pos2 gmysql.Position) int {
adjustedPos1 := AdjustPosition(pos1)
adjustedPos2 := AdjustPosition(pos2)

// means both pos1 and pos2 have uuid in name, so need also compare the uuid
if adjustedPos1.Name != pos1.Name && adjustedPos2.Name != pos2.Name {
return pos1.Compare(pos2)
}

return adjustedPos1.Compare(adjustedPos2)
}
117 changes: 117 additions & 0 deletions pkg/binlog/position_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,120 @@ func (t *testPositionSuite) TestVerifyUUIDSuffix(c *C) {
c.Assert(verifyUUIDSuffix(cs.suffix), Equals, cs.valid)
}
}

func (t *testPositionSuite) TestAdjustPosition(c *C) {
cases := []struct {
pos gmysql.Position
adjustedPos gmysql.Position
}{
{
gmysql.Position{
"mysql-bin.00001",
123,
},
gmysql.Position{
"mysql-bin.00001",
123,
},
}, {
gmysql.Position{
"mysql-bin|00001.00002",
123,
},
gmysql.Position{
"mysql-bin.00002",
123,
},
}, {
gmysql.Position{
"mysql-bin|00001.00002.00003",
123,
},
gmysql.Position{
"mysql-bin|00001.00002.00003",
123,
},
},
}

for _, cs := range cases {
adjustedPos := AdjustPosition(cs.pos)
c.Assert(adjustedPos.Name, Equals, cs.adjustedPos.Name)
c.Assert(adjustedPos.Pos, Equals, cs.adjustedPos.Pos)
}
}

func (t *testPositionSuite) TestComparePosition(c *C) {
cases := []struct {
pos1 gmysql.Position
pos2 gmysql.Position
cmp int
}{
{
gmysql.Position{
Name: "mysql-bin.00001",
Pos: 123,
},
gmysql.Position{
Name: "mysql-bin.00002",
Pos: 123,
},
-1,
}, {
gmysql.Position{
Name: "mysql-bin.00001",
Pos: 123,
},
gmysql.Position{
Name: "mysql-bin.00001",
Pos: 123,
},
0,
}, {
gmysql.Position{
Name: "mysql-bin.00002",
Pos: 123,
},
gmysql.Position{
Name: "mysql-bin.00001",
Pos: 123,
},
1,
}, {
gmysql.Position{
Name: "mysql-bin|00001.00002",
Pos: 123,
},
gmysql.Position{
Name: "mysql-bin|00002.00001",
Pos: 123,
},
-1,
}, {
gmysql.Position{
Name: "mysql-bin|00001.00002",
Pos: 123,
},
gmysql.Position{
Name: "mysql-bin|00001.00002",
Pos: 123,
},
0,
}, {
gmysql.Position{
Name: "mysql-bin|00002.00001",
Pos: 123,
},
gmysql.Position{
Name: "mysql-bin|00001.00002",
Pos: 123,
},
1,
},
}

for _, cs := range cases {
cmp := ComparePosition(cs.pos1, cs.pos2)
c.Assert(cmp, Equals, cs.cmp)
}
}
6 changes: 4 additions & 2 deletions pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,8 @@ const (
codeSyncerUnitReopenStreamNotSupport
codeSyncerUnitUpdateConfigInSharding
codeSyncerUnitExecWithNoBlockingDDL
codeSyncerUnitNewBWList
codeSyncerUnitGenBWList
codeSyncerUnitHandleDDLFailed
)

// DM-master error code
Expand Down Expand Up @@ -796,7 +797,8 @@ var (
ErrSyncerUnitReopenStreamNotSupport = New(codeSyncerUnitReopenStreamNotSupport, ClassSyncUnit, ScopeInternal, LevelHigh, "reopen %T not supported")
ErrSyncerUnitUpdateConfigInSharding = New(codeSyncerUnitUpdateConfigInSharding, ClassSyncUnit, ScopeInternal, LevelHigh, "try update config when some tables' (%v) sharding DDL not synced not supported")
ErrSyncerUnitExecWithNoBlockingDDL = New(codeSyncerUnitExecWithNoBlockingDDL, ClassSyncUnit, ScopeInternal, LevelHigh, "process unit not waiting for sharding DDL to sync")
ErrSyncerUnitNewBWList = New(codeSyncerUnitNewBWList, ClassSyncUnit, ScopeInternal, LevelHigh, "new black white list")
ErrSyncerUnitGenBWList = New(codeSyncerUnitGenBWList, ClassSyncUnit, ScopeInternal, LevelHigh, "generate black white list")
ErrSyncerUnitHandleDDLFailed = New(codeSyncerUnitHandleDDLFailed, ClassSyncUnit, ScopeInternal, LevelHigh, "fail to handle ddl job for %s")

// DM-master error
ErrMasterSQLOpNilRequest = New(codeMasterSQLOpNilRequest, ClassDMMaster, ScopeInternal, LevelMedium, "nil request not valid")
Expand Down
18 changes: 18 additions & 0 deletions syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ type CheckPoint interface {
// corresponding to to Meta.Pos
GlobalPoint() mysql.Position

// TablePoint returns all table's stream checkpoint
TablePoint() map[string]map[string]mysql.Position

// FlushedGlobalPoint returns the flushed global binlog stream's checkpoint
// corresponding to to Meta.Pos
FlushedGlobalPoint() mysql.Position
Expand Down Expand Up @@ -421,6 +424,21 @@ func (cp *RemoteCheckPoint) GlobalPoint() mysql.Position {
return cp.globalPoint.MySQLPos()
}

// TablePoint implements CheckPoint.TablePoint
func (cp *RemoteCheckPoint) TablePoint() map[string]map[string]mysql.Position {
cp.RLock()
defer cp.RUnlock()

tablePoint := make(map[string]map[string]mysql.Position)
for schema, tables := range cp.points {
tablePoint[schema] = make(map[string]mysql.Position)
for table, point := range tables {
tablePoint[schema][table] = point.MySQLPos()
}
}
return tablePoint
}

// FlushedGlobalPoint implements CheckPoint.FlushedGlobalPoint
func (cp *RemoteCheckPoint) FlushedGlobalPoint() mysql.Position {
return cp.globalPoint.FlushedMySQLPos()
Expand Down
8 changes: 8 additions & 0 deletions syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,14 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter
}

func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError func(error) bool, queries []string, args ...[]interface{}) (int, error) {
failpoint.Inject("ExecuteSQLWithIgnoreFailed", func(val failpoint.Value) {
queryPattern := val.(string)
if len(queries) == 1 && strings.Contains(queries[0], queryPattern) {
tctx.L().Warn("executeSQLWithIgnore failed", zap.String("failpoint", "ExecuteSQLWithIgnoreFailed"))
failpoint.Return(0, terror.ErrDBUnExpect.Generate("invalid connection"))
}
})

if len(queries) == 0 {
return 0, nil
}
Expand Down
9 changes: 9 additions & 0 deletions syncer/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb-tools/pkg/filter"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -176,6 +177,10 @@ func (s *OnlineDDLStorage) Get(ghostSchema, ghostTable string) *GhostDDLInfo {
return nil
}

if mSchema == nil || mSchema[ghostTable] == nil {
return nil
}

clone := new(GhostDDLInfo)
*clone = *mSchema[ghostTable]

Expand Down Expand Up @@ -204,6 +209,10 @@ func (s *OnlineDDLStorage) Save(tctx *tcontext.Context, ghostSchema, ghostTable,

// maybe we meed more checks for it

if len(info.DDLs) != 0 && info.DDLs[len(info.DDLs)-1] == ddl {
tctx.L().Warn("online ddl may be saved before, just ignore it", zap.String("ddl", ddl))
return nil
}
info.DDLs = append(info.DDLs, ddl)
ddlsBytes, err := json.Marshal(mSchema[ghostTable])
if err != nil {
Expand Down
26 changes: 20 additions & 6 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func (s *Syncer) Init(ctx context.Context) (err error) {

s.bwList, err = filter.New(s.cfg.CaseSensitive, s.cfg.BWList)
if err != nil {
return terror.ErrSyncerUnitNewBWList.Delegate(err)
return terror.ErrSyncerUnitGenBWList.Delegate(err)
}

s.binlogFilter, err = bf.NewBinlogEvent(s.cfg.CaseSensitive, s.cfg.FilterRules)
Expand Down Expand Up @@ -470,7 +470,8 @@ func (s *Syncer) initShardingGroups() error {
// IsFreshTask implements Unit.IsFreshTask
func (s *Syncer) IsFreshTask(ctx context.Context) (bool, error) {
globalPoint := s.checkpoint.GlobalPoint()
return globalPoint.Compare(minCheckpoint) <= 0, nil
tablePoint := s.checkpoint.TablePoint()
return binlog.ComparePosition(globalPoint, minCheckpoint) <= 0 && len(tablePoint) == 0, nil
}

func (s *Syncer) resetReplicationSyncer() {
Expand Down Expand Up @@ -1115,9 +1116,10 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
s.tctx.L().Error("panic log", zap.Reflect("error message", err1), zap.Stack("statck"))
err = terror.ErrSyncerUnitPanic.Generate(err1)
}
// flush the jobs channels, but if error occurred, we should not flush the checkpoints
if err1 := s.flushJobs(); err1 != nil {
s.tctx.L().Error("fail to finish all jobs when binlog replication exits", log.ShortError(err1))

s.jobWg.Wait()
if err2 := s.flushCheckPoints(); err2 != nil {
s.tctx.L().Warn("fail to flush check points when exit task", zap.Error(err2))
}
}()

Expand Down Expand Up @@ -1722,11 +1724,19 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
s.tctx.L().Info("replace ddls to preset ddls by sql operator in normal mode", zap.String("event", "query"), zap.Strings("preset ddls", appliedSQLs), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("position", ec.currentPos))
needHandleDDLs = appliedSQLs // maybe nil
}

job := newDDLJob(nil, needHandleDDLs, *ec.lastPos, *ec.currentPos, nil, nil, *ec.traceID)
err = s.addJobFunc(job)
if err != nil {
return err
}

// when add ddl job, will execute ddl and then flush checkpoint.
// if execute ddl failed, the execErrorDetected will be true.
if s.execErrorDetected.Get() {
return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query)
}

s.tctx.L().Info("finish to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("position", ec.currentPos))

for _, tbl := range targetTbls {
Expand Down Expand Up @@ -1909,6 +1919,10 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
return err
}

if s.execErrorDetected.Get() {
return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query)
}

if len(onlineDDLTableNames) > 0 {
err = s.clearOnlineDDL(ec.tctx, ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name)
if err != nil {
Expand Down Expand Up @@ -2347,7 +2361,7 @@ func (s *Syncer) Update(cfg *config.SubTaskConfig) error {
oldBwList = s.bwList
s.bwList, err = filter.New(cfg.CaseSensitive, cfg.BWList)
if err != nil {
return terror.ErrSyncerUnitNewBWList.Delegate(err)
return terror.ErrSyncerUnitGenBWList.Delegate(err)
}

// update route
Expand Down
8 changes: 0 additions & 8 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1502,10 +1502,6 @@ func (s *testSyncerSuite) TestRun(c *C) {
update,
"REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?);",
[]interface{}{int64(580981944116838401), "b"},
}, {
flush,
"",
nil,
},
}

Expand Down Expand Up @@ -1566,10 +1562,6 @@ func (s *testSyncerSuite) TestRun(c *C) {
del,
"DELETE FROM `test_1`.`t_2` WHERE `id` = ? LIMIT 1;",
[]interface{}{int32(3)},
}, {
flush,
"",
nil,
},
}

Expand Down
2 changes: 1 addition & 1 deletion tests/online_ddl/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ tables = ["t_target"]
schema = "online_ddl"
table = "t_target"
ignore-columns = ["id"]
is-online_ddl = true
is-sharding = true
index-field = "uid"

[[table-config.source-tables]]
Expand Down
2 changes: 1 addition & 1 deletion tests/online_ddl/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: test
task-mode: all
is-sharding: true
meta-schema: "dm_meta"
remove-meta: true
remove-meta: false
disable-heartbeat: true
timezone: "Asia/Shanghai"
online-ddl-scheme: online-ddl-scheme-placeholder
Expand Down