From 754816e80bf92b4c6ccfe384119e4d1b3157b5db Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 16 Jul 2019 07:26:58 +0800 Subject: [PATCH] =?UTF-8?q?store/tikv:=20move=20batch=20client=20code=20to?= =?UTF-8?q?=20file=20client=5Fbatch.go=20(#1=E2=80=A6=20(#11228)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- store/tikv/client.go | 448 +---------------------------------- store/tikv/client_batch.go | 472 +++++++++++++++++++++++++++++++++++++ store/tikv/client_test.go | 4 +- 3 files changed, 484 insertions(+), 440 deletions(-) create mode 100644 store/tikv/client_batch.go diff --git a/store/tikv/client.go b/store/tikv/client.go index 03a487d9321f7..2ec83f9e6d61d 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -25,7 +25,6 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/debugpb" "github.com/pingcap/kvproto/pkg/tikvpb" @@ -34,8 +33,6 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/util/logutil" - "github.com/prometheus/client_golang/prometheus" - "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" @@ -75,192 +72,24 @@ type connArray struct { v []*grpc.ClientConn // streamTimeout binds with a background goroutine to process coprocessor streaming timeout. streamTimeout chan *tikvrpc.Lease - - // batchCommandsCh used for batch commands. - batchCommandsCh chan *batchCommandsEntry - batchCommandsClients []*batchCommandsClient - tikvTransportLayerLoad uint64 - closed chan struct{} - - // Notify rpcClient to check the idle flag - idleNotify *uint32 - idle bool - idleDetect *time.Timer - - pendingRequests prometheus.Gauge -} - -type batchCommandsClient struct { - // The target host. - target string - - conn *grpc.ClientConn - client tikvpb.Tikv_BatchCommandsClient - batched sync.Map - idAlloc uint64 - tikvTransportLayerLoad *uint64 - - // closed indicates the batch client is closed explicitly or not. - closed int32 - // clientLock protects client when re-create the streaming. - clientLock sync.Mutex -} - -func (c *batchCommandsClient) isStopped() bool { - return atomic.LoadInt32(&c.closed) != 0 -} - -func (c *batchCommandsClient) send(request *tikvpb.BatchCommandsRequest, entries []*batchCommandsEntry) { - // Use the lock to protect the stream client won't be replaced by RecvLoop, - // and new added request won't be removed by `failPendingRequests`. - c.clientLock.Lock() - defer c.clientLock.Unlock() - for i, requestID := range request.RequestIds { - c.batched.Store(requestID, entries[i]) - } - if err := c.client.Send(request); err != nil { - logutil.Logger(context.Background()).Error( - "batch commands send error", - zap.String("target", c.target), - zap.Error(err), - ) - c.failPendingRequests(err) - } -} - -func (c *batchCommandsClient) recv() (*tikvpb.BatchCommandsResponse, error) { - failpoint.Inject("gotErrorInRecvLoop", func(_ failpoint.Value) (*tikvpb.BatchCommandsResponse, error) { - return nil, errors.New("injected error in batchRecvLoop") - }) - // When `conn.Close()` is called, `client.Recv()` will return an error. - return c.client.Recv() -} - -// `failPendingRequests` must be called in locked contexts in order to avoid double closing channels. -func (c *batchCommandsClient) failPendingRequests(err error) { - failpoint.Inject("panicInFailPendingRequests", nil) - c.batched.Range(func(key, value interface{}) bool { - id, _ := key.(uint64) - entry, _ := value.(*batchCommandsEntry) - entry.err = err - c.batched.Delete(id) - close(entry.res) - return true - }) -} - -func (c *batchCommandsClient) reCreateStreamingClient(err error) bool { - // Hold the lock to forbid batchSendLoop using the old client. - c.clientLock.Lock() - defer c.clientLock.Unlock() - c.failPendingRequests(err) // fail all pending requests. - - // Re-establish a application layer stream. TCP layer is handled by gRPC. - tikvClient := tikvpb.NewTikvClient(c.conn) - streamClient, err := tikvClient.BatchCommands(context.TODO()) - if err == nil { - logutil.Logger(context.Background()).Info( - "batchRecvLoop re-create streaming success", - zap.String("target", c.target), - ) - c.client = streamClient - return true - } - logutil.Logger(context.Background()).Error( - "batchRecvLoop re-create streaming fail", - zap.String("target", c.target), - zap.Error(err), - ) - return false -} - -func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { - defer func() { - if r := recover(); r != nil { - metrics.PanicCounter.WithLabelValues(metrics.LabelBatchRecvLoop).Inc() - logutil.Logger(context.Background()).Error("batchRecvLoop", - zap.Reflect("r", r), - zap.Stack("stack")) - logutil.Logger(context.Background()).Info("restart batchRecvLoop") - go c.batchRecvLoop(cfg) - } - }() - - for { - resp, err := c.recv() - if err != nil { - now := time.Now() - for { // try to re-create the streaming in the loop. - if c.isStopped() { - return - } - logutil.Logger(context.Background()).Error( - "batchRecvLoop error when receive", - zap.String("target", c.target), - zap.Error(err), - ) - - if c.reCreateStreamingClient(err) { - break - } - - // TODO: Use a more smart backoff strategy. - time.Sleep(time.Second) - } - metrics.TiKVBatchClientUnavailable.Observe(time.Since(now).Seconds()) - continue - } - - responses := resp.GetResponses() - for i, requestID := range resp.GetRequestIds() { - value, ok := c.batched.Load(requestID) - if !ok { - // There shouldn't be any unknown responses because if the old entries - // are cleaned by `failPendingRequests`, the stream must be re-created - // so that old responses will be never received. - panic("batchRecvLoop receives a unknown response") - } - entry := value.(*batchCommandsEntry) - if atomic.LoadInt32(&entry.canceled) == 0 { - // Put the response only if the request is not canceled. - entry.res <- responses[i] - } - c.batched.Delete(requestID) - } - - tikvTransportLayerLoad := resp.GetTransportLayerLoad() - if tikvTransportLayerLoad > 0.0 && cfg.MaxBatchWaitTime > 0 { - // We need to consider TiKV load only if batch-wait strategy is enabled. - atomic.StoreUint64(c.tikvTransportLayerLoad, tikvTransportLayerLoad) - } - } + // batchConn is not null when batch is enabled. + *batchConn } func newConnArray(maxSize uint, addr string, security config.Security, idleNotify *uint32) (*connArray, error) { - cfg := config.GetGlobalConfig() - a := &connArray{ index: 0, v: make([]*grpc.ClientConn, maxSize), streamTimeout: make(chan *tikvrpc.Lease, 1024), - - batchCommandsCh: make(chan *batchCommandsEntry, cfg.TiKVClient.MaxBatchSize), - batchCommandsClients: make([]*batchCommandsClient, 0, maxSize), - tikvTransportLayerLoad: 0, - closed: make(chan struct{}), - - idleNotify: idleNotify, - idleDetect: time.NewTimer(idleTimeout), } - if err := a.Init(addr, security); err != nil { + if err := a.Init(addr, security, idleNotify); err != nil { return nil, err } return a, nil } -func (a *connArray) Init(addr string, security config.Security) error { +func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32) error { a.target = addr - a.pendingRequests = metrics.TiKVPendingBatchRequests.WithLabelValues(a.target) opt := grpc.WithInsecure() if len(security.ClusterSSLCA) != 0 { @@ -282,6 +111,10 @@ func (a *connArray) Init(addr string, security config.Security) error { } allowBatch := cfg.TiKVClient.MaxBatchSize > 0 + if allowBatch { + a.batchConn = newBatchConn(uint(len(a.v)), cfg.TiKVClient.MaxBatchSize, idleNotify) + a.pendingRequests = metrics.TiKVPendingBatchRequests.WithLabelValues(a.target) + } keepAlive := cfg.TiKVClient.GrpcKeepAliveTime keepAliveTimeout := cfg.TiKVClient.GrpcKeepAliveTimeout for i := range a.v { @@ -345,15 +178,9 @@ func (a *connArray) Get() *grpc.ClientConn { } func (a *connArray) Close() { - // Close all batchRecvLoop. - for _, c := range a.batchCommandsClients { - // After connections are closed, `batchRecvLoop`s will check the flag. - atomic.StoreInt32(&c.closed, 1) + if a.batchConn != nil { + a.batchConn.Close() } - // Don't close(batchCommandsCh) because when Close() is called, someone maybe - // calling SendRequest and writing batchCommandsCh, if we close it here the - // writing goroutine will panic. - close(a.closed) for i, c := range a.v { if c != nil { @@ -365,197 +192,6 @@ func (a *connArray) Close() { close(a.streamTimeout) } -type batchCommandsEntry struct { - req *tikvpb.BatchCommandsRequest_Request - res chan *tikvpb.BatchCommandsResponse_Response - - // canceled indicated the request is canceled or not. - canceled int32 - err error -} - -func (b *batchCommandsEntry) isCanceled() bool { - return atomic.LoadInt32(&b.canceled) == 1 -} - -const idleTimeout = 3 * time.Minute - -// fetchAllPendingRequests fetches all pending requests from the channel. -func (a *connArray) fetchAllPendingRequests( - maxBatchSize int, - entries *[]*batchCommandsEntry, - requests *[]*tikvpb.BatchCommandsRequest_Request, -) { - // Block on the first element. - var headEntry *batchCommandsEntry - select { - case headEntry = <-a.batchCommandsCh: - if !a.idleDetect.Stop() { - <-a.idleDetect.C - } - a.idleDetect.Reset(idleTimeout) - case <-a.idleDetect.C: - a.idleDetect.Reset(idleTimeout) - a.idle = true - atomic.CompareAndSwapUint32(a.idleNotify, 0, 1) - // This connArray to be recycled - return - case <-a.closed: - return - } - if headEntry == nil { - return - } - *entries = append(*entries, headEntry) - *requests = append(*requests, headEntry.req) - - // This loop is for trying best to collect more requests. - for len(*entries) < maxBatchSize { - select { - case entry := <-a.batchCommandsCh: - if entry == nil { - return - } - *entries = append(*entries, entry) - *requests = append(*requests, entry.req) - default: - return - } - } -} - -// fetchMorePendingRequests fetches more pending requests from the channel. -func fetchMorePendingRequests( - ch chan *batchCommandsEntry, - maxBatchSize int, - batchWaitSize int, - maxWaitTime time.Duration, - entries *[]*batchCommandsEntry, - requests *[]*tikvpb.BatchCommandsRequest_Request, -) { - waitStart := time.Now() - - // Try to collect `batchWaitSize` requests, or wait `maxWaitTime`. - after := time.NewTimer(maxWaitTime) - for len(*entries) < batchWaitSize { - select { - case entry := <-ch: - if entry == nil { - return - } - *entries = append(*entries, entry) - *requests = append(*requests, entry.req) - case waitEnd := <-after.C: - metrics.TiKVBatchWaitDuration.Observe(float64(waitEnd.Sub(waitStart))) - return - } - } - after.Stop() - - // Do an additional non-block try. Here we test the lengh with `maxBatchSize` instead - // of `batchWaitSize` because trying best to fetch more requests is necessary so that - // we can adjust the `batchWaitSize` dynamically. - for len(*entries) < maxBatchSize { - select { - case entry := <-ch: - if entry == nil { - return - } - *entries = append(*entries, entry) - *requests = append(*requests, entry.req) - default: - metrics.TiKVBatchWaitDuration.Observe(float64(time.Since(waitStart))) - return - } - } -} - -func (a *connArray) batchSendLoop(cfg config.TiKVClient) { - defer func() { - if r := recover(); r != nil { - metrics.PanicCounter.WithLabelValues(metrics.LabelBatchSendLoop).Inc() - logutil.Logger(context.Background()).Error("batchSendLoop", - zap.Reflect("r", r), - zap.Stack("stack")) - logutil.Logger(context.Background()).Info("restart batchSendLoop") - go a.batchSendLoop(cfg) - } - }() - - entries := make([]*batchCommandsEntry, 0, cfg.MaxBatchSize) - requests := make([]*tikvpb.BatchCommandsRequest_Request, 0, cfg.MaxBatchSize) - requestIDs := make([]uint64, 0, cfg.MaxBatchSize) - - var bestBatchWaitSize = cfg.BatchWaitSize - for { - // Choose a connection by round-robbin. - next := atomic.AddUint32(&a.index, 1) % uint32(len(a.v)) - batchCommandsClient := a.batchCommandsClients[next] - - entries = entries[:0] - requests = requests[:0] - requestIDs = requestIDs[:0] - - a.pendingRequests.Set(float64(len(a.batchCommandsCh))) - a.fetchAllPendingRequests(int(cfg.MaxBatchSize), &entries, &requests) - - if len(entries) < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 { - tikvTransportLayerLoad := atomic.LoadUint64(batchCommandsClient.tikvTransportLayerLoad) - // If the target TiKV is overload, wait a while to collect more requests. - if uint(tikvTransportLayerLoad) >= cfg.OverloadThreshold { - fetchMorePendingRequests( - a.batchCommandsCh, int(cfg.MaxBatchSize), int(bestBatchWaitSize), - cfg.MaxBatchWaitTime, &entries, &requests, - ) - } - } - length := len(requests) - if uint(length) == 0 { - // The batch command channel is closed. - return - } else if uint(length) < bestBatchWaitSize && bestBatchWaitSize > 1 { - // Waits too long to collect requests, reduce the target batch size. - bestBatchWaitSize -= 1 - } else if uint(length) > bestBatchWaitSize+4 && bestBatchWaitSize < cfg.MaxBatchSize { - bestBatchWaitSize += 1 - } - - length = removeCanceledRequests(&entries, &requests) - if length == 0 { - continue // All requests are canceled. - } - maxBatchID := atomic.AddUint64(&batchCommandsClient.idAlloc, uint64(length)) - for i := 0; i < length; i++ { - requestID := uint64(i) + maxBatchID - uint64(length) - requestIDs = append(requestIDs, requestID) - } - - req := &tikvpb.BatchCommandsRequest{ - Requests: requests, - RequestIds: requestIDs, - } - - batchCommandsClient.send(req, entries) - } -} - -// removeCanceledRequests removes canceled requests before sending. -func removeCanceledRequests( - entries *[]*batchCommandsEntry, - requests *[]*tikvpb.BatchCommandsRequest_Request) int { - validEntries := (*entries)[:0] - validRequets := (*requests)[:0] - for _, e := range *entries { - if !e.isCanceled() { - validEntries = append(validEntries, e) - validRequets = append(validRequets, e.req) - } - } - *entries = validEntries - *requests = validRequets - return len(*entries) -} - // rpcClient is RPC client struct. // TODO: Add flow control between RPC clients in TiDB ond RPC servers in TiKV. // Since we use shared client connection to communicate to the same TiKV, it's possible @@ -629,43 +265,6 @@ func (c *rpcClient) closeConns() { c.Unlock() } -func sendBatchRequest( - ctx context.Context, - addr string, - connArray *connArray, - req *tikvpb.BatchCommandsRequest_Request, - timeout time.Duration, -) (*tikvrpc.Response, error) { - entry := &batchCommandsEntry{ - req: req, - res: make(chan *tikvpb.BatchCommandsResponse_Response, 1), - canceled: 0, - err: nil, - } - ctx1, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - select { - case connArray.batchCommandsCh <- entry: - case <-ctx1.Done(): - logutil.Logger(context.Background()).Warn("send request is cancelled", - zap.String("to", addr), zap.String("cause", ctx1.Err().Error())) - return nil, errors.Trace(ctx1.Err()) - } - - select { - case res, ok := <-entry.res: - if !ok { - return nil, errors.Trace(entry.err) - } - return tikvrpc.FromBatchCommandsResponse(res), nil - case <-ctx1.Done(): - atomic.StoreInt32(&entry.canceled, 1) - logutil.Logger(context.Background()).Warn("wait response is cancelled", - zap.String("to", addr), zap.String("cause", ctx1.Err().Error())) - return nil, errors.Trace(ctx1.Err()) - } -} - // SendRequest sends a Request to server and receives Response. func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { start := time.Now() @@ -686,7 +285,7 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R if config.GetGlobalConfig().TiKVClient.MaxBatchSize > 0 { if batchReq := req.ToBatchCommandsRequest(); batchReq != nil { - return sendBatchRequest(ctx, addr, connArray, batchReq, timeout) + return sendBatchRequest(ctx, addr, connArray.batchConn, batchReq, timeout) } } @@ -739,28 +338,3 @@ func (c *rpcClient) Close() error { c.closeConns() return nil } - -func (c *rpcClient) recycleIdleConnArray() { - var addrs []string - c.RLock() - for _, conn := range c.conns { - if conn.idle { - addrs = append(addrs, conn.target) - } - } - c.RUnlock() - - for _, addr := range addrs { - c.Lock() - conn, ok := c.conns[addr] - if ok { - delete(c.conns, addr) - logutil.Logger(context.Background()).Info("recycle idle connection", - zap.String("target", addr)) - } - c.Unlock() - if conn != nil { - conn.Close() - } - } -} diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go new file mode 100644 index 0000000000000..8098b3f4001a2 --- /dev/null +++ b/store/tikv/client_batch.go @@ -0,0 +1,472 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package tikv provides tcp connection to kvserver. +package tikv + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/tikvpb" + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/store/tikv/tikvrpc" + "github.com/pingcap/tidb/util/logutil" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +type batchConn struct { + index uint32 + // batchCommandsCh used for batch commands. + batchCommandsCh chan *batchCommandsEntry + batchCommandsClients []*batchCommandsClient + tikvTransportLayerLoad uint64 + closed chan struct{} + + // Notify rpcClient to check the idle flag + idleNotify *uint32 + idle bool + idleDetect *time.Timer + + pendingRequests prometheus.Gauge +} + +func newBatchConn(connCount, maxBatchSize uint, idleNotify *uint32) *batchConn { + return &batchConn{ + batchCommandsCh: make(chan *batchCommandsEntry, maxBatchSize), + batchCommandsClients: make([]*batchCommandsClient, 0, connCount), + tikvTransportLayerLoad: 0, + closed: make(chan struct{}), + + idleNotify: idleNotify, + idleDetect: time.NewTimer(idleTimeout), + } +} + +// fetchAllPendingRequests fetches all pending requests from the channel. +func (a *batchConn) fetchAllPendingRequests( + maxBatchSize int, + entries *[]*batchCommandsEntry, + requests *[]*tikvpb.BatchCommandsRequest_Request, +) { + // Block on the first element. + var headEntry *batchCommandsEntry + select { + case headEntry = <-a.batchCommandsCh: + if !a.idleDetect.Stop() { + <-a.idleDetect.C + } + a.idleDetect.Reset(idleTimeout) + case <-a.idleDetect.C: + a.idleDetect.Reset(idleTimeout) + a.idle = true + atomic.CompareAndSwapUint32(a.idleNotify, 0, 1) + // This batchConn to be recycled + return + case <-a.closed: + return + } + if headEntry == nil { + return + } + *entries = append(*entries, headEntry) + *requests = append(*requests, headEntry.req) + + // This loop is for trying best to collect more requests. + for len(*entries) < maxBatchSize { + select { + case entry := <-a.batchCommandsCh: + if entry == nil { + return + } + *entries = append(*entries, entry) + *requests = append(*requests, entry.req) + default: + return + } + } +} + +// fetchMorePendingRequests fetches more pending requests from the channel. +func fetchMorePendingRequests( + ch chan *batchCommandsEntry, + maxBatchSize int, + batchWaitSize int, + maxWaitTime time.Duration, + entries *[]*batchCommandsEntry, + requests *[]*tikvpb.BatchCommandsRequest_Request, +) { + waitStart := time.Now() + + // Try to collect `batchWaitSize` requests, or wait `maxWaitTime`. + after := time.NewTimer(maxWaitTime) + for len(*entries) < batchWaitSize { + select { + case entry := <-ch: + if entry == nil { + return + } + *entries = append(*entries, entry) + *requests = append(*requests, entry.req) + case waitEnd := <-after.C: + metrics.TiKVBatchWaitDuration.Observe(float64(waitEnd.Sub(waitStart))) + return + } + } + after.Stop() + + // Do an additional non-block try. Here we test the lengh with `maxBatchSize` instead + // of `batchWaitSize` because trying best to fetch more requests is necessary so that + // we can adjust the `batchWaitSize` dynamically. + for len(*entries) < maxBatchSize { + select { + case entry := <-ch: + if entry == nil { + return + } + *entries = append(*entries, entry) + *requests = append(*requests, entry.req) + default: + metrics.TiKVBatchWaitDuration.Observe(float64(time.Since(waitStart))) + return + } + } +} + +type batchCommandsClient struct { + // The target host. + target string + + conn *grpc.ClientConn + client tikvpb.Tikv_BatchCommandsClient + batched sync.Map + idAlloc uint64 + tikvTransportLayerLoad *uint64 + + // closed indicates the batch client is closed explicitly or not. + closed int32 + // clientLock protects client when re-create the streaming. + clientLock sync.Mutex +} + +func (c *batchCommandsClient) isStopped() bool { + return atomic.LoadInt32(&c.closed) != 0 +} + +func (c *batchCommandsClient) send(request *tikvpb.BatchCommandsRequest, entries []*batchCommandsEntry) { + // Use the lock to protect the stream client won't be replaced by RecvLoop, + // and new added request won't be removed by `failPendingRequests`. + c.clientLock.Lock() + defer c.clientLock.Unlock() + for i, requestID := range request.RequestIds { + c.batched.Store(requestID, entries[i]) + } + if err := c.client.Send(request); err != nil { + logutil.Logger(context.Background()).Error( + "batch commands send error", + zap.String("target", c.target), + zap.Error(err), + ) + c.failPendingRequests(err) + } +} + +func (c *batchCommandsClient) recv() (*tikvpb.BatchCommandsResponse, error) { + failpoint.Inject("gotErrorInRecvLoop", func(_ failpoint.Value) (*tikvpb.BatchCommandsResponse, error) { + return nil, errors.New("injected error in batchRecvLoop") + }) + // When `conn.Close()` is called, `client.Recv()` will return an error. + return c.client.Recv() +} + +// `failPendingRequests` must be called in locked contexts in order to avoid double closing channels. +func (c *batchCommandsClient) failPendingRequests(err error) { + failpoint.Inject("panicInFailPendingRequests", nil) + c.batched.Range(func(key, value interface{}) bool { + id, _ := key.(uint64) + entry, _ := value.(*batchCommandsEntry) + entry.err = err + c.batched.Delete(id) + close(entry.res) + return true + }) +} + +func (c *batchCommandsClient) reCreateStreamingClient(err error) bool { + // Hold the lock to forbid batchSendLoop using the old client. + c.clientLock.Lock() + defer c.clientLock.Unlock() + c.failPendingRequests(err) // fail all pending requests. + + // Re-establish a application layer stream. TCP layer is handled by gRPC. + tikvClient := tikvpb.NewTikvClient(c.conn) + streamClient, err := tikvClient.BatchCommands(context.TODO()) + if err == nil { + logutil.Logger(context.Background()).Info( + "batchRecvLoop re-create streaming success", + zap.String("target", c.target), + ) + c.client = streamClient + return true + } + logutil.Logger(context.Background()).Error( + "batchRecvLoop re-create streaming fail", + zap.String("target", c.target), + zap.Error(err), + ) + return false +} + +func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { + defer func() { + if r := recover(); r != nil { + metrics.PanicCounter.WithLabelValues(metrics.LabelBatchRecvLoop).Inc() + logutil.Logger(context.Background()).Error("batchRecvLoop", + zap.Reflect("r", r), + zap.Stack("stack")) + logutil.Logger(context.Background()).Info("restart batchRecvLoop") + go c.batchRecvLoop(cfg) + } + }() + + for { + resp, err := c.recv() + if err != nil { + now := time.Now() + for { // try to re-create the streaming in the loop. + if c.isStopped() { + return + } + logutil.Logger(context.Background()).Error( + "batchRecvLoop error when receive", + zap.String("target", c.target), + zap.Error(err), + ) + + if c.reCreateStreamingClient(err) { + break + } + + // TODO: Use a more smart backoff strategy. + time.Sleep(time.Second) + } + metrics.TiKVBatchClientUnavailable.Observe(time.Since(now).Seconds()) + continue + } + + responses := resp.GetResponses() + for i, requestID := range resp.GetRequestIds() { + value, ok := c.batched.Load(requestID) + if !ok { + // There shouldn't be any unknown responses because if the old entries + // are cleaned by `failPendingRequests`, the stream must be re-created + // so that old responses will be never received. + panic("batchRecvLoop receives a unknown response") + } + entry := value.(*batchCommandsEntry) + if atomic.LoadInt32(&entry.canceled) == 0 { + // Put the response only if the request is not canceled. + entry.res <- responses[i] + } + c.batched.Delete(requestID) + } + + tikvTransportLayerLoad := resp.GetTransportLayerLoad() + if tikvTransportLayerLoad > 0.0 && cfg.MaxBatchWaitTime > 0 { + // We need to consider TiKV load only if batch-wait strategy is enabled. + atomic.StoreUint64(c.tikvTransportLayerLoad, tikvTransportLayerLoad) + } + } +} + +type batchCommandsEntry struct { + req *tikvpb.BatchCommandsRequest_Request + res chan *tikvpb.BatchCommandsResponse_Response + + // canceled indicated the request is canceled or not. + canceled int32 + err error +} + +func (b *batchCommandsEntry) isCanceled() bool { + return atomic.LoadInt32(&b.canceled) == 1 +} + +const idleTimeout = 3 * time.Minute + +func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { + defer func() { + if r := recover(); r != nil { + metrics.PanicCounter.WithLabelValues(metrics.LabelBatchSendLoop).Inc() + logutil.Logger(context.Background()).Error("batchSendLoop", + zap.Reflect("r", r), + zap.Stack("stack")) + logutil.Logger(context.Background()).Info("restart batchSendLoop") + go a.batchSendLoop(cfg) + } + }() + + entries := make([]*batchCommandsEntry, 0, cfg.MaxBatchSize) + requests := make([]*tikvpb.BatchCommandsRequest_Request, 0, cfg.MaxBatchSize) + requestIDs := make([]uint64, 0, cfg.MaxBatchSize) + + var bestBatchWaitSize = cfg.BatchWaitSize + for { + // Choose a connection by round-robbin. + next := atomic.AddUint32(&a.index, 1) % uint32(len(a.batchCommandsClients)) + batchCommandsClient := a.batchCommandsClients[next] + + entries = entries[:0] + requests = requests[:0] + requestIDs = requestIDs[:0] + + a.pendingRequests.Set(float64(len(a.batchCommandsCh))) + a.fetchAllPendingRequests(int(cfg.MaxBatchSize), &entries, &requests) + + if len(entries) < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 { + tikvTransportLayerLoad := atomic.LoadUint64(batchCommandsClient.tikvTransportLayerLoad) + // If the target TiKV is overload, wait a while to collect more requests. + if uint(tikvTransportLayerLoad) >= cfg.OverloadThreshold { + fetchMorePendingRequests( + a.batchCommandsCh, int(cfg.MaxBatchSize), int(bestBatchWaitSize), + cfg.MaxBatchWaitTime, &entries, &requests, + ) + } + } + length := len(requests) + if uint(length) == 0 { + // The batch command channel is closed. + return + } else if uint(length) < bestBatchWaitSize && bestBatchWaitSize > 1 { + // Waits too long to collect requests, reduce the target batch size. + bestBatchWaitSize -= 1 + } else if uint(length) > bestBatchWaitSize+4 && bestBatchWaitSize < cfg.MaxBatchSize { + bestBatchWaitSize += 1 + } + + length = removeCanceledRequests(&entries, &requests) + if length == 0 { + continue // All requests are canceled. + } + maxBatchID := atomic.AddUint64(&batchCommandsClient.idAlloc, uint64(length)) + for i := 0; i < length; i++ { + requestID := uint64(i) + maxBatchID - uint64(length) + requestIDs = append(requestIDs, requestID) + } + + req := &tikvpb.BatchCommandsRequest{ + Requests: requests, + RequestIds: requestIDs, + } + + batchCommandsClient.send(req, entries) + } +} + +func (a *batchConn) Close() { + // Close all batchRecvLoop. + for _, c := range a.batchCommandsClients { + // After connections are closed, `batchRecvLoop`s will check the flag. + atomic.StoreInt32(&c.closed, 1) + } + // Don't close(batchCommandsCh) because when Close() is called, someone maybe + // calling SendRequest and writing batchCommandsCh, if we close it here the + // writing goroutine will panic. + close(a.closed) +} + +// removeCanceledRequests removes canceled requests before sending. +func removeCanceledRequests( + entries *[]*batchCommandsEntry, + requests *[]*tikvpb.BatchCommandsRequest_Request) int { + validEntries := (*entries)[:0] + validRequets := (*requests)[:0] + for _, e := range *entries { + if !e.isCanceled() { + validEntries = append(validEntries, e) + validRequets = append(validRequets, e.req) + } + } + *entries = validEntries + *requests = validRequets + return len(*entries) +} + +func sendBatchRequest( + ctx context.Context, + addr string, + batchConn *batchConn, + req *tikvpb.BatchCommandsRequest_Request, + timeout time.Duration, +) (*tikvrpc.Response, error) { + entry := &batchCommandsEntry{ + req: req, + res: make(chan *tikvpb.BatchCommandsResponse_Response, 1), + canceled: 0, + err: nil, + } + ctx1, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + select { + case batchConn.batchCommandsCh <- entry: + case <-ctx1.Done(): + logutil.Logger(context.Background()).Warn("send request is cancelled", + zap.String("to", addr), zap.String("cause", ctx1.Err().Error())) + return nil, errors.Trace(ctx1.Err()) + } + + select { + case res, ok := <-entry.res: + if !ok { + return nil, errors.Trace(entry.err) + } + return tikvrpc.FromBatchCommandsResponse(res), nil + case <-ctx1.Done(): + atomic.StoreInt32(&entry.canceled, 1) + logutil.Logger(context.Background()).Warn("wait response is cancelled", + zap.String("to", addr), zap.String("cause", ctx1.Err().Error())) + return nil, errors.Trace(ctx1.Err()) + } +} + +func (c *rpcClient) recycleIdleConnArray() { + var addrs []string + c.RLock() + for _, conn := range c.conns { + if conn.idle { + addrs = append(addrs, conn.target) + } + } + c.RUnlock() + + for _, addr := range addrs { + c.Lock() + conn, ok := c.conns[addr] + if ok { + delete(c.conns, addr) + logutil.Logger(context.Background()).Info("recycle idle connection", + zap.String("target", addr)) + } + c.Unlock() + if conn != nil { + conn.Close() + } + } +} diff --git a/store/tikv/client_test.go b/store/tikv/client_test.go index 2b692f99e0ec0..081607f26db31 100644 --- a/store/tikv/client_test.go +++ b/store/tikv/client_test.go @@ -53,8 +53,6 @@ func (s *testClientSuite) TestConn(c *C) { c.Assert(err, IsNil) c.Assert(conn2.Get(), Not(Equals), conn1.Get()) - client.recycleIdleConnArray() - client.Close() conn3, err := client.getConnArray(addr) c.Assert(err, NotNil) @@ -88,7 +86,7 @@ func (s *testClientSuite) TestRemoveCanceledRequests(c *C) { func (s *testClientSuite) TestCancelTimeoutRetErr(c *C) { req := new(tikvpb.BatchCommandsRequest_Request) - a := &connArray{batchCommandsCh: make(chan *batchCommandsEntry, 1)} + a := newBatchConn(1, 1, nil) ctx, cancel := context.WithCancel(context.TODO()) cancel()