Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT durable subscription exception #455

Open
OleksandrBerezianskyi opened this issue Mar 8, 2019 · 1 comment
Open

MQTT durable subscription exception #455

OleksandrBerezianskyi opened this issue Mar 8, 2019 · 1 comment
Assignees
Labels
Milestone

Comments

@OleksandrBerezianskyi
Copy link

Expected behavior

After reconnect of the same client with clean_session = false and subscribing to the same topic with QoS = 1 all missed messages should appear

Actual behavior

Exception is thrown while trying to resend unacked message:

2019-03-08 14:19:37.194 ERROR 31746 --- [ntLoopGroup-5-5] io.moquette.broker.NewNettyMQTTHandler   : Unexpected exception while processing MQTT message. Closing Netty channel. CId=client123

java.lang.ClassCastException: class io.moquette.broker.SessionRegistry$PubRelMarker cannot be cast to class io.moquette.broker.SessionRegistry$PublishedMessage (io.moquette.broker.SessionRegistry$PubRelMarker and io.moquette.broker.SessionRegistry$PublishedMessage are in unnamed module of loader 'app')
	at io.moquette.broker.Session.resendInflightNotAcked(Session.java:293)
	at io.moquette.broker.MQTTConnection.resendNotAckedPublishes(MQTTConnection.java:481)
	at io.moquette.broker.NewNettyMQTTHandler.userEventTriggered(NewNettyMQTTHandler.java:106)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
	at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
	at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
	at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
	at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
	at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
	at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
	at io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:366)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
	at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
	at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
	at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
	at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
	at io.moquette.broker.MoquetteIdleTimeoutHandler.userEventTriggered(MoquetteIdleTimeoutHandler.java:48)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
	at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
	at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
	at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
	at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315)
	at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307)
	at io.moquette.broker.InflightResender.resendNotAcked(InflightResender.java:160)
	at io.moquette.broker.InflightResender.access$100(InflightResender.java:32)
	at io.moquette.broker.InflightResender$WriterIdleTimeoutTask.run(InflightResender.java:58)
	at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
	at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:495)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:834)

2019-03-08 14:19:37.194 ERROR 31746 --- [ntLoopGroup-5-5] io.moquette.broker.NewNettyMQTTHandler   : Unexpected exception while processing MQTT message. Closing Netty channel. CId=client123

java.nio.channels.ClosedChannelException: null
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source)

Steps to reproduce

  1. Run moquette broker
  2. Publish messages every second to any topic, e.g. "/test-topic"
  3. Run subscriber with client = "client123", clean_session = false and subscribe to topic "/test-topic" with QoS = 1
  4. Kill subscriber
  5. Rerun subscriber
  6. Retry 4.-5. until you get an exception

Minimal yet complete reproducer code (or URL to code) or complete log file

Client using python 3.7

import paho.mqtt.client as mqtt


# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))

    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe("/test-topic", 1)


# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))


client = mqtt.Client(client_id="client123", clean_session=False)
client.username_pw_set("user123", "session_id")
client.on_connect = on_connect
client.on_message = on_message

print("connecting")
client.connect("localhost", 1883, 60)
print("connected")

client.loop_forever()

Moquette MQTT version

moquette-broker-0.12.1.jar

JVM version (e.g. java -version)

java -version
openjdk version "11.0.1" 2018-10-16
OpenJDK Runtime Environment 18.9 (build 11.0.1+13)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.1+13, mixed mode)

OS version (e.g. uname -a)

uname -a
Darwin Admins-MacBook-Pro.local 18.2.0 Darwin Kernel Version 18.2.0: Thu Dec 20 20:46:53 PST 2018; root:xnu-4903.241.1~1/RELEASE_X86_64 x86_64
@andsel andsel pinned this issue Mar 9, 2019
@andsel andsel self-assigned this Mar 17, 2019
@andsel andsel added the bug label Mar 17, 2019
@andsel andsel added this to the 0.13 milestone Mar 17, 2019
@andsel
Copy link
Collaborator

andsel commented Mar 17, 2019

@OleksandrBerezianskyi you missed the publisher client part. I mean the error here relates to the fact that PubRelMarker can't be cast to PublishedMessage due to a timeout of a message in flight from broker to client. The PubRelMarker is a placeholder class that reppresents the PUBREL message from broker to a subscriber that want to receive messages at QoS2. Please, could you share a complete log file from bootstrap of the broker till the crash moquette.log and messages.logs? It could help a lot to understand/ reproduce, thanks

hylkevds added a commit to FraunhoferIOSB/moquette that referenced this issue Feb 25, 2021
moquette-io#455: resendInflightNotAcked() assumed all messages in the inflightWindow are
PublishedMessage, but they can also be PubRelMarker. This caused a
ClassCastException.

moquette-io#587: When sending PubRelMessages, never put them on the queue, since this
deadlocks the system.
hylkevds added a commit to FraunhoferIOSB/moquette that referenced this issue May 9, 2021
moquette-io#455: resendInflightNotAcked() assumed all messages in the inflightWindow are
PublishedMessage, but they can also be PubRelMarker. This caused a
ClassCastException.

moquette-io#587: When sending PubRelMessages, never put them on the queue, since this
deadlocks the system.
hylkevds added a commit to FraunhoferIOSB/moquette that referenced this issue May 24, 2021
- moquette-io#455: resendInflightNotAcked() assumed all messages in the inflightWindow are
  PublishedMessage, but they can also be PubRelMarker. This caused a
  ClassCastException.

- moquette-io#587: When sending PubRelMessages, never put them on the queue, since this
  deadlocks the system.
hylkevds added a commit to FraunhoferIOSB/moquette that referenced this issue May 24, 2021
- moquette-io#455: resendInflightNotAcked() assumed all messages in the inflightWindow are
  PublishedMessage, but they can also be PubRelMarker. This caused a
  ClassCastException.

- moquette-io#587: When sending PubRelMessages, never put them on the queue, since this
  deadlocks the system.
hylkevds added a commit to FraunhoferIOSB/moquette that referenced this issue Jun 26, 2021
- moquette-io#455: resendInflightNotAcked() assumed all messages in the inflightWindow are
  PublishedMessage, but they can also be PubRelMarker. This caused a
  ClassCastException.

- moquette-io#587: When sending PubRelMessages, never put them on the queue, since this
  deadlocks the system.
hylkevds added a commit to FraunhoferIOSB/moquette that referenced this issue Jul 2, 2021
moquette-io#455: resendInflightNotAcked() assumed all messages in the inflightWindow are
PublishedMessage, but they can also be PubRelMarker. This caused a
ClassCastException.

moquette-io#587: When sending PubRelMessages, never put them on the queue, since this
deadlocks the system. Since the queue can not contain PubRelMessages,
drainQueueToConnection can be simplified.
hylkevds added a commit to FraunhoferIOSB/moquette that referenced this issue Jul 2, 2021
moquette-io#455: resendInflightNotAcked() assumed all messages in the inflightWindow are
PublishedMessage, but they can also be PubRelMarker. This caused a
ClassCastException.

moquette-io#587: When sending PubRelMessages, never put them on the queue, since this
deadlocks the system. Since the queue can not contain PubRelMessages,
drainQueueToConnection can be simplified.
hylkevds added a commit to FraunhoferIOSB/moquette that referenced this issue Jul 4, 2021
moquette-io#455: resendInflightNotAcked() assumed all messages in the inflightWindow are
PublishedMessage, but they can also be PubRelMarker. This caused a
ClassCastException.

moquette-io#587: When sending PubRelMessages, never put them on the queue, since this
deadlocks the system. Since the queue can not contain PubRelMessages,
drainQueueToConnection can be simplified.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants