Skip to content

Commit

Permalink
Fixes moquette-io#566: NotSerializableException: SessionRegistry$Publ…
Browse files Browse the repository at this point in the history
…ishedMessage

Instances of implementations of the interface
SessionRegistry$EnqueuedMessage are put in the queue, which can be
backed by the H2 store. But since this class is not serializable, H2
can not store it. This commit makes the interface serializable, and
deals with the non-serializable ByteBuf that the PublishedMessage
subclass has as a field.
  • Loading branch information
hylkevds committed Feb 1, 2021
1 parent 3e7917a commit eb11a31
Showing 1 changed file with 32 additions and 5 deletions.
37 changes: 32 additions & 5 deletions broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
Expand All @@ -38,20 +41,44 @@

public class SessionRegistry {

public abstract static class EnqueuedMessage {
public abstract static class EnqueuedMessage implements Serializable {
}

static class PublishedMessage extends EnqueuedMessage {

final Topic topic;
final MqttQoS publishingQos;
final ByteBuf payload;
Topic topic;
MqttQoS publishingQos;
transient ByteBuf payload;

public PublishedMessage() {
this.topic = null;
this.publishingQos = null;
}

PublishedMessage(Topic topic, MqttQoS publishingQos, ByteBuf payload) {
this.topic = topic;
this.publishingQos = publishingQos;
this.payload = payload;
}

private void writeObject(ObjectOutputStream oos) throws IOException {
oos.defaultWriteObject();
byte[] byteArr = new byte[payload.readableBytes()];
ByteBuf copy = payload.copy();
copy.readBytes(byteArr);
copy.release();
oos.writeObject(byteArr.length);
oos.writeObject(byteArr);
}

private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
ois.defaultReadObject();
int length = ois.readInt();
byte[] byteArr = new byte[length];
ois.read(byteArr);
payload = Unpooled.wrappedBuffer(byteArr);
}

}

static final class PubRelMarker extends EnqueuedMessage {
Expand Down

0 comments on commit eb11a31

Please sign in to comment.