Skip to content

Commit

Permalink
Fixed moquette-io#573 ByteBuf reference counting
Browse files Browse the repository at this point in the history
Moved buffer release/retain calls to the outside interfaces of moquette.
  • Loading branch information
hylkevds committed May 24, 2021
1 parent 4e29483 commit e5d58a4
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 18 deletions.
15 changes: 12 additions & 3 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.moquette.broker.subscriptions.Topic;
import io.moquette.broker.security.IAuthenticator;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
Expand All @@ -37,6 +38,7 @@
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
import static io.netty.handler.codec.mqtt.MqttQoS.*;
import io.netty.util.ReferenceCountUtil;

final class MQTTConnection {

Expand Down Expand Up @@ -377,7 +379,6 @@ void processPublish(MqttPublishMessage msg) {
case EXACTLY_ONCE: {
bindedSession.receivedPublishQos2(messageID, msg);
postOffice.receivedPublishQos2(this, msg, username);
// msg.release();
break;
}
default:
Expand Down Expand Up @@ -419,11 +420,19 @@ void sendIfWritableElseDrop(MqttMessage msg) {
LOG.debug("OUT {}", msg.fixedHeader().messageType());
}
if (channel.isWritable()) {

// Sending to external, retain a duplicate. Just retain is not
// enough, since the receiver must have full control.
Object retainedDup = msg;
if (msg instanceof ByteBufHolder) {
retainedDup = ((ByteBufHolder) msg).retainedDuplicate();
}

ChannelFuture channelFuture;
if (brokerConfig.isImmediateBufferFlush()) {
channelFuture = channel.writeAndFlush(msg);
channelFuture = channel.writeAndFlush(retainedDup);
} else {
channelFuture = channel.write(msg);
channelFuture = channel.write(retainedDup);
}
channelFuture.addListener(FIRE_EXCEPTION_ON_FAILURE);
}
Expand Down
6 changes: 3 additions & 3 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ private void publishRetainedMessagesForSubscriptions(String clientID, List<Subsc

final ByteBuf payloadBuf = Unpooled.wrappedBuffer(retainedMsg.getPayload());
targetSession.sendRetainedPublishOnSessionAtQos(retainedMsg.getTopic(), qos, payloadBuf);
// We made the buffer, we must release it.
payloadBuf.release();
}
}
}
Expand Down Expand Up @@ -202,7 +204,7 @@ void receivedPublishQos1(MQTTConnection connection, Topic topic, String username
interceptor.notifyTopicPublished(msg, clientId, username);
}

private void publish2Subscribers(ByteBuf origPayload, Topic topic, MqttQoS publishingQos) {
private void publish2Subscribers(ByteBuf payload, Topic topic, MqttQoS publishingQos) {
Set<Subscription> topicMatchingSubscriptions = subscriptions.matchQosSharpening(topic);

for (final Subscription sub : topicMatchingSubscriptions) {
Expand All @@ -213,8 +215,6 @@ private void publish2Subscribers(ByteBuf origPayload, Topic topic, MqttQoS publi
if (isSessionPresent) {
LOG.debug("Sending PUBLISH message to active subscriber CId: {}, topicFilter: {}, qos: {}",
sub.getClientId(), sub.getTopicFilter(), qos);
// we need to retain because duplicate only copy r/w indexes and don't retain() causing refCnt = 0
ByteBuf payload = origPayload.retainedDuplicate();
targetSession.sendPublishOnSessionAtQos(topic, qos, payload);
} else {
// If we are, the subscriber disconnected after the subscriptions tree selected that session as a
Expand Down
3 changes: 2 additions & 1 deletion broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ private <T, U> T loadClass(String className, Class<T> intrface, Class<U> constru
* Use the broker to publish a message. It's intended for embedding applications. It can be used
* only after the integration is correctly started with startServer.
*
* @param msg the message to forward.
* @param msg the message to forward. The ByteBuf in the message will be released.
* @param clientId the id of the sending integration.
* @throws IllegalStateException if the integration is not yet started
*/
Expand All @@ -308,6 +308,7 @@ public void internalPublish(MqttPublishMessage msg, final String clientId) {
}
LOG.trace("Internal publishing message CId: {}, messageId: {}", clientId, messageID);
dispatcher.internalPublish(msg);
msg.payload().release();
}

public void stopServer() {
Expand Down
62 changes: 53 additions & 9 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import static io.moquette.BrokerConstants.FLIGHT_BEFORE_RESEND_MS;
import static io.moquette.BrokerConstants.INFLIGHT_WINDOW_SIZE;
import io.moquette.broker.SessionRegistry.EnqueuedMessage;
import io.moquette.broker.SessionRegistry.PublishedMessage;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -183,7 +185,12 @@ boolean isClean() {
}

public void processPubRec(int packetId) {
inflightWindow.remove(packetId);
// Message discarded, make sure any buffers in it are released
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(packetId);
if (removed != null) {
removed.release();
}

inflightSlots.incrementAndGet();
if (canSkipQueue()) {
inflightSlots.decrementAndGet();
Expand All @@ -200,7 +207,12 @@ public void processPubRec(int packetId) {
}

public void processPubComp(int messageID) {
inflightWindow.remove(messageID);
// Message discarded, make sure any buffers in it are released
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(messageID);
if (removed != null) {
removed.release();
}

inflightSlots.incrementAndGet();

drainQueueToConnection();
Expand Down Expand Up @@ -238,15 +250,21 @@ private void sendPublishQos1(Topic topic, MqttQoS qos, ByteBuf payload) {
if (canSkipQueue()) {
inflightSlots.decrementAndGet();
int packetId = mqttConnection.nextPacketId();
inflightWindow.put(packetId, new SessionRegistry.PublishedMessage(topic, qos, payload));

// Adding to a map, retain.
payload.retain();
inflightWindow.put(packetId, new PublishedMessage(topic, qos, payload));
inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS));

MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(topic.toString(), qos,
payload, packetId);
mqttConnection.sendPublish(publishMsg);

// TODO drainQueueToConnection();?
} else {
final SessionRegistry.PublishedMessage msg = new SessionRegistry.PublishedMessage(topic, qos, payload);
// Adding to a queue, retain.
msg.retain();
sessionQueue.add(msg);
}
}
Expand All @@ -255,15 +273,26 @@ private void sendPublishQos2(Topic topic, MqttQoS qos, ByteBuf payload) {
if (canSkipQueue()) {
inflightSlots.decrementAndGet();
int packetId = mqttConnection.nextPacketId();
inflightWindow.put(packetId, new SessionRegistry.PublishedMessage(topic, qos, payload));

// Retain before adding to map
payload.retain();
EnqueuedMessage old = inflightWindow.put(packetId, new SessionRegistry.PublishedMessage(topic, qos, payload));
// If there already was something, release it.
if (old != null) {
old.release();
}

inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS));

MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(topic.toString(), qos,
payload, packetId);
mqttConnection.sendPublish(publishMsg);

drainQueueToConnection();
} else {
final SessionRegistry.PublishedMessage msg = new SessionRegistry.PublishedMessage(topic, qos, payload);
// Adding to a queue, retain.
msg.retain();
sessionQueue.add(msg);
}
}
Expand All @@ -283,7 +312,11 @@ private boolean inflighHasSlotsAndConnectionIsUp() {

void pubAckReceived(int ackPacketId) {
// TODO remain to invoke in somehow m_interceptor.notifyMessageAcknowledged
inflightWindow.remove(ackPacketId);
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(ackPacketId);
if (removed != null) {
removed.release();
}

inflightSlots.incrementAndGet();
drainQueueToConnection();
}
Expand All @@ -305,9 +338,11 @@ public void resendInflightNotAcked() {
final Topic topic = msg.topic;
final MqttQoS qos = msg.publishingQos;
final ByteBuf payload = msg.payload;
final ByteBuf copiedPayload = payload.retainedDuplicate();
MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, copiedPayload);
MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, payload);
mqttConnection.sendPublish(publishMsg);

// It was removed from a store, release.
msg.release();
}
}
}
Expand Down Expand Up @@ -341,6 +376,8 @@ private void drainQueueToConnection() {
}
inflightSlots.decrementAndGet();
int sendPacketId = mqttConnection.nextPacketId();

// Putting it in a map, but the retain is cancelled out by the below release.
inflightWindow.put(sendPacketId, msg);
if (msg instanceof SessionRegistry.PubRelMarker) {
MqttMessage pubRel = MQTTConnection.pubrel(sendPacketId);
Expand All @@ -352,6 +389,7 @@ private void drainQueueToConnection() {
msgPub.payload, sendPacketId);
mqttConnection.sendPublish(publishMsg);
}
// we fetched msg from a map, but the release is cancelled out by the above retain
}
}

Expand All @@ -374,12 +412,18 @@ void sendRetainedPublishOnSessionAtQos(Topic topic, MqttQoS qos, ByteBuf payload
}

public void receivedPublishQos2(int messageID, MqttPublishMessage msg) {
qos2Receiving.put(messageID, msg);
msg.retain(); // retain to put in the inflight map
// Retain before putting msg in map.
ReferenceCountUtil.retain(msg);

MqttPublishMessage old = qos2Receiving.put(messageID, msg);
// In case of evil client with duplicate msgid.
ReferenceCountUtil.release(old);

mqttConnection.sendPublishReceived(messageID);
}

public void receivedPubRelQos2(int messageID) {
// Done with the message, remove from queue and release payload.
final MqttPublishMessage removedMsg = qos2Receiving.remove(messageID);
ReferenceCountUtil.release(removedMsg);
}
Expand Down
23 changes: 23 additions & 0 deletions broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@
public class SessionRegistry {

public abstract static class EnqueuedMessage {

/**
* Releases any held resources. Must be called when the EnqueuedMessage is no
* longer needed.
*/
public void release() {}

/**
* Retains any held resources. Must be called when the EnqueuedMessage is added
* to a store.
*/
public void retain() {}
}

public static class PublishedMessage extends EnqueuedMessage {
Expand All @@ -63,6 +75,17 @@ public MqttQoS getPublishingQos() {
public ByteBuf getPayload() {
return payload;
}

@Override
public void release() {
payload.release();
}

@Override
public void retain() {
payload.retain();
}

}

public static final class PubRelMarker extends EnqueuedMessage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.broker.security.IAuthenticator;
import io.moquette.persistence.MemorySubscriptionsRepository;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.embedded.EmbeddedChannel;
Expand Down Expand Up @@ -81,16 +82,18 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel
@Test
public void dropConnectionOnPublishWithInvalidTopicFormat() {
// Connect message with clean session set to true and client id is null.
final ByteBuf payload = Unpooled.copiedBuffer("Hello MQTT world!".getBytes(UTF_8));
MqttPublishMessage publish = MqttMessageBuilders.publish()
.topicName("")
.retained(false)
.qos(MqttQoS.AT_MOST_ONCE)
.payload(Unpooled.copiedBuffer("Hello MQTT world!".getBytes(UTF_8))).build();
.payload(payload).build();

sut.processPublish(publish);

// Verify
assertFalse(channel.isOpen(), "Connection should be closed by the broker");
payload.release();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,13 @@ public void testPublishWithEmptyPayloadClearRetainedStore() {
connection.processConnect(connectMessage);
ConnectionTestUtils.assertConnectAccepted(channel);

final ByteBuf payload1 = ByteBufUtil.writeAscii(UnpooledByteBufAllocator.DEFAULT, "Hello world!");
this.retainedRepository.retain(new Topic(NEWS_TOPIC), MqttMessageBuilders.publish()
.payload(ByteBufUtil.writeAscii(UnpooledByteBufAllocator.DEFAULT, "Hello world!"))
.payload(payload1)
.qos(AT_LEAST_ONCE)
.build());
// Retaining a msg does not release the payload.
payload1.release();

// Exercise
final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset());
Expand All @@ -241,6 +244,8 @@ public void testPublishWithEmptyPayloadClearRetainedStore() {
.qos(MqttQoS.AT_MOST_ONCE)
.retained(false)
.topicName(NEWS_TOPIC).build());
// receivedPublishQos0 does not release payload.
anyPayload.release();

// Verify
assertTrue(retainedRepository.isEmpty(), "QoS0 MUST clean retained message for topic");
Expand Down
7 changes: 7 additions & 0 deletions broker/src/test/java/io/moquette/broker/SessionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ public void testPubAckDrainMessagesRemainingInQueue() {

// Verify
assertTrue(queuedMessages.isEmpty(), "Messages should be drained");

// release the rest, to avoid leaking buffers
for (int i = 2; i <= 11; i++) {
client.pubAckReceived(i);
}
client.closeImmediately();
testChannel.close();
}

private void sendQoS1To(Session client, Topic destinationTopic, String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ public void shouldNotInternalPublishOnReadBlockedSubscriptionTopic() throws Exce
.payload(Unpooled.copiedBuffer("Hello World!!".getBytes(UTF_8)))
.build();

// We will be sending the same message again, retain the payload.
message.payload().retain();
m_server.internalPublish(message, "INTRLPUB");

Awaitility.await().until(m_messagesCollector::isMessageReceived);
Expand Down

0 comments on commit e5d58a4

Please sign in to comment.