Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
shardddl/optimistic: warn add not fully dropped columns (#1510) (#1537)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Mar 31, 2021
1 parent a6f917d commit bae4109
Show file tree
Hide file tree
Showing 35 changed files with 845 additions and 211 deletions.
2 changes: 1 addition & 1 deletion _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ ErrSyncerUnitUpdateConfigInSharding,[code=36058:class=sync-unit:scope=internal:l
ErrSyncerUnitExecWithNoBlockingDDL,[code=36059:class=sync-unit:scope=internal:level=high], "Message: process unit not waiting for sharding DDL to sync"
ErrSyncerUnitGenBAList,[code=36060:class=sync-unit:scope=internal:level=high], "Message: generate block allow list, Workaround: Please check the `block-allow-list` config in task configuration file."
ErrSyncerUnitHandleDDLFailed,[code=36061:class=sync-unit:scope=internal:level=high], "Message: fail to handle ddl job for %s"
ErrSyncerShardDDLConflict,[code=36062:class=sync-unit:scope=internal:level=high], "Message: fail to handle shard ddl %v in optimistic mode, because schema conflict detected, Workaround: Please use show-ddl-locks command for more details."
ErrSyncerShardDDLConflict,[code=36062:class=sync-unit:scope=internal:level=high], "Message: fail to handle shard ddl %v in optimistic mode, because schema conflict detected, conflict error: %s, Workaround: Please use show-ddl-locks command for more details."
ErrSyncerFailpoint,[code=36063:class=sync-unit:scope=internal:level=low], "Message: failpoint specified error"
ErrSyncerReplaceEvent,[code=36064:class=sync-unit:scope=internal:level=high]
ErrSyncerOperatorNotExist,[code=36065:class=sync-unit:scope=internal:level=low], "Message: error operator not exist, position: %s"
Expand Down
12 changes: 11 additions & 1 deletion dm/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ var (
// ShardDDLOptimismInitSchemaKeyAdapter is used to store the initial schema (before constructed the lock) of merged tables.
// k/v: Encode(task-name, downstream-schema-name, downstream-table-name) -> table schema.
ShardDDLOptimismInitSchemaKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-optimism/init-schema/")
// ShardDDLOptimismDroppedColumnsKeyAdapter is used to store the columns that are not fully dropped
// k/v: Encode(task-name, downstream-schema-name, downstream-table-name, column-name, source-id, upstream-schema-name, upstream-table-name) -> empty
// If we don't identify different upstream tables, we may report an error for tb2 in the following case.
// Time series: (+a/-a means add/drop column a)
// older ----------------> newer
// tb1: +a +b +c -c
// tb2: +a +b +c
// tb3: +a +b +c
ShardDDLOptimismDroppedColumnsKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-optimism/dropped-columns/")
)

func keyAdapterKeysLen(s KeyAdapter) int {
Expand All @@ -95,7 +104,8 @@ func keyAdapterKeysLen(s KeyAdapter) int {
return 3
case ShardDDLOptimismInfoKeyAdapter, ShardDDLOptimismOperationKeyAdapter:
return 4

case ShardDDLOptimismDroppedColumnsKeyAdapter:
return 7
}
return -1
}
Expand Down
6 changes: 3 additions & 3 deletions dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,15 +634,15 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
tiBefore = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY)`)
tiAfter1 = createTableInfo(c, p, se, tblID, `CREATE TABLE bar (id INT PRIMARY KEY, c1 TEXT)`)
info1 = optimism.NewInfo(taskName, sources[0], "foo-1", "bar-1", schema, table, DDLs1, tiBefore, []*model.TableInfo{tiAfter1})
op1 = optimism.NewOperation(ID, taskName, sources[0], info1.UpSchema, info1.UpTable, DDLs1, optimism.ConflictNone, false)
op1 = optimism.NewOperation(ID, taskName, sources[0], info1.UpSchema, info1.UpTable, DDLs1, optimism.ConflictNone, "", false)
)

st1.AddTable("foo-1", "bar-1", schema, table)
_, err = optimism.PutSourceTables(etcdTestCli, st1)
c.Assert(err, check.IsNil)
_, err = optimism.PutInfo(etcdTestCli, info1)
c.Assert(err, check.IsNil)
_, succ, err = optimism.PutOperation(etcdTestCli, false, op1)
_, succ, err = optimism.PutOperation(etcdTestCli, false, op1, 0)
c.Assert(succ, check.IsTrue)
c.Assert(err, check.IsNil)

Expand Down Expand Up @@ -1471,7 +1471,7 @@ func (t *testMaster) TestOfflineMember(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(listResp.Members, check.HasLen, 3)

// make sure s3 is not the leader, otherwise it will take some time to campain a new leader after close s3, and it may cause timeout
// make sure s3 is not the leader, otherwise it will take some time to campaign a new leader after close s3, and it may cause timeout
c.Assert(utils.WaitSomething(20, 500*time.Millisecond, func() bool {
_, leaderID, _, err = s1.election.LeaderInfo(ctx)
if err != nil {
Expand Down
108 changes: 67 additions & 41 deletions dm/master/shardddl/optimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,21 @@ func (o *Optimist) rebuildLocks() (revSource, revInfo, revOperation int64, err e
}
o.logger.Info("get history shard DDL lock operation", zap.Int64("revision", revOperation))

colm, _, err := optimism.GetAllDroppedColumns(o.cli)
if err != nil {
// only log the error, and don't return it to forbid the startup of the DM-master leader.
// then these unexpected columns can be handled by the user.
o.logger.Error("fail to recover colms", log.ShortError(err))
}

// recover the shard DDL lock based on history shard DDL info & lock operation.
err = o.recoverLocks(ifm, opm)
err = o.recoverLocks(ifm, opm, colm)
if err != nil {
// only log the error, and don't return it to forbid the startup of the DM-master leader.
// then these unexpected locks can be handled by the user.
o.logger.Error("fail to recover locks", log.ShortError(err))
}

return revSource, revInfo, revOperation, nil
}

Expand Down Expand Up @@ -312,26 +320,30 @@ func (o *Optimist) buildLockJoinedAndTTS(ifm map[string]map[string]map[string]ma
// recoverLocks recovers shard DDL locks based on shard DDL info and shard DDL lock operation.
func (o *Optimist) recoverLocks(
ifm map[string]map[string]map[string]map[string]optimism.Info,
opm map[string]map[string]map[string]map[string]optimism.Operation) error {
opm map[string]map[string]map[string]map[string]optimism.Operation,
colm map[string]map[string]map[string]map[string]map[string]struct{}) error {
// construct joined table based on the shard DDL info.
o.logger.Info("build lock joined and tts")
lockJoined, lockTTS := o.buildLockJoinedAndTTS(ifm)
// build lock and restore table info
o.logger.Info("rebuild locks and tables")
o.lk.RebuildLocksAndTables(ifm, lockJoined, lockTTS)
o.lk.RebuildLocksAndTables(o.cli, ifm, colm, lockJoined, lockTTS)
// sort infos by revision
infos := sortInfos(ifm)
var firstErr error
setFirstErr := func(err error) {
if firstErr == nil && err != nil {
firstErr = err
}
}

for _, info := range infos {
tts := o.tk.FindTables(info.Task, info.DownSchema, info.DownTable)
_, _, err := o.lk.TrySync(info, tts)
if err != nil {
return err
}
// never mark the lock operation from `done` to `not-done` when recovering.
err = o.handleLock(info, tts, true)
err := o.handleInfo(info, true)
if err != nil {
return err
o.logger.Error("fail to handle info while recovering locks", zap.Error(err))
setFirstErr(err)
continue
}
}

Expand All @@ -347,12 +359,17 @@ func (o *Optimist) recoverLocks(
}
if op.Done {
lock.TryMarkDone(op.Source, op.UpSchema, op.UpTable)
err := lock.DeleteColumnsByDDLs(op.DDLs)
if err != nil {
o.logger.Error("fail to update lock columns", zap.Error(err))
continue
}
}
}
}
}
}
return nil
return firstErr
}

// watchSourceInfoOperation watches the etcd operation for source tables, shard DDL infos and shard DDL operations.
Expand Down Expand Up @@ -394,7 +411,7 @@ func (o *Optimist) watchSourceInfoOperation(
}()
go func() {
defer wg.Done()
o.handleInfo(ctx, infoCh)
o.handleInfoPut(ctx, infoCh)
}()

// watch for the shard DDL lock operation and handle them.
Expand Down Expand Up @@ -437,8 +454,8 @@ func (o *Optimist) handleSourceTables(ctx context.Context, sourceCh <-chan optim
}
}

// handleInfo handles PUT and DELETE for the shard DDL info.
func (o *Optimist) handleInfo(ctx context.Context, infoCh <-chan optimism.Info) {
// handleInfoPut handles PUT and DELETE for the shard DDL info.
func (o *Optimist) handleInfoPut(ctx context.Context, infoCh <-chan optimism.Info) {
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -470,35 +487,38 @@ func (o *Optimist) handleInfo(ctx context.Context, infoCh <-chan optimism.Info)
continue
}

added := o.tk.AddTable(info.Task, info.Source, info.UpSchema, info.UpTable, info.DownSchema, info.DownTable)
o.logger.Debug("a table added for info", zap.Bool("added", added), zap.Stringer("info", info))

tts := o.tk.FindTables(info.Task, info.DownSchema, info.DownTable)
if tts == nil {
// WATCH for SourceTables may fall behind WATCH for Info although PUT earlier,
// so we try to get SourceTables again.
// NOTE: check SourceTables for `info.Source` if needed later.
stm, _, err := optimism.GetAllSourceTables(o.cli)
if err != nil {
o.logger.Error("fail to get source tables", log.ShortError(err))
} else if tts2 := optimism.TargetTablesForTask(info.Task, info.DownSchema, info.DownTable, stm); tts2 != nil {
tts = tts2
}
}
// put operation for the table. we don't set `skipDone=true` now,
// because in optimism mode, one table may execute/done multiple DDLs but other tables may do nothing.
err := o.handleLock(info, tts, false)
if err != nil {
o.logger.Error("fail to handle the shard DDL lock", zap.Stringer("info", info), log.ShortError(err))
metrics.ReportDDLError(info.Task, metrics.InfoErrHandleLock)
o.mu.Unlock()
continue
}
_ = o.handleInfo(info, false)
o.mu.Unlock()
}
}
}

func (o *Optimist) handleInfo(info optimism.Info, skipDone bool) error {
added := o.tk.AddTable(info.Task, info.Source, info.UpSchema, info.UpTable, info.DownSchema, info.DownTable)
o.logger.Debug("a table added for info", zap.Bool("added", added), zap.Stringer("info", info))

tts := o.tk.FindTables(info.Task, info.DownSchema, info.DownTable)
if tts == nil {
// WATCH for SourceTables may fall behind WATCH for Info although PUT earlier,
// so we try to get SourceTables again.
// NOTE: check SourceTables for `info.Source` if needed later.
stm, _, err := optimism.GetAllSourceTables(o.cli)
if err != nil {
o.logger.Error("fail to get source tables", log.ShortError(err))
} else if tts2 := optimism.TargetTablesForTask(info.Task, info.DownSchema, info.DownTable, stm); tts2 != nil {
tts = tts2
}
}
err := o.handleLock(info, tts, skipDone)
if err != nil {
o.logger.Error("fail to handle the shard DDL lock", zap.Stringer("info", info), log.ShortError(err))
metrics.ReportDDLError(info.Task, metrics.InfoErrHandleLock)
}
return err
}

// handleOperationPut handles PUT for the shard DDL lock operations.
func (o *Optimist) handleOperationPut(ctx context.Context, opCh <-chan optimism.Operation) {
for {
Expand All @@ -525,6 +545,10 @@ func (o *Optimist) handleOperationPut(ctx context.Context, opCh <-chan optimism.
continue
}

err := lock.DeleteColumnsByDDLs(op.DDLs)
if err != nil {
o.logger.Error("fail to update lock columns", zap.Error(err))
}
// in optimistic mode, we always try to mark a table as done after received the `done` status of the DDLs operation.
// NOTE: even all tables have done their previous DDLs operations, the lock may still not resolved,
// because these tables may have different schemas.
Expand Down Expand Up @@ -553,13 +577,15 @@ func (o *Optimist) handleOperationPut(ctx context.Context, opCh <-chan optimism.

// handleLock handles a single shard DDL lock.
func (o *Optimist) handleLock(info optimism.Info, tts []optimism.TargetTable, skipDone bool) error {
lockID, newDDLs, err := o.lk.TrySync(info, tts)
lockID, newDDLs, err := o.lk.TrySync(o.cli, info, tts)
var cfStage = optimism.ConflictNone
var cfMsg = ""
if info.IgnoreConflict {
o.logger.Warn("error occur when trying to sync for shard DDL info, this often means shard DDL conflict detected",
zap.String("lock", lockID), zap.Stringer("info", info), zap.Bool("is deleted", info.IsDeleted), log.ShortError(err))
} else if err != nil {
cfStage = optimism.ConflictDetected // we treat any errors returned from `TrySync` as conflict detected now.
cfMsg = err.Error()
o.logger.Warn("error occur when trying to sync for shard DDL info, this often means shard DDL conflict detected",
zap.String("lock", lockID), zap.Stringer("info", info), zap.Bool("is deleted", info.IsDeleted), log.ShortError(err))
} else {
Expand Down Expand Up @@ -599,8 +625,8 @@ func (o *Optimist) handleLock(info optimism.Info, tts []optimism.TargetTable, sk
return nil
}

op := optimism.NewOperation(lockID, lock.Task, info.Source, info.UpSchema, info.UpTable, newDDLs, cfStage, false)
rev, succ, err := optimism.PutOperation(o.cli, skipDone, op)
op := optimism.NewOperation(lockID, lock.Task, info.Source, info.UpSchema, info.UpTable, newDDLs, cfStage, cfMsg, false)
rev, succ, err := optimism.PutOperation(o.cli, skipDone, op, info.Revision)
if err != nil {
return err
}
Expand Down Expand Up @@ -671,13 +697,13 @@ func (o *Optimist) deleteInfosOps(lock *optimism.Lock) (bool, error) {
info := optimism.NewInfo(lock.Task, source, schema, table, lock.DownSchema, lock.DownTable, nil, nil, nil)
info.Version = lock.GetVersion(source, schema, table)
infos = append(infos, info)
ops = append(ops, optimism.NewOperation(lock.ID, lock.Task, source, schema, table, nil, optimism.ConflictNone, false))
ops = append(ops, optimism.NewOperation(lock.ID, lock.Task, source, schema, table, nil, optimism.ConflictNone, "", false))
}
}
}
// NOTE: we rely on only `task`, `downSchema`, and `downTable` used for deletion.
initSchema := optimism.NewInitSchema(lock.Task, lock.DownSchema, lock.DownTable, nil)
rev, deleted, err := optimism.DeleteInfosOperationsSchema(o.cli, infos, ops, initSchema)
rev, deleted, err := optimism.DeleteInfosOperationsSchemaColumn(o.cli, infos, ops, initSchema)
if err != nil {
return deleted, err
}
Expand Down
Loading

0 comments on commit bae4109

Please sign in to comment.