From 506e50e31d8c4a73f645f97bb043a30db5996856 Mon Sep 17 00:00:00 2001 From: fengberlin Date: Tue, 29 Jun 2021 11:57:06 +0800 Subject: [PATCH] fix: call cancel() as soon as possible to release the resources associatd with context --- internal/client.go | 12 +++++++++--- internal/remote/remote_client_test.go | 6 ++++-- internal/route.go | 4 +++- internal/trace.go | 3 ++- producer/producer.go | 4 +++- 5 files changed, 21 insertions(+), 8 deletions(-) diff --git a/internal/client.go b/internal/client.go index bf0437ad..6e665eaa 100644 --- a/internal/client.go +++ b/internal/client.go @@ -448,7 +448,9 @@ func (c *rmqClient) InvokeSync(ctx context.Context, addr string, request *remote if c.close { return nil, ErrServiceState } - ctx, _ = context.WithTimeout(ctx, timeoutMillis) + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeoutMillis) + defer cancel() return c.remoteClient.InvokeSync(ctx, addr, request) } @@ -524,14 +526,16 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() { } cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode()) - ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) response, err := c.remoteClient.InvokeSync(ctx, addr, cmd) if err != nil { + cancel() rlog.Warning("send heart beat to broker error", map[string]interface{}{ rlog.LogKeyUnderlayError: err, }) return true } + cancel() if response.Code == ResSuccess { c.namesrvs.AddBrokerVersion(brokerName, addr, int32(response.Version)) rlog.Debug("send heart beat to broker success", map[string]interface{}{ @@ -633,7 +637,9 @@ func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC // PullMessage with sync func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader) (*primitive.PullResult, error) { cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil) - ctx, _ = context.WithTimeout(ctx, 30*time.Second) + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, 30*time.Second) + defer cancel() res, err := c.remoteClient.InvokeSync(ctx, brokerAddrs, cmd) if err != nil { return nil, err diff --git a/internal/remote/remote_client_test.go b/internal/remote/remote_client_test.go index 5b69fd66..f1606742 100644 --- a/internal/remote/remote_client_test.go +++ b/internal/remote/remote_client_test.go @@ -77,7 +77,8 @@ func TestResponseFutureTimeout(t *testing.T) { } func TestResponseFutureWaitResponse(t *testing.T) { - ctx, _ := context.WithTimeout(context.Background(), time.Duration(1000)) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(1000)) + defer cancel() future := NewResponseFuture(ctx, 10, nil) if _, err := future.waitResponse(); err != utils.ErrRequestTimeout { t.Errorf("wrong ResponseFuture waitResponse. want=%v, got=%v", @@ -289,7 +290,8 @@ func TestInvokeAsyncTimeout(t *testing.T) { clientSend.Add(1) go func() { clientSend.Wait() - ctx, _ := context.WithTimeout(context.Background(), time.Duration(10*time.Second)) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(10*time.Second)) + defer cancel() err := client.InvokeAsync(ctx, addr, clientSendRemtingCommand, func(r *ResponseFuture) { assert.NotNil(t, r.Err) diff --git a/internal/route.go b/internal/route.go index 09b6e53a..e1bc1ff5 100644 --- a/internal/route.go +++ b/internal/route.go @@ -351,12 +351,14 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData, for i := 0; i < s.Size(); i++ { rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil) - ctx, _ := context.WithTimeout(context.Background(), requestTimeout) + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) response, err = s.nameSrvClient.InvokeSync(ctx, s.getNameServerAddress(), rc) if err == nil { + cancel() break } + cancel() } if err != nil { rlog.Error("connect to namesrv failed.", map[string]interface{}{ diff --git a/internal/trace.go b/internal/trace.go index a7fee679..cef16349 100644 --- a/internal/trace.go +++ b/internal/trace.go @@ -458,7 +458,8 @@ func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, dat } var req = td.buildSendRequest(mq, msg) - ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) { resp := primitive.NewSendResult() if e != nil { diff --git a/producer/producer.go b/producer/producer.go index 8ebb660f..f3b5afe3 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -241,7 +241,9 @@ func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message, return errors.Errorf("topic=%s route info not found", mq.Topic) } - ctx, _ = context.WithTimeout(ctx, 3*time.Second) + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, 3*time.Second) + defer cancel() return p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) { resp := primitive.NewSendResult() if err != nil {