From 3eb9052ca3108d835e80b24a802a46014b22fb04 Mon Sep 17 00:00:00 2001 From: zhangyang21 Date: Thu, 20 May 2021 10:58:04 +0800 Subject: [PATCH] Fix the usage bug of namespace Signed-off-by: zhangyang21 --- producer/producer.go | 29 ++++++++++------- producer/producer_test.go | 66 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 83 insertions(+), 12 deletions(-) diff --git a/producer/producer.go b/producer/producer.go index 910bb23b..8ebb660f 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -143,6 +143,8 @@ func (p *defaultProducer) SendSync(ctx context.Context, msgs ...*primitive.Messa return nil, err } + p.messagesWithNamespace(msgs...) + msg := p.encodeBatch(msgs...) resp := primitive.NewSendResult() @@ -179,10 +181,6 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message, err error ) - if p.options.Namespace != "" { - msg.Topic = p.options.Namespace + "%" + msg.Topic - } - var producerCtx *primitive.ProducerCtx for retryCount := 0; retryCount < retryTime; retryCount++ { mq := p.selectMessageQueue(msg) @@ -217,6 +215,8 @@ func (p *defaultProducer) SendAsync(ctx context.Context, f func(context.Context, return err } + p.messagesWithNamespace(msgs...) + msg := p.encodeBatch(msgs...) if p.interceptor != nil { @@ -230,9 +230,7 @@ func (p *defaultProducer) SendAsync(ctx context.Context, f func(context.Context, } func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message, h func(context.Context, *primitive.SendResult, error)) error { - if p.options.Namespace != "" { - msg.Topic = p.options.Namespace + "%" + msg.Topic - } + mq := p.selectMessageQueue(msg) if mq == nil { return errors.Errorf("the topic=%s route info not found", msg.Topic) @@ -260,6 +258,8 @@ func (p *defaultProducer) SendOneWay(ctx context.Context, msgs ...*primitive.Mes return err } + p.messagesWithNamespace(msgs...) + msg := p.encodeBatch(msgs...) if p.interceptor != nil { @@ -275,10 +275,6 @@ func (p *defaultProducer) SendOneWay(ctx context.Context, msgs ...*primitive.Mes func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message) error { retryTime := 1 + p.options.RetryTimes - if p.options.Namespace != "" { - msg.Topic = p.options.Namespace + "%" + msg.Topic - } - var err error for retryCount := 0; retryCount < retryTime; retryCount++ { mq := p.selectMessageQueue(msg) @@ -302,6 +298,17 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message return err } +func (p *defaultProducer) messagesWithNamespace(msgs ...*primitive.Message) { + + if p.options.Namespace == "" { + return + } + + for _, msg := range msgs { + msg.Topic = p.options.Namespace + "%" + msg.Topic + } +} + func (p *defaultProducer) tryCompressMsg(msg *primitive.Message) bool { if msg.Compress { return true diff --git a/producer/producer_test.go b/producer/producer_test.go index d1b88c3f..12cefb85 100644 --- a/producer/producer_test.go +++ b/producer/producer_test.go @@ -30,7 +30,8 @@ import ( ) const ( - topic = "TopicTest" + topic = "TopicTest" + namespaceTopic = "Test%TopicTest" ) func TestShutdown(t *testing.T) { @@ -83,6 +84,16 @@ func mockB4Send(p *defaultProducer) { }, }, }) + p.publishInfo.Store(namespaceTopic, &internal.TopicPublishInfo{ + HaveTopicRouterInfo: true, + MqList: []*primitive.MessageQueue{ + { + Topic: namespaceTopic, + BrokerName: "aa", + QueueId: 0, + }, + }, + }) p.options.Namesrv.AddBroker(&internal.TopicRouteData{ BrokerDataList: []*internal.BrokerData{ { @@ -245,3 +256,56 @@ func TestOneway(t *testing.T) { err = p.SendOneWay(ctx, msg) assert.Nil(t, err) } + +func TestSyncWithNamespace(t *testing.T) { + p, _ := NewDefaultProducer( + WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})), + WithRetry(2), + WithQueueSelector(NewManualQueueSelector()), + WithNamespace("Test"), + ) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + client := internal.NewMockRMQClient(ctrl) + p.client = client + + client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return() + client.EXPECT().Start().Return() + err := p.Start() + assert.Nil(t, err) + + ctx := context.Background() + msg := &primitive.Message{ + Topic: topic, + Body: []byte("this is a message body"), + Queue: &primitive.MessageQueue{ + Topic: namespaceTopic, + BrokerName: "aa", + QueueId: 0, + }, + } + msg.WithProperty("key", "value") + + expectedResp := &primitive.SendResult{ + Status: primitive.SendOK, + MsgID: "111", + QueueOffset: 0, + OffsetMsgID: "0", + } + + mockB4Send(p) + + client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) + client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do( + func(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) { + resp.Status = expectedResp.Status + resp.MsgID = expectedResp.MsgID + resp.QueueOffset = expectedResp.QueueOffset + resp.OffsetMsgID = expectedResp.OffsetMsgID + }) + resp, err := p.SendSync(ctx, msg) + assert.Nil(t, err) + assert.Equal(t, expectedResp, resp) + assert.Equal(t, namespaceTopic, msg.Topic) +}