diff --git a/client/client.go b/client/client.go index b9535aa504e..d417ebf6eda 100644 --- a/client/client.go +++ b/client/client.go @@ -799,6 +799,7 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur } func (c *client) getTSORequest(ctx context.Context, dcLocation string) *tsoRequest { + tsoReqPool := c.tsoClient.tsoReqPool req := tsoReqPool.Get().(*tsoRequest) // Set needed fields in the request before using it. req.start = time.Now() @@ -807,6 +808,7 @@ func (c *client) getTSORequest(ctx context.Context, dcLocation string) *tsoReque req.physical = 0 req.logical = 0 req.dcLocation = dcLocation + req.pool = tsoReqPool return req } diff --git a/client/tso_client.go b/client/tso_client.go index 5f8b12df36f..08aba0d69cc 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -51,16 +51,8 @@ type tsoRequest struct { physical int64 logical int64 dcLocation string -} -var tsoReqPool = sync.Pool{ - New: func() any { - return &tsoRequest{ - done: make(chan error, 1), - physical: 0, - logical: 0, - } - }, + pool *sync.Pool } func (req *tsoRequest) tryDone(err error) { @@ -84,6 +76,8 @@ type tsoClient struct { // tso allocator leader is switched. tsoAllocServingURLSwitchedCallback []func() + // tsoReqPool is the pool to recycle `*tsoRequest`. + tsoReqPool *sync.Pool // tsoDispatcher is used to dispatch different TSO requests to // the corresponding dc-location TSO channel. tsoDispatcher sync.Map // Same as map[string]*tsoDispatcher @@ -104,11 +98,20 @@ func newTSOClient( ) *tsoClient { ctx, cancel := context.WithCancel(ctx) c := &tsoClient{ - ctx: ctx, - cancel: cancel, - option: option, - svcDiscovery: svcDiscovery, - tsoStreamBuilderFactory: factory, + ctx: ctx, + cancel: cancel, + option: option, + svcDiscovery: svcDiscovery, + tsoStreamBuilderFactory: factory, + tsoReqPool: &sync.Pool{ + New: func() any { + return &tsoRequest{ + done: make(chan error, 1), + physical: 0, + logical: 0, + } + }, + }, checkTSDeadlineCh: make(chan struct{}), checkTSODispatcherCh: make(chan struct{}, 1), updateTSOConnectionCtxsCh: make(chan struct{}, 1), diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 88f8ffd61b5..0eb7eb51343 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -130,7 +130,7 @@ func (req *tsoRequest) Wait() (physical int64, logical int64, err error) { case err = <-req.done: defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqDone").End() err = errors.WithStack(err) - defer tsoReqPool.Put(req) + defer req.pool.Put(req) if err != nil { cmdFailDurationTSO.Observe(time.Since(req.start).Seconds()) return 0, 0, err