-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MQ Message not acknowledged on thrown exception #5234
Comments
/cc @cescoffier |
Same not only happens to AMQP messages but also when using the MQTT connector. This however is not only a technical question/bug. On the functional side, what should the behaviour be for messages that result into errors. Right now there is no ACK so the message might stay in the queue. When restarting the server the same message is delivered again most probably resulting in the same error. It is quite easy to state that other/newer messages should still be handled, but what should happen to messages resulting in errors? For my server I have the following handler which resumes on errors. @Incoming("command-reader")
@Outgoing("command-forwarder")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public PublisherBuilder<MqttMessage<JsonObject>> process( final AmqpMessage<?> message ) {
return ReactiveStreams.of( message )
.filter( Objects::nonNull )
.map( this::consume )
.map( this::transform )
.onErrorResumeWith( e -> {
if ( e instanceof FailedMessageException ) {
exceptionLogger.log( (FailedMessageException) e );
} else {
LOGGER.error( "Message failed:", e );
}
return ReactiveStreams.empty();
} )
;
} Maybe some default behaviour can be applied here to make it more robust, since everyone using queues will eventually run into this. |
The current behavior is the one dictating by reactive streams. I agree with the expected behavior, but it needs to be done carefully to avoid breaking the TCK. |
Any progress on this one? Somehow my Then the question remains how to make message handlers fault tolerant. For example the Quarkus mqtt quickstart stop receiving message once a single none integer get posted on the |
Nothing has changed on this in the latest Quarkus, so it would be a bug. The work is going to be done in Smallrye Reactive Messaging first. |
I will check in which Quarkus version did work. And report the bug on Quarkus. |
Hmm, apparently I was wrong. Modified public PublisherBuilder<MqttMessage<Double>> process( final MqttMessage<byte[]> priceMessage ) {
return ReactiveStreams.of( priceMessage )
.filter( Objects::nonNull )
.map( message -> Integer.valueOf(new String(priceMessage.getPayload())) )
.peek( price -> System.out.println("Receiving price: " + price) )
.map( value -> value * CONVERSION_RATE)
.map( value -> MqttMessage.of( value ))
.onErrorResumeWith( e -> {
System.out.println("Failed message: " + e.getMessage());
return ReactiveStreams.empty();
} )
;
} Running showed no issues. Sending incorrect messages did not disturb further processing of correct messages. My failing server was actually using a public ProcessorBuilder<MqttMessage<byte[]>, MqttMessage<Double>> processor() {
return ReactiveStreams.<MqttMessage<byte[]>>builder()
.filter( Objects::nonNull )
.map( message -> Integer.valueOf(new String(message.getPayload())) )
.peek( price -> System.out.println("Receiving price: " + price) )
.map( value -> value * CONVERSION_RATE)
.map( value -> MqttMessage.of( value ))
.onErrorResumeWith( e -> {
System.out.println("Failed message: " + e.getMessage());
return ReactiveStreams.empty();
} )
;
} |
Note: Quarkus 1.6 added support for nack and failure management. |
Fixed in 1.6 thanks to the reactive messaging update. |
Describe the bug
I am using artemis mq as a message broker and have an incoming stream to listen for messages on a queue. In the method for the incoming stream, I have it possible to throw an exception. It seems that if an exception is thrown the mq broker is not notified that the message has been acknowledged by my service even though I have received and processed it. My service will no longer receive any messages sent to the queue and every time I restart my service, it receives that same bad message. If I change the method to just log the error, the queue works normally and is able to receive the bad message, log it, and continue receiving messages.
Expected behavior
Throws exception but continues listening for future messages on that queue.
Actual behavior
Throws exception and no longer receives messages on that queue.
To Reproduce
Steps to reproduce the behavior:
Configuration
Environment (please complete the following information):
uname -a
orver
:java -version
: 11.0.2The text was updated successfully, but these errors were encountered: