From 2d3c3ae4558d4b7d77075dc35166ac19393494dc Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Thu, 27 Aug 2020 14:10:00 +0800 Subject: [PATCH] return lock instead of lockID in TrySync --- dm/master/shardddl/optimist.go | 18 +++++++++--------- pkg/shardddl/optimism/keeper.go | 4 ++-- pkg/shardddl/optimism/keeper_test.go | 28 +++++++++++++++++++++------- 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/dm/master/shardddl/optimist.go b/dm/master/shardddl/optimist.go index 7c645c5c62..eca8cabb43 100644 --- a/dm/master/shardddl/optimist.go +++ b/dm/master/shardddl/optimist.go @@ -476,15 +476,15 @@ func (o *Optimist) handleOperationPut(ctx context.Context, opCh <-chan optimism. // handleLock handles a single shard DDL lock. func (o *Optimist) handleLock(info optimism.Info, tts []optimism.TargetTable, skipDone bool) error { - lockID, newDDLs, err := o.lk.TrySync(info, tts) + l, newDDLs, err := o.lk.TrySync(info, tts) var cfStage = optimism.ConflictNone if err != nil { cfStage = optimism.ConflictDetected // we treat any errors returned from `TrySync` as conflict detected now. o.logger.Warn("error occur when trying to sync for shard DDL info, this often means shard DDL conflict detected", - zap.String("lock", lockID), zap.Stringer("info", info), zap.Bool("is deleted", info.IsDeleted), log.ShortError(err)) + zap.String("lock", l.ID), zap.Stringer("info", info), zap.Bool("is deleted", info.IsDeleted), log.ShortError(err)) } else { o.logger.Info("the shard DDL lock returned some DDLs", - zap.String("lock", lockID), zap.Strings("ddls", newDDLs), zap.Stringer("info", info), zap.Bool("is deleted", info.IsDeleted)) + zap.String("lock", l.ID), zap.Strings("ddls", newDDLs), zap.Stringer("info", info), zap.Bool("is deleted", info.IsDeleted)) // try to record the init schema before applied the DDL to the downstream. initSchema := optimism.NewInitSchema(info.Task, info.DownSchema, info.DownTable, info.TableInfoBefore) @@ -498,11 +498,11 @@ func (o *Optimist) handleLock(info optimism.Info, tts []optimism.TargetTable, sk } } - lock := o.lk.FindLock(lockID) + lock := o.lk.FindLock(l.ID) if lock == nil { - // this should not happen. - o.logger.Warn("lock not found after try sync for shard DDL info", zap.String("lock", lockID), zap.Stringer("info", info)) - return nil + // the lock was remove by others, revert it back + o.logger.Info("lock not found after try sync for shard DDL info, revert it back", zap.String("lock", l.ID), zap.Stringer("info", info)) + lock = l } // check whether the lock has resolved. @@ -516,12 +516,12 @@ func (o *Optimist) handleLock(info optimism.Info, tts []optimism.TargetTable, sk return nil } - op := optimism.NewOperation(lockID, lock.Task, info.Source, info.UpSchema, info.UpTable, newDDLs, cfStage, false) + op := optimism.NewOperation(lock.ID, lock.Task, info.Source, info.UpSchema, info.UpTable, newDDLs, cfStage, false) rev, succ, err := optimism.PutOperation(o.cli, skipDone, op) if err != nil { return err } - o.logger.Info("put shard DDL lock operation", zap.String("lock", lockID), + o.logger.Info("put shard DDL lock operation", zap.String("lock", lock.ID), zap.Stringer("operation", op), zap.Bool("already exist", !succ), zap.Int64("revision", rev)) return nil } diff --git a/pkg/shardddl/optimism/keeper.go b/pkg/shardddl/optimism/keeper.go index 9d9228f3a9..c2d1120182 100644 --- a/pkg/shardddl/optimism/keeper.go +++ b/pkg/shardddl/optimism/keeper.go @@ -35,7 +35,7 @@ func NewLockKeeper() *LockKeeper { } // TrySync tries to sync the lock. -func (lk *LockKeeper) TrySync(info Info, tts []TargetTable) (string, []string, error) { +func (lk *LockKeeper) TrySync(info Info, tts []TargetTable) (*Lock, []string, error) { var ( lockID = genDDLLockID(info) l *Lock @@ -51,7 +51,7 @@ func (lk *LockKeeper) TrySync(info Info, tts []TargetTable) (string, []string, e } newDDLs, err := l.TrySync(info.Source, info.UpSchema, info.UpTable, info.DDLs, info.TableInfoAfter, tts) - return lockID, newDDLs, err + return l, newDDLs, err } // RemoveLock removes a lock. diff --git a/pkg/shardddl/optimism/keeper_test.go b/pkg/shardddl/optimism/keeper_test.go index 0b73b561b3..ccfb7fae4c 100644 --- a/pkg/shardddl/optimism/keeper_test.go +++ b/pkg/shardddl/optimism/keeper_test.go @@ -56,8 +56,10 @@ func (t *testKeeper) TestLockKeeper(c *C) { ) // lock with 2 sources. - lockID1, newDDLs, err := lk.TrySync(i11, tts1) + l1, newDDLs, err := lk.TrySync(i11, tts1) c.Assert(err, IsNil) + c.Assert(l1, NotNil) + lockID1 := l1.ID c.Assert(lockID1, Equals, "task1-`foo`.`bar`") c.Assert(newDDLs, DeepEquals, DDLs) lock1 := lk.FindLock(lockID1) @@ -68,8 +70,10 @@ func (t *testKeeper) TestLockKeeper(c *C) { c.Assert(synced, IsFalse) c.Assert(remain, Equals, 1) - lockID1, newDDLs, err = lk.TrySync(i12, tts1) + l1, newDDLs, err = lk.TrySync(i12, tts1) c.Assert(err, IsNil) + c.Assert(l1, NotNil) + lockID1 = l1.ID c.Assert(lockID1, Equals, "task1-`foo`.`bar`") c.Assert(newDDLs, DeepEquals, DDLs) lock1 = lk.FindLock(lockID1) @@ -80,8 +84,10 @@ func (t *testKeeper) TestLockKeeper(c *C) { c.Assert(remain, Equals, 0) // lock with only 1 source. - lockID2, newDDLs, err := lk.TrySync(i21, tts2) + l2, newDDLs, err := lk.TrySync(i21, tts2) c.Assert(err, IsNil) + c.Assert(l2, NotNil) + lockID2 := l2.ID c.Assert(lockID2, Equals, "task2-`foo`.`bar`") c.Assert(newDDLs, DeepEquals, DDLs) lock2 := lk.FindLock(lockID2) @@ -149,14 +155,18 @@ func (t *testKeeper) TestLockKeeperMultipleTarget(c *C) { ) // lock for target1. - lockID1, newDDLs, err := lk.TrySync(i11, tts1) + l1, newDDLs, err := lk.TrySync(i11, tts1) c.Assert(err, IsNil) + c.Assert(l1, NotNil) + lockID1 := l1.ID c.Assert(lockID1, DeepEquals, "test-lock-keeper-multiple-target-`foo`.`bar`") c.Assert(newDDLs, DeepEquals, DDLs) // lock for target2. - lockID2, newDDLs, err := lk.TrySync(i21, tts2) + l2, newDDLs, err := lk.TrySync(i21, tts2) c.Assert(err, IsNil) + c.Assert(l2, NotNil) + lockID2 := l2.ID c.Assert(lockID2, DeepEquals, "test-lock-keeper-multiple-target-`foo`.`rab`") c.Assert(newDDLs, DeepEquals, DDLs) @@ -177,12 +187,16 @@ func (t *testKeeper) TestLockKeeperMultipleTarget(c *C) { c.Assert(remain, Equals, 1) // sync for two locks. - lockID1, newDDLs, err = lk.TrySync(i12, tts1) + l1, newDDLs, err = lk.TrySync(i12, tts1) c.Assert(err, IsNil) + c.Assert(l1, NotNil) + lockID1 = l1.ID c.Assert(lockID1, DeepEquals, "test-lock-keeper-multiple-target-`foo`.`bar`") c.Assert(newDDLs, DeepEquals, DDLs) - lockID2, newDDLs, err = lk.TrySync(i22, tts2) + l2, newDDLs, err = lk.TrySync(i22, tts2) c.Assert(err, IsNil) + c.Assert(l2, NotNil) + lockID2 = l2.ID c.Assert(lockID2, DeepEquals, "test-lock-keeper-multiple-target-`foo`.`rab`") c.Assert(newDDLs, DeepEquals, DDLs)