From eb11a3102869d27cfa1772629814a513bafcfe23 Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Mon, 1 Feb 2021 16:04:00 +0100 Subject: [PATCH] Fixes #566: NotSerializableException: SessionRegistry$PublishedMessage Instances of implementations of the interface SessionRegistry$EnqueuedMessage are put in the queue, which can be backed by the H2 store. But since this class is not serializable, H2 can not store it. This commit makes the interface serializable, and deals with the non-serializable ByteBuf that the PublishedMessage subclass has as a field. --- .../io/moquette/broker/SessionRegistry.java | 37 ++++++++++++++++--- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/broker/src/main/java/io/moquette/broker/SessionRegistry.java b/broker/src/main/java/io/moquette/broker/SessionRegistry.java index 903f9af35..d53a3550b 100644 --- a/broker/src/main/java/io/moquette/broker/SessionRegistry.java +++ b/broker/src/main/java/io/moquette/broker/SessionRegistry.java @@ -26,9 +26,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; import java.net.InetSocketAddress; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Queue; @@ -38,20 +41,44 @@ public class SessionRegistry { - public abstract static class EnqueuedMessage { + public abstract static class EnqueuedMessage implements Serializable { } static class PublishedMessage extends EnqueuedMessage { - final Topic topic; - final MqttQoS publishingQos; - final ByteBuf payload; + Topic topic; + MqttQoS publishingQos; + transient ByteBuf payload; + + public PublishedMessage() { + this.topic = null; + this.publishingQos = null; + } PublishedMessage(Topic topic, MqttQoS publishingQos, ByteBuf payload) { this.topic = topic; this.publishingQos = publishingQos; this.payload = payload; } + + private void writeObject(ObjectOutputStream oos) throws IOException { + oos.defaultWriteObject(); + byte[] byteArr = new byte[payload.readableBytes()]; + ByteBuf copy = payload.copy(); + copy.readBytes(byteArr); + copy.release(); + oos.writeObject(byteArr.length); + oos.writeObject(byteArr); + } + + private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { + ois.defaultReadObject(); + int length = ois.readInt(); + byte[] byteArr = new byte[length]; + ois.read(byteArr); + payload = Unpooled.wrappedBuffer(byteArr); + } + } static final class PubRelMarker extends EnqueuedMessage {