Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: don't clear online ddl if execute ddl failed #449

Merged
merged 18 commits into from
Jan 20, 2020
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ ErrSyncerUnitReopenStreamNotSupport,[code=36057:class=sync-unit:scope=internal:l
ErrSyncerUnitUpdateConfigInSharding,[code=36058:class=sync-unit:scope=internal:level=high],"try update config when some tables' (%v) sharding DDL not synced not supported"
ErrSyncerUnitExecWithNoBlockingDDL,[code=36059:class=sync-unit:scope=internal:level=high],"process unit not waiting for sharding DDL to sync"
ErrSyncerUnitGenBWList,[code=36060:class=sync-unit:scope=internal:level=high],"generate black white list"
ErrSyncerUnitHandleDDLFailed,[code=36061:class=sync-unit:scope=internal:level=high],"fail to handle ddl job"
ErrMasterSQLOpNilRequest,[code=38001:class=dm-master:scope=internal:level=medium],"nil request not valid"
ErrMasterSQLOpNotSupport,[code=38002:class=dm-master:scope=internal:level=medium],"op %s not supported"
ErrMasterSQLOpWithoutSharding,[code=38003:class=dm-master:scope=internal:level=medium],"operate request without --sharding specified not valid"
Expand Down
2 changes: 2 additions & 0 deletions pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ const (
codeSyncerUnitUpdateConfigInSharding
codeSyncerUnitExecWithNoBlockingDDL
codeSyncerUnitGenBWList
codeSyncerUnitHandleDDLFailed
)

// DM-master error code
Expand Down Expand Up @@ -828,6 +829,7 @@ var (
ErrSyncerUnitUpdateConfigInSharding = New(codeSyncerUnitUpdateConfigInSharding, ClassSyncUnit, ScopeInternal, LevelHigh, "try update config when some tables' (%v) sharding DDL not synced not supported")
ErrSyncerUnitExecWithNoBlockingDDL = New(codeSyncerUnitExecWithNoBlockingDDL, ClassSyncUnit, ScopeInternal, LevelHigh, "process unit not waiting for sharding DDL to sync")
ErrSyncerUnitGenBWList = New(codeSyncerUnitGenBWList, ClassSyncUnit, ScopeInternal, LevelHigh, "generate black white list")
ErrSyncerUnitHandleDDLFailed = New(codeSyncerUnitHandleDDLFailed, ClassSyncUnit, ScopeInternal, LevelHigh, "fail to handle ddl job")

// DM-master error
ErrMasterSQLOpNilRequest = New(codeMasterSQLOpNilRequest, ClassDMMaster, ScopeInternal, LevelMedium, "nil request not valid")
Expand Down
15 changes: 15 additions & 0 deletions syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ type CheckPoint interface {
// corresponding to to Meta.Pos
GlobalPoint() mysql.Position

// ShardTablePoint returns the shard table's stream checkpoint
ShardTablePoint() map[string]map[string]mysql.Position

// FlushedGlobalPoint returns the flushed global binlog stream's checkpoint
// corresponding to to Meta.Pos
FlushedGlobalPoint() mysql.Position
Expand Down Expand Up @@ -473,6 +476,18 @@ func (cp *RemoteCheckPoint) GlobalPoint() mysql.Position {
return cp.globalPoint.MySQLPos()
}

// ShardTablePoint implements CheckPoint.ShardTablePoint
func (cp *RemoteCheckPoint) ShardTablePoint() map[string]map[string]mysql.Position {
tablePoints := make(map[string]map[string]mysql.Position)
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
for schema, tables := range cp.points {
tablePoints[schema] = make(map[string]mysql.Position)
for table, point := range tables {
tablePoints[schema][table] = point.MySQLPos()
}
}
return tablePoints
}

// FlushedGlobalPoint implements CheckPoint.FlushedGlobalPoint
func (cp *RemoteCheckPoint) FlushedGlobalPoint() mysql.Position {
return cp.globalPoint.FlushedMySQLPos()
Expand Down
10 changes: 10 additions & 0 deletions syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package syncer

import (
"database/sql"
"strings"
"time"

"github.com/pingcap/dm/dm/config"
Expand Down Expand Up @@ -192,6 +193,15 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter
}

func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError func(error) bool, queries []string, args ...[]interface{}) (int, error) {

csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
failpoint.Inject("ExecuteSQLWithIgnoreFailed", func(val failpoint.Value) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will use this failpoint later

queryPattern := val.(string)
if len(queries) == 1 && strings.Contains(queries[0], queryPattern) {
tctx.L().Warn("executeSQLWithIgnore failed", zap.String("failpoint", "ExecuteSQLWithIgnoreFailed"))
failpoint.Return(0, terror.ErrDBUnExpect.Generate("invalid connection"))
}
})

if len(queries) == 0 {
return 0, nil
}
Expand Down
4 changes: 4 additions & 0 deletions syncer/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ func (s *OnlineDDLStorage) Get(ghostSchema, ghostTable string) *GhostDDLInfo {
return nil
}

if mSchema == nil || mSchema[ghostTable] == nil {
return nil
}

clone := new(GhostDDLInfo)
*clone = *mSchema[ghostTable]

Expand Down
21 changes: 17 additions & 4 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,8 @@ func (s *Syncer) initShardingGroups() error {
// IsFreshTask implements Unit.IsFreshTask
func (s *Syncer) IsFreshTask(ctx context.Context) (bool, error) {
globalPoint := s.checkpoint.GlobalPoint()
return binlog.ComparePosition(globalPoint, minCheckpoint) <= 0, nil
shardTablePoints := s.checkpoint.ShardTablePoint()
return binlog.ComparePosition(globalPoint, minCheckpoint) <= 0 && len(shardTablePoints) == 0, nil
}

func (s *Syncer) reset() {
Expand Down Expand Up @@ -1054,9 +1055,9 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
s.tctx.L().Error("panic log", zap.Reflect("error message", err1), zap.Stack("statck"))
err = terror.ErrSyncerUnitPanic.Generate(err1)
}
// flush the jobs channels, but if error occurred, we should not flush the checkpoints
if err1 := s.flushJobs(); err1 != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems that don't need to execute theses sqls, if sync unit exit because of error or user pause the task, will cancel the context, and all sqls will execute failed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to s.jobWg.Wait() all operation jobs to return?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add in 20cfd53

s.tctx.L().Error("fail to finish all jobs when binlog replication exits", log.ShortError(err1))

if err2 := s.flushCheckPoints(); err2 != nil {
s.tctx.L().Warn("fail to flush check points when exit task", zap.Error(err2))
}
}()

Expand Down Expand Up @@ -1675,11 +1676,19 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
s.tctx.L().Info("replace ddls to preset ddls by sql operator in normal mode", zap.String("event", "query"), zap.Strings("preset ddls", appliedSQLs), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("position", ec.currentPos))
needHandleDDLs = appliedSQLs // maybe nil
}

job := newDDLJob(nil, needHandleDDLs, *ec.lastPos, *ec.currentPos, nil, nil, *ec.traceID)
err = s.addJobFunc(job)
if err != nil {
return err
}

// when add ddl job, will execute ddl and then flush checkpoint.
// if execute ddl failed, the execErrorDetected will be true.
if s.execErrorDetected.Get() {
return terror.ErrSyncerUnitHandleDDLFailed.Generate()
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
}

s.tctx.L().Info("finish to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("position", ec.currentPos))

for _, td := range needTrackDDLs {
Expand Down Expand Up @@ -1872,6 +1881,10 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
return err
}

if s.execErrorDetected.Get() {
return terror.ErrSyncerUnitHandleDDLFailed.Generate()
}

if len(onlineDDLTableNames) > 0 {
err = s.clearOnlineDDL(ec.tctx, ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion tests/online_ddl/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ tables = ["t_target"]
schema = "online_ddl"
table = "t_target"
ignore-columns = ["id"]
is-online_ddl = true
is-sharding = true
index-field = "uid"

[[table-config.source-tables]]
Expand Down
2 changes: 1 addition & 1 deletion tests/online_ddl/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: test
task-mode: all
is-sharding: true
meta-schema: "dm_meta"
remove-meta: true
remove-meta: false
disable-heartbeat: true
timezone: "Asia/Shanghai"
online-ddl-scheme: online-ddl-scheme-placeholder
Expand Down