Skip to content

Commit

Permalink
Fixed moquette-io#573 ByteBuf reference counting
Browse files Browse the repository at this point in the history
Moved buffer release/retain calls to the outside interfaces of moquette.
  • Loading branch information
hylkevds committed May 24, 2021
1 parent 4e29483 commit 0cdcc27
Show file tree
Hide file tree
Showing 15 changed files with 136 additions and 23 deletions.
15 changes: 12 additions & 3 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.moquette.broker.subscriptions.Topic;
import io.moquette.broker.security.IAuthenticator;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
Expand All @@ -37,6 +38,7 @@
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
import static io.netty.handler.codec.mqtt.MqttQoS.*;
import io.netty.util.ReferenceCountUtil;

final class MQTTConnection {

Expand Down Expand Up @@ -377,7 +379,6 @@ void processPublish(MqttPublishMessage msg) {
case EXACTLY_ONCE: {
bindedSession.receivedPublishQos2(messageID, msg);
postOffice.receivedPublishQos2(this, msg, username);
// msg.release();
break;
}
default:
Expand Down Expand Up @@ -419,11 +420,19 @@ void sendIfWritableElseDrop(MqttMessage msg) {
LOG.debug("OUT {}", msg.fixedHeader().messageType());
}
if (channel.isWritable()) {

// Sending to external, retain a duplicate. Just retain is not
// enough, since the receiver must have full control.
Object retainedDup = msg;
if (msg instanceof ByteBufHolder) {
retainedDup = ((ByteBufHolder) msg).retainedDuplicate();
}

ChannelFuture channelFuture;
if (brokerConfig.isImmediateBufferFlush()) {
channelFuture = channel.writeAndFlush(msg);
channelFuture = channel.writeAndFlush(retainedDup);
} else {
channelFuture = channel.write(msg);
channelFuture = channel.write(retainedDup);
}
channelFuture.addListener(FIRE_EXCEPTION_ON_FAILURE);
}
Expand Down
6 changes: 3 additions & 3 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ private void publishRetainedMessagesForSubscriptions(String clientID, List<Subsc

final ByteBuf payloadBuf = Unpooled.wrappedBuffer(retainedMsg.getPayload());
targetSession.sendRetainedPublishOnSessionAtQos(retainedMsg.getTopic(), qos, payloadBuf);
// We made the buffer, we must release it.
payloadBuf.release();
}
}
}
Expand Down Expand Up @@ -202,7 +204,7 @@ void receivedPublishQos1(MQTTConnection connection, Topic topic, String username
interceptor.notifyTopicPublished(msg, clientId, username);
}

private void publish2Subscribers(ByteBuf origPayload, Topic topic, MqttQoS publishingQos) {
private void publish2Subscribers(ByteBuf payload, Topic topic, MqttQoS publishingQos) {
Set<Subscription> topicMatchingSubscriptions = subscriptions.matchQosSharpening(topic);

for (final Subscription sub : topicMatchingSubscriptions) {
Expand All @@ -213,8 +215,6 @@ private void publish2Subscribers(ByteBuf origPayload, Topic topic, MqttQoS publi
if (isSessionPresent) {
LOG.debug("Sending PUBLISH message to active subscriber CId: {}, topicFilter: {}, qos: {}",
sub.getClientId(), sub.getTopicFilter(), qos);
// we need to retain because duplicate only copy r/w indexes and don't retain() causing refCnt = 0
ByteBuf payload = origPayload.retainedDuplicate();
targetSession.sendPublishOnSessionAtQos(topic, qos, payload);
} else {
// If we are, the subscriber disconnected after the subscriptions tree selected that session as a
Expand Down
3 changes: 2 additions & 1 deletion broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ private <T, U> T loadClass(String className, Class<T> intrface, Class<U> constru
* Use the broker to publish a message. It's intended for embedding applications. It can be used
* only after the integration is correctly started with startServer.
*
* @param msg the message to forward.
* @param msg the message to forward. The ByteBuf in the message will be released.
* @param clientId the id of the sending integration.
* @throws IllegalStateException if the integration is not yet started
*/
Expand All @@ -308,6 +308,7 @@ public void internalPublish(MqttPublishMessage msg, final String clientId) {
}
LOG.trace("Internal publishing message CId: {}, messageId: {}", clientId, messageID);
dispatcher.internalPublish(msg);
msg.payload().release();
}

public void stopServer() {
Expand Down
70 changes: 60 additions & 10 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import static io.moquette.BrokerConstants.FLIGHT_BEFORE_RESEND_MS;
import static io.moquette.BrokerConstants.INFLIGHT_WINDOW_SIZE;
import io.moquette.broker.SessionRegistry.EnqueuedMessage;
import io.moquette.broker.SessionRegistry.PublishedMessage;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -183,7 +185,12 @@ boolean isClean() {
}

public void processPubRec(int packetId) {
inflightWindow.remove(packetId);
// Message discarded, make sure any buffers in it are released
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(packetId);
if (removed != null) {
removed.release();
}

inflightSlots.incrementAndGet();
if (canSkipQueue()) {
inflightSlots.decrementAndGet();
Expand All @@ -200,7 +207,12 @@ public void processPubRec(int packetId) {
}

public void processPubComp(int messageID) {
inflightWindow.remove(messageID);
// Message discarded, make sure any buffers in it are released
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(messageID);
if (removed != null) {
removed.release();
}

inflightSlots.incrementAndGet();

drainQueueToConnection();
Expand Down Expand Up @@ -238,15 +250,26 @@ private void sendPublishQos1(Topic topic, MqttQoS qos, ByteBuf payload) {
if (canSkipQueue()) {
inflightSlots.decrementAndGet();
int packetId = mqttConnection.nextPacketId();
inflightWindow.put(packetId, new SessionRegistry.PublishedMessage(topic, qos, payload));

// Adding to a map, retain.
payload.retain();
EnqueuedMessage old = inflightWindow.put(packetId, new PublishedMessage(topic, qos, payload));
// If there already was something, release it.
if (old != null) {
old.release();
inflightSlots.incrementAndGet();
}
inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS));

MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(topic.toString(), qos,
payload, packetId);
mqttConnection.sendPublish(publishMsg);

// TODO drainQueueToConnection();?
} else {
final SessionRegistry.PublishedMessage msg = new SessionRegistry.PublishedMessage(topic, qos, payload);
// Adding to a queue, retain.
msg.retain();
sessionQueue.add(msg);
}
}
Expand All @@ -255,15 +278,26 @@ private void sendPublishQos2(Topic topic, MqttQoS qos, ByteBuf payload) {
if (canSkipQueue()) {
inflightSlots.decrementAndGet();
int packetId = mqttConnection.nextPacketId();
inflightWindow.put(packetId, new SessionRegistry.PublishedMessage(topic, qos, payload));

// Retain before adding to map
payload.retain();
EnqueuedMessage old = inflightWindow.put(packetId, new SessionRegistry.PublishedMessage(topic, qos, payload));
// If there already was something, release it.
if (old != null) {
old.release();
inflightSlots.incrementAndGet();
}
inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS));

MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(topic.toString(), qos,
payload, packetId);
mqttConnection.sendPublish(publishMsg);

drainQueueToConnection();
} else {
final SessionRegistry.PublishedMessage msg = new SessionRegistry.PublishedMessage(topic, qos, payload);
// Adding to a queue, retain.
msg.retain();
sessionQueue.add(msg);
}
}
Expand All @@ -283,7 +317,11 @@ private boolean inflighHasSlotsAndConnectionIsUp() {

void pubAckReceived(int ackPacketId) {
// TODO remain to invoke in somehow m_interceptor.notifyMessageAcknowledged
inflightWindow.remove(ackPacketId);
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(ackPacketId);
if (removed != null) {
removed.release();
}

inflightSlots.incrementAndGet();
drainQueueToConnection();
}
Expand All @@ -305,8 +343,7 @@ public void resendInflightNotAcked() {
final Topic topic = msg.topic;
final MqttQoS qos = msg.publishingQos;
final ByteBuf payload = msg.payload;
final ByteBuf copiedPayload = payload.retainedDuplicate();
MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, copiedPayload);
MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, payload);
mqttConnection.sendPublish(publishMsg);
}
}
Expand Down Expand Up @@ -341,7 +378,13 @@ private void drainQueueToConnection() {
}
inflightSlots.decrementAndGet();
int sendPacketId = mqttConnection.nextPacketId();
inflightWindow.put(sendPacketId, msg);

// Putting it in a map, but the retain is cancelled out by the below release.
EnqueuedMessage old = inflightWindow.put(sendPacketId, msg);
if (old != null) {
old.release();
inflightSlots.incrementAndGet();
}
if (msg instanceof SessionRegistry.PubRelMarker) {
MqttMessage pubRel = MQTTConnection.pubrel(sendPacketId);
mqttConnection.sendIfWritableElseDrop(pubRel);
Expand All @@ -352,6 +395,7 @@ private void drainQueueToConnection() {
msgPub.payload, sendPacketId);
mqttConnection.sendPublish(publishMsg);
}
// we fetched msg from a map, but the release is cancelled out by the above retain
}
}

Expand All @@ -374,12 +418,18 @@ void sendRetainedPublishOnSessionAtQos(Topic topic, MqttQoS qos, ByteBuf payload
}

public void receivedPublishQos2(int messageID, MqttPublishMessage msg) {
qos2Receiving.put(messageID, msg);
msg.retain(); // retain to put in the inflight map
// Retain before putting msg in map.
ReferenceCountUtil.retain(msg);

MqttPublishMessage old = qos2Receiving.put(messageID, msg);
// In case of evil client with duplicate msgid.
ReferenceCountUtil.release(old);

mqttConnection.sendPublishReceived(messageID);
}

public void receivedPubRelQos2(int messageID) {
// Done with the message, remove from queue and release payload.
final MqttPublishMessage removedMsg = qos2Receiving.remove(messageID);
ReferenceCountUtil.release(removedMsg);
}
Expand Down
23 changes: 23 additions & 0 deletions broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@
public class SessionRegistry {

public abstract static class EnqueuedMessage {

/**
* Releases any held resources. Must be called when the EnqueuedMessage is no
* longer needed.
*/
public void release() {}

/**
* Retains any held resources. Must be called when the EnqueuedMessage is added
* to a store.
*/
public void retain() {}
}

public static class PublishedMessage extends EnqueuedMessage {
Expand All @@ -63,6 +75,17 @@ public MqttQoS getPublishingQos() {
public ByteBuf getPayload() {
return payload;
}

@Override
public void release() {
payload.release();
}

@Override
public void retain() {
payload.retain();
}

}

public static final class PubRelMarker extends EnqueuedMessage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public void onConnectionLost(InterceptConnectionLostMessage msg) {

@Override
public void onPublish(InterceptPublishMessage msg) {
msg.getPayload().release();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public void notifyTopicPublished(final MqttPublishMessage msg, final String clie
for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) {
LOG.debug("Notifying MQTT PUBLISH message to interceptor. CId={}, messageId={}, topic={}, "
+ "interceptorId={}", clientID, messageId, topic, handler.getID());
handler.onPublish(new InterceptPublishMessage(msg, clientID, username));
// Sending to the outside, make a retainedDuplicate.
handler.onPublish(new InterceptPublishMessage(msg.retainedDuplicate(), clientID, username));
}
} finally {
ReferenceCountUtil.release(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public interface InterceptHandler {

void onConnectionLost(InterceptConnectionLostMessage msg);

/**
* Called when a message is published. The receiver MUST release the payload of the message, either
* by calling super.onPublish, or by calling msg.getPayload.release() directly.
*
* @param msg The message that was published.
*/
void onPublish(InterceptPublishMessage msg);

void onSubscribe(InterceptSubscribeMessage msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void write(WriteBuffer buff, Object obj) {
final ByteBuf casted = (ByteBuf) obj;
final int payloadSize = casted.readableBytes();
byte[] rawBytes = new byte[payloadSize];
casted.copy().readBytes(rawBytes);
casted.copy().readBytes(rawBytes).release();
buff.putInt(payloadSize);
buff.put(rawBytes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.broker.security.IAuthenticator;
import io.moquette.persistence.MemorySubscriptionsRepository;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.embedded.EmbeddedChannel;
Expand Down Expand Up @@ -81,16 +82,18 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel
@Test
public void dropConnectionOnPublishWithInvalidTopicFormat() {
// Connect message with clean session set to true and client id is null.
final ByteBuf payload = Unpooled.copiedBuffer("Hello MQTT world!".getBytes(UTF_8));
MqttPublishMessage publish = MqttMessageBuilders.publish()
.topicName("")
.retained(false)
.qos(MqttQoS.AT_MOST_ONCE)
.payload(Unpooled.copiedBuffer("Hello MQTT world!".getBytes(UTF_8))).build();
.payload(payload).build();

sut.processPublish(publish);

// Verify
assertFalse(channel.isOpen(), "Connection should be closed by the broker");
payload.release();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,13 @@ public void testPublishWithEmptyPayloadClearRetainedStore() {
connection.processConnect(connectMessage);
ConnectionTestUtils.assertConnectAccepted(channel);

final ByteBuf payload1 = ByteBufUtil.writeAscii(UnpooledByteBufAllocator.DEFAULT, "Hello world!");
this.retainedRepository.retain(new Topic(NEWS_TOPIC), MqttMessageBuilders.publish()
.payload(ByteBufUtil.writeAscii(UnpooledByteBufAllocator.DEFAULT, "Hello world!"))
.payload(payload1)
.qos(AT_LEAST_ONCE)
.build());
// Retaining a msg does not release the payload.
payload1.release();

// Exercise
final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset());
Expand All @@ -241,6 +244,8 @@ public void testPublishWithEmptyPayloadClearRetainedStore() {
.qos(MqttQoS.AT_MOST_ONCE)
.retained(false)
.topicName(NEWS_TOPIC).build());
// receivedPublishQos0 does not release payload.
anyPayload.release();

// Verify
assertTrue(retainedRepository.isEmpty(), "QoS0 MUST clean retained message for topic");
Expand Down
7 changes: 7 additions & 0 deletions broker/src/test/java/io/moquette/broker/SessionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ public void testPubAckDrainMessagesRemainingInQueue() {

// Verify
assertTrue(queuedMessages.isEmpty(), "Messages should be drained");

// release the rest, to avoid leaking buffers
for (int i = 2; i <= 11; i++) {
client.pubAckReceived(i);
}
client.closeImmediately();
testChannel.close();
}

private void sendQoS1To(Session client, Topic destinationTopic, String message) {
Expand Down
Loading

0 comments on commit 0cdcc27

Please sign in to comment.