Skip to content

Commit

Permalink
executor: kill tidb [session id] can't stop executors and release r…
Browse files Browse the repository at this point in the history
…esources quickly (pingcap#9844)
  • Loading branch information
qw4990 committed Apr 1, 2019
1 parent fe7aefa commit b5688a1
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 14 deletions.
10 changes: 10 additions & 0 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type clientConn struct {
mu struct {
sync.RWMutex
cancelFunc context.CancelFunc
resultSets []ResultSet
}
}

Expand Down Expand Up @@ -883,6 +884,15 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
metrics.ExecuteErrorCounter.WithLabelValues(metrics.ExecuteErrorToLabel(err)).Inc()
return errors.Trace(err)
}
cc.mu.Lock()
cc.mu.resultSets = rs
status := atomic.LoadInt32(&cc.status)
if status == connStatusShutdown || status == connStatusWaitShutdown {
cc.mu.Unlock()
killConn(cc)
return errors.New("killed by another connection")
}
cc.mu.Unlock()
if rs != nil {
if len(rs) == 1 {
err = cc.writeResultset(ctx, rs[0], false, 0, 0)
Expand Down
7 changes: 3 additions & 4 deletions server/driver_tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package server
import (
"crypto/tls"
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/auth"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
"golang.org/x/net/context"
"sync/atomic"
)

// TiDBDriver implements IDriver.
Expand Down Expand Up @@ -336,7 +336,7 @@ type tidbResultSet struct {
recordSet sqlexec.RecordSet
columns []*ColumnInfo
rows []chunk.Row
closed bool
closed int32
}

func (trs *tidbResultSet) NewChunk() *chunk.Chunk {
Expand All @@ -359,10 +359,9 @@ func (trs *tidbResultSet) GetFetchedRows() []chunk.Row {
}

func (trs *tidbResultSet) Close() error {
if trs.closed {
if !atomic.CompareAndSwapInt32(&trs.closed, 0, 1) {
return nil
}
trs.closed = true
return trs.recordSet.Close()
}

Expand Down
21 changes: 11 additions & 10 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,19 +351,25 @@ func (s *Server) Kill(connectionID uint64, query bool) {
return
}

killConn(conn, query)
}

func killConn(conn *clientConn, query bool) {
if !query {
// Mark the client connection status as WaitShutdown, when the goroutine detect
// this, it will end the dispatch loop and exit.
atomic.StoreInt32(&conn.status, connStatusWaitShutdown)
}
killConn(conn)
}

func killConn(conn *clientConn) {
conn.mu.RLock()
resultSets := conn.mu.resultSets
cancelFunc := conn.mu.cancelFunc
conn.mu.RUnlock()
for _, resultSet := range resultSets {
// resultSet.Close() is reentrant so it's safe to kill a same connID multiple times
if err := resultSet.Close(); err != nil {
logutil.Logger(context.Background()).Error("close result set error", zap.Uint32("connID", conn.connectionID), zap.Error(err))
}
}
if cancelFunc != nil {
cancelFunc()
}
Expand All @@ -378,12 +384,7 @@ func (s *Server) KillAllConnections() {
for _, conn := range s.clients {
atomic.StoreInt32(&conn.status, connStatusShutdown)
terror.Log(errors.Trace(conn.closeWithoutLock()))
conn.mu.RLock()
cancelFunc := conn.mu.cancelFunc
conn.mu.RUnlock()
if cancelFunc != nil {
cancelFunc()
}
killConn(conn)
}
}

Expand Down

0 comments on commit b5688a1

Please sign in to comment.