Skip to content

Commit

Permalink
fix: select one message queue of different broker when retry to send (#…
Browse files Browse the repository at this point in the history
…1014)

Co-authored-by: dengzhiwen1 <[email protected]>
  • Loading branch information
cserwen and dengzhiwen1 authored Jun 28, 2023
1 parent a15c777 commit 533de03
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 24 deletions.
2 changes: 2 additions & 0 deletions internal/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const (
ResFlushDiskTimeout = int16(10)
ResSlaveNotAvailable = int16(11)
ResFlushSlaveTimeout = int16(12)
ResServiceNotAvailable = int16(14)
ResNoPermission = int16(16)
ResTopicNotExist = int16(17)
ResPullNotFound = int16(19)
ResPullRetryImmediately = int16(20)
Expand Down
55 changes: 47 additions & 8 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,21 @@ func MarshalMessageBatch(msgs ...*primitive.Message) []byte {
return buffer.Bytes()
}

func needRetryCode(code int16) bool {
switch code {
case internal.ResTopicNotExist:
return true
case internal.ResServiceNotAvailable:
return true
case internal.ResError:
return true
case internal.ResNoPermission:
return true
default:
return false
}
}

func (p *defaultProducer) prepareSendRequest(msg *primitive.Message, ttl time.Duration) (string, error) {
correlationId := uuid.New().String()
requestClientId := p.client.ClientID()
Expand Down Expand Up @@ -301,19 +316,31 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,

var (
err error
mq *primitive.MessageQueue
)

var (
producerCtx *primitive.ProducerCtx
ok bool
)
for retryCount := 0; retryCount < retryTime; retryCount++ {
mq := p.selectMessageQueue(msg)
var lastBrokerName string
if mq != nil {
lastBrokerName = mq.BrokerName
}
mq := p.selectMessageQueue(msg, lastBrokerName)
if mq == nil {
err = fmt.Errorf("the topic=%s route info not found", msg.Topic)
continue
}

if lastBrokerName != "" {
rlog.Warning("start retrying to send, ", map[string]interface{}{
"lastBroker": lastBrokerName,
"newBroker": mq.BrokerName,
})
}

addr := p.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
if addr == "" {
return fmt.Errorf("topic=%s route info not found", mq.Topic)
Expand All @@ -333,6 +360,10 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
err = _err
continue
}

if needRetryCode(res.Code) && retryCount < retryTime-1 {
continue
}
return p.client.ProcessSendResponse(mq.BrokerName, res, resp, msg)
}
return err
Expand All @@ -359,7 +390,7 @@ func (p *defaultProducer) SendAsync(ctx context.Context, f func(context.Context,

func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message, h func(context.Context, *primitive.SendResult, error)) error {

mq := p.selectMessageQueue(msg)
mq := p.selectMessageQueue(msg, "")
if mq == nil {
return errors.Errorf("the topic=%s route info not found", msg.Topic)
}
Expand Down Expand Up @@ -416,8 +447,13 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message
retryTime := 1 + p.options.RetryTimes

var err error
var mq *primitive.MessageQueue
for retryCount := 0; retryCount < retryTime; retryCount++ {
mq := p.selectMessageQueue(msg)
var lastBrokerName string
if mq != nil {
lastBrokerName = mq.BrokerName
}
mq = p.selectMessageQueue(msg, lastBrokerName)
if mq == nil {
err = fmt.Errorf("the topic=%s route info not found", msg.Topic)
continue
Expand Down Expand Up @@ -554,13 +590,16 @@ func (p *defaultProducer) tryToFindTopicPublishInfo(topic string) *internal.Topi
return result
}

func (p *defaultProducer) selectMessageQueue(msg *primitive.Message) *primitive.MessageQueue {
topic := msg.Topic
result := p.tryToFindTopicPublishInfo(topic)
if result == nil {
func (p *defaultProducer) selectMessageQueue(msg *primitive.Message, lastBrokerName string) *primitive.MessageQueue {
result := p.tryToFindTopicPublishInfo(msg.Topic)
if result == nil || len(result.MqList) == 0 {
rlog.Warning("topic route info is nil or empty", map[string]interface{}{
rlog.LogKeyTopic: msg.Topic,
"result": result,
})
return nil
}
return p.options.Selector.Select(msg, result.MqList)
return p.options.Selector.Select(msg, result.MqList, lastBrokerName)
}

func (p *defaultProducer) PublishTopicList() []string {
Expand Down
12 changes: 10 additions & 2 deletions producer/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ func TestSync(t *testing.T) {

mockB4Send(p)

client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)
cmd := &remote.RemotingCommand{
Code: internal.ResSuccess,
}

client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(cmd, nil)
client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(
func(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) {
resp.Status = expectedResp.Status
Expand Down Expand Up @@ -309,7 +313,11 @@ func TestSyncWithNamespace(t *testing.T) {

mockB4Send(p)

client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)
cmd := &remote.RemotingCommand{
Code: internal.ResSuccess,
}

client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(cmd, nil)
client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(
func(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) {
resp.Status = expectedResp.Status
Expand Down
36 changes: 28 additions & 8 deletions producer/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

type QueueSelector interface {
Select(*primitive.Message, []*primitive.MessageQueue) *primitive.MessageQueue
Select(msg *primitive.Message, mqs []*primitive.MessageQueue, lastBrokerName string) *primitive.MessageQueue
}

// manualQueueSelector use the queue manually set in the provided Message's QueueID field as the queue to send.
Expand All @@ -37,7 +37,7 @@ func NewManualQueueSelector() QueueSelector {
return new(manualQueueSelector)
}

func (manualQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
func (manualQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue, lastBrokerName string) *primitive.MessageQueue {
return message.Queue
}

Expand All @@ -53,7 +53,7 @@ func NewRandomQueueSelector() QueueSelector {
return s
}

func (r *randomQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
func (r *randomQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue, lastBrokerName string) *primitive.MessageQueue {
r.mux.Lock()
i := r.rander.Intn(len(queues))
r.mux.Unlock()
Expand All @@ -74,19 +74,39 @@ func NewRoundRobinQueueSelector() QueueSelector {
return s
}

func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
func (r *roundRobinQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue, lastBrokerName string) *primitive.MessageQueue {
t := message.Topic
var idx *uint32

r.Lock()
defer r.Unlock()
if lastBrokerName != "" {
for i := 0; i < len(queues); i++ {
idx, exist := r.indexer[t]
if !exist {
var v uint32 = 0
idx = &v
r.indexer[t] = idx
}
*idx++
qIndex := *idx % uint32(len(queues))
if queues[qIndex].BrokerName != lastBrokerName {
return queues[qIndex]
}
}
}
return r.selectOneMessageQueue(t, queues)
}

func (r *roundRobinQueueSelector) selectOneMessageQueue(t string, queues []*primitive.MessageQueue) *primitive.MessageQueue {
var idx *uint32

idx, exist := r.indexer[t]
if !exist {
var v uint32 = 0
idx = &v
r.indexer[t] = idx
}
*idx++
r.Unlock()

qIndex := *idx % uint32(len(queues))
return queues[qIndex]
Expand All @@ -103,10 +123,10 @@ func NewHashQueueSelector() QueueSelector {
}

// hashQueueSelector choose the queue by hash if message having sharding key, otherwise choose queue by random instead.
func (h *hashQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue) *primitive.MessageQueue {
func (h *hashQueueSelector) Select(message *primitive.Message, queues []*primitive.MessageQueue, lastBrokerName string) *primitive.MessageQueue {
key := message.GetShardingKey()
if len(key) == 0 {
return h.random.Select(message, queues)
return h.random.Select(message, queues, lastBrokerName)
}

hasher := fnv.New32a()
Expand Down
37 changes: 31 additions & 6 deletions producer/selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

func TestRoundRobin(t *testing.T) {
queues := make([]*primitive.MessageQueue, 10)
queues := make([]*primitive.MessageQueue, 0)
for i := 0; i < 10; i++ {
queues = append(queues, &primitive.MessageQueue{
QueueId: i,
Expand All @@ -41,18 +41,43 @@ func TestRoundRobin(t *testing.T) {
Topic: "rr",
}
for i := 0; i < 100; i++ {
q := s.Select(m, queues)
q := s.Select(m, queues, "")
expected := (i + 1) % len(queues)
assert.Equal(t, queues[expected], q, "i: %d", i)

qrr := s.Select(mrr, queues)
qrr := s.Select(mrr, queues, "")
expected = (i + 1) % len(queues)
assert.Equal(t, queues[expected], qrr, "i: %d", i)
}
}

func TestRoundRobinRetry(t *testing.T) {
queues := make([]*primitive.MessageQueue, 0)
brokerA := "brokerA"
brokerB := "brokerB"
for i := 0; i < 5; i++ {
queues = append(queues, &primitive.MessageQueue{
QueueId: i,
BrokerName: brokerA,
})
queues = append(queues, &primitive.MessageQueue{
QueueId: i,
BrokerName: brokerB,
})
}
s := NewRoundRobinQueueSelector()

m := &primitive.Message{
Topic: "test",
}
for i := 0; i < 100; i++ {
q := s.Select(m, queues, brokerA)
assert.Equal(t, brokerB, q.BrokerName)
}
}

func TestHashQueueSelector(t *testing.T) {
queues := make([]*primitive.MessageQueue, 10)
queues := make([]*primitive.MessageQueue, 0)
for i := 0; i < 10; i++ {
queues = append(queues, &primitive.MessageQueue{
QueueId: i,
Expand All @@ -66,13 +91,13 @@ func TestHashQueueSelector(t *testing.T) {
Body: []byte("one message"),
}
m1.WithShardingKey("same_key")
q1 := s.Select(m1, queues)
q1 := s.Select(m1, queues, "")

m2 := &primitive.Message{
Topic: "test",
Body: []byte("another message"),
}
m2.WithShardingKey("same_key")
q2 := s.Select(m2, queues)
q2 := s.Select(m2, queues, "")
assert.Equal(t, *q1, *q2)
}

0 comments on commit 533de03

Please sign in to comment.