Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
fix unit test etcd cluster race usage
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu committed Mar 26, 2021
1 parent 2efb0cf commit 05b2c20
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 33 deletions.
74 changes: 41 additions & 33 deletions dm/master/shardddl/optimist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
tiddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/mock"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/integration"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
Expand Down Expand Up @@ -146,14 +148,20 @@ func (t *testOptimist) TestOptimistSourceTables(c *C) {
}

func (t *testOptimist) TestOptimist(c *C) {
t.testOptimist(c, noRestart)
t.testOptimist(c, restartOnly)
t.testOptimist(c, restartNewInstance)
cluster := integration.NewClusterV3(tt, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(tt)

cli := cluster.RandClient()
t.testOptimist(c, cli, noRestart)
t.testOptimist(c, cli, restartOnly)
t.testOptimist(c, cli, restartNewInstance)
t.testSortInfos(c)
}

func (t *testOptimist) testOptimist(c *C, restart int) {
defer clearOptimistTestSourceInfoOperation(c)
func (t *testOptimist) testOptimist(c *C, cli *clientv3.Client, restart int) {
defer func() {
c.Assert(optimism.ClearTestInfoOperationSchema(cli), IsNil)
}()

var (
backOff = 30
Expand All @@ -166,11 +174,11 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
switch restart {
case restartOnly:
o.Close()
c.Assert(o.Start(ctx, etcdTestCli), IsNil)
c.Assert(o.Start(ctx, cli), IsNil)
case restartNewInstance:
o.Close()
o = NewOptimist(&logger)
c.Assert(o.Start(ctx, etcdTestCli), IsNil)
c.Assert(o.Start(ctx, cli), IsNil)
}
}

Expand Down Expand Up @@ -208,24 +216,24 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
st32.AddTable("foo-2", "bar-3", downSchema, downTable)

// put source tables first.
_, err := optimism.PutSourceTables(etcdTestCli, st1)
_, err := optimism.PutSourceTables(cli, st1)
c.Assert(err, IsNil)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// CASE 1: start without any previous shard DDL info.
c.Assert(o.Start(ctx, etcdTestCli), IsNil)
c.Assert(o.Start(ctx, cli), IsNil)
c.Assert(o.Locks(), HasLen, 0)
o.Close()
o.Close() // close multiple times.

// CASE 2: start again without any previous shard DDL info.
c.Assert(o.Start(ctx, etcdTestCli), IsNil)
c.Assert(o.Start(ctx, cli), IsNil)
c.Assert(o.Locks(), HasLen, 0)

// PUT i11, will create a lock but not synced.
rev1, err := optimism.PutInfo(etcdTestCli, i11)
rev1, err := optimism.PutInfo(cli, i11)
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(backOff, waitTime, func() bool {
return len(o.Locks()) == 1
Expand Down Expand Up @@ -257,7 +265,7 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
opCh := make(chan optimism.Operation, 10)
errCh := make(chan error, 10)
ctx2, cancel2 := context.WithCancel(ctx)
go optimism.WatchOperationPut(ctx2, etcdTestCli, i11.Task, i11.Source, i11.UpSchema, i11.UpTable, rev1, opCh, errCh)
go optimism.WatchOperationPut(ctx2, cli, i11.Task, i11.Source, i11.UpSchema, i11.UpTable, rev1, opCh, errCh)
utils.WaitSomething(10, watchTimeout, func() bool {
return len(opCh) != 0
})
Expand All @@ -274,7 +282,7 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
// mark op11 as done.
op11c := op11
op11c.Done = true
_, putted, err := optimism.PutOperation(etcdTestCli, false, op11c, 0)
_, putted, err := optimism.PutOperation(cli, false, op11c, 0)
c.Assert(err, IsNil)
c.Assert(putted, IsTrue)
c.Assert(utils.WaitSomething(backOff, waitTime, func() bool {
Expand All @@ -288,7 +296,7 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
c.Assert(o.ShowLocks("", []string{}), DeepEquals, expectedLock)

// PUT i12, the lock will be synced.
rev2, err := optimism.PutInfo(etcdTestCli, i12)
rev2, err := optimism.PutInfo(cli, i12)
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(backOff, waitTime, func() bool {
synced, _ = o.Locks()[lockID].IsSynced()
Expand All @@ -315,7 +323,7 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
opCh = make(chan optimism.Operation, 10)
errCh = make(chan error, 10)
ctx2, cancel2 = context.WithCancel(ctx)
go optimism.WatchOperationPut(ctx2, etcdTestCli, i12.Task, i12.Source, i12.UpSchema, i12.UpTable, rev2, opCh, errCh)
go optimism.WatchOperationPut(ctx2, cli, i12.Task, i12.Source, i12.UpSchema, i12.UpTable, rev2, opCh, errCh)
utils.WaitSomething(10, watchTimeout, func() bool {
return len(opCh) != 0
})
Expand All @@ -332,7 +340,7 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
// mark op12 as done, the lock should be resolved.
op12c := op12
op12c.Done = true
_, putted, err = optimism.PutOperation(etcdTestCli, false, op12c, 0)
_, putted, err = optimism.PutOperation(cli, false, op12c, 0)
c.Assert(err, IsNil)
c.Assert(putted, IsTrue)
c.Assert(utils.WaitSomething(backOff, waitTime, func() bool {
Expand All @@ -343,15 +351,15 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
c.Assert(o.ShowLocks("", nil), HasLen, 0)

// no shard DDL info or lock operation exists.
ifm, _, err := optimism.GetAllInfo(etcdTestCli)
ifm, _, err := optimism.GetAllInfo(cli)
c.Assert(err, IsNil)
c.Assert(ifm, HasLen, 0)
opm, _, err := optimism.GetAllOperations(etcdTestCli)
opm, _, err := optimism.GetAllOperations(cli)
c.Assert(err, IsNil)
c.Assert(opm, HasLen, 0)

// put another table info.
rev1, err = optimism.PutInfo(etcdTestCli, i21)
rev1, err = optimism.PutInfo(cli, i21)
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(backOff, waitTime, func() bool {
return len(o.Locks()) == 1
Expand All @@ -365,7 +373,7 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
opCh = make(chan optimism.Operation, 10)
errCh = make(chan error, 10)
ctx2, cancel2 = context.WithCancel(ctx)
go optimism.WatchOperationPut(ctx2, etcdTestCli, i21.Task, i21.Source, i21.UpSchema, i21.UpTable, rev1, opCh, errCh)
go optimism.WatchOperationPut(ctx2, cli, i21.Task, i21.Source, i21.UpSchema, i21.UpTable, rev1, opCh, errCh)
utils.WaitSomething(10, watchTimeout, func() bool {
return len(opCh) != 0
})
Expand All @@ -387,7 +395,7 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
c.Assert(remain, Equals, 1)

// put table info for a new table (to simulate `CREATE TABLE`).
rev3, err := optimism.PutSourceTablesInfo(etcdTestCli, st32, i23)
rev3, err := optimism.PutSourceTablesInfo(cli, st32, i23)
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(backOff, waitTime, func() bool {
ready := o.Locks()[lockID].Ready()
Expand Down Expand Up @@ -432,7 +440,7 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
opCh = make(chan optimism.Operation, 10)
errCh = make(chan error, 10)
ctx2, cancel2 = context.WithCancel(ctx)
go optimism.WatchOperationPut(ctx2, etcdTestCli, i23.Task, i23.Source, i23.UpSchema, i23.UpTable, rev3, opCh, errCh)
go optimism.WatchOperationPut(ctx2, cli, i23.Task, i23.Source, i23.UpSchema, i23.UpTable, rev3, opCh, errCh)
utils.WaitSomething(10, watchTimeout, func() bool {
return len(opCh) != 0
})
Expand All @@ -446,13 +454,13 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
c.Assert(len(errCh), Equals, 0)

// delete i12 for a table (to simulate `DROP TABLE`), the lock should become synced again.
rev2, err = optimism.PutInfo(etcdTestCli, i12) // put i12 first to trigger DELETE for i12.
rev2, err = optimism.PutInfo(cli, i12) // put i12 first to trigger DELETE for i12.
c.Assert(err, IsNil)
// wait until operation for i12 ready.
opCh = make(chan optimism.Operation, 10)
errCh = make(chan error, 10)
ctx2, cancel2 = context.WithCancel(ctx)
go optimism.WatchOperationPut(ctx2, etcdTestCli, i12.Task, i12.Source, i12.UpSchema, i12.UpTable, rev2, opCh, errCh)
go optimism.WatchOperationPut(ctx2, cli, i12.Task, i12.Source, i12.UpSchema, i12.UpTable, rev2, opCh, errCh)
utils.WaitSomething(10, watchTimeout, func() bool {
return len(opCh) != 0
})
Expand All @@ -462,7 +470,7 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
c.Assert(len(opCh), Equals, 1)
c.Assert(len(errCh), Equals, 0)

_, err = optimism.PutSourceTablesDeleteInfo(etcdTestCli, st31, i12)
_, err = optimism.PutSourceTablesDeleteInfo(cli, st31, i12)
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(backOff, waitTime, func() bool {
synced, _ = o.Locks()[lockID].IsSynced()
Expand Down Expand Up @@ -493,7 +501,7 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
// mark op21 as done.
op21c := op21
op21c.Done = true
_, putted, err = optimism.PutOperation(etcdTestCli, false, op21c, 0)
_, putted, err = optimism.PutOperation(cli, false, op21c, 0)
c.Assert(err, IsNil)
c.Assert(putted, IsTrue)
c.Assert(utils.WaitSomething(backOff, waitTime, func() bool {
Expand All @@ -513,7 +521,7 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
// mark op23 as done.
op23c := op23
op23c.Done = true
_, putted, err = optimism.PutOperation(etcdTestCli, false, op23c, 0)
_, putted, err = optimism.PutOperation(cli, false, op23c, 0)
c.Assert(err, IsNil)
c.Assert(putted, IsTrue)
c.Assert(utils.WaitSomething(backOff, waitTime, func() bool {
Expand All @@ -523,7 +531,7 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
c.Assert(o.Locks(), HasLen, 0)

// PUT i31, will create a lock but not synced (to test `DROP COLUMN`)
rev1, err = optimism.PutInfo(etcdTestCli, i31)
rev1, err = optimism.PutInfo(cli, i31)
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(backOff, waitTime, func() bool {
return len(o.Locks()) == 1
Expand Down Expand Up @@ -555,7 +563,7 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
opCh = make(chan optimism.Operation, 10)
errCh = make(chan error, 10)
ctx2, cancel2 = context.WithCancel(ctx)
go optimism.WatchOperationPut(ctx2, etcdTestCli, i31.Task, i31.Source, i31.UpSchema, i31.UpTable, rev1, opCh, errCh)
go optimism.WatchOperationPut(ctx2, cli, i31.Task, i31.Source, i31.UpSchema, i31.UpTable, rev1, opCh, errCh)
utils.WaitSomething(10, watchTimeout, func() bool {
return len(opCh) != 0
})
Expand All @@ -572,7 +580,7 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
// mark op31 as done.
op31c := op31
op31c.Done = true
_, putted, err = optimism.PutOperation(etcdTestCli, false, op31c, 0)
_, putted, err = optimism.PutOperation(cli, false, op31c, 0)
c.Assert(err, IsNil)
c.Assert(putted, IsTrue)
c.Assert(utils.WaitSomething(backOff, waitTime, func() bool {
Expand All @@ -581,7 +589,7 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
c.Assert(o.ShowLocks("", []string{}), DeepEquals, expectedLock)

// PUT i33, the lock will be synced.
rev3, err = optimism.PutInfo(etcdTestCli, i33)
rev3, err = optimism.PutInfo(cli, i33)
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(backOff, waitTime, func() bool {
synced, _ = o.Locks()[lockID].IsSynced()
Expand All @@ -608,7 +616,7 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
opCh = make(chan optimism.Operation, 10)
errCh = make(chan error, 10)
ctx2, cancel2 = context.WithCancel(ctx)
go optimism.WatchOperationPut(ctx2, etcdTestCli, i33.Task, i33.Source, i33.UpSchema, i33.UpTable, rev3, opCh, errCh)
go optimism.WatchOperationPut(ctx2, cli, i33.Task, i33.Source, i33.UpSchema, i33.UpTable, rev3, opCh, errCh)
utils.WaitSomething(10, watchTimeout, func() bool {
return len(opCh) != 0
})
Expand All @@ -625,7 +633,7 @@ func (t *testOptimist) testOptimist(c *C, restart int) {
// mark op33 as done, the lock should be resolved.
op33c := op33
op33c.Done = true
_, putted, err = optimism.PutOperation(etcdTestCli, false, op33c, 0)
_, putted, err = optimism.PutOperation(cli, false, op33c, 0)
c.Assert(err, IsNil)
c.Assert(putted, IsTrue)
c.Assert(utils.WaitSomething(backOff, waitTime, func() bool {
Expand Down
2 changes: 2 additions & 0 deletions dm/master/shardddl/pessimist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
var (
etcdTestCli *clientv3.Client
etcdErrCompacted = v3rpc.ErrCompacted
tt *testing.T
)

const (
Expand All @@ -54,6 +55,7 @@ func TestShardDDL(t *testing.T) {
if err != nil {
t.Fatal(err)
}
tt = t

mockCluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer mockCluster.Terminate(t)
Expand Down

0 comments on commit 05b2c20

Please sign in to comment.