Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: fix new shard DDL handle when the previous one has not complete #725

Merged
merged 10 commits into from
Jun 12, 2020
10 changes: 3 additions & 7 deletions dm/master/shardddl/pessimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,13 +463,9 @@ func (p *Pessimist) handleInfoPut(ctx context.Context, infoCh <-chan pessimism.I
lockID, synced, remain, err := p.lk.TrySync(info, p.taskSources(info.Task))
if err != nil {
// TODO: add & update metrics.
// FIXME: the following case is not supported automatically now, try to support it later.
// - the lock become synced, and `done` for `exec` operation received.
// - put `skip` operation for non-owners and the lock is still not resolved.
// - another new DDL from the old owner received and TrySync again with an error returned.
// after the old lock resolved, the new DDL from the old owner will NOT be handled again,
// then the lock will be block because the Pessimist thinks missing DDL from some sources.
// now, we need to `pause-task` and `resume-task` to let DM-workers put DDL again to trigger the process.
// if the lock become synced, and `done` for `exec`/`skip` operation received,
// but the `done` operations have not been deleted,
// then the DM-worker should not put any new DDL info until the old operation has been deleted.
p.logger.Error("fail to try sync shard DDL lock", zap.Stringer("info", info), log.ShortError(err))
continue
} else if !synced {
Expand Down
20 changes: 20 additions & 0 deletions pkg/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,26 @@ func DoOpsInOneTxnWithRetry(cli *clientv3.Client, ops ...clientv3.Op) (*clientv3
return resp, resp.Header.Revision, nil
}

// DoOpsInOneCmpsTxnWithRetry do multiple etcd operations in one txn and with comparisons.
func DoOpsInOneCmpsTxnWithRetry(cli *clientv3.Client, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (*clientv3.TxnResponse, int64, error) {
ctx, cancel := context.WithTimeout(cli.Ctx(), DefaultRequestTimeout)
defer cancel()
tctx := tcontext.NewContext(ctx, log.L())
ret, _, err := etcdDefaultTxnStrategy.Apply(tctx, etcdDefaultTxnRetryParam, func(t *tcontext.Context) (ret interface{}, err error) {
resp, err := cli.Txn(ctx).If(cmps...).Then(opsThen...).Else(opsElse...).Commit()
if err != nil {
return nil, err
}
return resp, nil
})

if err != nil {
return nil, 0, err
}
resp := ret.(*clientv3.TxnResponse)
return resp, resp.Header.Revision, nil
}

// IsRetryableError check whether error is retryable error for etcd to build again
func IsRetryableError(err error) bool {
switch errors.Cause(err) {
Expand Down
41 changes: 41 additions & 0 deletions pkg/shardddl/pessimism/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pingcap/dm/dm/common"
"github.com/pingcap/dm/pkg/etcdutil"
"github.com/pingcap/dm/pkg/utils"
)

// Info represents the shard DDL information.
Expand Down Expand Up @@ -95,6 +96,46 @@ func PutInfo(cli *clientv3.Client, info Info) (int64, error) {
return resp.Header.Revision, nil
}

// PutInfoIfOpNotDone puts the shard DDL info into etcd if the operation not exists or not `done`.
func PutInfoIfOpNotDone(cli *clientv3.Client, info Info) (rev int64, putted bool, err error) {
infoValue, err := info.toJSON()
if err != nil {
return 0, false, err
}
infoKey := common.ShardDDLPessimismInfoKeyAdapter.Encode(info.Task, info.Source)
opKey := common.ShardDDLPessimismOperationKeyAdapter.Encode(info.Task, info.Source)
infoPut := clientv3.OpPut(infoKey, infoValue)
opGet := clientv3.OpGet(opKey)

// try to PUT info if the operation not exist.
resp, rev, err := etcdutil.DoOpsInOneCmpsTxnWithRetry(cli, []clientv3.Cmp{clientv3util.KeyMissing(opKey)},
[]clientv3.Op{infoPut}, []clientv3.Op{opGet})
if err != nil {
return 0, false, err
} else if resp.Succeeded {
return rev, resp.Succeeded, nil
}

opsResp := resp.Responses[0].GetResponseRange()
opBefore, err := operationFromJSON(string(opsResp.Kvs[0].Value))
if err != nil {
return 0, false, err
} else if opBefore.Done {
// the operation with `done` exist before, abort the PUT.
return rev, false, nil
} else if utils.CompareShardingDDLs(opBefore.DDLs, info.DDLs) {
// TODO: try to handle put the same `done` DDL later.
}

// NOTE: try to PUT info if the operation still not done.
opNotDone := clientv3.Compare(clientv3.Value(opKey), "=", string(opsResp.Kvs[0].Value))
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
resp, rev, err = etcdutil.DoOpsInOneCmpsTxnWithRetry(cli, []clientv3.Cmp{opNotDone}, []clientv3.Op{infoPut}, []clientv3.Op{})
if err != nil {
return 0, false, err
}
return rev, resp.Succeeded, nil
}

// GetAllInfo gets all shard DDL info in etcd currently.
// k/k/v: task-name -> source-ID -> DDL info.
// This function should often be called by DM-master.
Expand Down
55 changes: 55 additions & 0 deletions pkg/shardddl/pessimism/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ package pessimism

import (
"context"
"fmt"
"sync"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/integration"

Expand Down Expand Up @@ -153,3 +155,56 @@ func (t *testForEtcd) TestInfoEtcd(c *C) {
c.Assert(ifm[task2], HasLen, 1)
c.Assert(ifm[task2][source1], DeepEquals, i21)
}

func (t *testForEtcd) TestPutInfoIfOpNotDone(c *C) {
defer clearTestInfoOperation(c)

var (
source = "mysql-replica-1"
task = "test-put-info-if-no-op"
schema = "foo"
table = "bar"
DDLs = []string{"ALTER TABLE bar ADD COLUMN c1 INT"}
ID = fmt.Sprintf("%s-%s", task, dbutil.TableName(schema, table))
info = NewInfo(task, source, schema, table, DDLs)
op = NewOperation(ID, task, source, DDLs, false, false)
)

// put info success because no operation exist.
rev1, putted, err := PutInfoIfOpNotDone(etcdTestCli, info)
c.Assert(err, IsNil)
c.Assert(rev1, Greater, int64(0))
c.Assert(putted, IsTrue)

// put a non-done operation.
rev2, putted, err := PutOperations(etcdTestCli, false, op)
c.Assert(err, IsNil)
c.Assert(rev2, Greater, rev1)
c.Assert(putted, IsTrue)

// still can put info.
rev3, putted, err := PutInfoIfOpNotDone(etcdTestCli, info)
c.Assert(err, IsNil)
c.Assert(rev3, Greater, rev2)
c.Assert(putted, IsTrue)

// change op to `done` and put it.
op.Done = true
rev4, putted, err := PutOperations(etcdTestCli, false, op)
c.Assert(err, IsNil)
c.Assert(rev4, Greater, rev3)
c.Assert(putted, IsTrue)

// can't put info anymore.
rev5, putted, err := PutInfoIfOpNotDone(etcdTestCli, info)
c.Assert(err, IsNil)
c.Assert(rev5, Equals, rev4)
c.Assert(putted, IsFalse)

// try put anther info, but still can't put it.
info.DDLs = []string{"ALTER TABLE bar ADD COLUMN c2 INT"}
rev6, putted, err := PutInfoIfOpNotDone(etcdTestCli, info)
c.Assert(err, IsNil)
c.Assert(rev6, Equals, rev5)
c.Assert(putted, IsFalse)
}
35 changes: 30 additions & 5 deletions pkg/shardddl/pessimism/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type Operation struct {
DDLs []string `json:"ddls"` // DDL statements
Exec bool `json:"exec"` // execute or skip the DDL statements
Done bool `json:"done"` // whether the `Exec` operation has done

// only used to report to the caller of the watcher, do not marsh it.
// if it's true, it means the Operation has been deleted in etcd.
IsDeleted bool `json:"-"`
}

// NewOperation creates a new Operation instance.
Expand Down Expand Up @@ -202,8 +206,22 @@ func GetInfosOperationsByTask(cli *clientv3.Client, task string) ([]Info, []Oper
// This function can be called by DM-worker and DM-master.
// TODO(csuzhangxc): report error and do some retry.
func WatchOperationPut(ctx context.Context, cli *clientv3.Client, task, source string, revision int64, outCh chan<- Operation, errCh chan<- error) {
watchOperation(ctx, cli, mvccpb.PUT, task, source, revision, outCh, errCh)
}

// WatchOperationDelete watches DELETE operations for DDL lock operation.
// If want to watch all operations, pass empty string for `task` and `source`.
// This function is often called by DM-worker.
func WatchOperationDelete(ctx context.Context, cli *clientv3.Client, task, source string, revision int64, outCh chan<- Operation, errCh chan<- error) {
watchOperation(ctx, cli, mvccpb.DELETE, task, source, revision, outCh, errCh)
}

// watchOperation watches PUT or DELETE operations for DDL lock operation.
func watchOperation(ctx context.Context, cli *clientv3.Client, watchType mvccpb.Event_EventType,
task, source string, revision int64,
outCh chan<- Operation, errCh chan<- error) {
ch := cli.Watch(ctx, common.ShardDDLPessimismOperationKeyAdapter.Encode(task, source),
clientv3.WithPrefix(), clientv3.WithRev(revision))
clientv3.WithPrefix(), clientv3.WithRev(revision), clientv3.WithPrevKV())

for {
select {
Expand All @@ -219,18 +237,25 @@ func WatchOperationPut(ctx context.Context, cli *clientv3.Client, task, source s
}

for _, ev := range resp.Events {
if ev.Type != mvccpb.PUT {
continue
var (
op Operation
err error
)

if ev.Type == mvccpb.PUT && watchType == mvccpb.PUT {
op, err = operationFromJSON(string(ev.Kv.Value))
} else if ev.Type == mvccpb.DELETE && watchType == mvccpb.DELETE {
op, err = operationFromJSON(string(ev.PrevKv.Value))
op.IsDeleted = true
}

op, err := operationFromJSON(string(ev.Kv.Value))
if err != nil {
select {
case errCh <- err:
case <-ctx.Done():
return
}
} else {
} else if op.Task != "" { // valid operation got.
select {
case outCh <- op:
case <-ctx.Done():
Expand Down
17 changes: 17 additions & 0 deletions pkg/shardddl/pessimism/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,23 @@ func (t *testForEtcd) TestOperationEtcd(c *C) {
c.Assert(err, IsNil)
c.Assert(rev6, Greater, rev5)

// start watch with an older revision for the deleted op11.
wch = make(chan Operation, 10)
ech = make(chan error, 10)
ctx, cancel = context.WithTimeout(context.Background(), 500*time.Millisecond)
WatchOperationDelete(ctx, etcdTestCli, op11.Task, op11.Source, rev5, wch, ech)
cancel()
close(wch)
close(ech)

// watch should got the previous deleted operation.
c.Assert(len(wch), Equals, 1)
c.Assert(len(ech), Equals, 0)
op11d := <-wch
c.Assert(op11d.IsDeleted, IsTrue)
op11d.IsDeleted = false // reset to false
c.Assert(op11d, DeepEquals, op11)

// get again, op11 should be deleted.
opm, _, err = GetAllOperations(etcdTestCli)
c.Assert(err, IsNil)
Expand Down
33 changes: 31 additions & 2 deletions syncer/shardddl/pessimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,37 @@ func (p *Pessimist) ConstructInfo(schema, table string, DDLs []string) pessimism
}

// PutInfo puts the shard DDL info into etcd and returns the revision.
func (p *Pessimist) PutInfo(info pessimism.Info) (int64, error) {
rev, err := pessimism.PutInfo(p.cli, info)
func (p *Pessimist) PutInfo(ctx context.Context, info pessimism.Info) (int64, error) {
// put info only no previous operation exists or not done.
rev, putted, err := pessimism.PutInfoIfOpNotDone(p.cli, info)
if err != nil {
return 0, err
} else if putted {
p.mu.Lock()
p.pendingInfo = &info
p.mu.Unlock()

return rev, nil
}

p.logger.Warn("the previous shard DDL operation still exists, waiting for it to be deleted", zap.Stringer("info", info))

// wait for the operation to be deleted.
ctx2, cancel2 := context.WithCancel(ctx)
defer cancel2()
ch := make(chan pessimism.Operation, 1)
errCh := make(chan error, 1)
go pessimism.WatchOperationDelete(ctx2, p.cli, info.Task, info.Source, rev, ch, errCh)
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved

select {
case <-ch: // deleted.
case err := <-errCh:
return 0, err
case <-ctx.Done():
return 0, ctx.Err()
}

rev, err = pessimism.PutInfo(p.cli, info)
if err != nil {
return 0, err
}
Expand Down
18 changes: 16 additions & 2 deletions syncer/shardddl/pessimist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package shardddl
import (
"context"
"testing"
"time"

. "github.com/pingcap/check"
"go.etcd.io/etcd/clientv3"
Expand Down Expand Up @@ -78,7 +79,7 @@ func (t *testPessimist) TestPessimist(c *C) {
c.Assert(p.PendingOperation(), IsNil)

// put shard DDL info.
rev1, err := p.PutInfo(info)
rev1, err := p.PutInfo(ctx, info)
c.Assert(err, IsNil)
c.Assert(rev1, Greater, int64(0))

Expand Down Expand Up @@ -123,8 +124,21 @@ func (t *testPessimist) TestPessimist(c *C) {
c.Assert(p.PendingInfo(), IsNil)
c.Assert(p.PendingOperation(), IsNil)

// try to put info again, but timeout because a `done` operation exist in etcd.
ctx2, cancel2 := context.WithTimeout(ctx, time.Second)
defer cancel2()
_, err = p.PutInfo(ctx2, info)
c.Assert(err, Equals, context.DeadlineExceeded)

// start a goroutine to delete the `done` operation in background, then we can put info again.
go func() {
time.Sleep(500 * time.Millisecond) // wait `PutInfo` to start watch the deletion of the operation.
_, err2 := pessimism.DeleteOperations(etcdTestCli, op)
c.Assert(err2, IsNil)
}()

// put info again, but do not complete the flow.
_, err = p.PutInfo(info)
_, err = p.PutInfo(ctx, info)
c.Assert(err, IsNil)
c.Assert(p.PendingInfo(), NotNil)

Expand Down
4 changes: 2 additions & 2 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1898,12 +1898,12 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e

// construct & send shard DDL info into etcd, DM-master will handle it.
shardInfo := s.pessimist.ConstructInfo(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name, needHandleDDLs)
rev, err2 := s.pessimist.PutInfo(shardInfo)
rev, err2 := s.pessimist.PutInfo(ec.tctx.Ctx, shardInfo)
if err2 != nil {
return err2
}
shardLockResolving.WithLabelValues(s.cfg.Name).Set(1) // block and wait DDL lock to be synced
s.tctx.L().Info("putted shard DDL info", zap.Stringer("info", shardInfo))
s.tctx.L().Info("putted shard DDL info", zap.Stringer("info", shardInfo), zap.Int64("revision", rev))

shardOp, err2 := s.pessimist.GetOperation(ec.tctx.Ctx, shardInfo, rev+1)
shardLockResolving.WithLabelValues(s.cfg.Name).Set(0)
Expand Down
2 changes: 1 addition & 1 deletion tests/_utils/run_sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/sh
#!/bin/bash
# parameter 1: sql command
# parameter 2: port
# parameter 3: password
Expand Down
2 changes: 1 addition & 1 deletion tests/_utils/run_sql_file
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/sh
#!/bin/bash
# parameter 1: sql file
# parameter 2: host
# parameter 3: port
Expand Down
10 changes: 10 additions & 0 deletions tests/ha_cases/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,11 @@ function test_pause_task() {
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"pause-task $name"\
"\"result\": true" 3

# pause twice, just used to test pause by the way
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"pause-task $name"\
"\"result\": true" 3

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status $name"\
Expand All @@ -427,6 +432,11 @@ function test_pause_task() {
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"resume-task $name"\
"\"result\": true" 3

# resume twice, just used to test resume by the way
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"resume-task $name"\
"\"result\": true" 3

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status $name"\
Expand Down
Loading