Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling MQTT messages sent while ditto offline #1794

Conversation

dimabarbul
Copy link
Contributor

Resolves #1767

Current version of the PR contains only integration tests, so it's draft PR.

@thjaeckle thjaeckle changed the title Fix handling messages sent while ditto offline Fix handling MQTT messages sent while ditto offline Nov 6, 2023
@dimabarbul
Copy link
Contributor Author

I have some concerns regarding the PR, but at least it's at a point when the functionality looks working, so I mark it as ready for review.

Solution Description

There are 2 kinds of messages that are lost:

  1. Unlikely RabbitMQ, messages in MQTT are not stored in topics (or queues), they are stored in per-client sessions. So, when client that has existing session connects, it receives all missed messages before it subscribed to any topic. HiveMQ client in this case just auto-acknowledges the messages;
  2. Messages that are sent after the code has subscribed to topic but before it has attached listener to Flowable of publishes. In this case HiveMQ issues a warning "No publish flow registered for MqttStatefulPublish" and acknowledges the message (see related ticket).

The implemented solution extracts consuming messages into dedicated interface - GenericMqttConsumingClient. This interface subscribes to all messages with manual acknowledgement before the connection is established. It is used then as source for per-connection-source messages (including messages from previous session that match corresponding topics, they will be processed) and as source for messages from previous session that the connection is no longer interested in (they will be just acknowledged).

My Concerns about Implementation

  1. rxJava2 error

Sometimes during tests I see following error in console:

io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | com.hivemq.client.mqtt.exceptions.MqttSessionExpiredException: Session expired as connection was closed.
	at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
	at io.reactivex.subjects.PublishSubject$PublishDisposable.onError(PublishSubject.java:314)
	at io.reactivex.subjects.PublishSubject.onError(PublishSubject.java:243)
	at org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.BufferingFlowableWrapper.lambda$new$1(BufferingFlowableWrapper.java:62)
	at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
	at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onError(BasicFuseableSubscriber.java:101)
	at io.reactivex.internal.operators.flowable.FlowableOnErrorNext$OnErrorNextSubscriber.onError(FlowableOnErrorNext.java:90)
	at io.reactivex.internal.subscriptions.EmptySubscription.error(EmptySubscription.java:55)
	at io.reactivex.internal.operators.flowable.FlowableError.subscribeActual(FlowableError.java:40)
	at io.reactivex.Flowable.subscribe(Flowable.java:14935)
	at io.reactivex.Flowable.subscribe(Flowable.java:14882)
	at io.reactivex.internal.operators.flowable.FlowableOnErrorNext$OnErrorNextSubscriber.onError(FlowableOnErrorNext.java:115)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.checkTerminated(FlowableObserveOn.java:197)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:399)
	at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176)
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: com.hivemq.client.mqtt.exceptions.MqttSessionExpiredException: Session expired as connection was closed.
Caused by: com.hivemq.client.mqtt.mqtt3.exceptions.Mqtt3DisconnectException: Client sent DISCONNECT
Exception in thread "RxComputationThreadPool-4" io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | com.hivemq.client.mqtt.exceptions.MqttSessionExpiredException: Session expired as connection was closed.
	at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
	at io.reactivex.subjects.PublishSubject$PublishDisposable.onError(PublishSubject.java:314)
	at io.reactivex.subjects.PublishSubject.onError(PublishSubject.java:243)
	at org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.BufferingFlowableWrapper.lambda$new$1(BufferingFlowableWrapper.java:62)
	at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
	at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onError(BasicFuseableSubscriber.java:101)
	at io.reactivex.internal.operators.flowable.FlowableOnErrorNext$OnErrorNextSubscriber.onError(FlowableOnErrorNext.java:90)
	at io.reactivex.internal.subscriptions.EmptySubscription.error(EmptySubscription.java:55)
	at io.reactivex.internal.operators.flowable.FlowableError.subscribeActual(FlowableError.java:40)
	at io.reactivex.Flowable.subscribe(Flowable.java:14935)
	at io.reactivex.Flowable.subscribe(Flowable.java:14882)

The solution is described at page mentioned in error. I already check if the object is disposed before raising error, so solution seems to be adding global error handler which sounds awkward to me. Also previously the code used rxJava, but did not have the handler, so I guess, I might be using it wrong. Suggestions are welcome.

  1. BufferingFlowableWrapper class

In general I'm not sure if using Subject is OK for rxJava. I've struggled many weeks to create the code that buffers items until it's told to stop, at least, it passes the tests. It might be better to create custom Subject, not sure.

  1. Explicit timings in MqttClientActorIT

I don't like specifying timings explicitly. Not only they are specific to machine where the tests run, but also they disregard duration scale.
I did that because default message timeout (3 seconds) did not work for me for connected/disconnected events. I believe, this is because here we use real MQTT broker and it takes more time than in unit tests (for my machine the events were raised in about 10 seconds).
So, I didn't find good solution. What I could do is to add multiplying the timings by pekko.test.timefactor but that seems more like hack than solution.

  1. Amount of test cases in MqttClientActorIT

Probably, we could minimize number of tests by removing some test methods.

@dimabarbul dimabarbul marked this pull request as ready for review November 17, 2023 05:54
@dimabarbul dimabarbul force-pushed the fix-handling-messages-sent-while-ditto-offline branch from ef20db4 to eaeac18 Compare November 17, 2023 08:49
@dimabarbul
Copy link
Contributor Author

Looks like the changes fix #1768 as well

@thjaeckle
Copy link
Member

@dimabarbul awesome, thanks a lot.
I will review ASAP.

@kalinkostashki @alstanchev could you run the system tests on this feature branch to check if mqtt tests still pass?

Copy link
Member

@thjaeckle thjaeckle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @dimabarbul

Sorry, that it took a little, but not I had the chance to have a look at the PR.
I must say, that is the highest quality contribution to Ditto I ever saw. Thanks a lot for this.
The abstraction for the integration-tests, starting containers (Mongo/Mosquitto) is really great to have and very well done 👍

So I am really glad you fixed the issue(s) and improved testability by doing so.

One class may be "final" (which I commented inline), but the rest: LGTM!

@thjaeckle thjaeckle added the bug label Nov 23, 2023
@thjaeckle thjaeckle added this to the 3.4.2 milestone Nov 23, 2023
@thjaeckle thjaeckle merged commit 2f88899 into eclipse-ditto:master Nov 29, 2023
3 checks passed
@dimabarbul dimabarbul deleted the fix-handling-messages-sent-while-ditto-offline branch December 1, 2023 04:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

MQTT 5 messages sent when Ditto was not connected are not processed
2 participants