diff --git a/errors/errors.go b/errors/errors.go index 793fcda1..43b49ca9 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -47,4 +47,5 @@ var ( ErrMessageEmpty = errors.New("message is nil") ErrNotRunning = errors.New("producer not started") ErrPullConsumer = errors.New("pull consumer has not supported") + ErrMultipleTopics = errors.New("the topic of the messages in one batch should be the same") ) diff --git a/producer/producer.go b/producer/producer.go index 2ab24459..226eedb7 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -103,6 +103,14 @@ func (p *defaultProducer) checkMsg(msgs ...*primitive.Message) error { if len(msgs[0].Topic) == 0 { return errors2.ErrTopicEmpty } + + topic := msgs[0].Topic + for _, msg := range msgs { + if msg.Topic != topic { + return errors2.ErrMultipleTopics + } + } + return nil } diff --git a/producer/producer_test.go b/producer/producer_test.go index b6ec84d1..a7c15c1a 100644 --- a/producer/producer_test.go +++ b/producer/producer_test.go @@ -310,3 +310,37 @@ func TestSyncWithNamespace(t *testing.T) { assert.Equal(t, expectedResp, resp) assert.Equal(t, namespaceTopic, msg.Topic) } + +func TestBatchSendDifferentTopics(t *testing.T) { + p, _ := NewDefaultProducer( + WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})), + WithRetry(2), + WithQueueSelector(NewManualQueueSelector()), + ) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + client := internal.NewMockRMQClient(ctrl) + p.client = client + + client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return() + client.EXPECT().Start().Return() + err := p.Start() + assert.Nil(t, err) + + ctx := context.Background() + msgToA := &primitive.Message{ + Topic: "topic-A", + Body: []byte("this is a message body"), + } + + msgToB := &primitive.Message{ + Topic: "topic-B", + Body: []byte("this is a message body"), + } + + resp, err := p.SendSync(ctx, []*primitive.Message{msgToA, msgToB}...) + assert.Nil(t, resp) + assert.NotNil(t, err) + assert.Equal(t, err, errors.ErrMultipleTopics) +}