Skip to content

Commit

Permalink
Fix the usage bug of namespace
Browse files Browse the repository at this point in the history
Signed-off-by: zhangyang21 <[email protected]>
  • Loading branch information
Git-Yang committed May 20, 2021
1 parent dfa26d1 commit 4f1c53c
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 12 deletions.
38 changes: 27 additions & 11 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ func (p *defaultProducer) SendSync(ctx context.Context, msgs ...*primitive.Messa
return nil, err
}

p.messagesWithNamespace(msgs...)

msg := p.encodeBatch(msgs...)

resp := primitive.NewSendResult()
Expand Down Expand Up @@ -179,10 +181,6 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
err error
)

if p.options.Namespace != "" {
msg.Topic = p.options.Namespace + "%" + msg.Topic
}

var producerCtx *primitive.ProducerCtx
for retryCount := 0; retryCount < retryTime; retryCount++ {
mq := p.selectMessageQueue(msg)
Expand Down Expand Up @@ -217,6 +215,8 @@ func (p *defaultProducer) SendAsync(ctx context.Context, f func(context.Context,
return err
}

p.messagesWithNamespace(msgs...)

msg := p.encodeBatch(msgs...)

if p.interceptor != nil {
Expand All @@ -230,9 +230,7 @@ func (p *defaultProducer) SendAsync(ctx context.Context, f func(context.Context,
}

func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message, h func(context.Context, *primitive.SendResult, error)) error {
if p.options.Namespace != "" {
msg.Topic = p.options.Namespace + "%" + msg.Topic
}

mq := p.selectMessageQueue(msg)
if mq == nil {
return errors.Errorf("the topic=%s route info not found", msg.Topic)
Expand Down Expand Up @@ -260,6 +258,8 @@ func (p *defaultProducer) SendOneWay(ctx context.Context, msgs ...*primitive.Mes
return err
}

p.messagesWithNamespace(msgs...)

msg := p.encodeBatch(msgs...)

if p.interceptor != nil {
Expand All @@ -275,10 +275,6 @@ func (p *defaultProducer) SendOneWay(ctx context.Context, msgs ...*primitive.Mes
func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message) error {
retryTime := 1 + p.options.RetryTimes

if p.options.Namespace != "" {
msg.Topic = p.options.Namespace + "%" + msg.Topic
}

var err error
for retryCount := 0; retryCount < retryTime; retryCount++ {
mq := p.selectMessageQueue(msg)
Expand All @@ -302,6 +298,26 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message
return err
}

func (p *defaultProducer) withNamespace(resource string) string {

if p.options.Namespace != "" {
return p.options.Namespace + "%" + resource
}

return resource
}

func (p *defaultProducer) messagesWithNamespace(msgs ...*primitive.Message) {

if p.options.Namespace == "" {
return
}

for _, msg := range msgs {
msg.Topic = p.withNamespace(msg.Topic)
}
}

func (p *defaultProducer) tryCompressMsg(msg *primitive.Message) bool {
if msg.Compress {
return true
Expand Down
66 changes: 65 additions & 1 deletion producer/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import (
)

const (
topic = "TopicTest"
topic = "TopicTest"
namespaceTopic = "Test%TopicTest"
)

func TestShutdown(t *testing.T) {
Expand Down Expand Up @@ -83,6 +84,16 @@ func mockB4Send(p *defaultProducer) {
},
},
})
p.publishInfo.Store(namespaceTopic, &internal.TopicPublishInfo{
HaveTopicRouterInfo: true,
MqList: []*primitive.MessageQueue{
{
Topic: namespaceTopic,
BrokerName: "aa",
QueueId: 0,
},
},
})
p.options.Namesrv.AddBroker(&internal.TopicRouteData{
BrokerDataList: []*internal.BrokerData{
{
Expand Down Expand Up @@ -245,3 +256,56 @@ func TestOneway(t *testing.T) {
err = p.SendOneWay(ctx, msg)
assert.Nil(t, err)
}

func TestSyncWithNamespace(t *testing.T) {
p, _ := NewDefaultProducer(
WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
WithRetry(2),
WithQueueSelector(NewManualQueueSelector()),
WithNamespace("Test"),
)

ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := internal.NewMockRMQClient(ctrl)
p.client = client

client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
client.EXPECT().Start().Return()
err := p.Start()
assert.Nil(t, err)

ctx := context.Background()
msg := &primitive.Message{
Topic: topic,
Body: []byte("this is a message body"),
Queue: &primitive.MessageQueue{
Topic: namespaceTopic,
BrokerName: "aa",
QueueId: 0,
},
}
msg.WithProperty("key", "value")

expectedResp := &primitive.SendResult{
Status: primitive.SendOK,
MsgID: "111",
QueueOffset: 0,
OffsetMsgID: "0",
}

mockB4Send(p)

client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)
client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(
func(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) {
resp.Status = expectedResp.Status
resp.MsgID = expectedResp.MsgID
resp.QueueOffset = expectedResp.QueueOffset
resp.OffsetMsgID = expectedResp.OffsetMsgID
})
resp, err := p.SendSync(ctx, msg)
assert.Nil(t, err)
assert.Equal(t, expectedResp, resp)
assert.Equal(t, namespaceTopic, msg.Topic)
}

0 comments on commit 4f1c53c

Please sign in to comment.