From 77e91d13279d34d8d40f516a77382e4f37d05681 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Mon, 1 Apr 2019 16:51:24 +0800 Subject: [PATCH] executor: `kill tidb [session id]` can't stop executors and release resources quickly (#9844) --- server/conn.go | 10 ++++++++++ server/driver_tidb.go | 6 +++--- server/server.go | 21 +++++++++++---------- 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/server/conn.go b/server/conn.go index dba6ef82a0ffd..f2fa4264eddc8 100644 --- a/server/conn.go +++ b/server/conn.go @@ -112,6 +112,7 @@ type clientConn struct { mu struct { sync.RWMutex cancelFunc context.CancelFunc + resultSets []ResultSet } } @@ -1047,6 +1048,15 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) { metrics.ExecuteErrorCounter.WithLabelValues(metrics.ExecuteErrorToLabel(err)).Inc() return errors.Trace(err) } + cc.mu.Lock() + cc.mu.resultSets = rs + status := atomic.LoadInt32(&cc.status) + if status == connStatusShutdown || status == connStatusWaitShutdown { + cc.mu.Unlock() + killConn(cc) + return errors.New("killed by another connection") + } + cc.mu.Unlock() if rs != nil { if len(rs) == 1 { err = cc.writeResultset(ctx, rs[0], false, 0, 0) diff --git a/server/driver_tidb.go b/server/driver_tidb.go index 46e9f37f4a5e8..315387a8a1ee3 100644 --- a/server/driver_tidb.go +++ b/server/driver_tidb.go @@ -17,6 +17,7 @@ import ( "context" "crypto/tls" "fmt" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -354,7 +355,7 @@ type tidbResultSet struct { recordSet sqlexec.RecordSet columns []*ColumnInfo rows []chunk.Row - closed bool + closed int32 } func (trs *tidbResultSet) NewRecordBatch() *chunk.RecordBatch { @@ -377,10 +378,9 @@ func (trs *tidbResultSet) GetFetchedRows() []chunk.Row { } func (trs *tidbResultSet) Close() error { - if trs.closed { + if !atomic.CompareAndSwapInt32(&trs.closed, 0, 1) { return nil } - trs.closed = true return trs.recordSet.Close() } diff --git a/server/server.go b/server/server.go index f76b7d8504ff2..1881ab7cf2bb6 100644 --- a/server/server.go +++ b/server/server.go @@ -508,19 +508,25 @@ func (s *Server) Kill(connectionID uint64, query bool) { return } - killConn(conn, query) -} - -func killConn(conn *clientConn, query bool) { if !query { // Mark the client connection status as WaitShutdown, when the goroutine detect // this, it will end the dispatch loop and exit. atomic.StoreInt32(&conn.status, connStatusWaitShutdown) } + killConn(conn) +} +func killConn(conn *clientConn) { conn.mu.RLock() + resultSets := conn.mu.resultSets cancelFunc := conn.mu.cancelFunc conn.mu.RUnlock() + for _, resultSet := range resultSets { + // resultSet.Close() is reentrant so it's safe to kill a same connID multiple times + if err := resultSet.Close(); err != nil { + logutil.Logger(context.Background()).Error("close result set error", zap.Uint32("connID", conn.connectionID), zap.Error(err)) + } + } if cancelFunc != nil { cancelFunc() } @@ -535,12 +541,7 @@ func (s *Server) KillAllConnections() { for _, conn := range s.clients { atomic.StoreInt32(&conn.status, connStatusShutdown) terror.Log(errors.Trace(conn.closeWithoutLock())) - conn.mu.RLock() - cancelFunc := conn.mu.cancelFunc - conn.mu.RUnlock() - if cancelFunc != nil { - cancelFunc() - } + killConn(conn) } }