From be2b4c78a91020a1e73adf2577cf08109e985f87 Mon Sep 17 00:00:00 2001 From: crazycs Date: Tue, 19 Sep 2023 11:15:11 +0800 Subject: [PATCH] Revert "*: fix batch-client wait too long and add some metrics (#973)" (#984) This reverts commit adb7db13c3e6f94256831fe0fe80f4095d93d12a. Signed-off-by: crazycs520 --- internal/client/client.go | 2 +- internal/client/client_batch.go | 42 +++------------------------------ metrics/metrics.go | 26 ++++++-------------- 3 files changed, 11 insertions(+), 59 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index 3bf9b0ebf0..98d847a909 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -620,7 +620,7 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R // TiDB RPC server supports batch RPC, but batch connection will send heart beat, It's not necessary since // request to TiDB is not high frequency. - if config.GetGlobalConfig().TiKVClient.MaxBatchSize > 0 && enableBatch && !connArray.batchConn.isBusy(start.UnixNano()) { + if config.GetGlobalConfig().TiKVClient.MaxBatchSize > 0 && enableBatch { if batchReq := req.ToBatchCommandsRequest(); batchReq != nil { defer trace.StartRegion(ctx, req.Type.String()).End() return sendBatchRequest(ctx, addr, req.ForwardedHost, connArray.batchConn, batchReq, timeout) diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index eb58cb0333..b7159847b9 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -70,7 +70,6 @@ type batchCommandsEntry struct { // canceled indicated the request is canceled or not. canceled int32 err error - start time.Time } func (b *batchCommandsEntry) isCanceled() bool { @@ -198,16 +197,9 @@ type batchConn struct { pendingRequests prometheus.Observer batchSize prometheus.Observer - index uint32 - state atomic.Int32 - startHandingTime atomic.Int64 + index uint32 } -var ( - batchConnIdle = int32(0) - batchConnHanding = int32(1) -) - func newBatchConn(connCount, maxBatchSize uint, idleNotify *uint32) *batchConn { return &batchConn{ batchCommandsCh: make(chan *batchCommandsEntry, maxBatchSize), @@ -224,16 +216,6 @@ func (a *batchConn) isIdle() bool { return atomic.LoadUint32(&a.idle) != 0 } -func (a *batchConn) isBusy(now int64) bool { - if len(a.batchCommandsCh) == cap(a.batchCommandsCh) { - return true - } - if a.state.Load() == batchConnHanding && (now-a.startHandingTime.Load()) > int64(time.Second) { - return true - } - return false -} - // fetchAllPendingRequests fetches all pending requests from the channel. func (a *batchConn) fetchAllPendingRequests( maxBatchSize int, @@ -329,7 +311,6 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { bestBatchWaitSize := cfg.BatchWaitSize for { - a.state.Store(batchConnIdle) a.reqBuilder.reset() start := a.fetchAllPendingRequests(int(cfg.MaxBatchSize)) @@ -341,8 +322,6 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { } } - a.state.Store(batchConnHanding) - a.startHandingTime.Store(start.UnixNano()) if a.reqBuilder.len() < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 { // If the target TiKV is overload, wait a while to collect more requests. if atomic.LoadUint64(&a.tikvTransportLayerLoad) >= uint64(cfg.OverloadThreshold) { @@ -399,14 +378,11 @@ func (a *batchConn) getClientAndSend() { } defer cli.unlockForSend() - now := time.Now() - batchCmdWaitToSendDuration := metrics.TiKVBatchCmdDuration.WithLabelValues("wait-to-send", target) req, forwardingReqs := a.reqBuilder.build(func(id uint64, e *batchCommandsEntry) { cli.batched.Store(id, e) if trace.IsEnabled() { trace.Log(e.ctx, "rpc", "send") } - batchCmdWaitToSendDuration.Observe(float64(now.Sub(e.start))) }) if req != nil { cli.send("", req) @@ -531,14 +507,6 @@ func (c *batchCommandsClient) isStopped() bool { } func (c *batchCommandsClient) send(forwardedHost string, req *tikvpb.BatchCommandsRequest) { - start := time.Now() - defer func() { - if forwardedHost == "" { - metrics.TiKVBatchConnSendDuration.WithLabelValues(c.target).Observe(time.Since(start).Seconds()) - } else { - metrics.TiKVBatchConnSendDuration.WithLabelValues(forwardedHost).Observe(time.Since(start).Seconds()) - } - }() err := c.initBatchClient(forwardedHost) if err != nil { logutil.BgLogger().Warn( @@ -644,7 +612,6 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport } }() - batchCmdGotRespDuration := metrics.TiKVBatchCmdDuration.WithLabelValues("got-resp", c.target) epoch := atomic.LoadUint64(&c.epoch) for { resp, err := streamClient.recv() @@ -668,7 +635,6 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport } responses := resp.GetResponses() - now := time.Now() for i, requestID := range resp.GetRequestIds() { value, ok := c.batched.Load(requestID) if !ok { @@ -683,7 +649,6 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport trace.Log(entry.ctx, "rpc", "received") } logutil.Eventf(entry.ctx, "receive %T response with other %d batched requests from %s", responses[i].GetCmd(), len(responses), c.target) - batchCmdGotRespDuration.Observe(float64(now.Sub(entry.start))) if atomic.LoadInt32(&entry.canceled) == 0 { // Put the response only if the request is not canceled. entry.res <- responses[i] @@ -808,7 +773,6 @@ func sendBatchRequest( req *tikvpb.BatchCommandsRequest_Request, timeout time.Duration, ) (*tikvrpc.Response, error) { - start := time.Now() entry := &batchCommandsEntry{ ctx: ctx, req: req, @@ -816,11 +780,11 @@ func sendBatchRequest( forwardedHost: forwardedHost, canceled: 0, err: nil, - start: start, } timer := time.NewTimer(timeout) defer timer.Stop() + start := time.Now() select { case batchConn.batchCommandsCh <- entry: case <-ctx.Done(): @@ -831,7 +795,7 @@ func sendBatchRequest( return nil, errors.WithMessage(context.DeadlineExceeded, "wait sendLoop") } waitDuration := time.Since(start) - metrics.TiKVBatchCmdDuration.WithLabelValues("send-to-chan", addr).Observe(float64(waitDuration)) + metrics.TiKVBatchWaitDuration.Observe(float64(waitDuration)) select { case res, ok := <-entry.res: diff --git a/metrics/metrics.go b/metrics/metrics.go index 9328ebcd84..5c72f05b28 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -62,8 +62,7 @@ var ( TiKVLocalLatchWaitTimeHistogram prometheus.Histogram TiKVStatusDuration *prometheus.HistogramVec TiKVStatusCounter *prometheus.CounterVec - TiKVBatchConnSendDuration *prometheus.HistogramVec - TiKVBatchCmdDuration *prometheus.HistogramVec + TiKVBatchWaitDuration prometheus.Histogram TiKVBatchSendLatency prometheus.Histogram TiKVBatchWaitOverLoad prometheus.Counter TiKVBatchPendingRequests *prometheus.HistogramVec @@ -334,25 +333,15 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { ConstLabels: constLabels, }, []string{LblResult}) - TiKVBatchConnSendDuration = prometheus.NewHistogramVec( + TiKVBatchWaitDuration = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "batch_conn_send_seconds", - Buckets: prometheus.ExponentialBuckets(0.0005, 2, 22), // 0.5ms ~ 1048s - Help: "batch conn send duration", - ConstLabels: constLabels, - }, []string{LblStore}) - - TiKVBatchCmdDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "batch_cmd_duration", - Buckets: prometheus.ExponentialBuckets(16, 2, 36), // 16ns ~ 549s - Help: "batch cmd duration, unit is nanosecond", + Name: "batch_wait_duration", + Buckets: prometheus.ExponentialBuckets(1, 2, 34), // 1ns ~ 8s + Help: "batch wait duration", ConstLabels: constLabels, - }, []string{LblType, LblStore}) + }) TiKVBatchSendLatency = prometheus.NewHistogram( prometheus.HistogramOpts{ @@ -778,8 +767,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVLocalLatchWaitTimeHistogram) prometheus.MustRegister(TiKVStatusDuration) prometheus.MustRegister(TiKVStatusCounter) - prometheus.MustRegister(TiKVBatchConnSendDuration) - prometheus.MustRegister(TiKVBatchCmdDuration) + prometheus.MustRegister(TiKVBatchWaitDuration) prometheus.MustRegister(TiKVBatchSendLatency) prometheus.MustRegister(TiKVBatchRecvLatency) prometheus.MustRegister(TiKVBatchWaitOverLoad)