Skip to content

Commit

Permalink
[ISSUE #996] invoke user callback and return error info when async pr…
Browse files Browse the repository at this point in the history
…ocess send resp error (#997)
  • Loading branch information
ferrirW authored Feb 13, 2023
1 parent a2cf953 commit c8d06a6
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
3 changes: 1 addition & 2 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,7 @@ func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC
case ResSuccess:
status = primitive.SendOK
default:
status = primitive.SendUnknownError
return errors.New(cmd.Remark)
return errors.New(fmt.Sprintf("CODE: %d, DESC: %s", cmd.Code, cmd.Remark))
}

msgIDs := make([]string, 0)
Expand Down
11 changes: 8 additions & 3 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,18 @@ func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message,
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
err := p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) {
cancel()
if err != nil {
h(ctx, nil, err)
}

resp := primitive.NewSendResult()
err = p.client.ProcessSendResponse(mq.BrokerName, command, resp, msg)
if err != nil {
h(ctx, nil, err)
} else {
p.client.ProcessSendResponse(mq.BrokerName, command, resp, msg)
h(ctx, resp, nil)
return
}

h(ctx, resp, nil)
})

if err != nil {
Expand Down

0 comments on commit c8d06a6

Please sign in to comment.