Skip to content

Commit

Permalink
store/tikv: recycle idle connection in tikv client (#10616) (#10632)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored and zz-jason committed Jun 4, 2019
1 parent 9286afe commit 9501014
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 10 deletions.
73 changes: 63 additions & 10 deletions store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ type connArray struct {
batchCommandsCh chan *batchCommandsEntry
batchCommandsClients []*batchCommandsClient
tikvTransportLayerLoad uint64

// Notify rpcClient to check the idle flag
idleNotify *uint32
idle bool
idleDetect *time.Timer
}

type batchCommandsClient struct {
Expand Down Expand Up @@ -132,7 +137,6 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
// When `conn.Close()` is called, `client.Recv()` will return an error.
resp, err := c.client.Recv()
if err != nil {

now := time.Now()
for { // try to re-create the streaming in the loop.
if c.isStopped() {
Expand Down Expand Up @@ -198,8 +202,9 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
}
}

func newConnArray(maxSize uint, addr string, security config.Security) (*connArray, error) {
func newConnArray(maxSize uint, addr string, security config.Security, idleNotify *uint32) (*connArray, error) {
cfg := config.GetGlobalConfig()

a := &connArray{
index: 0,
v: make([]*grpc.ClientConn, maxSize),
Expand All @@ -208,6 +213,9 @@ func newConnArray(maxSize uint, addr string, security config.Security) (*connArr
batchCommandsCh: make(chan *batchCommandsEntry, cfg.TiKVClient.MaxBatchSize),
batchCommandsClients: make([]*batchCommandsClient, 0, maxSize),
tikvTransportLayerLoad: 0,

idleNotify: idleNotify,
idleDetect: time.NewTimer(idleTimeout),
}
if err := a.Init(addr, security); err != nil {
return nil, err
Expand Down Expand Up @@ -332,15 +340,29 @@ func (b *batchCommandsEntry) isCanceled() bool {
return atomic.LoadInt32(&b.canceled) == 1
}

const idleTimeout = 3 * time.Minute

// fetchAllPendingRequests fetches all pending requests from the channel.
func fetchAllPendingRequests(
ch chan *batchCommandsEntry,
func (a *connArray) fetchAllPendingRequests(
maxBatchSize int,
entries *[]*batchCommandsEntry,
requests *[]*tikvpb.BatchCommandsRequest_Request,
) {
// Block on the first element.
headEntry := <-ch
var headEntry *batchCommandsEntry
select {
case headEntry = <-a.batchCommandsCh:
if !a.idleDetect.Stop() {
<-a.idleDetect.C
}
a.idleDetect.Reset(idleTimeout)
case <-a.idleDetect.C:
a.idleDetect.Reset(idleTimeout)
a.idle = true
atomic.CompareAndSwapUint32(a.idleNotify, 0, 1)
// This connArray to be recycled
return
}
if headEntry == nil {
return
}
Expand All @@ -350,7 +372,7 @@ func fetchAllPendingRequests(
// This loop is for trying best to collect more requests.
for len(*entries) < maxBatchSize {
select {
case entry := <-ch:
case entry := <-a.batchCommandsCh:
if entry == nil {
return
}
Expand Down Expand Up @@ -435,7 +457,7 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) {
requestIDs = requestIDs[:0]

metrics.TiKVPendingBatchRequests.Set(float64(len(a.batchCommandsCh)))
fetchAllPendingRequests(a.batchCommandsCh, int(cfg.MaxBatchSize), &entries, &requests)
a.fetchAllPendingRequests(int(cfg.MaxBatchSize), &entries, &requests)

if len(entries) < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 {
tikvTransportLayerLoad := atomic.LoadUint64(batchCommandsClient.tikvTransportLayerLoad)
Expand Down Expand Up @@ -513,13 +535,15 @@ func removeCanceledRequests(
// TODO: Add flow control between RPC clients in TiDB ond RPC servers in TiKV.
// Since we use shared client connection to communicate to the same TiKV, it's possible
// that there are too many concurrent requests which overload the service of TiKV.
// TODO: Implement background cleanup. It adds a background goroutine to periodically check
// whether there is any connection is idle and then close and remove these idle connections.
type rpcClient struct {
sync.RWMutex
isClosed bool
conns map[string]*connArray
security config.Security

// Implement background cleanup.
// Periodically check whether there is any connection that is idle and then close and remove these idle connections.
idleNotify uint32
}

func newRPCClient(security config.Security) *rpcClient {
Expand Down Expand Up @@ -554,7 +578,7 @@ func (c *rpcClient) createConnArray(addr string) (*connArray, error) {
if !ok {
var err error
connCount := config.GetGlobalConfig().TiKVClient.GrpcConnectionCount
array, err = newConnArray(connCount, addr, c.security)
array, err = newConnArray(connCount, addr, c.security, &c.idleNotify)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -619,6 +643,10 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeID).Observe(time.Since(start).Seconds())
}()

if atomic.CompareAndSwapUint32(&c.idleNotify, 1, 0) {
c.recycleIdleConnArray()
}

connArray, err := c.getConnArray(addr)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -679,3 +707,28 @@ func (c *rpcClient) Close() error {
c.closeConns()
return nil
}

func (c *rpcClient) recycleIdleConnArray() {
var addrs []string
c.RLock()
for _, conn := range c.conns {
if conn.idle {
addrs = append(addrs, conn.target)
}
}
c.RUnlock()

for _, addr := range addrs {
c.Lock()
conn, ok := c.conns[addr]
if ok {
delete(c.conns, addr)
logutil.Logger(context.Background()).Info("recycle idle connection",
zap.String("target", addr))
}
c.Unlock()
if conn != nil {
conn.Close()
}
}
}
2 changes: 2 additions & 0 deletions store/tikv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func (s *testClientSuite) TestConn(c *C) {
c.Assert(err, IsNil)
c.Assert(conn2.Get(), Not(Equals), conn1.Get())

client.recycleIdleConnArray()

client.Close()
conn3, err := client.getConnArray(addr)
c.Assert(err, NotNil)
Expand Down

0 comments on commit 9501014

Please sign in to comment.