From c8d06a661a022097445cc7979290733a2cc86804 Mon Sep 17 00:00:00 2001 From: wangfan <42178996+ferrirW@users.noreply.github.com> Date: Mon, 13 Feb 2023 14:07:43 +0800 Subject: [PATCH] [ISSUE #996] invoke user callback and return error info when async process send resp error (#997) --- internal/client.go | 3 +-- producer/producer.go | 11 ++++++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/internal/client.go b/internal/client.go index 408e942c..d3c65a91 100644 --- a/internal/client.go +++ b/internal/client.go @@ -702,8 +702,7 @@ func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC case ResSuccess: status = primitive.SendOK default: - status = primitive.SendUnknownError - return errors.New(cmd.Remark) + return errors.New(fmt.Sprintf("CODE: %d, DESC: %s", cmd.Code, cmd.Remark)) } msgIDs := make([]string, 0) diff --git a/producer/producer.go b/producer/producer.go index fa2a8328..c8415e9f 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -373,13 +373,18 @@ func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message, ctx, cancel := context.WithTimeout(ctx, 3*time.Second) err := p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) { cancel() + if err != nil { + h(ctx, nil, err) + } + resp := primitive.NewSendResult() + err = p.client.ProcessSendResponse(mq.BrokerName, command, resp, msg) if err != nil { h(ctx, nil, err) - } else { - p.client.ProcessSendResponse(mq.BrokerName, command, resp, msg) - h(ctx, resp, nil) + return } + + h(ctx, resp, nil) }) if err != nil {