Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: fix executing shard DDL with mismatched request #336

Merged
merged 11 commits into from
Oct 28, 2019
14 changes: 10 additions & 4 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ import (
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/tracing"
"github.com/pingcap/dm/syncer"
)

var (
retryTimeout = 5 * time.Second
cmuxReadTimeout = 10 * time.Second
fetchDDLInfoRetryTimeout = 5 * time.Second
cmuxReadTimeout = 10 * time.Second
)

// Server handles RPC requests for dm-master
Expand Down Expand Up @@ -1293,7 +1294,7 @@ func (s *Server) fetchWorkerDDLInfo(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-time.After(retryTimeout):
case <-time.After(fetchDDLInfoRetryTimeout):
}
}
doRetry = false // reset
Expand Down Expand Up @@ -1462,9 +1463,13 @@ func (s *Server) resolveDDLLock(ctx context.Context, lockID string, replaceOwner
LockID: lockID,
Exec: true,
TraceGID: traceGID,
DDLs: lock.ddls,
},
}
resp, err := cli.SendRequest(ctx, ownerReq, s.cfg.RPCTimeout)
// use a longer timeout for executing DDL in DM-worker.
// now, we ignore `invalid connection` for `ADD INDEX`, use a longer timout to ensure the DDL lock removed.
ownerTimeout := time.Duration(syncer.MaxDDLConnectionTimeoutMinute)*time.Minute + 30*time.Second
resp, err := cli.SendRequest(ctx, ownerReq, ownerTimeout)
ownerResp := &pb.CommonWorkerResponse{}
if err != nil {
ownerResp = errorCommonWorkerResponse(errors.ErrorStack(err), "")
Expand Down Expand Up @@ -1496,6 +1501,7 @@ func (s *Server) resolveDDLLock(ctx context.Context, lockID string, replaceOwner
LockID: lockID,
Exec: false, // ignore and skip DDL
TraceGID: traceGID,
DDLs: lock.ddls,
},
}
workerRespCh := make(chan *pb.CommonWorkerResponse, len(workers))
Expand Down
21 changes: 12 additions & 9 deletions dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,14 @@ func (t *testMaster) TestUnlockDDLLock(c *check.C) {
ctrl := gomock.NewController(c)
defer ctrl.Finish()

var (
sqls = []string{"stmt"}
task = "testA"
schema = "test_db"
table = "test_table"
traceGIDIdx = 1
)

server := testDefaultMasterServer(c)

workers := make([]string, 0, len(server.cfg.DeployMap))
Expand Down Expand Up @@ -897,21 +905,14 @@ func (t *testMaster) TestUnlockDDLLock(c *check.C) {
LockID: lockID,
Exec: exec,
TraceGID: traceGID,
DDLs: sqls,
},
).Return(ret...)

server.workerClients[worker] = newMockRPCClient(mockWorkerClient)
}
}

var (
sqls = []string{"stmt"}
task = "testA"
schema = "test_db"
table = "test_table"
traceGIDIdx = 1
)

prepareDDLLock := func() {
// prepare ddl lock keeper, mainly use code from ddl_lock_test.go
lk := NewLockKeeper()
Expand Down Expand Up @@ -1428,7 +1429,7 @@ func (t *testMaster) TestFetchWorkerDDLInfo(c *check.C) {
Table: table,
DDLs: ddls,
}, nil)
// This will lead to a select on ctx.Done and time.After(retryTimeout),
// This will lead to a select on ctx.Done and time.After(fetchDDLInfoRetryTimeout),
// so we have enough time to cancel the context.
stream.EXPECT().Recv().Return(nil, io.EOF).MaxTimes(1)
stream.EXPECT().Send(&pb.DDLLockInfo{Task: task, ID: lockID}).Return(nil)
Expand All @@ -1445,6 +1446,7 @@ func (t *testMaster) TestFetchWorkerDDLInfo(c *check.C) {
LockID: lockID,
Exec: true,
TraceGID: traceGID,
DDLs: ddls,
},
).Return(&pb.CommonWorkerResponse{Result: true}, nil).MaxTimes(1)
mockWorkerClient.EXPECT().ExecuteDDL(
Expand All @@ -1454,6 +1456,7 @@ func (t *testMaster) TestFetchWorkerDDLInfo(c *check.C) {
LockID: lockID,
Exec: false,
TraceGID: traceGID,
DDLs: ddls,
},
).Return(&pb.CommonWorkerResponse{Result: true}, nil).MaxTimes(1)

Expand Down
Loading