diff --git a/pkg/client/rocketmq/producer.go b/pkg/client/rocketmq/producer.go index cf44fec0f0..cc8f3e019c 100644 --- a/pkg/client/rocketmq/producer.go +++ b/pkg/client/rocketmq/producer.go @@ -146,3 +146,28 @@ func (pc *Producer) SendWithMsg(ctx context.Context, msg *primitive.Message) err } return nil } + +// SendWithResult rocket mq 发送消息,可以自定义选择 tag 及返回结果 +func (pc *Producer) SendWithResult(ctx context.Context, msg []byte, tag string) (*primitive.SendResult, error) { + m := primitive.NewMessage(pc.Topic, msg) + if tag != "" { + m.WithTag(tag) + } + + res, err := pc.SendSync(ctx, m) + if err != nil { + xlog.Jupiter().Error("send message error", xlog.Any("msg", string(msg))) + return res, err + } + return res, nil +} + +// SendMsg... 自定义消息格式 +func (pc *Producer) SendMsg(ctx context.Context, msg *primitive.Message) (*primitive.SendResult, error) { + res, err := pc.SendSync(ctx, msg) + if err != nil { + xlog.Jupiter().Error("send message error", xlog.Any("msg", msg)) + return res, err + } + return res, nil +}