Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clean = false subscriber offline messages not sent in order #649

Closed
CWilliamL opened this issue Jul 2, 2021 · 13 comments · Fixed by #777
Closed

clean = false subscriber offline messages not sent in order #649

CWilliamL opened this issue Jul 2, 2021 · 13 comments · Fixed by #777
Labels

Comments

@CWilliamL
Copy link

I hosted 2 aedes brokers connected to mongodb as a cluster. I was trying to publish messages continuously (20 messages per second) and receive them in subscriber side. I noticed when I disconnected the subscriber and restart it while the publisher keeps publish messages, the stored messages were mixed with the incoming message.
E.g.
Testing {"Temperature": 19, "Time": "07/02 14:06:09", "count": 88}
Testing {"Temperature": 14, "Time": "07/02 14:06:09", "count": 89}
Testing {"Temperature": 30, "Time": "07/02 14:06:09", "count": 90}
Testing {"Temperature": 14, "Time": "07/02 14:06:09", "count": 91}
Testing {"Temperature": 10, "Time": "07/02 14:06:09", "count": 92}
Testing {"Temperature": 3, "Time": "07/02 14:06:09", "count": 93}
Testing {"Temperature": 5, "Time": "07/02 14:06:14", "count": 192} <---incoming message
Testing {"Temperature": 24, "Time": "07/02 14:06:09", "count": 94}
Testing {"Temperature": 3, "Time": "07/02 14:06:09", "count": 95}
Testing {"Temperature": 29, "Time": "07/02 14:06:09", "count": 96}
Testing {"Temperature": 27, "Time": "07/02 14:06:09", "count": 97}
Testing {"Temperature": 2, "Time": "07/02 14:06:09", "count": 98}
Testing {"Temperature": 15, "Time": "07/02 14:06:09", "count": 99}
Is there a way to fix this problem? Thank you.

@robertsLando
Copy link
Member

With 'stored' messages you mean retained messages or are you using clean flag set to false when connecting to the broker (persistent session)?

@CWilliamL
Copy link
Author

Clean flag set to false. Client can get back the messages that are not got previously.

@robertsLando
Copy link
Member

I cannot find in mqtt specs something that specify how the messages should be processed in order or not while restoring a session, btw cannot you simply use the time in the payload to identify the correct order?

@mcollina do you think this is an error? Should we pause client while sending session messages?

@robertsLando
Copy link
Member

I think this could be easily fixed by moving client.resume at the end of the emptyQueue function

@CWilliamL
Copy link
Author

In my future development the payload may not have the time information so may not be order using it.
Additionally, I tried another mqtt broker and it do not have this issue, so I think it could be fixed.

@CWilliamL
Copy link
Author

I think this could be easily fixed by moving client.resume at the end of the emptyQueue function

Just try to move client.resume() to like below. The result is the same.
function emptyQueue (arg, done) { ... pipline( ... done ) client.resume() }

@robertsLando
Copy link
Member

@CWilliamL You should edit it like this:

function emptyQueue (arg, done) {
  const client = this.client
  const persistence = client.broker.persistence
  const outgoing = persistence.outgoingStream(client)

  pipeline(
    outgoing,
    through(function clearQueue (data, enc, next) {
      const packet = new QoSPacket(data, client)
      // Here we are deliberatly passing only the error
      // This is because there is no destination stream so the "client"
      // Object filled the buffer up to the highWaterMark preventing stored messages
      // being sent
      packet.writeCallback = (error, _client) => next(error)
      persistence.outgoingUpdate(client, packet, emptyQueueFilter)
    }),
    function (err) {
	    client.resume()
		done(err)
	}
  )
}

@CWilliamL
Copy link
Author

@CWilliamL You should edit it like this:

function emptyQueue (arg, done) {
  const client = this.client
  const persistence = client.broker.persistence
  const outgoing = persistence.outgoingStream(client)

  pipeline(
    outgoing,
    through(function clearQueue (data, enc, next) {
      const packet = new QoSPacket(data, client)
      // Here we are deliberatly passing only the error
      // This is because there is no destination stream so the "client"
      // Object filled the buffer up to the highWaterMark preventing stored messages
      // being sent
      packet.writeCallback = (error, _client) => next(error)
      persistence.outgoingUpdate(client, packet, emptyQueueFilter)
    }),
    function (err) {
	    client.resume()
		done(err)
	}
  )
}

Still the same...

@robertsLando
Copy link
Member

Yep, sorry, that pauses the client incoming stream, so this has no effect

@robertsLando
Copy link
Member

robertsLando commented Jul 2, 2021

The other solution would be to move the emptyQueue before the fetchSubs metohd in connectActions

But I dunno if this respects specs. @mcollina thoughts?

@CWilliamL
Copy link
Author

The other solution would be to move the emptyQueue before the fetchSubs metohd in connectActions

But I dunno if this respects specs. @mcollina thoughts?

Just tried. The stored messages are now showed before the incoming message. However, the incoming message during restoring stored message are not shown.
Testing {"Temperature": 11, "Time": "07/02 17:01:52", "count": 190}
Testing {"Temperature": 15, "Time": "07/02 17:01:52", "count": 191}
Testing {"Temperature": 16, "Time": "07/02 17:01:52", "count": 192}
Connected with result code 0
Testing {"Temperature": 9, "Time": "07/02 17:01:52", "count": 201}
Testing {"Temperature": 26, "Time": "07/02 17:01:52", "count": 202}
Testing {"Temperature": 12, "Time": "07/02 17:01:53", "count": 203}

I can only get back those messages when I restart the subscriber.

@seriousme
Copy link
Contributor

When a Server processes a message that has been published to an Ordered Topic, it MUST send PUBLISH packets to
consumers (for the same Topic and QoS) in the order that they were received from any given Client [MQTT-4.6.0-5]
So order must be preserved.

My guess would be that the persistence.outgoingUpdate(client, packet, emptyQueueFilter) hapens before the queue is empty and therefore new messages are not queued anymore but directly send and hence mix with messages that were still enqueued.

Kind regards,
Hans

@seriousme
Copy link
Contributor

Just noticed that the persistence.outgoingUpdate(client, packet, emptyQueueFilter) is part of the stream transform so that cannot be the issue.

Client resume indeed only seems to pause the incoming stream.

I don't have any better suggestions at this point.

Cheers,
Hans

@robertsLando robertsLando changed the title Subscriber side message not in order clean = false subscriber offline messages not sent in order Feb 11, 2022
@robertsLando robertsLando linked a pull request Sep 15, 2022 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants