Skip to content

Commit

Permalink
[ISSUE #1034] fix: the message may be cleaned when the message has no…
Browse files Browse the repository at this point in the history
…t been consumed
  • Loading branch information
humkum authored Apr 27, 2023
1 parent c77a95d commit 63a5a1f
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 9 deletions.
26 changes: 18 additions & 8 deletions consumer/process_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,22 +259,32 @@ func (pq *processQueue) cleanExpiredMsg(pc *pushConsumer) {
"time": startTime,
rlog.LogKeyUnderlayError: err,
})
pq.mutex.RUnlock()
continue
}
if time.Now().UnixNano()/1e6-st <= int64(pc.option.ConsumeTimeout/time.Millisecond) {
pq.mutex.RUnlock()
return
}
}
pq.mutex.RUnlock()

if !pc.sendMessageBack("", msg, int(3+msg.ReconsumeTimes)) {
rlog.Error("send message back to broker error when clean expired messages", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
rlog.Info("send expire msg back. ", map[string]interface{}{
rlog.LogKeyTopic: msg.Topic,
rlog.LogKeyMessageId: msg.MsgId,
"startTime": startTime,
rlog.LogKeyStoreHost: msg.StoreHost,
rlog.LogKeyQueueId: msg.Queue.QueueId,
rlog.LogKeyQueueOffset: msg.QueueOffset,
})
continue
pq.mutex.RUnlock()
if !pc.sendMessageBack("", msg, int(3+msg.ReconsumeTimes)) {
rlog.Error("send message back to broker error when clean expired messages", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
continue
}
pq.removeMessage(msg)
} else {
pq.mutex.RUnlock()
}
pq.removeMessage(msg)
}
}

Expand Down
21 changes: 20 additions & 1 deletion consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,8 +1133,19 @@ func (pc *pushConsumer) consumeMessageConcurrently(pq *processQueue, mq *primiti

consumeRT := time.Now().Sub(beginTime)
if err != nil {
rlog.Warning("consumeMessageCurrently error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
rlog.LogKeyMessages: msgs,
rlog.LogKeyMessageQueue: mq,
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
msgCtx.Properties[primitive.PropCtxType] = string(primitive.ExceptionReturn)
} else if consumeRT >= pc.option.ConsumeTimeout {
rlog.Warning("consumeMessageCurrently time out", map[string]interface{}{
rlog.LogKeyMessages: msgs,
rlog.LogKeyMessageQueue: mq,
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
msgCtx.Properties[primitive.PropCtxType] = string(primitive.TimeoutReturn)
} else if result == ConsumeSuccess {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
Expand Down Expand Up @@ -1262,7 +1273,15 @@ func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.Me
ctx = primitive.WithOrderlyCtx(ctx, orderlyCtx)

pq.lockConsume.Lock()
result, _ := pc.consumeInner(ctx, msgs)
result, err := pc.consumeInner(ctx, msgs)
if err != nil {
rlog.Warning("consumeMessage orderly error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
rlog.LogKeyMessages: msgs,
rlog.LogKeyMessageQueue: mq.String(),
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
}
pq.lockConsume.Unlock()

if result == Rollback || result == SuspendCurrentQueueAMoment {
Expand Down
5 changes: 5 additions & 0 deletions rlog/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ const (
LogKeyValueChangedTo = "changeTo"
LogKeyPullRequest = "PullRequest"
LogKeyTimeStamp = "timestamp"
LogKeyMessageId = "msgId"
LogKeyStoreHost = "storeHost"
LogKeyQueueId = "queueId"
LogKeyQueueOffset = "queueOffset"
LogKeyMessages = "messages"
)

type Logger interface {
Expand Down

0 comments on commit 63a5a1f

Please sign in to comment.