diff --git a/dm/master/shardddl/pessimist.go b/dm/master/shardddl/pessimist.go index 2336c766f6..56bfde1c40 100644 --- a/dm/master/shardddl/pessimist.go +++ b/dm/master/shardddl/pessimist.go @@ -463,13 +463,9 @@ func (p *Pessimist) handleInfoPut(ctx context.Context, infoCh <-chan pessimism.I lockID, synced, remain, err := p.lk.TrySync(info, p.taskSources(info.Task)) if err != nil { // TODO: add & update metrics. - // FIXME: the following case is not supported automatically now, try to support it later. - // - the lock become synced, and `done` for `exec` operation received. - // - put `skip` operation for non-owners and the lock is still not resolved. - // - another new DDL from the old owner received and TrySync again with an error returned. - // after the old lock resolved, the new DDL from the old owner will NOT be handled again, - // then the lock will be block because the Pessimist thinks missing DDL from some sources. - // now, we need to `pause-task` and `resume-task` to let DM-workers put DDL again to trigger the process. + // if the lock become synced, and `done` for `exec`/`skip` operation received, + // but the `done` operations have not been deleted, + // then the DM-worker should not put any new DDL info until the old operation has been deleted. p.logger.Error("fail to try sync shard DDL lock", zap.Stringer("info", info), log.ShortError(err)) continue } else if !synced { diff --git a/pkg/etcdutil/etcdutil.go b/pkg/etcdutil/etcdutil.go index 904ec1d211..79110202fe 100644 --- a/pkg/etcdutil/etcdutil.go +++ b/pkg/etcdutil/etcdutil.go @@ -116,6 +116,26 @@ func DoOpsInOneTxnWithRetry(cli *clientv3.Client, ops ...clientv3.Op) (*clientv3 return resp, resp.Header.Revision, nil } +// DoOpsInOneCmpsTxnWithRetry do multiple etcd operations in one txn and with comparisons. +func DoOpsInOneCmpsTxnWithRetry(cli *clientv3.Client, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (*clientv3.TxnResponse, int64, error) { + ctx, cancel := context.WithTimeout(cli.Ctx(), DefaultRequestTimeout) + defer cancel() + tctx := tcontext.NewContext(ctx, log.L()) + ret, _, err := etcdDefaultTxnStrategy.Apply(tctx, etcdDefaultTxnRetryParam, func(t *tcontext.Context) (ret interface{}, err error) { + resp, err := cli.Txn(ctx).If(cmps...).Then(opsThen...).Else(opsElse...).Commit() + if err != nil { + return nil, err + } + return resp, nil + }) + + if err != nil { + return nil, 0, err + } + resp := ret.(*clientv3.TxnResponse) + return resp, resp.Header.Revision, nil +} + // IsRetryableError check whether error is retryable error for etcd to build again func IsRetryableError(err error) bool { switch errors.Cause(err) { diff --git a/pkg/shardddl/pessimism/info.go b/pkg/shardddl/pessimism/info.go index e002817641..4c794bb5f1 100644 --- a/pkg/shardddl/pessimism/info.go +++ b/pkg/shardddl/pessimism/info.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/dm/dm/common" "github.com/pingcap/dm/pkg/etcdutil" + "github.com/pingcap/dm/pkg/utils" ) // Info represents the shard DDL information. @@ -95,6 +96,46 @@ func PutInfo(cli *clientv3.Client, info Info) (int64, error) { return resp.Header.Revision, nil } +// PutInfoIfOpNotDone puts the shard DDL info into etcd if the operation not exists or not `done`. +func PutInfoIfOpNotDone(cli *clientv3.Client, info Info) (rev int64, putted bool, err error) { + infoValue, err := info.toJSON() + if err != nil { + return 0, false, err + } + infoKey := common.ShardDDLPessimismInfoKeyAdapter.Encode(info.Task, info.Source) + opKey := common.ShardDDLPessimismOperationKeyAdapter.Encode(info.Task, info.Source) + infoPut := clientv3.OpPut(infoKey, infoValue) + opGet := clientv3.OpGet(opKey) + + // try to PUT info if the operation not exist. + resp, rev, err := etcdutil.DoOpsInOneCmpsTxnWithRetry(cli, []clientv3.Cmp{clientv3util.KeyMissing(opKey)}, + []clientv3.Op{infoPut}, []clientv3.Op{opGet}) + if err != nil { + return 0, false, err + } else if resp.Succeeded { + return rev, resp.Succeeded, nil + } + + opsResp := resp.Responses[0].GetResponseRange() + opBefore, err := operationFromJSON(string(opsResp.Kvs[0].Value)) + if err != nil { + return 0, false, err + } else if opBefore.Done { + // the operation with `done` exist before, abort the PUT. + return rev, false, nil + } else if utils.CompareShardingDDLs(opBefore.DDLs, info.DDLs) { + // TODO: try to handle put the same `done` DDL later. + } + + // NOTE: try to PUT info if the operation still not done. + opNotDone := clientv3.Compare(clientv3.Value(opKey), "=", string(opsResp.Kvs[0].Value)) + resp, rev, err = etcdutil.DoOpsInOneCmpsTxnWithRetry(cli, []clientv3.Cmp{opNotDone}, []clientv3.Op{infoPut}, []clientv3.Op{}) + if err != nil { + return 0, false, err + } + return rev, resp.Succeeded, nil +} + // GetAllInfo gets all shard DDL info in etcd currently. // k/k/v: task-name -> source-ID -> DDL info. // This function should often be called by DM-master. diff --git a/pkg/shardddl/pessimism/info_test.go b/pkg/shardddl/pessimism/info_test.go index 843f4e3f25..2895e4caf4 100644 --- a/pkg/shardddl/pessimism/info_test.go +++ b/pkg/shardddl/pessimism/info_test.go @@ -15,11 +15,13 @@ package pessimism import ( "context" + "fmt" "sync" "testing" "time" . "github.com/pingcap/check" + "github.com/pingcap/tidb-tools/pkg/dbutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/integration" @@ -153,3 +155,56 @@ func (t *testForEtcd) TestInfoEtcd(c *C) { c.Assert(ifm[task2], HasLen, 1) c.Assert(ifm[task2][source1], DeepEquals, i21) } + +func (t *testForEtcd) TestPutInfoIfOpNotDone(c *C) { + defer clearTestInfoOperation(c) + + var ( + source = "mysql-replica-1" + task = "test-put-info-if-no-op" + schema = "foo" + table = "bar" + DDLs = []string{"ALTER TABLE bar ADD COLUMN c1 INT"} + ID = fmt.Sprintf("%s-%s", task, dbutil.TableName(schema, table)) + info = NewInfo(task, source, schema, table, DDLs) + op = NewOperation(ID, task, source, DDLs, false, false) + ) + + // put info success because no operation exist. + rev1, putted, err := PutInfoIfOpNotDone(etcdTestCli, info) + c.Assert(err, IsNil) + c.Assert(rev1, Greater, int64(0)) + c.Assert(putted, IsTrue) + + // put a non-done operation. + rev2, putted, err := PutOperations(etcdTestCli, false, op) + c.Assert(err, IsNil) + c.Assert(rev2, Greater, rev1) + c.Assert(putted, IsTrue) + + // still can put info. + rev3, putted, err := PutInfoIfOpNotDone(etcdTestCli, info) + c.Assert(err, IsNil) + c.Assert(rev3, Greater, rev2) + c.Assert(putted, IsTrue) + + // change op to `done` and put it. + op.Done = true + rev4, putted, err := PutOperations(etcdTestCli, false, op) + c.Assert(err, IsNil) + c.Assert(rev4, Greater, rev3) + c.Assert(putted, IsTrue) + + // can't put info anymore. + rev5, putted, err := PutInfoIfOpNotDone(etcdTestCli, info) + c.Assert(err, IsNil) + c.Assert(rev5, Equals, rev4) + c.Assert(putted, IsFalse) + + // try put anther info, but still can't put it. + info.DDLs = []string{"ALTER TABLE bar ADD COLUMN c2 INT"} + rev6, putted, err := PutInfoIfOpNotDone(etcdTestCli, info) + c.Assert(err, IsNil) + c.Assert(rev6, Equals, rev5) + c.Assert(putted, IsFalse) +} diff --git a/pkg/shardddl/pessimism/operation.go b/pkg/shardddl/pessimism/operation.go index ee360f5db7..8ae96578e4 100644 --- a/pkg/shardddl/pessimism/operation.go +++ b/pkg/shardddl/pessimism/operation.go @@ -35,6 +35,10 @@ type Operation struct { DDLs []string `json:"ddls"` // DDL statements Exec bool `json:"exec"` // execute or skip the DDL statements Done bool `json:"done"` // whether the `Exec` operation has done + + // only used to report to the caller of the watcher, do not marsh it. + // if it's true, it means the Operation has been deleted in etcd. + IsDeleted bool `json:"-"` } // NewOperation creates a new Operation instance. @@ -202,8 +206,22 @@ func GetInfosOperationsByTask(cli *clientv3.Client, task string) ([]Info, []Oper // This function can be called by DM-worker and DM-master. // TODO(csuzhangxc): report error and do some retry. func WatchOperationPut(ctx context.Context, cli *clientv3.Client, task, source string, revision int64, outCh chan<- Operation, errCh chan<- error) { + watchOperation(ctx, cli, mvccpb.PUT, task, source, revision, outCh, errCh) +} + +// WatchOperationDelete watches DELETE operations for DDL lock operation. +// If want to watch all operations, pass empty string for `task` and `source`. +// This function is often called by DM-worker. +func WatchOperationDelete(ctx context.Context, cli *clientv3.Client, task, source string, revision int64, outCh chan<- Operation, errCh chan<- error) { + watchOperation(ctx, cli, mvccpb.DELETE, task, source, revision, outCh, errCh) +} + +// watchOperation watches PUT or DELETE operations for DDL lock operation. +func watchOperation(ctx context.Context, cli *clientv3.Client, watchType mvccpb.Event_EventType, + task, source string, revision int64, + outCh chan<- Operation, errCh chan<- error) { ch := cli.Watch(ctx, common.ShardDDLPessimismOperationKeyAdapter.Encode(task, source), - clientv3.WithPrefix(), clientv3.WithRev(revision)) + clientv3.WithPrefix(), clientv3.WithRev(revision), clientv3.WithPrevKV()) for { select { @@ -219,18 +237,25 @@ func WatchOperationPut(ctx context.Context, cli *clientv3.Client, task, source s } for _, ev := range resp.Events { - if ev.Type != mvccpb.PUT { - continue + var ( + op Operation + err error + ) + + if ev.Type == mvccpb.PUT && watchType == mvccpb.PUT { + op, err = operationFromJSON(string(ev.Kv.Value)) + } else if ev.Type == mvccpb.DELETE && watchType == mvccpb.DELETE { + op, err = operationFromJSON(string(ev.PrevKv.Value)) + op.IsDeleted = true } - op, err := operationFromJSON(string(ev.Kv.Value)) if err != nil { select { case errCh <- err: case <-ctx.Done(): return } - } else { + } else if op.Task != "" { // valid operation got. select { case outCh <- op: case <-ctx.Done(): diff --git a/pkg/shardddl/pessimism/operation_test.go b/pkg/shardddl/pessimism/operation_test.go index 3112c8bcb5..929b308756 100644 --- a/pkg/shardddl/pessimism/operation_test.go +++ b/pkg/shardddl/pessimism/operation_test.go @@ -122,6 +122,23 @@ func (t *testForEtcd) TestOperationEtcd(c *C) { c.Assert(err, IsNil) c.Assert(rev6, Greater, rev5) + // start watch with an older revision for the deleted op11. + wch = make(chan Operation, 10) + ech = make(chan error, 10) + ctx, cancel = context.WithTimeout(context.Background(), 500*time.Millisecond) + WatchOperationDelete(ctx, etcdTestCli, op11.Task, op11.Source, rev5, wch, ech) + cancel() + close(wch) + close(ech) + + // watch should got the previous deleted operation. + c.Assert(len(wch), Equals, 1) + c.Assert(len(ech), Equals, 0) + op11d := <-wch + c.Assert(op11d.IsDeleted, IsTrue) + op11d.IsDeleted = false // reset to false + c.Assert(op11d, DeepEquals, op11) + // get again, op11 should be deleted. opm, _, err = GetAllOperations(etcdTestCli) c.Assert(err, IsNil) diff --git a/syncer/shardddl/pessimist.go b/syncer/shardddl/pessimist.go index 769994bbe7..6f036f695b 100644 --- a/syncer/shardddl/pessimist.go +++ b/syncer/shardddl/pessimist.go @@ -65,8 +65,37 @@ func (p *Pessimist) ConstructInfo(schema, table string, DDLs []string) pessimism } // PutInfo puts the shard DDL info into etcd and returns the revision. -func (p *Pessimist) PutInfo(info pessimism.Info) (int64, error) { - rev, err := pessimism.PutInfo(p.cli, info) +func (p *Pessimist) PutInfo(ctx context.Context, info pessimism.Info) (int64, error) { + // put info only no previous operation exists or not done. + rev, putted, err := pessimism.PutInfoIfOpNotDone(p.cli, info) + if err != nil { + return 0, err + } else if putted { + p.mu.Lock() + p.pendingInfo = &info + p.mu.Unlock() + + return rev, nil + } + + p.logger.Warn("the previous shard DDL operation still exists, waiting for it to be deleted", zap.Stringer("info", info)) + + // wait for the operation to be deleted. + ctx2, cancel2 := context.WithCancel(ctx) + defer cancel2() + ch := make(chan pessimism.Operation, 1) + errCh := make(chan error, 1) + go pessimism.WatchOperationDelete(ctx2, p.cli, info.Task, info.Source, rev, ch, errCh) + + select { + case <-ch: // deleted. + case err := <-errCh: + return 0, err + case <-ctx.Done(): + return 0, ctx.Err() + } + + rev, err = pessimism.PutInfo(p.cli, info) if err != nil { return 0, err } diff --git a/syncer/shardddl/pessimist_test.go b/syncer/shardddl/pessimist_test.go index 87631a67e2..f5b1962fdd 100644 --- a/syncer/shardddl/pessimist_test.go +++ b/syncer/shardddl/pessimist_test.go @@ -16,6 +16,7 @@ package shardddl import ( "context" "testing" + "time" . "github.com/pingcap/check" "go.etcd.io/etcd/clientv3" @@ -78,7 +79,7 @@ func (t *testPessimist) TestPessimist(c *C) { c.Assert(p.PendingOperation(), IsNil) // put shard DDL info. - rev1, err := p.PutInfo(info) + rev1, err := p.PutInfo(ctx, info) c.Assert(err, IsNil) c.Assert(rev1, Greater, int64(0)) @@ -123,8 +124,21 @@ func (t *testPessimist) TestPessimist(c *C) { c.Assert(p.PendingInfo(), IsNil) c.Assert(p.PendingOperation(), IsNil) + // try to put info again, but timeout because a `done` operation exist in etcd. + ctx2, cancel2 := context.WithTimeout(ctx, time.Second) + defer cancel2() + _, err = p.PutInfo(ctx2, info) + c.Assert(err, Equals, context.DeadlineExceeded) + + // start a goroutine to delete the `done` operation in background, then we can put info again. + go func() { + time.Sleep(500 * time.Millisecond) // wait `PutInfo` to start watch the deletion of the operation. + _, err2 := pessimism.DeleteOperations(etcdTestCli, op) + c.Assert(err2, IsNil) + }() + // put info again, but do not complete the flow. - _, err = p.PutInfo(info) + _, err = p.PutInfo(ctx, info) c.Assert(err, IsNil) c.Assert(p.PendingInfo(), NotNil) diff --git a/syncer/syncer.go b/syncer/syncer.go index 767448c856..217e396964 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1898,12 +1898,12 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e // construct & send shard DDL info into etcd, DM-master will handle it. shardInfo := s.pessimist.ConstructInfo(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name, needHandleDDLs) - rev, err2 := s.pessimist.PutInfo(shardInfo) + rev, err2 := s.pessimist.PutInfo(ec.tctx.Ctx, shardInfo) if err2 != nil { return err2 } shardLockResolving.WithLabelValues(s.cfg.Name).Set(1) // block and wait DDL lock to be synced - s.tctx.L().Info("putted shard DDL info", zap.Stringer("info", shardInfo)) + s.tctx.L().Info("putted shard DDL info", zap.Stringer("info", shardInfo), zap.Int64("revision", rev)) shardOp, err2 := s.pessimist.GetOperation(ec.tctx.Ctx, shardInfo, rev+1) shardLockResolving.WithLabelValues(s.cfg.Name).Set(0) diff --git a/tests/_utils/run_sql b/tests/_utils/run_sql index b6442d4e04..547d4d50de 100755 --- a/tests/_utils/run_sql +++ b/tests/_utils/run_sql @@ -1,4 +1,4 @@ -#!/bin/sh +#!/bin/bash # parameter 1: sql command # parameter 2: port # parameter 3: password diff --git a/tests/_utils/run_sql_file b/tests/_utils/run_sql_file index 15833b2f1e..8af6799d91 100755 --- a/tests/_utils/run_sql_file +++ b/tests/_utils/run_sql_file @@ -1,4 +1,4 @@ -#!/bin/sh +#!/bin/bash # parameter 1: sql file # parameter 2: host # parameter 3: port diff --git a/tests/ha_cases/run.sh b/tests/ha_cases/run.sh index dc22f65e2a..ec467e77e6 100755 --- a/tests/ha_cases/run.sh +++ b/tests/ha_cases/run.sh @@ -414,6 +414,11 @@ function test_pause_task() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "pause-task $name"\ "\"result\": true" 3 + + # pause twice, just used to test pause by the way + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "pause-task $name"\ + "\"result\": true" 3 run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status $name"\ @@ -427,6 +432,11 @@ function test_pause_task() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "resume-task $name"\ "\"result\": true" 3 + + # resume twice, just used to test resume by the way + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "resume-task $name"\ + "\"result\": true" 3 run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status $name"\ diff --git a/tests/sharding/run.sh b/tests/sharding/run.sh index 274016338e..a4a8e76a0f 100755 --- a/tests/sharding/run.sh +++ b/tests/sharding/run.sh @@ -86,30 +86,6 @@ function run() { "resume-task test"\ "\"result\": true" 3 - # NOTE: the lock may be locked for the next DDL, for details please see the following comments in `master/shardll/pessimist.go`, - # `FIXME: the following case is not supported automatically now, try to support it later` - # so we try to do this `pause-task` and `resume-task` in the case now. - sleep 3 - # pause twice, just used to test pause by the way - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "pause-task test"\ - "\"result\": true" 3 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "pause-task test"\ - "\"result\": true" 3 - # wait really paused - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status test" \ - "Paused" 2 - - # resume twice, just used to test resume by the way - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "resume-task test"\ - "\"result\": true" 3 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "resume-task test"\ - "\"result\": true" 3 - # TODO: check sharding partition id # use sync_diff_inspector to check data now! echo "check sync diff for the first increment replication"