Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
shardddl: add infoOpLock for pessmist shardddl (#1257) (#1263)
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>

Co-authored-by: GMHDBJD <[email protected]>
  • Loading branch information
ti-srebot and GMHDBJD authored Nov 2, 2020
1 parent dd8c11a commit a8a59e6
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 7 deletions.
47 changes: 43 additions & 4 deletions dm/master/shardddl/pessimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/shardddl/pessimism"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/failpoint"
)

var (
Expand All @@ -52,6 +53,8 @@ type Pessimist struct {

// taskSources used to get all sources relative to the given task.
taskSources func(task string) []string

infoOpMu sync.Mutex
}

// NewPessimist creates a new Pessimist instance.
Expand Down Expand Up @@ -464,17 +467,18 @@ func (p *Pessimist) handleInfoPut(ctx context.Context, infoCh <-chan pessimism.I
return
}
p.logger.Info("receive a shard DDL info", zap.Stringer("info", info))

p.infoOpMu.Lock()
lockID, synced, remain, err := p.lk.TrySync(info, p.taskSources(info.Task))
if err != nil {
// if the lock become synced, and `done` for `exec`/`skip` operation received,
// but the `done` operations have not been deleted,
// then the DM-worker should not put any new DDL info until the old operation has been deleted.
p.logger.Error("fail to try sync shard DDL lock", zap.Stringer("info", info), log.ShortError(err))
// currently, only DDL mismatch will cause error
metrics.ReportDDLError(info.Task, metrics.InfoErrSyncLock)
p.infoOpMu.Unlock()
continue
} else if !synced {
p.logger.Info("the shard DDL lock has not synced", zap.String("lock", lockID), zap.Int("remain", remain))
p.infoOpMu.Unlock()
continue
}
p.logger.Info("the shard DDL lock has synced", zap.String("lock", lockID))
Expand All @@ -483,8 +487,8 @@ func (p *Pessimist) handleInfoPut(ctx context.Context, infoCh <-chan pessimism.I
if err != nil {
p.logger.Error("fail to handle the shard DDL lock", zap.String("lock", lockID), log.ShortError(err))
metrics.ReportDDLError(info.Task, metrics.InfoErrHandleLock)
continue
}
p.infoOpMu.Unlock()
}
}
}
Expand All @@ -505,14 +509,17 @@ func (p *Pessimist) handleOperationPut(ctx context.Context, opCh <-chan pessimis
continue
}

p.infoOpMu.Lock()
lock := p.lk.FindLock(op.ID)
if lock == nil {
p.logger.Warn("no lock for the shard DDL lock operation exist", zap.Stringer("operation", op))
p.infoOpMu.Unlock()
continue
} else if synced, _ := lock.IsSynced(); !synced {
// this should not happen in normal case.
p.logger.Warn("the lock for the shard DDL lock operation has not synced", zap.Stringer("operation", op))
metrics.ReportDDLError(op.Task, metrics.OpErrLockUnSynced)
p.infoOpMu.Unlock()
continue
}

Expand All @@ -527,13 +534,15 @@ func (p *Pessimist) handleOperationPut(ctx context.Context, opCh <-chan pessimis
metrics.ReportDDLError(op.Task, metrics.OpErrRemoveLock)
}
p.logger.Info("the lock info for the shard DDL lock operation has been cleared", zap.Stringer("operation", op))
p.infoOpMu.Unlock()
continue
}

// one of the non-owner dm-worker instance has done the operation,
// still need to wait for more `done` from other non-owner dm-worker instances.
if op.Source != lock.Owner {
p.logger.Info("the shard DDL lock operation of a non-owner has done", zap.Stringer("operation", op), zap.String("owner", lock.Owner))
p.infoOpMu.Unlock()
continue
}

Expand All @@ -544,6 +553,7 @@ func (p *Pessimist) handleOperationPut(ctx context.Context, opCh <-chan pessimis
p.logger.Error("fail to put skip shard DDL lock operations for non-owner", zap.String("lock", lock.ID), log.ShortError(err))
metrics.ReportDDLError(op.Task, metrics.OpErrPutNonOwnerOp)
}
p.infoOpMu.Unlock()
}
}
}
Expand Down Expand Up @@ -631,6 +641,35 @@ func (p *Pessimist) removeLock(lock *pessimism.Lock) error {
if err != nil {
return err
}

failpoint.Inject("SleepWhenRemoveLock", func(val failpoint.Value) {
t := val.(int)
log.L().Info("wait new ddl info putted into etcd",
zap.String("failpoint", "SleepWhenRemoveLock"),
zap.Int("max wait second", t))

ticker := time.NewTicker(time.Second)
defer ticker.Stop()
timer := time.NewTimer(time.Duration(t) * time.Second)
defer timer.Stop()
OUTER:
for {
select {
case <-timer.C:
log.L().Info("failed to wait new DDL info", zap.Int("wait second", t))
break OUTER
case <-ticker.C:
// manually check etcd
infos, _, err := pessimism.GetAllInfo(p.cli)
if err == nil {
if _, ok := infos[lock.Task]; ok {
log.L().Info("found new DDL info")
break OUTER
}
}
}
}
})
p.lk.RemoveLock(lock.ID)
metrics.ReportDDLPending(lock.Task, metrics.DDLPendingSynced, metrics.DDLPendingNone)
return nil
Expand Down
2 changes: 1 addition & 1 deletion tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ function check_log_contain_with_retry() {
log2=$3
fi
rc=0
for ((k=1;k<11;k++)); do
for ((k=1;k<31;k++)); do
if [[ ! -f $log1 ]]; then
sleep 2
echo "check log contain failed $k-th time (file not exist), retry later"
Expand Down
13 changes: 11 additions & 2 deletions tests/shardddl3/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,11 @@ function DM_RemoveLock_CASE() {
check_log_contain_with_retry "wait new ddl info putted into etcd" $WORK_DIR/master/log/dm-master.log
run_sql_source1 "alter table ${shardddl1}.${tb1} drop column b;"

check_log_contain_with_retry "fail to delete shard DDL infos and lock operations" $WORK_DIR/master/log/dm-master.log
if [[ "$1" = "pessimistic" ]]; then
check_log_contain_with_retry "found new DDL info" $WORK_DIR/master/log/dm-master.log
else
check_log_contain_with_retry "fail to delete shard DDL infos and lock operations" $WORK_DIR/master/log/dm-master.log
fi

run_sql_source1 "alter table ${shardddl1}.${tb1} change a a bigint default 10;"
run_sql_source2 "alter table ${shardddl1}.${tb1} drop column b;"
Expand All @@ -700,13 +704,18 @@ function DM_RemoveLock_CASE() {
function DM_RemoveLock() {
ps aux | grep dm-master |awk '{print $2}'|xargs kill || true
check_port_offline $MASTER_PORT1 20
export GO_FAILPOINTS="github.com/pingcap/dm/dm/master/shardddl/SleepWhenRemoveLock=return(10)"
export GO_FAILPOINTS="github.com/pingcap/dm/dm/master/shardddl/SleepWhenRemoveLock=return(30)"
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member -w" \
"bound" 2

run_case RemoveLock "double-source-pessimistic" \
"run_sql_source1 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \
run_sql_source2 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \
run_sql_source2 \"create table ${shardddl1}.${tb2} (a int, b varchar(10));\"" \
"clean_table" "pessimistic"
run_case RemoveLock "double-source-optimistic" \
"run_sql_source1 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \
run_sql_source2 \"create table ${shardddl1}.${tb1} (a int, b varchar(10));\"; \
Expand Down

0 comments on commit a8a59e6

Please sign in to comment.