Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Orderly Consumer will not reconsume the message #650

Closed
kerthcet opened this issue Apr 23, 2021 · 8 comments · Fixed by #693
Closed

Orderly Consumer will not reconsume the message #650

kerthcet opened this issue Apr 23, 2021 · 8 comments · Fixed by #693
Labels
bug Something isn't working discuss Something undering disscussing

Comments

@kerthcet
Copy link

kerthcet commented Apr 23, 2021

we try to use the orderly consumer, but when we return the SuspendCurrentQueueAMoment , we found the message will not be reconsumed again.

Another problem is that when we send another message which commit ConsumeSuccess, all the messages including the blocked ones will commited all at once, which will lead to message missing.

how we init the consumer :

	c.consumer, err = rocketmq.NewPushConsumer(
		consumer.WithGroupName(c.GroupName),
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{config.ENV.MQConnectionString})),
		consumer.WithConsumerModel(consumer.Clustering),
		consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
		consumer.WithConsumerOrder(true),
		consumer.WithMaxReconsumeTimes(5),
	)
@kerthcet
Copy link
Author

well, I found that If I consumer message failed and return SuspendCurrentQueueAMoment, then next time, when I pull messages again, I will only get the latest messages. but if i restart the container, I will receive messages from the earliest offset, then I will get the former failed messges.

@qianlongzt
Copy link
Contributor

may be rel here bug #618

@kerthcet
Copy link
Author

kerthcet commented May 5, 2021

may be rel here bug #618

yep, I check the code, but i still have a question. may pq.msgCache.Put(msg.QueueOffset, msg) dismatch cachedMsgCount, as cachedMsgCount means msgCache.Size()

@kerthcet
Copy link
Author

kerthcet commented May 5, 2021

may be rel here bug #618

yep, I check the code, but i still have a question. may pq.msgCache.Put(msg.QueueOffset, msg) dismatch cachedMsgCount, as cachedMsgCount means msgCache.Size()

If message not in msgCache, but in consumingMsgOrderlyTreeMap, then message will put back to msgCache, but the cachedMsgCount will not increase.

@qianlongzt
Copy link
Contributor

type processQueue struct {
	cachedMsgCount             int64
	cachedMsgSize              int64

	msgCache                   *treemap.Map
	consumingMsgOrderlyTreeMap *treemap.Map
}

i think cachedMsgCount mean pulled msgCount ,now equal msgCache.Size() + consumingMsgOrderlyTreeMap.Size(), used by

if pq.cachedMsgCount > pc.option.PullThresholdForQueue {

first problem is #615 ,if consuming orderly and failed, the cachedMsgCount will increase and can't consume any more.
I send a pr to fix #616 ,but got another problem( #618 msg lost when failed).
so pr #619 make cachedMsgCount and msg both ok

@kerthcet
Copy link
Author

kerthcet commented May 5, 2021

type processQueue struct {
	cachedMsgCount             int64
	cachedMsgSize              int64

	msgCache                   *treemap.Map
	consumingMsgOrderlyTreeMap *treemap.Map
}

i think cachedMsgCount mean pulled msgCount ,now equal msgCache.Size() + consumingMsgOrderlyTreeMap.Size(), used by

if pq.cachedMsgCount > pc.option.PullThresholdForQueue {

first problem is #615 ,if consuming orderly and failed, the cachedMsgCount will increase and can't consume any more.
I send a pr to fix #616 ,but got another problem( #618 msg lost when failed).
so pr #619 make cachedMsgCount and msg both ok

as you said, cachedMsgCount = msgCache.Size() + consumingMsgOrderlyTreeMap.Size(), if you put one message back to msgCache, the cachedMsgCount should plus one at the same time. Or else, you just mean the total amount of the cachedMsgCount should not change, but the message will in both msgCache and consumingMsgOrderlyTreeMap, so it's not necessary to increate the cachedMsgCount

@kerthcet
Copy link
Author

kerthcet commented May 5, 2021

I review the code, I think the problem is the funciton takeMessages can only get the new messages with pq.msgCache.Min(), and the new taken messages will commit successfully, leading to the message missing. an i right?

@wenfengwang wenfengwang added bug Something isn't working discuss Something undering disscussing labels Jul 1, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working discuss Something undering disscussing
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants