Skip to content

Commit

Permalink
Fixed moquette-io#573 ByteBuf reference counting
Browse files Browse the repository at this point in the history
The main place where ByteBufs were not released was the inflightWindow.
Also improves two postoffice methods that passed the payload and retain
information twice, once direct and once in the message.
  • Loading branch information
hylkevds committed Feb 17, 2021
1 parent 3e7917a commit 8c07329
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 45 deletions.
15 changes: 10 additions & 5 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.moquette.broker.subscriptions.Topic;
import io.moquette.broker.security.IAuthenticator;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
Expand Down Expand Up @@ -359,25 +360,24 @@ void processPublish(MqttPublishMessage msg) {
final String clientId = getClientId();
final int messageID = msg.variableHeader().packetId();
LOG.trace("Processing PUBLISH message, topic: {}, messageId: {}, qos: {}", topicName, messageID, qos);
ByteBuf payload = msg.payload();
final boolean retain = msg.fixedHeader().isRetain();
final Topic topic = new Topic(topicName);
if (!topic.isValid()) {
LOG.debug("Drop connection because of invalid topic format");
dropConnection();
}
switch (qos) {
case AT_MOST_ONCE:
postOffice.receivedPublishQos0(topic, username, clientId, payload, retain, msg);
postOffice.receivedPublishQos0(topic, username, clientId, msg);
break;
case AT_LEAST_ONCE: {
postOffice.receivedPublishQos1(this, topic, username, payload, messageID, retain, msg);
postOffice.receivedPublishQos1(this, topic, username, messageID, msg);
break;
}
case EXACTLY_ONCE: {
bindedSession.receivedPublishQos2(messageID, msg);
// Second pass-on, retain
msg.payload().retain();
postOffice.receivedPublishQos2(this, msg, username);
// msg.release();
break;
}
default:
Expand Down Expand Up @@ -426,6 +426,11 @@ void sendIfWritableElseDrop(MqttMessage msg) {
channelFuture = channel.write(msg);
}
channelFuture.addListener(FIRE_EXCEPTION_ON_FAILURE);
} else {
// msg not passed on, release.
if (msg instanceof ByteBufHolder) {
((ByteBufHolder) msg).release();
}
}
}

Expand Down
20 changes: 12 additions & 8 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,24 +156,23 @@ public void unsubscribe(List<String> topics, MQTTConnection mqttConnection, int
mqttConnection.sendUnsubAckMessage(topics, clientID, messageId);
}

void receivedPublishQos0(Topic topic, String username, String clientID, ByteBuf payload, boolean retain,
MqttPublishMessage msg) {
void receivedPublishQos0(Topic topic, String username, String clientID, MqttPublishMessage msg) {
if (!authorizator.canWrite(topic, username, clientID)) {
LOG.error("client is not authorized to publish on topic: {}", topic);
return;
}
publish2Subscribers(payload, topic, AT_MOST_ONCE);
publish2Subscribers(msg.payload(), topic, AT_MOST_ONCE);

if (retain) {
if (msg.fixedHeader().isRetain()) {
// QoS == 0 && retain => clean old retained
retainedRepository.cleanRetained(topic);
}

interceptor.notifyTopicPublished(msg, clientID, username);
}

void receivedPublishQos1(MQTTConnection connection, Topic topic, String username, ByteBuf payload, int messageID,
boolean retain, MqttPublishMessage msg) {
void receivedPublishQos1(MQTTConnection connection, Topic topic, String username, int messageID,
MqttPublishMessage msg) {
// verify if topic can be write
topic.getTokens();
if (!topic.isValid()) {
Expand All @@ -187,11 +186,12 @@ void receivedPublishQos1(MQTTConnection connection, Topic topic, String username
return;
}

ByteBuf payload = msg.payload();
publish2Subscribers(payload, topic, AT_LEAST_ONCE);

connection.sendPubAck(messageID);

if (retain) {
if (msg.fixedHeader().isRetain()) {
if (!payload.isReadable()) {
retainedRepository.cleanRetained(topic);
} else {
Expand Down Expand Up @@ -237,6 +237,8 @@ void receivedPublishQos2(MQTTConnection connection, MqttPublishMessage mqttPubli
final String clientId = connection.getClientId();
if (!authorizator.canWrite(topic, username, clientId)) {
LOG.error("MQTT client is not authorized to publish on topic: {}", topic);
// msg not passed on, release payload.
payload.release();
return;
}

Expand All @@ -254,6 +256,8 @@ void receivedPublishQos2(MQTTConnection connection, MqttPublishMessage mqttPubli

String clientID = connection.getClientId();
interceptor.notifyTopicPublished(mqttPublishMessage, clientID, username);
// none of the methods above released the payload, do it now.
payload.release();
}

static MqttQoS lowerQosToTheSubscriptionDesired(Subscription sub, MqttQoS qos) {
Expand Down Expand Up @@ -284,7 +288,7 @@ public void internalPublish(MqttPublishMessage msg) {
if (!msg.fixedHeader().isRetain()) {
return;
}
if (qos == AT_MOST_ONCE || msg.payload().readableBytes() == 0) {
if (qos == AT_MOST_ONCE || payload.readableBytes() == 0) {
// QoS == 0 && retain => clean old retained
retainedRepository.cleanRetained(topic);
return;
Expand Down
1 change: 1 addition & 0 deletions broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ public void internalPublish(MqttPublishMessage msg, final String clientId) {
}
LOG.trace("Internal publishing message CId: {}, messageId: {}", clientId, messageID);
dispatcher.internalPublish(msg);
msg.payload().release();
}

public void stopServer() {
Expand Down
35 changes: 32 additions & 3 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,12 @@ boolean isClean() {
}

public void processPubRec(int packetId) {
inflightWindow.remove(packetId);
// Message discarded, make sure any buffers in it are released
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(packetId);
if (removed != null) {
removed.release();
}

inflightSlots.incrementAndGet();
if (canSkipQueue()) {
inflightSlots.decrementAndGet();
Expand All @@ -200,7 +205,12 @@ public void processPubRec(int packetId) {
}

public void processPubComp(int messageID) {
inflightWindow.remove(messageID);
// Message discarded, make sure any buffers in it are released
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(messageID);
if (removed != null) {
removed.release();
}

inflightSlots.incrementAndGet();

drainQueueToConnection();
Expand All @@ -216,6 +226,9 @@ public void sendPublishOnSessionAtQos(Topic topic, MqttQoS qos, ByteBuf payload)
case AT_MOST_ONCE:
if (connected()) {
mqttConnection.sendPublishNotRetainedQos0(topic, qos, payload);
} else {
// buffer not passed on, release it.
payload.release();
}
break;
case AT_LEAST_ONCE:
Expand All @@ -226,12 +239,16 @@ public void sendPublishOnSessionAtQos(Topic topic, MqttQoS qos, ByteBuf payload)
break;
case FAILURE:
LOG.error("Not admissible");
// buffer not passed on, release it.
payload.release();
}
}

private void sendPublishQos1(Topic topic, MqttQoS qos, ByteBuf payload) {
if (!connected() && isClean()) {
//pushing messages to disconnected not clean session
//buffer not passed on, release it.
payload.release();
return;
}

Expand All @@ -240,6 +257,9 @@ private void sendPublishQos1(Topic topic, MqttQoS qos, ByteBuf payload) {
int packetId = mqttConnection.nextPacketId();
inflightWindow.put(packetId, new SessionRegistry.PublishedMessage(topic, qos, payload));
inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS));

// second pass-on, add retain
payload.retain();
MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(topic.toString(), qos,
payload, packetId);
mqttConnection.sendPublish(publishMsg);
Expand All @@ -257,6 +277,9 @@ private void sendPublishQos2(Topic topic, MqttQoS qos, ByteBuf payload) {
int packetId = mqttConnection.nextPacketId();
inflightWindow.put(packetId, new SessionRegistry.PublishedMessage(topic, qos, payload));
inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS));

// second pass-on, add retain
payload.retain();
MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(topic.toString(), qos,
payload, packetId);
mqttConnection.sendPublish(publishMsg);
Expand All @@ -283,7 +306,11 @@ private boolean inflighHasSlotsAndConnectionIsUp() {

void pubAckReceived(int ackPacketId) {
// TODO remain to invoke in somehow m_interceptor.notifyMessageAcknowledged
inflightWindow.remove(ackPacketId);
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(ackPacketId);
if (removed != null) {
removed.release();
}

inflightSlots.incrementAndGet();
drainQueueToConnection();
}
Expand Down Expand Up @@ -343,6 +370,8 @@ private void drainQueueToConnection() {
mqttConnection.sendIfWritableElseDrop(pubRel);
} else {
final SessionRegistry.PublishedMessage msgPub = (SessionRegistry.PublishedMessage) msg;
// Second pass-on.
msgPub.payload.retain();
MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(msgPub.topic.toString(),
msgPub.publishingQos,
msgPub.payload, sendPacketId);
Expand Down
11 changes: 10 additions & 1 deletion broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
Expand All @@ -39,6 +38,7 @@
public class SessionRegistry {

public abstract static class EnqueuedMessage {
public void release() {}
}

static class PublishedMessage extends EnqueuedMessage {
Expand All @@ -52,6 +52,15 @@ static class PublishedMessage extends EnqueuedMessage {
this.publishingQos = publishingQos;
this.payload = payload;
}

/**
* Releases the payload. Must be called when the PublishedMessage is no
* longer needed.
*/
@Override
public void release() {
payload.release();
}
}

static final class PubRelMarker extends EnqueuedMessage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.broker.security.IAuthenticator;
import io.moquette.persistence.MemorySubscriptionsRepository;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.embedded.EmbeddedChannel;
Expand Down Expand Up @@ -81,16 +82,18 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel
@Test
public void dropConnectionOnPublishWithInvalidTopicFormat() {
// Connect message with clean session set to true and client id is null.
final ByteBuf payload = Unpooled.copiedBuffer("Hello MQTT world!".getBytes(UTF_8));
MqttPublishMessage publish = MqttMessageBuilders.publish()
.topicName("")
.retained(false)
.qos(MqttQoS.AT_MOST_ONCE)
.payload(Unpooled.copiedBuffer("Hello MQTT world!".getBytes(UTF_8))).build();
.payload(payload).build();

sut.processPublish(publish);

// Verify
assertFalse(channel.isOpen(), "Connection should be closed by the broker");
payload.release();
}

}
30 changes: 18 additions & 12 deletions broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void testPublishQoS0ToItself() {

// Exercise
final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset());
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, FAKE_CLIENT_ID, payload, false,
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, FAKE_CLIENT_ID,
MqttMessageBuilders.publish()
.payload(payload.retainedDuplicate())
.qos(MqttQoS.AT_MOST_ONCE)
Expand Down Expand Up @@ -211,7 +211,7 @@ public void testPublishToMultipleSubscribers() {

// Exercise
final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset());
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, FAKE_CLIENT_ID, payload, false,
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, FAKE_CLIENT_ID,
MqttMessageBuilders.publish()
.payload(payload.retainedDuplicate())
.qos(MqttQoS.AT_MOST_ONCE)
Expand All @@ -228,19 +228,24 @@ public void testPublishWithEmptyPayloadClearRetainedStore() {
connection.processConnect(connectMessage);
ConnectionTestUtils.assertConnectAccepted(channel);

final ByteBuf payload1 = ByteBufUtil.writeAscii(UnpooledByteBufAllocator.DEFAULT, "Hello world!");
this.retainedRepository.retain(new Topic(NEWS_TOPIC), MqttMessageBuilders.publish()
.payload(ByteBufUtil.writeAscii(UnpooledByteBufAllocator.DEFAULT, "Hello world!"))
.payload(payload1)
.qos(AT_LEAST_ONCE)
.build());
// Retaining a msg does not release the payload.
payload1.release();

// Exercise
final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset());
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, FAKE_CLIENT_ID, anyPayload, true,
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, FAKE_CLIENT_ID,
MqttMessageBuilders.publish()
.payload(anyPayload)
.qos(MqttQoS.AT_MOST_ONCE)
.retained(false)
.retained(true)
.topicName(NEWS_TOPIC).build());
// receivedPublishQos0 does not release payload.
anyPayload.release();

// Verify
assertTrue(retainedRepository.isEmpty(), "QoS0 MUST clean retained message for topic");
Expand All @@ -254,7 +259,7 @@ public void testPublishWithQoS1() {

// Exercise
final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset());
sut.receivedPublishQos1(connection, new Topic(NEWS_TOPIC), TEST_USER, anyPayload, 1, true,
sut.receivedPublishQos1(connection, new Topic(NEWS_TOPIC), TEST_USER, 1,
MqttMessageBuilders.publish()
.payload(Unpooled.copiedBuffer("Any payload", Charset.defaultCharset()))
.qos(MqttQoS.AT_LEAST_ONCE)
Expand Down Expand Up @@ -298,7 +303,7 @@ public void forwardQoS1PublishesWhenNotCleanSessionReconnects() {
ConnectionTestUtils.assertConnectAccepted(pubChannel);

final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset());
sut.receivedPublishQos1(pubConn, new Topic(NEWS_TOPIC), TEST_USER, anyPayload, 1, true,
sut.receivedPublishQos1(pubConn, new Topic(NEWS_TOPIC), TEST_USER, 1,
MqttMessageBuilders.publish()
.payload(anyPayload.retainedDuplicate())
.qos(MqttQoS.AT_LEAST_ONCE)
Expand Down Expand Up @@ -336,10 +341,11 @@ public void checkReceivePublishedMessage_after_a_reconnect_with_notCleanSession(
ConnectionTestUtils.assertConnectAccepted(pubChannel);

final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset());
sut.receivedPublishQos1(pubConn, new Topic(NEWS_TOPIC), TEST_USER, anyPayload, 1, true,
sut.receivedPublishQos1(pubConn, new Topic(NEWS_TOPIC), TEST_USER, 1,
MqttMessageBuilders.publish()
.payload(anyPayload.retainedDuplicate())
.qos(MqttQoS.AT_LEAST_ONCE)
.retained(true)
.topicName(NEWS_TOPIC).build());

// Verify that after a reconnection the client receive the message
Expand All @@ -361,7 +367,7 @@ public void noPublishToInactiveSession() {
ConnectionTestUtils.assertConnectAccepted(pubChannel);

final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset());
sut.receivedPublishQos1(pubConn, new Topic(NEWS_TOPIC), TEST_USER, anyPayload, 1, true,
sut.receivedPublishQos1(pubConn, new Topic(NEWS_TOPIC), TEST_USER, 1,
MqttMessageBuilders.publish()
.payload(anyPayload)
.qos(MqttQoS.AT_LEAST_ONCE)
Expand Down Expand Up @@ -389,19 +395,19 @@ public void cleanRetainedMessageStoreWhenPublishWithRetainedQos0IsReceived() {
.retained(true)
.topicName(NEWS_TOPIC)
.build();
sut.receivedPublishQos1(connection, new Topic(NEWS_TOPIC), TEST_USER, anyPayload, 1, true,
sut.receivedPublishQos1(connection, new Topic(NEWS_TOPIC), TEST_USER, 1,
publishMsg);

assertMessageIsRetained(NEWS_TOPIC, anyPayload);

// publish a QoS0 retained message
// Exercise
final ByteBuf qos0Payload = Unpooled.copiedBuffer("QoS0 payload", Charset.defaultCharset());
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, connection.getClientId(), qos0Payload, true,
sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, connection.getClientId(),
MqttMessageBuilders.publish()
.payload(qos0Payload)
.qos(MqttQoS.AT_MOST_ONCE)
.retained(false)
.retained(true)
.topicName(NEWS_TOPIC).build());

// Verify
Expand Down
Loading

0 comments on commit 8c07329

Please sign in to comment.