From da16a99c47408723f6b9e95ec558ee55187fedd4 Mon Sep 17 00:00:00 2001 From: fzhedu Date: Tue, 2 Mar 2021 21:04:16 +0800 Subject: [PATCH 01/20] run ok --- distsql/distsql.go | 2 +- kv/mpp.go | 2 +- store/copr/mpp.go | 52 ++++++++++++++++++++++++++++++++--- store/tikv/tikvrpc/tikvrpc.go | 11 ++++++++ 4 files changed, 61 insertions(+), 6 deletions(-) diff --git a/distsql/distsql.go b/distsql/distsql.go index 9c8d8e4e63384..83236ce4dc5f4 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -30,7 +30,7 @@ import ( // DispatchMPPTasks dispathes all tasks and returns an iterator. func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int) (SelectResult, error) { - resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, tasks) + resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks) if resp == nil { err := errors.New("client returns nil response") return nil, err diff --git a/kv/mpp.go b/kv/mpp.go index f275346e2d477..7436746941198 100644 --- a/kv/mpp.go +++ b/kv/mpp.go @@ -64,7 +64,7 @@ type MPPClient interface { ConstructMPPTasks(context.Context, *MPPBuildTasksRequest) ([]MPPTaskMeta, error) // DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data. - DispatchMPPTasks(context.Context, []*MPPDispatchRequest) Response + DispatchMPPTasks(context.Context, *Variables, []*MPPDispatchRequest) Response } // MPPBuildTasksRequest request the stores allocation for a mpp plan fragment. diff --git a/store/copr/mpp.go b/store/copr/mpp.go index da008937f65cf..1a422609a801b 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -128,6 +128,8 @@ type mppIterator struct { wg sync.WaitGroup closed uint32 + + vars *kv.Variables } func (m *mppIterator) run(ctx context.Context) { @@ -231,6 +233,25 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer, m.establishMPPConns(bo, req, taskMeta) } +// TODO: retry once failed? +func (m *mppIterator) cancelMppTask(bo *tikv.Backoffer, req *kv.MPPDispatchRequest, meta *mpp.TaskMeta) { + killReq := &mpp.CancelTaskRequest{ + Meta: meta, + } + + wrappedReq := tikvrpc.NewRequest(tikvrpc.CmdMPPCancel, killReq, kvrpcpb.Context{}) + wrappedReq.StoreTp = kv.TiFlash + + _, err := m.store.GetTiKVClient().SendRequest(bo.GetCtx(), req.Meta.GetAddress(), wrappedReq, tikv.ReadTimeoutUltraLong) + + logutil.BgLogger().Info("cancel task ", zap.Uint64("query id ", meta.GetStartTs()), zap.String(" on addr ", meta.GetAddress())) + + if err != nil { + m.sendError(err) + return + } +} + func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchRequest, taskMeta *mpp.TaskMeta) { connReq := &mpp.EstablishMPPConnectionRequest{ SenderMeta: taskMeta, @@ -248,6 +269,7 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR rpcResp, err := m.store.GetTiKVClient().SendRequest(bo.GetCtx(), req.Meta.GetAddress(), wrappedReq, tikv.ReadTimeoutUltraLong) if err != nil { + m.cancelMppTask(bo, req, taskMeta) m.sendError(err) return } @@ -260,13 +282,21 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR return } - // TODO: cancel the whole process when some error happens for { err := m.handleMPPStreamResponse(bo, resp, req) if err != nil { + m.cancelMppTask(bo, req, taskMeta) + m.sendError(err) + return + } + + if m.vars != nil && m.vars.Killed != nil && atomic.LoadUint32(m.vars.Killed) == 1 { + m.cancelMppTask(bo, req, taskMeta) + err = tikv.ErrQueryInterrupted m.sendError(err) return } + resp, err = stream.Recv() if err != nil { if errors.Cause(err) == io.EOF { @@ -280,9 +310,19 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR logutil.BgLogger().Info("stream unknown error", zap.Error(err)) } } +<<<<<<< HEAD m.sendToRespCh(&mppResponse{ err: tikv.ErrTiFlashServerTimeout, }) +======= + // TODO + if resp != nil && resp.Error != nil { + m.sendToRespCh(&mppResponse{ + err: errors.New(resp.Error.Msg), + }) + } + m.cancelMppTask(bo, req, taskMeta) +>>>>>>> run ok return } } @@ -336,7 +376,11 @@ func (m *mppIterator) nextImpl(ctx context.Context) (resp *mppResponse, ok bool, case resp, ok = <-m.respChan: return case <-ticker.C: - //TODO: kill query + if m.vars != nil && m.vars.Killed != nil && atomic.LoadUint32(m.vars.Killed) == 1 { + err = tikv.ErrQueryInterrupted + exit = true + return + } case <-m.finishCh: exit = true return @@ -371,7 +415,7 @@ func (m *mppIterator) Next(ctx context.Context) (kv.ResultSubset, error) { } // DispatchMPPTasks dispatches all the mpp task and waits for the reponses. -func (c *MPPClient) DispatchMPPTasks(ctx context.Context, dispatchReqs []*kv.MPPDispatchRequest) kv.Response { +func (c *MPPClient) DispatchMPPTasks(ctx context.Context, vars *kv.Variables, dispatchReqs []*kv.MPPDispatchRequest) kv.Response { iter := &mppIterator{ store: c.store, tasks: dispatchReqs, @@ -379,10 +423,10 @@ func (c *MPPClient) DispatchMPPTasks(ctx context.Context, dispatchReqs []*kv.MPP rpcCancel: tikv.NewRPCanceller(), respChan: make(chan *mppResponse, 4096), startTs: dispatchReqs[0].StartTs, + vars: vars, } ctx = context.WithValue(ctx, tikv.RPCCancellerCtxKey{}, iter.rpcCancel) - // TODO: Process the case of query cancellation. go iter.run(ctx) return iter } diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index c695f8ecb3827..0cd3a2d93ed66 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -73,6 +73,7 @@ const ( CmdBatchCop CmdMPPTask CmdMPPConn + CmdMPPCancel CmdMvccGetByKey CmdType = 1024 + iota CmdMvccGetByStartTs @@ -147,6 +148,8 @@ func (t CmdType) String() string { return "DispatchMPPTask" case CmdMPPConn: return "EstablishMPPConnection" + case CmdMPPCancel: + return "CancelMPPTask" case CmdMvccGetByKey: return "MvccGetByKey" case CmdMvccGetByStartTs: @@ -339,6 +342,11 @@ func (req *Request) EstablishMPPConn() *mpp.EstablishMPPConnectionRequest { return req.Req.(*mpp.EstablishMPPConnectionRequest) } +// CancelMPPTask returns canceling task in request +func (req *Request) CancelMPPTask() *mpp.CancelTaskRequest { + return req.Req.(*mpp.CancelTaskRequest) +} + // MvccGetByKey returns MvccGetByKeyRequest in request. func (req *Request) MvccGetByKey() *kvrpcpb.MvccGetByKeyRequest { return req.Req.(*kvrpcpb.MvccGetByKeyRequest) @@ -871,6 +879,9 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp resp.Resp = &MPPStreamResponse{ Tikv_EstablishMPPConnectionClient: streamClient, } + case CmdMPPCancel: + //// TODO(fzh): context? + resp.Resp, err = client.CancelMPPTask(context.TODO(), req.CancelMPPTask()) case CmdCopStream: var streamClient tikvpb.Tikv_CoprocessorStreamClient streamClient, err = client.CoprocessorStream(ctx, req.Cop()) From 8559a5b3b58f9f5ba7837679aa2b12d82fc269c3 Mon Sep 17 00:00:00 2001 From: fzhedu Date: Tue, 2 Mar 2021 21:27:47 +0800 Subject: [PATCH 02/20] update comments --- store/tikv/tikvrpc/tikvrpc.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index 0cd3a2d93ed66..251083ce052c4 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -880,8 +880,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp Tikv_EstablishMPPConnectionClient: streamClient, } case CmdMPPCancel: - //// TODO(fzh): context? - resp.Resp, err = client.CancelMPPTask(context.TODO(), req.CancelMPPTask()) + // it cannot use the ctx with cancel(), otherwise this cmd will fail. + resp.Resp, err = client.CancelMPPTask(context.Background(), req.CancelMPPTask()) case CmdCopStream: var streamClient tikvpb.Tikv_CoprocessorStreamClient streamClient, err = client.CoprocessorStream(ctx, req.Cop()) From 608fd1ad19b0c62b639a16e0254a1b90dbc0ed9a Mon Sep 17 00:00:00 2001 From: fzhedu Date: Wed, 3 Mar 2021 11:05:27 +0800 Subject: [PATCH 03/20] send cancle cmd to all tiflash stores --- store/copr/mpp.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 1a422609a801b..11b3924c820e5 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -234,7 +234,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer, } // TODO: retry once failed? -func (m *mppIterator) cancelMppTask(bo *tikv.Backoffer, req *kv.MPPDispatchRequest, meta *mpp.TaskMeta) { +func (m *mppIterator) cancelMppTask(bo *tikv.Backoffer, meta *mpp.TaskMeta) { killReq := &mpp.CancelTaskRequest{ Meta: meta, } @@ -242,13 +242,10 @@ func (m *mppIterator) cancelMppTask(bo *tikv.Backoffer, req *kv.MPPDispatchReque wrappedReq := tikvrpc.NewRequest(tikvrpc.CmdMPPCancel, killReq, kvrpcpb.Context{}) wrappedReq.StoreTp = kv.TiFlash - _, err := m.store.GetTiKVClient().SendRequest(bo.GetCtx(), req.Meta.GetAddress(), wrappedReq, tikv.ReadTimeoutUltraLong) - - logutil.BgLogger().Info("cancel task ", zap.Uint64("query id ", meta.GetStartTs()), zap.String(" on addr ", meta.GetAddress())) - - if err != nil { - m.sendError(err) - return + // send cancel cmd to all TiFlash stores + for _, addr := range m.store.GetRegionCache().GetTiFlashStoreAddrs() { + m.store.GetTiKVClient().SendRequest(bo.GetCtx(), addr, wrappedReq, tikv.ReadTimeoutUltraLong) + logutil.BgLogger().Debug("cancel task ", zap.Uint64("query id ", meta.GetStartTs()), zap.String(" on addr ", addr)) } } @@ -269,7 +266,7 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR rpcResp, err := m.store.GetTiKVClient().SendRequest(bo.GetCtx(), req.Meta.GetAddress(), wrappedReq, tikv.ReadTimeoutUltraLong) if err != nil { - m.cancelMppTask(bo, req, taskMeta) + m.cancelMppTask(bo, taskMeta) m.sendError(err) return } @@ -285,13 +282,13 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR for { err := m.handleMPPStreamResponse(bo, resp, req) if err != nil { - m.cancelMppTask(bo, req, taskMeta) + m.cancelMppTask(bo, taskMeta) m.sendError(err) return } if m.vars != nil && m.vars.Killed != nil && atomic.LoadUint32(m.vars.Killed) == 1 { - m.cancelMppTask(bo, req, taskMeta) + m.cancelMppTask(bo, taskMeta) err = tikv.ErrQueryInterrupted m.sendError(err) return @@ -321,8 +318,12 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR err: errors.New(resp.Error.Msg), }) } +<<<<<<< HEAD m.cancelMppTask(bo, req, taskMeta) >>>>>>> run ok +======= + m.cancelMppTask(bo, taskMeta) +>>>>>>> send cancle cmd to all tiflash stores return } } From dfef5e751e3b94324ecd0b5ac1cbed60bc18ab09 Mon Sep 17 00:00:00 2001 From: fzhedu Date: Wed, 3 Mar 2021 11:26:59 +0800 Subject: [PATCH 04/20] updated --- store/copr/mpp.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 11b3924c820e5..c6981d06ccafd 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -244,7 +244,7 @@ func (m *mppIterator) cancelMppTask(bo *tikv.Backoffer, meta *mpp.TaskMeta) { // send cancel cmd to all TiFlash stores for _, addr := range m.store.GetRegionCache().GetTiFlashStoreAddrs() { - m.store.GetTiKVClient().SendRequest(bo.GetCtx(), addr, wrappedReq, tikv.ReadTimeoutUltraLong) + _, _ = m.store.GetTiKVClient().SendRequest(bo.GetCtx(), addr, wrappedReq, tikv.ReadTimeoutUltraLong) logutil.BgLogger().Debug("cancel task ", zap.Uint64("query id ", meta.GetStartTs()), zap.String(" on addr ", addr)) } } From 8a49f3b747fac8af548c16384fc36337f50e9e46 Mon Sep 17 00:00:00 2001 From: fzhedu Date: Wed, 3 Mar 2021 11:54:07 +0800 Subject: [PATCH 05/20] log error if sending failed --- store/copr/mpp.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/store/copr/mpp.go b/store/copr/mpp.go index c6981d06ccafd..8618e75e07a43 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -244,8 +244,11 @@ func (m *mppIterator) cancelMppTask(bo *tikv.Backoffer, meta *mpp.TaskMeta) { // send cancel cmd to all TiFlash stores for _, addr := range m.store.GetRegionCache().GetTiFlashStoreAddrs() { - _, _ = m.store.GetTiKVClient().SendRequest(bo.GetCtx(), addr, wrappedReq, tikv.ReadTimeoutUltraLong) + _, err := m.store.GetTiKVClient().SendRequest(bo.GetCtx(), addr, wrappedReq, tikv.ReadTimeoutUltraLong) logutil.BgLogger().Debug("cancel task ", zap.Uint64("query id ", meta.GetStartTs()), zap.String(" on addr ", addr)) + if err != nil { + logutil.BgLogger().Error("cancel task error: ", zap.Error(err)) + } } } From ff2794e0cf1faaf9d2181038efdfa8fc949fd0e8 Mon Sep 17 00:00:00 2001 From: fzhedu Date: Mon, 8 Mar 2021 17:47:29 +0800 Subject: [PATCH 06/20] update comments --- store/copr/mpp.go | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 8618e75e07a43..9537f731ec4e5 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -233,7 +233,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer, m.establishMPPConns(bo, req, taskMeta) } -// TODO: retry once failed? +// NOTE: We do not retry here, because retry is helpless when errors result from TiFlash or Network. If errors occur, the execution will stop after some minutes. func (m *mppIterator) cancelMppTask(bo *tikv.Backoffer, meta *mpp.TaskMeta) { killReq := &mpp.CancelTaskRequest{ Meta: meta, @@ -247,7 +247,7 @@ func (m *mppIterator) cancelMppTask(bo *tikv.Backoffer, meta *mpp.TaskMeta) { _, err := m.store.GetTiKVClient().SendRequest(bo.GetCtx(), addr, wrappedReq, tikv.ReadTimeoutUltraLong) logutil.BgLogger().Debug("cancel task ", zap.Uint64("query id ", meta.GetStartTs()), zap.String(" on addr ", addr)) if err != nil { - logutil.BgLogger().Error("cancel task error: ", zap.Error(err)) + logutil.BgLogger().Error("cancel task error: ", zap.Error(err), zap.Uint64(" for query id ", meta.GetStartTs()), zap.String(" on addr ", addr)) } } } @@ -310,23 +310,11 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR logutil.BgLogger().Info("stream unknown error", zap.Error(err)) } } -<<<<<<< HEAD m.sendToRespCh(&mppResponse{ err: tikv.ErrTiFlashServerTimeout, }) -======= - // TODO - if resp != nil && resp.Error != nil { - m.sendToRespCh(&mppResponse{ - err: errors.New(resp.Error.Msg), - }) - } -<<<<<<< HEAD - m.cancelMppTask(bo, req, taskMeta) ->>>>>>> run ok -======= + m.cancelMppTask(bo, taskMeta) ->>>>>>> send cancle cmd to all tiflash stores return } } From 56ba82ccb737821b06cdef2f77769a517743255f Mon Sep 17 00:00:00 2001 From: fzhedu Date: Mon, 8 Mar 2021 19:09:15 +0800 Subject: [PATCH 07/20] disable send task when being killed --- store/copr/mpp.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 9537f731ec4e5..c4ed00086326e 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -134,6 +134,9 @@ type mppIterator struct { func (m *mppIterator) run(ctx context.Context) { for _, task := range m.tasks { + if m.vars != nil && m.vars.Killed != nil && atomic.LoadUint32(m.vars.Killed) == 1 { + break + } m.wg.Add(1) bo := tikv.NewBackoffer(ctx, copNextMaxBackoff) go m.handleDispatchReq(ctx, bo, task) From 9b7dc03ed609714041017a4f6d242c9d015a6d15 Mon Sep 17 00:00:00 2001 From: fzhedu Date: Tue, 9 Mar 2021 14:57:02 +0800 Subject: [PATCH 08/20] update function name --- store/copr/mpp.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/store/copr/mpp.go b/store/copr/mpp.go index c4ed00086326e..d08a542827d81 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -237,7 +237,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer, } // NOTE: We do not retry here, because retry is helpless when errors result from TiFlash or Network. If errors occur, the execution will stop after some minutes. -func (m *mppIterator) cancelMppTask(bo *tikv.Backoffer, meta *mpp.TaskMeta) { +func (m *mppIterator) cancelMppTasks(bo *tikv.Backoffer, meta *mpp.TaskMeta) { killReq := &mpp.CancelTaskRequest{ Meta: meta, } @@ -272,7 +272,7 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR rpcResp, err := m.store.GetTiKVClient().SendRequest(bo.GetCtx(), req.Meta.GetAddress(), wrappedReq, tikv.ReadTimeoutUltraLong) if err != nil { - m.cancelMppTask(bo, taskMeta) + m.cancelMppTasks(bo, taskMeta) m.sendError(err) return } @@ -286,15 +286,22 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR } for { + if m.vars != nil && m.vars.Killed != nil && atomic.LoadUint32(m.vars.Killed) == 1 { + m.cancelMppTasks(bo, taskMeta) + err = tikv.ErrQueryInterrupted + m.sendError(err) + return + } + err := m.handleMPPStreamResponse(bo, resp, req) if err != nil { - m.cancelMppTask(bo, taskMeta) + m.cancelMppTasks(bo, taskMeta) m.sendError(err) return } if m.vars != nil && m.vars.Killed != nil && atomic.LoadUint32(m.vars.Killed) == 1 { - m.cancelMppTask(bo, taskMeta) + m.cancelMppTasks(bo, taskMeta) err = tikv.ErrQueryInterrupted m.sendError(err) return @@ -317,7 +324,7 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR err: tikv.ErrTiFlashServerTimeout, }) - m.cancelMppTask(bo, taskMeta) + m.cancelMppTasks(bo, taskMeta) return } } From a8001cdd4b7de8691ecf79b2149efeab7fd953f0 Mon Sep 17 00:00:00 2001 From: fzhedu Date: Tue, 9 Mar 2021 22:07:34 +0800 Subject: [PATCH 09/20] add test for kill mpp when mocking grpc hang --- executor/tiflash_test.go | 39 ++++++++++++++++++- go.mod | 2 + go.sum | 2 + kv/mpp.go | 10 +++++ store/copr/mpp.go | 21 +++++----- .../unistore/cophandler/cop_handler.go | 2 + .../mockstore/unistore/cophandler/mpp_exec.go | 3 +- store/mockstore/unistore/rpc.go | 19 +++++++-- 8 files changed, 83 insertions(+), 15 deletions(-) diff --git a/executor/tiflash_test.go b/executor/tiflash_test.go index 1d628375a9c82..724c0cad8afca 100644 --- a/executor/tiflash_test.go +++ b/executor/tiflash_test.go @@ -15,18 +15,23 @@ package executor_test import ( "fmt" - . "github.com/pingcap/check" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser" + "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/mockstore/unistore" "github.com/pingcap/tidb/store/tikv/mockstore/cluster" "github.com/pingcap/tidb/util/testkit" + "sync" + "sync/atomic" + "time" ) type tiflashTestSuite struct { @@ -271,3 +276,35 @@ func (s *tiflashTestSuite) TestPartitionTable(c *C) { failpoint.Disable("github.com/pingcap/tidb/executor/checkTotalMPPTasks") failpoint.Disable("github.com/pingcap/tidb/executor/checkUseMPP") } + +func (s *tiflashTestSuite) TestCancelMppTasks(c *C) { + var hang = "github.com/pingcap/tidb/store/mockstore/unistore/mppRecvHang" + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int not null primary key, b int not null)") + tk.MustExec("alter table t set tiflash replica 1") + tk.MustExec("insert into t values(1,0)") + tk.MustExec("insert into t values(2,0)") + tk.MustExec("insert into t values(3,0)") + tk.MustExec("insert into t values(4,0)") + tb := testGetTableByName(c, tk.Se, "test", "t") + err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true) + c.Assert(err, IsNil) + tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") + tk.MustExec("set @@session.tidb_allow_mpp=ON") + atomic.StoreUint32(&tk.Se.GetSessionVars().Killed, 0) + c.Assert(failpoint.Enable(hang, `return(true)`), IsNil) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + err := tk.QueryToErr("select count(*) from t as t1 , t where t1.a = t.a") + c.Assert(err, NotNil) + c.Assert(int(terror.ToSQLError(errors.Cause(err).(*terror.Error)).Code), Equals, int(executor.ErrQueryInterrupted.Code())) + }() + time.Sleep(1 * time.Second) + atomic.StoreUint32(&tk.Se.GetSessionVars().Killed, 1) + wg.Wait() + c.Assert(failpoint.Disable(hang), IsNil) +} diff --git a/go.mod b/go.mod index b42ff2ea6b384..27e93408e372b 100644 --- a/go.mod +++ b/go.mod @@ -86,3 +86,5 @@ require ( ) go 1.13 + +replace github.com/ngaut/unistore => github.com/fzhedu/unistore v0.0.0-20210309135726-19020bbc7eca diff --git a/go.sum b/go.sum index 7c152071e31fd..3f152800f8fae 100644 --- a/go.sum +++ b/go.sum @@ -141,6 +141,8 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsouza/fake-gcs-server v1.17.0 h1:OeH75kBZcZa3ZE+zz/mFdJ2btt9FgqfjI7gIh9+5fvk= github.com/fsouza/fake-gcs-server v1.17.0/go.mod h1:D1rTE4YCyHFNa99oyJJ5HyclvN/0uQR+pM/VdlL83bw= +github.com/fzhedu/unistore v0.0.0-20210309135726-19020bbc7eca h1:X2ufF95JLwfbn6Rxve9k1clkXt6eefjEcPTv50LtbA4= +github.com/fzhedu/unistore v0.0.0-20210309135726-19020bbc7eca/go.mod h1:flGPOgMj1Bon2QTIfFFl+WB7XSjRl26CIXnXx7TtptA= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gin-contrib/gzip v0.0.1/go.mod h1:fGBJBCdt6qCZuCAOwWuFhBB4OOq9EFqlo5dEaFhhu5w= github.com/gin-contrib/sse v0.0.0-20170109093832-22d885f9ecc7/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s= diff --git a/kv/mpp.go b/kv/mpp.go index 7436746941198..205f1c5218087 100644 --- a/kv/mpp.go +++ b/kv/mpp.go @@ -45,6 +45,15 @@ func (t *MPPTask) ToPB() *mpp.TaskMeta { return meta } +type MppTaskStates uint8 + +const ( + MppTaskReady MppTaskStates = iota + MppTaskRunning + MppTaskCancelled + MppTaskDone +) + // MPPDispatchRequest stands for a dispatching task. type MPPDispatchRequest struct { Data []byte // data encodes the dag coprocessor request. @@ -55,6 +64,7 @@ type MPPDispatchRequest struct { SchemaVar int64 StartTs uint64 ID int64 // identify a single task + State MppTaskStates } // MPPClient accepts and processes mpp requests. diff --git a/store/copr/mpp.go b/store/copr/mpp.go index d08a542827d81..8fdab26522c6a 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -137,6 +137,7 @@ func (m *mppIterator) run(ctx context.Context) { if m.vars != nil && m.vars.Killed != nil && atomic.LoadUint32(m.vars.Killed) == 1 { break } + task.State = kv.MppTaskRunning m.wg.Add(1) bo := tikv.NewBackoffer(ctx, copNextMaxBackoff) go m.handleDispatchReq(ctx, bo, task) @@ -245,8 +246,17 @@ func (m *mppIterator) cancelMppTasks(bo *tikv.Backoffer, meta *mpp.TaskMeta) { wrappedReq := tikvrpc.NewRequest(tikvrpc.CmdMPPCancel, killReq, kvrpcpb.Context{}) wrappedReq.StoreTp = kv.TiFlash - // send cancel cmd to all TiFlash stores - for _, addr := range m.store.GetRegionCache().GetTiFlashStoreAddrs() { + usedStoreAddrs := make(map[string]bool) + for _, task := range m.tasks { + // get the store address of running tasks + if task.State != kv.MppTaskReady && !usedStoreAddrs[task.Meta.GetAddress()] { + usedStoreAddrs[task.Meta.GetAddress()] = true + } + task.State = kv.MppTaskCancelled + } + + // send cancel cmd to all stores where tasks run + for addr := range usedStoreAddrs { _, err := m.store.GetTiKVClient().SendRequest(bo.GetCtx(), addr, wrappedReq, tikv.ReadTimeoutUltraLong) logutil.BgLogger().Debug("cancel task ", zap.Uint64("query id ", meta.GetStartTs()), zap.String(" on addr ", addr)) if err != nil { @@ -286,13 +296,6 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR } for { - if m.vars != nil && m.vars.Killed != nil && atomic.LoadUint32(m.vars.Killed) == 1 { - m.cancelMppTasks(bo, taskMeta) - err = tikv.ErrQueryInterrupted - m.sendError(err) - return - } - err := m.handleMPPStreamResponse(bo, resp, req) if err != nil { m.cancelMppTasks(bo, taskMeta) diff --git a/store/mockstore/unistore/cophandler/cop_handler.go b/store/mockstore/unistore/cophandler/cop_handler.go index 685804f12314e..c119a49db16e2 100644 --- a/store/mockstore/unistore/cophandler/cop_handler.go +++ b/store/mockstore/unistore/cophandler/cop_handler.go @@ -15,6 +15,7 @@ package cophandler import ( "bytes" + "context" "fmt" "time" @@ -46,6 +47,7 @@ type MPPCtx struct { RPCClient client.Client StoreAddr string TaskHandler *MPPTaskHandler + Ctx context.Context } // HandleCopRequest handles coprocessor request. diff --git a/store/mockstore/unistore/cophandler/mpp_exec.go b/store/mockstore/unistore/cophandler/mpp_exec.go index f586230a4ea53..8da2ed1085f1d 100644 --- a/store/mockstore/unistore/cophandler/mpp_exec.go +++ b/store/mockstore/unistore/cophandler/mpp_exec.go @@ -14,7 +14,6 @@ package cophandler import ( - "context" "io" "math" "sync" @@ -258,7 +257,7 @@ func (e *exchRecvExec) next() (*chunk.Chunk, error) { func (e *exchRecvExec) EstablishConnAndReceiveData(h *MPPTaskHandler, meta *mpp.TaskMeta) ([]*mpp.MPPDataPacket, error) { req := &mpp.EstablishMPPConnectionRequest{ReceiverMeta: h.Meta, SenderMeta: meta} rpcReq := tikvrpc.NewRequest(tikvrpc.CmdMPPConn, req, kvrpcpb.Context{}) - rpcResp, err := h.RPCClient.SendRequest(context.Background(), meta.Address, rpcReq, 3600*time.Second) + rpcResp, err := h.RPCClient.SendRequest(e.mppCtx.Ctx, meta.Address, rpcReq, 3600*time.Second) if err != nil { return nil, errors.Trace(err) } diff --git a/store/mockstore/unistore/rpc.go b/store/mockstore/unistore/rpc.go index 52bdc5e34a513..72f36eb239abe 100644 --- a/store/mockstore/unistore/rpc.go +++ b/store/mockstore/unistore/rpc.go @@ -248,6 +248,7 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R } }) resp.Resp, err = c.handleDispatchMPPTask(ctx, req.DispatchMPPTask(), storeID) + case tikvrpc.CmdMPPCancel: case tikvrpc.CmdMvccGetByKey: resp.Resp, err = c.usSvr.MvccGetByKey(ctx, req.MvccGetByKey()) case tikvrpc.CmdMvccGetByStartTs: @@ -297,7 +298,7 @@ func (c *RPCClient) handleEstablishMPPConnection(ctx context.Context, r *mpp.Est if err != nil { return nil, err } - var mockClient = mockMPPConnectionClient{mppResponses: mockServer.mppResponses, idx: 0, targetTask: r.ReceiverMeta} + var mockClient = mockMPPConnectionClient{mppResponses: mockServer.mppResponses, idx: 0, ctx: ctx, targetTask: r.ReceiverMeta} streamResp := &tikvrpc.MPPStreamResponse{Tikv_EstablishMPPConnectionClient: &mockClient} _, cancel := context.WithCancel(ctx) streamResp.Lease.Cancel = cancel @@ -472,8 +473,8 @@ type mockMPPConnectionClient struct { mockClientStream mppResponses []*mpp.MPPDataPacket idx int - - targetTask *mpp.TaskMeta + ctx context.Context + targetTask *mpp.TaskMeta } func (mock *mockMPPConnectionClient) Recv() (*mpp.MPPDataPacket, error) { @@ -487,6 +488,18 @@ func (mock *mockMPPConnectionClient) Recv() (*mpp.MPPDataPacket, error) { failpoint.Return(nil, context.Canceled) } }) + failpoint.Inject("mppRecvHang", func(val failpoint.Value) { + for val.(bool) { + select { + case <-mock.ctx.Done(): + { + failpoint.Return(nil, context.Canceled) + } + default: + time.Sleep(1 * time.Second) + } + } + }) return nil, io.EOF } From 8b000561c78bb27501e955861a2761d13a96e7ea Mon Sep 17 00:00:00 2001 From: fzhedu Date: Tue, 9 Mar 2021 22:14:08 +0800 Subject: [PATCH 10/20] update imports --- executor/tiflash_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/executor/tiflash_test.go b/executor/tiflash_test.go index 724c0cad8afca..defbe4d673a9b 100644 --- a/executor/tiflash_test.go +++ b/executor/tiflash_test.go @@ -15,6 +15,10 @@ package executor_test import ( "fmt" + "sync" + "sync/atomic" + "time" + . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -29,9 +33,6 @@ import ( "github.com/pingcap/tidb/store/mockstore/unistore" "github.com/pingcap/tidb/store/tikv/mockstore/cluster" "github.com/pingcap/tidb/util/testkit" - "sync" - "sync/atomic" - "time" ) type tiflashTestSuite struct { From 8d0be99b1184554200fd333f3eb0a50b226984b8 Mon Sep 17 00:00:00 2001 From: fzhedu Date: Tue, 9 Mar 2021 22:19:05 +0800 Subject: [PATCH 11/20] update comments --- kv/mpp.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kv/mpp.go b/kv/mpp.go index 205f1c5218087..df38894d0c5d1 100644 --- a/kv/mpp.go +++ b/kv/mpp.go @@ -45,12 +45,17 @@ func (t *MPPTask) ToPB() *mpp.TaskMeta { return meta } +//MppTaskStates denotes the state of mpp tasks type MppTaskStates uint8 const ( + // MppTaskReady means the task is ready MppTaskReady MppTaskStates = iota + // MppTaskRunning means the task is running MppTaskRunning + // MppTaskCancelled means the task is cancelled MppTaskCancelled + // MppTaskDone means the task is done MppTaskDone ) From 41331db55d11e082e4fef9674495f0251e943872 Mon Sep 17 00:00:00 2001 From: fzhedu Date: Wed, 10 Mar 2021 09:34:37 +0800 Subject: [PATCH 12/20] clean go.mod --- go.mod | 1 - 1 file changed, 1 deletion(-) diff --git a/go.mod b/go.mod index 27e93408e372b..5b4e112bef439 100644 --- a/go.mod +++ b/go.mod @@ -80,7 +80,6 @@ require ( gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/yaml.v2 v2.3.0 // indirect - honnef.co/go/tools v0.1.2 // indirect sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 ) From 7f3c3569a7093ad25c65a717ef1e0b39ff63152f Mon Sep 17 00:00:00 2001 From: fzhedu Date: Thu, 11 Mar 2021 00:43:47 +0800 Subject: [PATCH 13/20] updated --- store/copr/mpp.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 8fdab26522c6a..3414c0ac4f760 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -237,7 +237,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer, m.establishMPPConns(bo, req, taskMeta) } -// NOTE: We do not retry here, because retry is helpless when errors result from TiFlash or Network. If errors occur, the execution will stop after some minutes. +// NOTE: We do not retry here, because retry is helpless when errors result from TiFlash or Network. If errors occur, the execution on TiFlash will finally stop after some minutes. func (m *mppIterator) cancelMppTasks(bo *tikv.Backoffer, meta *mpp.TaskMeta) { killReq := &mpp.CancelTaskRequest{ Meta: meta, @@ -249,7 +249,7 @@ func (m *mppIterator) cancelMppTasks(bo *tikv.Backoffer, meta *mpp.TaskMeta) { usedStoreAddrs := make(map[string]bool) for _, task := range m.tasks { // get the store address of running tasks - if task.State != kv.MppTaskReady && !usedStoreAddrs[task.Meta.GetAddress()] { + if task.State == kv.MppTaskRunning && !usedStoreAddrs[task.Meta.GetAddress()] { usedStoreAddrs[task.Meta.GetAddress()] = true } task.State = kv.MppTaskCancelled @@ -282,8 +282,8 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR rpcResp, err := m.store.GetTiKVClient().SendRequest(bo.GetCtx(), req.Meta.GetAddress(), wrappedReq, tikv.ReadTimeoutUltraLong) if err != nil { - m.cancelMppTasks(bo, taskMeta) m.sendError(err) + m.cancelMppTasks(bo, taskMeta) return } @@ -298,15 +298,15 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR for { err := m.handleMPPStreamResponse(bo, resp, req) if err != nil { - m.cancelMppTasks(bo, taskMeta) m.sendError(err) + m.cancelMppTasks(bo, taskMeta) return } if m.vars != nil && m.vars.Killed != nil && atomic.LoadUint32(m.vars.Killed) == 1 { - m.cancelMppTasks(bo, taskMeta) err = tikv.ErrQueryInterrupted m.sendError(err) + m.cancelMppTasks(bo, taskMeta) return } From 7308409d181ec11ede8e3ffee9e89f0794d7c1c5 Mon Sep 17 00:00:00 2001 From: fzhedu Date: Thu, 11 Mar 2021 16:14:06 +0800 Subject: [PATCH 14/20] use cancel to avoid goroutine leak, ensure cancellMppTasks being called just once --- executor/mpp_gather.go | 1 + store/copr/mpp.go | 77 +++++++++++++++++++---------------- store/tikv/tikvrpc/tikvrpc.go | 2 +- 3 files changed, 44 insertions(+), 36 deletions(-) diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index f29346a62278f..a8628330e16ef 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -82,6 +82,7 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment, tasks []*kv.M Timeout: 10, SchemaVar: e.is.SchemaMetaVersion(), StartTs: e.startTS, + State: kv.MppTaskReady, } e.mppReqs = append(e.mppReqs, req) } diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 3414c0ac4f760..6e7b2d7043d56 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -123,21 +123,31 @@ type mppIterator struct { respChan chan *mppResponse - rpcCancel *tikv.RPCCanceller + cancelFunc context.CancelFunc wg sync.WaitGroup closed uint32 vars *kv.Variables + + mu sync.Mutex } func (m *mppIterator) run(ctx context.Context) { for _, task := range m.tasks { - if m.vars != nil && m.vars.Killed != nil && atomic.LoadUint32(m.vars.Killed) == 1 { + if atomic.LoadUint32(&m.closed) == 1 { + break + } + m.mu.Lock() + switch task.State { + case kv.MppTaskReady: + task.State = kv.MppTaskRunning + m.mu.Unlock() + default: + m.mu.Unlock() break } - task.State = kv.MppTaskRunning m.wg.Add(1) bo := tikv.NewBackoffer(ctx, copNextMaxBackoff) go m.handleDispatchReq(ctx, bo, task) @@ -148,6 +158,7 @@ func (m *mppIterator) run(ctx context.Context) { func (m *mppIterator) sendError(err error) { m.sendToRespCh(&mppResponse{err: err}) + m.cancelMppTasks() } func (m *mppIterator) sendToRespCh(resp *mppResponse) (exit bool) { @@ -238,9 +249,17 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer, } // NOTE: We do not retry here, because retry is helpless when errors result from TiFlash or Network. If errors occur, the execution on TiFlash will finally stop after some minutes. -func (m *mppIterator) cancelMppTasks(bo *tikv.Backoffer, meta *mpp.TaskMeta) { +// This function is exclusively called, and only the first call succeeds sending tasks and setting all tasks as cancelled, while others will not work. +func (m *mppIterator) cancelMppTasks() { + m.mu.Lock() + defer m.mu.Unlock() killReq := &mpp.CancelTaskRequest{ - Meta: meta, + Meta: &mpp.TaskMeta{ + StartTs: m.startTs, + TaskId: 0, + PartitionId: 0, + Address: "", + }, } wrappedReq := tikvrpc.NewRequest(tikvrpc.CmdMPPCancel, killReq, kvrpcpb.Context{}) @@ -251,16 +270,18 @@ func (m *mppIterator) cancelMppTasks(bo *tikv.Backoffer, meta *mpp.TaskMeta) { // get the store address of running tasks if task.State == kv.MppTaskRunning && !usedStoreAddrs[task.Meta.GetAddress()] { usedStoreAddrs[task.Meta.GetAddress()] = true + } else if task.State == kv.MppTaskCancelled { + return } task.State = kv.MppTaskCancelled } // send cancel cmd to all stores where tasks run for addr := range usedStoreAddrs { - _, err := m.store.GetTiKVClient().SendRequest(bo.GetCtx(), addr, wrappedReq, tikv.ReadTimeoutUltraLong) - logutil.BgLogger().Debug("cancel task ", zap.Uint64("query id ", meta.GetStartTs()), zap.String(" on addr ", addr)) + _, err := m.store.GetTiKVClient().SendRequest(context.Background(), addr, wrappedReq, tikv.ReadTimeoutUltraLong) + logutil.BgLogger().Debug("cancel task ", zap.Uint64("query id ", m.startTs), zap.String(" on addr ", addr)) if err != nil { - logutil.BgLogger().Error("cancel task error: ", zap.Error(err), zap.Uint64(" for query id ", meta.GetStartTs()), zap.String(" on addr ", addr)) + logutil.BgLogger().Error("cancel task error: ", zap.Error(err), zap.Uint64(" for query id ", m.startTs), zap.String(" on addr ", addr)) } } } @@ -283,7 +304,6 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR if err != nil { m.sendError(err) - m.cancelMppTasks(bo, taskMeta) return } @@ -299,14 +319,6 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR err := m.handleMPPStreamResponse(bo, resp, req) if err != nil { m.sendError(err) - m.cancelMppTasks(bo, taskMeta) - return - } - - if m.vars != nil && m.vars.Killed != nil && atomic.LoadUint32(m.vars.Killed) == 1 { - err = tikv.ErrQueryInterrupted - m.sendError(err) - m.cancelMppTasks(bo, taskMeta) return } @@ -323,11 +335,7 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR logutil.BgLogger().Info("stream unknown error", zap.Error(err)) } } - m.sendToRespCh(&mppResponse{ - err: tikv.ErrTiFlashServerTimeout, - }) - - m.cancelMppTasks(bo, taskMeta) + m.sendError(tikv.ErrTiFlashServerTimeout) return } } @@ -338,7 +346,7 @@ func (m *mppIterator) Close() error { if atomic.CompareAndSwapUint32(&m.closed, 0, 1) { close(m.finishCh) } - m.rpcCancel.CancelAll() + m.cancelFunc() m.wg.Wait() return nil } @@ -419,19 +427,18 @@ func (m *mppIterator) Next(ctx context.Context) (kv.ResultSubset, error) { return resp, nil } -// DispatchMPPTasks dispatches all the mpp task and waits for the reponses. +// DispatchMPPTasks dispatches all the mpp task and waits for the responses. func (c *MPPClient) DispatchMPPTasks(ctx context.Context, vars *kv.Variables, dispatchReqs []*kv.MPPDispatchRequest) kv.Response { + ctxChild, cancelFunc := context.WithCancel(ctx) iter := &mppIterator{ - store: c.store, - tasks: dispatchReqs, - finishCh: make(chan struct{}), - rpcCancel: tikv.NewRPCanceller(), - respChan: make(chan *mppResponse, 4096), - startTs: dispatchReqs[0].StartTs, - vars: vars, - } - ctx = context.WithValue(ctx, tikv.RPCCancellerCtxKey{}, iter.rpcCancel) - - go iter.run(ctx) + store: c.store, + tasks: dispatchReqs, + finishCh: make(chan struct{}), + cancelFunc: cancelFunc, + respChan: make(chan *mppResponse, 4096), + startTs: dispatchReqs[0].StartTs, + vars: vars, + } + go iter.run(ctxChild) return iter } diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index 251083ce052c4..4975719c1238a 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -881,7 +881,7 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp } case CmdMPPCancel: // it cannot use the ctx with cancel(), otherwise this cmd will fail. - resp.Resp, err = client.CancelMPPTask(context.Background(), req.CancelMPPTask()) + resp.Resp, err = client.CancelMPPTask(ctx, req.CancelMPPTask()) case CmdCopStream: var streamClient tikvpb.Tikv_CoprocessorStreamClient streamClient, err = client.CoprocessorStream(ctx, req.Cop()) From 2deb8eae0296fa7f1b6c6039072565c7623a3598 Mon Sep 17 00:00:00 2001 From: fzhedu Date: Thu, 11 Mar 2021 16:47:49 +0800 Subject: [PATCH 15/20] update go.mod --- go.mod | 5 ++--- go.sum | 11 +++++------ 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 5b4e112bef439..5a53c7b55cb36 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/kr/text v0.2.0 // indirect github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef - github.com/ngaut/unistore v0.0.0-20210304095907-0ebafaf44efb + github.com/ngaut/unistore v0.0.0-20210310131351-7ad6a204de87 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/opentracing/basictracer-go v1.0.0 github.com/opentracing/opentracing-go v1.1.0 @@ -80,10 +80,9 @@ require ( gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/yaml.v2 v2.3.0 // indirect + honnef.co/go/tools v0.1.3 // indirect sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 ) go 1.13 - -replace github.com/ngaut/unistore => github.com/fzhedu/unistore v0.0.0-20210309135726-19020bbc7eca diff --git a/go.sum b/go.sum index 3f152800f8fae..863be9702b2d8 100644 --- a/go.sum +++ b/go.sum @@ -141,8 +141,6 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsouza/fake-gcs-server v1.17.0 h1:OeH75kBZcZa3ZE+zz/mFdJ2btt9FgqfjI7gIh9+5fvk= github.com/fsouza/fake-gcs-server v1.17.0/go.mod h1:D1rTE4YCyHFNa99oyJJ5HyclvN/0uQR+pM/VdlL83bw= -github.com/fzhedu/unistore v0.0.0-20210309135726-19020bbc7eca h1:X2ufF95JLwfbn6Rxve9k1clkXt6eefjEcPTv50LtbA4= -github.com/fzhedu/unistore v0.0.0-20210309135726-19020bbc7eca/go.mod h1:flGPOgMj1Bon2QTIfFFl+WB7XSjRl26CIXnXx7TtptA= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gin-contrib/gzip v0.0.1/go.mod h1:fGBJBCdt6qCZuCAOwWuFhBB4OOq9EFqlo5dEaFhhu5w= github.com/gin-contrib/sse v0.0.0-20170109093832-22d885f9ecc7/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s= @@ -355,8 +353,8 @@ github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 h1:7KAv7KMGTTqSmYZtNdc github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7/go.mod h1:iWMfgwqYW+e8n5lC/jjNEhwcjbRDpl5NT7n2h+4UNcI= github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3bPQ/0cuYh2H4rkg0tytX/07k= github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8= -github.com/ngaut/unistore v0.0.0-20210304095907-0ebafaf44efb h1:2rGvEhflp/uK1l1rNUmoHA4CiHpbddHGxg52H71Fke8= -github.com/ngaut/unistore v0.0.0-20210304095907-0ebafaf44efb/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4= +github.com/ngaut/unistore v0.0.0-20210310131351-7ad6a204de87 h1:lVRrhmqIT2zMbmoalrgxQLwWzFd3VtFaaWy0fnMwPro= +github.com/ngaut/unistore v0.0.0-20210310131351-7ad6a204de87/go.mod h1:ZR3NH+HzqfiYetwdoAivApnIy8iefPZHTMLfrFNm8g4= github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= @@ -847,9 +845,10 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= -honnef.co/go/tools v0.1.2 h1:SMdYLJl312RXuxXziCCHhRsp/tvct9cGKey0yv95tZM= -honnef.co/go/tools v0.1.2/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= +honnef.co/go/tools v0.1.3 h1:qTakTkI6ni6LFD5sBwwsdSO+AQqbSIxOauHTTQKZ/7o= +honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs= From 50b6b88bcf858dc85e01fe24173f5a4578f7a458 Mon Sep 17 00:00:00 2001 From: fzhedu Date: Thu, 11 Mar 2021 20:01:13 +0800 Subject: [PATCH 16/20] add test for avoiding goroutine leak --- executor/tiflash_test.go | 45 ++++++++++++++++++++++++++++++++++++++++ store/copr/mpp.go | 21 ++++++++++++------- 2 files changed, 59 insertions(+), 7 deletions(-) diff --git a/executor/tiflash_test.go b/executor/tiflash_test.go index defbe4d673a9b..17374865c946c 100644 --- a/executor/tiflash_test.go +++ b/executor/tiflash_test.go @@ -309,3 +309,48 @@ func (s *tiflashTestSuite) TestCancelMppTasks(c *C) { wg.Wait() c.Assert(failpoint.Disable(hang), IsNil) } + +// all goroutines exit if one goroutine hangs but another return errors +func (s *tiflashTestSuite) TestMppGoroutinesExitFromErrors(c *C) { + // mock non-root tasks return error + var mockTaskError = "github.com/pingcap/tidb/store/copr/mppExitFromErrors" + // mock root tasks hang + var hang = "github.com/pingcap/tidb/store/mockstore/unistore/mppRecvHang" + // check all goroutines done + var exitDone = "github.com/pingcap/tidb/store/copr/mppExitsDone" + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int not null primary key, b int not null)") + tk.MustExec("alter table t set tiflash replica 1") + tb := testGetTableByName(c, tk.Se, "test", "t") + err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true) + c.Assert(err, IsNil) + tk.MustExec("insert into t values(1,0)") + tk.MustExec("insert into t values(2,0)") + tk.MustExec("insert into t values(3,0)") + + tk.MustExec("create table t1(a int not null primary key, b int not null)") + tk.MustExec("alter table t1 set tiflash replica 1") + tb = testGetTableByName(c, tk.Se, "test", "t1") + err = domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true) + c.Assert(err, IsNil) + tk.MustExec("insert into t1 values(1,0)") + tk.MustExec("insert into t1 values(2,0)") + tk.MustExec("insert into t1 values(3,0)") + atomic.StoreUint32(&tk.Se.GetSessionVars().Killed, 0) + tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") + tk.MustExec("set @@session.tidb_allow_mpp=ON") + c.Assert(failpoint.Enable(exitDone, `return(true)`), IsNil) + c.Assert(failpoint.Enable(mockTaskError, `return(true)`), IsNil) + c.Assert(failpoint.Enable(hang, `return(true)`), IsNil) + + // generate 2 root tasks, one will hang and another will return errors + err = tk.QueryToErr("select count(*) from t as t1 , t where t1.a = t.a") + c.Assert(err, NotNil) + // check that all goroutine exit normally. + c.Assert(atomic.LoadUint32(&tk.Se.GetSessionVars().Killed), Equals, uint32(20)) + c.Assert(failpoint.Disable(exitDone), IsNil) + c.Assert(failpoint.Disable(mockTaskError), IsNil) + c.Assert(failpoint.Disable(hang), IsNil) +} diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 6e7b2d7043d56..4ff972570122e 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" @@ -240,7 +241,13 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer, m.sendError(errors.New(realResp.Error.Msg)) return } - + failpoint.Inject("mppExitFromErrors", func(val failpoint.Value) { + if val.(bool) && !req.IsRoot { + time.Sleep(1 * time.Second) + m.sendError(tikv.ErrTiFlashServerTimeout) + return + } + }) if !req.IsRoot { return } @@ -254,12 +261,7 @@ func (m *mppIterator) cancelMppTasks() { m.mu.Lock() defer m.mu.Unlock() killReq := &mpp.CancelTaskRequest{ - Meta: &mpp.TaskMeta{ - StartTs: m.startTs, - TaskId: 0, - PartitionId: 0, - Address: "", - }, + Meta: &mpp.TaskMeta{StartTs: m.startTs}, } wrappedReq := tikvrpc.NewRequest(tikvrpc.CmdMPPCancel, killReq, kvrpcpb.Context{}) @@ -348,6 +350,11 @@ func (m *mppIterator) Close() error { } m.cancelFunc() m.wg.Wait() + failpoint.Inject("mppExitsDone", func(val failpoint.Value) { + if val.(bool) { + atomic.StoreUint32(m.vars.Killed, 20) + } + }) return nil } From ff4026d1ea692ad2a813d297740ca87f31454eda Mon Sep 17 00:00:00 2001 From: fzhedu Date: Thu, 11 Mar 2021 21:29:02 +0800 Subject: [PATCH 17/20] update test --- executor/tiflash_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/tiflash_test.go b/executor/tiflash_test.go index 17374865c946c..6339859a07f25 100644 --- a/executor/tiflash_test.go +++ b/executor/tiflash_test.go @@ -329,7 +329,7 @@ func (s *tiflashTestSuite) TestMppGoroutinesExitFromErrors(c *C) { tk.MustExec("insert into t values(1,0)") tk.MustExec("insert into t values(2,0)") tk.MustExec("insert into t values(3,0)") - + tk.MustExec("drop table if exists t1") tk.MustExec("create table t1(a int not null primary key, b int not null)") tk.MustExec("alter table t1 set tiflash replica 1") tb = testGetTableByName(c, tk.Se, "test", "t1") From bba1eb9d75761dbc25948cfe4ecb052c3f8ffea6 Mon Sep 17 00:00:00 2001 From: fzhedu Date: Fri, 12 Mar 2021 16:01:29 +0800 Subject: [PATCH 18/20] update test --- executor/tiflash_test.go | 21 +++++++++++---------- store/copr/mpp.go | 7 +------ 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/executor/tiflash_test.go b/executor/tiflash_test.go index 6339859a07f25..16ce2fa9f5ebc 100644 --- a/executor/tiflash_test.go +++ b/executor/tiflash_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/store/mockstore/unistore" "github.com/pingcap/tidb/store/tikv/mockstore/cluster" "github.com/pingcap/tidb/util/testkit" + "github.com/pingcap/tidb/util/testleak" ) type tiflashTestSuite struct { @@ -42,6 +43,7 @@ type tiflashTestSuite struct { } func (s *tiflashTestSuite) SetUpSuite(c *C) { + testleak.BeforeTest() var err error s.store, err = mockstore.NewMockStore( mockstore.WithClusterInspector(func(c cluster.Cluster) { @@ -71,6 +73,7 @@ func (s *tiflashTestSuite) SetUpSuite(c *C) { } func (s *tiflashTestSuite) TestReadPartitionTable(c *C) { + defer testleak.AfterTest(c)() tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t") @@ -98,6 +101,7 @@ func (s *tiflashTestSuite) TestReadPartitionTable(c *C) { } func (s *tiflashTestSuite) TestReadUnsigedPK(c *C) { + defer testleak.AfterTest(c)() tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t") @@ -133,6 +137,7 @@ func (s *tiflashTestSuite) TestReadUnsigedPK(c *C) { } func (s *tiflashTestSuite) TestMppExecution(c *C) { + defer testleak.AfterTest(c)() tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t") @@ -191,6 +196,7 @@ func (s *tiflashTestSuite) TestMppExecution(c *C) { } func (s *tiflashTestSuite) TestPartitionTable(c *C) { + defer testleak.AfterTest(c)() tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t") @@ -279,6 +285,7 @@ func (s *tiflashTestSuite) TestPartitionTable(c *C) { } func (s *tiflashTestSuite) TestCancelMppTasks(c *C) { + defer testleak.AfterTest(c)() var hang = "github.com/pingcap/tidb/store/mockstore/unistore/mppRecvHang" tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") @@ -312,12 +319,11 @@ func (s *tiflashTestSuite) TestCancelMppTasks(c *C) { // all goroutines exit if one goroutine hangs but another return errors func (s *tiflashTestSuite) TestMppGoroutinesExitFromErrors(c *C) { + defer testleak.AfterTest(c)() // mock non-root tasks return error - var mockTaskError = "github.com/pingcap/tidb/store/copr/mppExitFromErrors" + var mppNonRootTaskError = "github.com/pingcap/tidb/store/copr/mppNonRootTaskError" // mock root tasks hang var hang = "github.com/pingcap/tidb/store/mockstore/unistore/mppRecvHang" - // check all goroutines done - var exitDone = "github.com/pingcap/tidb/store/copr/mppExitsDone" tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t") @@ -338,19 +344,14 @@ func (s *tiflashTestSuite) TestMppGoroutinesExitFromErrors(c *C) { tk.MustExec("insert into t1 values(1,0)") tk.MustExec("insert into t1 values(2,0)") tk.MustExec("insert into t1 values(3,0)") - atomic.StoreUint32(&tk.Se.GetSessionVars().Killed, 0) tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") tk.MustExec("set @@session.tidb_allow_mpp=ON") - c.Assert(failpoint.Enable(exitDone, `return(true)`), IsNil) - c.Assert(failpoint.Enable(mockTaskError, `return(true)`), IsNil) + c.Assert(failpoint.Enable(mppNonRootTaskError, `return(true)`), IsNil) c.Assert(failpoint.Enable(hang, `return(true)`), IsNil) // generate 2 root tasks, one will hang and another will return errors err = tk.QueryToErr("select count(*) from t as t1 , t where t1.a = t.a") c.Assert(err, NotNil) - // check that all goroutine exit normally. - c.Assert(atomic.LoadUint32(&tk.Se.GetSessionVars().Killed), Equals, uint32(20)) - c.Assert(failpoint.Disable(exitDone), IsNil) - c.Assert(failpoint.Disable(mockTaskError), IsNil) + c.Assert(failpoint.Disable(mppNonRootTaskError), IsNil) c.Assert(failpoint.Disable(hang), IsNil) } diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 4ff972570122e..8a6bd3c108b38 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -241,7 +241,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer, m.sendError(errors.New(realResp.Error.Msg)) return } - failpoint.Inject("mppExitFromErrors", func(val failpoint.Value) { + failpoint.Inject("mppNonRootTaskError", func(val failpoint.Value) { if val.(bool) && !req.IsRoot { time.Sleep(1 * time.Second) m.sendError(tikv.ErrTiFlashServerTimeout) @@ -350,11 +350,6 @@ func (m *mppIterator) Close() error { } m.cancelFunc() m.wg.Wait() - failpoint.Inject("mppExitsDone", func(val failpoint.Value) { - if val.(bool) { - atomic.StoreUint32(m.vars.Killed, 20) - } - }) return nil } From 0c82290eeab93e4975429b21208c04a3b53bb88f Mon Sep 17 00:00:00 2001 From: fzhedu Date: Fri, 12 Mar 2021 16:40:36 +0800 Subject: [PATCH 19/20] remove unnecessary leak test --- executor/tiflash_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/executor/tiflash_test.go b/executor/tiflash_test.go index 16ce2fa9f5ebc..20679644dabb6 100644 --- a/executor/tiflash_test.go +++ b/executor/tiflash_test.go @@ -73,7 +73,6 @@ func (s *tiflashTestSuite) SetUpSuite(c *C) { } func (s *tiflashTestSuite) TestReadPartitionTable(c *C) { - defer testleak.AfterTest(c)() tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t") @@ -101,7 +100,6 @@ func (s *tiflashTestSuite) TestReadPartitionTable(c *C) { } func (s *tiflashTestSuite) TestReadUnsigedPK(c *C) { - defer testleak.AfterTest(c)() tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t") @@ -137,7 +135,6 @@ func (s *tiflashTestSuite) TestReadUnsigedPK(c *C) { } func (s *tiflashTestSuite) TestMppExecution(c *C) { - defer testleak.AfterTest(c)() tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t") @@ -196,7 +193,6 @@ func (s *tiflashTestSuite) TestMppExecution(c *C) { } func (s *tiflashTestSuite) TestPartitionTable(c *C) { - defer testleak.AfterTest(c)() tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t") @@ -285,7 +281,6 @@ func (s *tiflashTestSuite) TestPartitionTable(c *C) { } func (s *tiflashTestSuite) TestCancelMppTasks(c *C) { - defer testleak.AfterTest(c)() var hang = "github.com/pingcap/tidb/store/mockstore/unistore/mppRecvHang" tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") From b28f348492c3235719e4816b8db779e6a908b217 Mon Sep 17 00:00:00 2001 From: fzhedu Date: Fri, 12 Mar 2021 17:56:58 +0800 Subject: [PATCH 20/20] work around goroutine leak --- executor/tiflash_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/executor/tiflash_test.go b/executor/tiflash_test.go index 20679644dabb6..1a5d7804cca0e 100644 --- a/executor/tiflash_test.go +++ b/executor/tiflash_test.go @@ -33,7 +33,6 @@ import ( "github.com/pingcap/tidb/store/mockstore/unistore" "github.com/pingcap/tidb/store/tikv/mockstore/cluster" "github.com/pingcap/tidb/util/testkit" - "github.com/pingcap/tidb/util/testleak" ) type tiflashTestSuite struct { @@ -43,7 +42,6 @@ type tiflashTestSuite struct { } func (s *tiflashTestSuite) SetUpSuite(c *C) { - testleak.BeforeTest() var err error s.store, err = mockstore.NewMockStore( mockstore.WithClusterInspector(func(c cluster.Cluster) { @@ -314,7 +312,6 @@ func (s *tiflashTestSuite) TestCancelMppTasks(c *C) { // all goroutines exit if one goroutine hangs but another return errors func (s *tiflashTestSuite) TestMppGoroutinesExitFromErrors(c *C) { - defer testleak.AfterTest(c)() // mock non-root tasks return error var mppNonRootTaskError = "github.com/pingcap/tidb/store/copr/mppNonRootTaskError" // mock root tasks hang