diff --git a/broker/src/main/java/io/moquette/broker/MQTTConnection.java b/broker/src/main/java/io/moquette/broker/MQTTConnection.java index 7cf920ae2..4406a1322 100644 --- a/broker/src/main/java/io/moquette/broker/MQTTConnection.java +++ b/broker/src/main/java/io/moquette/broker/MQTTConnection.java @@ -18,6 +18,7 @@ import io.moquette.broker.subscriptions.Topic; import io.moquette.broker.security.IAuthenticator; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufHolder; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -359,8 +360,6 @@ void processPublish(MqttPublishMessage msg) { final String clientId = getClientId(); final int messageID = msg.variableHeader().packetId(); LOG.trace("Processing PUBLISH message, topic: {}, messageId: {}, qos: {}", topicName, messageID, qos); - ByteBuf payload = msg.payload(); - final boolean retain = msg.fixedHeader().isRetain(); final Topic topic = new Topic(topicName); if (!topic.isValid()) { LOG.debug("Drop connection because of invalid topic format"); @@ -368,16 +367,17 @@ void processPublish(MqttPublishMessage msg) { } switch (qos) { case AT_MOST_ONCE: - postOffice.receivedPublishQos0(topic, username, clientId, payload, retain, msg); + postOffice.receivedPublishQos0(topic, username, clientId, msg); break; case AT_LEAST_ONCE: { - postOffice.receivedPublishQos1(this, topic, username, payload, messageID, retain, msg); + postOffice.receivedPublishQos1(this, topic, username, messageID, msg); break; } case EXACTLY_ONCE: { bindedSession.receivedPublishQos2(messageID, msg); + // Second pass-on, retain + msg.payload().retain(); postOffice.receivedPublishQos2(this, msg, username); -// msg.release(); break; } default: @@ -426,6 +426,11 @@ void sendIfWritableElseDrop(MqttMessage msg) { channelFuture = channel.write(msg); } channelFuture.addListener(FIRE_EXCEPTION_ON_FAILURE); + } else { + // msg not passed on, release. + if (msg instanceof ByteBufHolder) { + ((ByteBufHolder) msg).release(); + } } } diff --git a/broker/src/main/java/io/moquette/broker/PostOffice.java b/broker/src/main/java/io/moquette/broker/PostOffice.java index ecc378a35..1684b808b 100644 --- a/broker/src/main/java/io/moquette/broker/PostOffice.java +++ b/broker/src/main/java/io/moquette/broker/PostOffice.java @@ -156,15 +156,14 @@ public void unsubscribe(List topics, MQTTConnection mqttConnection, int mqttConnection.sendUnsubAckMessage(topics, clientID, messageId); } - void receivedPublishQos0(Topic topic, String username, String clientID, ByteBuf payload, boolean retain, - MqttPublishMessage msg) { + void receivedPublishQos0(Topic topic, String username, String clientID, MqttPublishMessage msg) { if (!authorizator.canWrite(topic, username, clientID)) { LOG.error("client is not authorized to publish on topic: {}", topic); return; } - publish2Subscribers(payload, topic, AT_MOST_ONCE); + publish2Subscribers(msg.payload(), topic, AT_MOST_ONCE); - if (retain) { + if (msg.fixedHeader().isRetain()) { // QoS == 0 && retain => clean old retained retainedRepository.cleanRetained(topic); } @@ -172,8 +171,8 @@ void receivedPublishQos0(Topic topic, String username, String clientID, ByteBuf interceptor.notifyTopicPublished(msg, clientID, username); } - void receivedPublishQos1(MQTTConnection connection, Topic topic, String username, ByteBuf payload, int messageID, - boolean retain, MqttPublishMessage msg) { + void receivedPublishQos1(MQTTConnection connection, Topic topic, String username, int messageID, + MqttPublishMessage msg) { // verify if topic can be write topic.getTokens(); if (!topic.isValid()) { @@ -187,11 +186,12 @@ void receivedPublishQos1(MQTTConnection connection, Topic topic, String username return; } + ByteBuf payload = msg.payload(); publish2Subscribers(payload, topic, AT_LEAST_ONCE); connection.sendPubAck(messageID); - if (retain) { + if (msg.fixedHeader().isRetain()) { if (!payload.isReadable()) { retainedRepository.cleanRetained(topic); } else { @@ -237,6 +237,8 @@ void receivedPublishQos2(MQTTConnection connection, MqttPublishMessage mqttPubli final String clientId = connection.getClientId(); if (!authorizator.canWrite(topic, username, clientId)) { LOG.error("MQTT client is not authorized to publish on topic: {}", topic); + // msg not passed on, release payload. + payload.release(); return; } @@ -254,6 +256,8 @@ void receivedPublishQos2(MQTTConnection connection, MqttPublishMessage mqttPubli String clientID = connection.getClientId(); interceptor.notifyTopicPublished(mqttPublishMessage, clientID, username); + // none of the methods above released the payload, do it now. + payload.release(); } static MqttQoS lowerQosToTheSubscriptionDesired(Subscription sub, MqttQoS qos) { @@ -284,7 +288,7 @@ public void internalPublish(MqttPublishMessage msg) { if (!msg.fixedHeader().isRetain()) { return; } - if (qos == AT_MOST_ONCE || msg.payload().readableBytes() == 0) { + if (qos == AT_MOST_ONCE || payload.readableBytes() == 0) { // QoS == 0 && retain => clean old retained retainedRepository.cleanRetained(topic); return; diff --git a/broker/src/main/java/io/moquette/broker/Server.java b/broker/src/main/java/io/moquette/broker/Server.java index 22d04be68..e5ca087f8 100644 --- a/broker/src/main/java/io/moquette/broker/Server.java +++ b/broker/src/main/java/io/moquette/broker/Server.java @@ -308,6 +308,7 @@ public void internalPublish(MqttPublishMessage msg, final String clientId) { } LOG.trace("Internal publishing message CId: {}, messageId: {}", clientId, messageID); dispatcher.internalPublish(msg); + msg.payload().release(); } public void stopServer() { diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index 9545fe020..0790539d7 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -183,7 +183,12 @@ boolean isClean() { } public void processPubRec(int packetId) { - inflightWindow.remove(packetId); + // Message discarded, make sure any buffers in it are released + SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(packetId); + if (removed != null) { + removed.release(); + } + inflightSlots.incrementAndGet(); if (canSkipQueue()) { inflightSlots.decrementAndGet(); @@ -200,7 +205,12 @@ public void processPubRec(int packetId) { } public void processPubComp(int messageID) { - inflightWindow.remove(messageID); + // Message discarded, make sure any buffers in it are released + SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(messageID); + if (removed != null) { + removed.release(); + } + inflightSlots.incrementAndGet(); drainQueueToConnection(); @@ -216,6 +226,9 @@ public void sendPublishOnSessionAtQos(Topic topic, MqttQoS qos, ByteBuf payload) case AT_MOST_ONCE: if (connected()) { mqttConnection.sendPublishNotRetainedQos0(topic, qos, payload); + } else { + // buffer not passed on, release it. + payload.release(); } break; case AT_LEAST_ONCE: @@ -226,12 +239,16 @@ public void sendPublishOnSessionAtQos(Topic topic, MqttQoS qos, ByteBuf payload) break; case FAILURE: LOG.error("Not admissible"); + // buffer not passed on, release it. + payload.release(); } } private void sendPublishQos1(Topic topic, MqttQoS qos, ByteBuf payload) { if (!connected() && isClean()) { //pushing messages to disconnected not clean session + //buffer not passed on, release it. + payload.release(); return; } @@ -240,6 +257,9 @@ private void sendPublishQos1(Topic topic, MqttQoS qos, ByteBuf payload) { int packetId = mqttConnection.nextPacketId(); inflightWindow.put(packetId, new SessionRegistry.PublishedMessage(topic, qos, payload)); inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS)); + + // second pass-on, add retain + payload.retain(); MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(topic.toString(), qos, payload, packetId); mqttConnection.sendPublish(publishMsg); @@ -257,6 +277,9 @@ private void sendPublishQos2(Topic topic, MqttQoS qos, ByteBuf payload) { int packetId = mqttConnection.nextPacketId(); inflightWindow.put(packetId, new SessionRegistry.PublishedMessage(topic, qos, payload)); inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS)); + + // second pass-on, add retain + payload.retain(); MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(topic.toString(), qos, payload, packetId); mqttConnection.sendPublish(publishMsg); @@ -283,7 +306,11 @@ private boolean inflighHasSlotsAndConnectionIsUp() { void pubAckReceived(int ackPacketId) { // TODO remain to invoke in somehow m_interceptor.notifyMessageAcknowledged - inflightWindow.remove(ackPacketId); + SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(ackPacketId); + if (removed != null) { + removed.release(); + } + inflightSlots.incrementAndGet(); drainQueueToConnection(); } @@ -343,6 +370,8 @@ private void drainQueueToConnection() { mqttConnection.sendIfWritableElseDrop(pubRel); } else { final SessionRegistry.PublishedMessage msgPub = (SessionRegistry.PublishedMessage) msg; + // Second pass-on. + msgPub.payload.retain(); MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(msgPub.topic.toString(), msgPub.publishingQos, msgPub.payload, sendPacketId); diff --git a/broker/src/main/java/io/moquette/broker/SessionRegistry.java b/broker/src/main/java/io/moquette/broker/SessionRegistry.java index 903f9af35..4cae524ce 100644 --- a/broker/src/main/java/io/moquette/broker/SessionRegistry.java +++ b/broker/src/main/java/io/moquette/broker/SessionRegistry.java @@ -28,7 +28,6 @@ import java.net.InetSocketAddress; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Queue; @@ -39,6 +38,7 @@ public class SessionRegistry { public abstract static class EnqueuedMessage { + public void release() {} } static class PublishedMessage extends EnqueuedMessage { @@ -52,6 +52,15 @@ static class PublishedMessage extends EnqueuedMessage { this.publishingQos = publishingQos; this.payload = payload; } + + /** + * Releases the payload. Must be called when the PublishedMessage is no + * longer needed. + */ + @Override + public void release() { + payload.release(); + } } static final class PubRelMarker extends EnqueuedMessage { diff --git a/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java b/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java index 3d2f88610..f6721c65f 100644 --- a/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java +++ b/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java @@ -20,6 +20,7 @@ import io.moquette.broker.subscriptions.ISubscriptionsDirectory; import io.moquette.broker.security.IAuthenticator; import io.moquette.persistence.MemorySubscriptionsRepository; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.embedded.EmbeddedChannel; @@ -81,16 +82,18 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel @Test public void dropConnectionOnPublishWithInvalidTopicFormat() { // Connect message with clean session set to true and client id is null. + final ByteBuf payload = Unpooled.copiedBuffer("Hello MQTT world!".getBytes(UTF_8)); MqttPublishMessage publish = MqttMessageBuilders.publish() .topicName("") .retained(false) .qos(MqttQoS.AT_MOST_ONCE) - .payload(Unpooled.copiedBuffer("Hello MQTT world!".getBytes(UTF_8))).build(); + .payload(payload).build(); sut.processPublish(publish); // Verify assertFalse(channel.isOpen(), "Connection should be closed by the broker"); + payload.release(); } } diff --git a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java index 3234d3724..33170cd63 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java @@ -110,7 +110,7 @@ public void testPublishQoS0ToItself() { // Exercise final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, FAKE_CLIENT_ID, payload, false, + sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, FAKE_CLIENT_ID, MqttMessageBuilders.publish() .payload(payload.retainedDuplicate()) .qos(MqttQoS.AT_MOST_ONCE) @@ -211,7 +211,7 @@ public void testPublishToMultipleSubscribers() { // Exercise final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, FAKE_CLIENT_ID, payload, false, + sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, FAKE_CLIENT_ID, MqttMessageBuilders.publish() .payload(payload.retainedDuplicate()) .qos(MqttQoS.AT_MOST_ONCE) @@ -228,19 +228,24 @@ public void testPublishWithEmptyPayloadClearRetainedStore() { connection.processConnect(connectMessage); ConnectionTestUtils.assertConnectAccepted(channel); + final ByteBuf payload1 = ByteBufUtil.writeAscii(UnpooledByteBufAllocator.DEFAULT, "Hello world!"); this.retainedRepository.retain(new Topic(NEWS_TOPIC), MqttMessageBuilders.publish() - .payload(ByteBufUtil.writeAscii(UnpooledByteBufAllocator.DEFAULT, "Hello world!")) + .payload(payload1) .qos(AT_LEAST_ONCE) .build()); + // Retaining a msg does not release the payload. + payload1.release(); // Exercise final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, FAKE_CLIENT_ID, anyPayload, true, + sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, FAKE_CLIENT_ID, MqttMessageBuilders.publish() .payload(anyPayload) .qos(MqttQoS.AT_MOST_ONCE) - .retained(false) + .retained(true) .topicName(NEWS_TOPIC).build()); + // receivedPublishQos0 does not release payload. + anyPayload.release(); // Verify assertTrue(retainedRepository.isEmpty(), "QoS0 MUST clean retained message for topic"); @@ -254,7 +259,7 @@ public void testPublishWithQoS1() { // Exercise final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset()); - sut.receivedPublishQos1(connection, new Topic(NEWS_TOPIC), TEST_USER, anyPayload, 1, true, + sut.receivedPublishQos1(connection, new Topic(NEWS_TOPIC), TEST_USER, 1, MqttMessageBuilders.publish() .payload(Unpooled.copiedBuffer("Any payload", Charset.defaultCharset())) .qos(MqttQoS.AT_LEAST_ONCE) @@ -298,7 +303,7 @@ public void forwardQoS1PublishesWhenNotCleanSessionReconnects() { ConnectionTestUtils.assertConnectAccepted(pubChannel); final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset()); - sut.receivedPublishQos1(pubConn, new Topic(NEWS_TOPIC), TEST_USER, anyPayload, 1, true, + sut.receivedPublishQos1(pubConn, new Topic(NEWS_TOPIC), TEST_USER, 1, MqttMessageBuilders.publish() .payload(anyPayload.retainedDuplicate()) .qos(MqttQoS.AT_LEAST_ONCE) @@ -336,10 +341,11 @@ public void checkReceivePublishedMessage_after_a_reconnect_with_notCleanSession( ConnectionTestUtils.assertConnectAccepted(pubChannel); final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset()); - sut.receivedPublishQos1(pubConn, new Topic(NEWS_TOPIC), TEST_USER, anyPayload, 1, true, + sut.receivedPublishQos1(pubConn, new Topic(NEWS_TOPIC), TEST_USER, 1, MqttMessageBuilders.publish() .payload(anyPayload.retainedDuplicate()) .qos(MqttQoS.AT_LEAST_ONCE) + .retained(true) .topicName(NEWS_TOPIC).build()); // Verify that after a reconnection the client receive the message @@ -361,7 +367,7 @@ public void noPublishToInactiveSession() { ConnectionTestUtils.assertConnectAccepted(pubChannel); final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset()); - sut.receivedPublishQos1(pubConn, new Topic(NEWS_TOPIC), TEST_USER, anyPayload, 1, true, + sut.receivedPublishQos1(pubConn, new Topic(NEWS_TOPIC), TEST_USER, 1, MqttMessageBuilders.publish() .payload(anyPayload) .qos(MqttQoS.AT_LEAST_ONCE) @@ -389,7 +395,7 @@ public void cleanRetainedMessageStoreWhenPublishWithRetainedQos0IsReceived() { .retained(true) .topicName(NEWS_TOPIC) .build(); - sut.receivedPublishQos1(connection, new Topic(NEWS_TOPIC), TEST_USER, anyPayload, 1, true, + sut.receivedPublishQos1(connection, new Topic(NEWS_TOPIC), TEST_USER, 1, publishMsg); assertMessageIsRetained(NEWS_TOPIC, anyPayload); @@ -397,11 +403,11 @@ public void cleanRetainedMessageStoreWhenPublishWithRetainedQos0IsReceived() { // publish a QoS0 retained message // Exercise final ByteBuf qos0Payload = Unpooled.copiedBuffer("QoS0 payload", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, connection.getClientId(), qos0Payload, true, + sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, connection.getClientId(), MqttMessageBuilders.publish() .payload(qos0Payload) .qos(MqttQoS.AT_MOST_ONCE) - .retained(false) + .retained(true) .topicName(NEWS_TOPIC).build()); // Verify diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java index 9b72b3686..4bc3abb13 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java @@ -240,7 +240,7 @@ public void testCleanSession_maintainClientSubscriptions() { assertEquals(1, subscriptions.size(), "After a reconnect, subscription MUST be still present"); final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload, false, + sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload.retainedDuplicate()) .qos(MqttQoS.AT_MOST_ONCE) @@ -285,7 +285,7 @@ public void testCleanSession_correctlyClientSubscriptions() { // publish on /news final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload, false, + sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload) .qos(MqttQoS.AT_MOST_ONCE) @@ -305,8 +305,9 @@ public void testReceiveRetainedPublishRespectingSubscriptionQoSAndNotPublisher() final MqttPublishMessage retainedPubQoS1Msg = MqttMessageBuilders.publish() .payload(payload.retainedDuplicate()) .qos(MqttQoS.AT_LEAST_ONCE) + .retained(true) .topicName(NEWS_TOPIC).build(); - sut.receivedPublishQos1(connection, new Topic(NEWS_TOPIC), TEST_USER, payload, 1, true, + sut.receivedPublishQos1(connection, new Topic(NEWS_TOPIC), TEST_USER, 1, retainedPubQoS1Msg); // subscriber connects subscribe to topic /news and receive the last retained message diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java index 1bf3a0a35..e988bace4 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java @@ -143,7 +143,7 @@ public void testDontNotifyClientSubscribedToTopicAfterDisconnectedAndReconnectOn // publish on /news final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload, false, + sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload.retainedDuplicate()) .qos(MqttQoS.AT_MOST_ONCE) @@ -156,9 +156,9 @@ public void testDontNotifyClientSubscribedToTopicAfterDisconnectedAndReconnectOn // publish on /news final ByteBuf payload2 = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload2, false, + sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, MqttMessageBuilders.publish() - .payload(payload) + .payload(payload2) .qos(MqttQoS.AT_MOST_ONCE) .retained(false) .topicName(NEWS_TOPIC).build()); @@ -181,7 +181,7 @@ public void testDontNotifyClientSubscribedToTopicAfterDisconnectedAndReconnectOn subscribe(connection, NEWS_TOPIC, AT_MOST_ONCE); // publish on /news final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload, false, + sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload.retainedDuplicate()) .qos(MqttQoS.AT_MOST_ONCE) @@ -201,7 +201,7 @@ public void testDontNotifyClientSubscribedToTopicAfterDisconnectedAndReconnectOn // publish on /news final ByteBuf payload2 = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload2, false, + sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload2) .qos(MqttQoS.AT_MOST_ONCE) @@ -248,9 +248,9 @@ private void connectPublishDisconnectFromAnotherClient(String firstPayload, Stri // publish from another channel final ByteBuf anyPayload = Unpooled.copiedBuffer(firstPayload, Charset.defaultCharset()); - sut.receivedPublishQos1(anotherConnection, new Topic(topic), TEST_USER, anyPayload, 1, false, + sut.receivedPublishQos1(anotherConnection, new Topic(topic), TEST_USER, 1, MqttMessageBuilders.publish() - .payload(Unpooled.copiedBuffer(firstPayload, Charset.defaultCharset())) + .payload(anyPayload) .qos(MqttQoS.AT_LEAST_ONCE) .retained(false) .topicName(topic).build()); @@ -281,7 +281,7 @@ public void testConnectSubPub_cycle_getTimeout_on_second_disconnect_issue142() { subscribe(connection, NEWS_TOPIC, AT_MOST_ONCE); // publish on /news final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload, false, + sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload.retainedDuplicate()) .qos(MqttQoS.AT_MOST_ONCE) @@ -301,7 +301,7 @@ public void testConnectSubPub_cycle_getTimeout_on_second_disconnect_issue142() { subscribe(subscriberConnection, NEWS_TOPIC, AT_MOST_ONCE); // publish on /news final ByteBuf payload2 = Unpooled.copiedBuffer("Hello world2!", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, payload2, false, + sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload2.retainedDuplicate()) .qos(MqttQoS.AT_MOST_ONCE) @@ -338,9 +338,9 @@ public void checkReplayofStoredPublishResumeAfter_a_disconnect_cleanSessionFalse private void publishQos1(MQTTConnection publisher, String topic, String payload, int messageID) { final ByteBuf bytePayload = Unpooled.copiedBuffer(payload, Charset.defaultCharset()); - sut.receivedPublishQos1(publisher, new Topic(topic), TEST_USER, bytePayload, messageID, false, + sut.receivedPublishQos1(publisher, new Topic(topic), TEST_USER, messageID, MqttMessageBuilders.publish() - .payload(Unpooled.copiedBuffer(payload, Charset.defaultCharset())) + .payload(bytePayload) .qos(MqttQoS.AT_LEAST_ONCE) .retained(false) .topicName(NEWS_TOPIC).build()); diff --git a/broker/src/test/java/io/moquette/broker/SessionTest.java b/broker/src/test/java/io/moquette/broker/SessionTest.java index fcd6c1495..fd9723273 100644 --- a/broker/src/test/java/io/moquette/broker/SessionTest.java +++ b/broker/src/test/java/io/moquette/broker/SessionTest.java @@ -39,6 +39,13 @@ public void testPubAckDrainMessagesRemainingInQueue() { // Verify assertTrue(queuedMessages.isEmpty(), "Messages should be drained"); + + // release the rest, to avoid leaking buffers + for (int i = 2; i <= 11; i++) { + client.pubAckReceived(i); + } + client.closeImmediately(); + testChannel.close(); } private void sendQoS1To(Session client, Topic destinationTopic, String message) { diff --git a/broker/src/test/java/io/moquette/integration/ServerIntegrationPahoCanPublishOnReadBlockedTopicTest.java b/broker/src/test/java/io/moquette/integration/ServerIntegrationPahoCanPublishOnReadBlockedTopicTest.java index bc9568640..767276773 100644 --- a/broker/src/test/java/io/moquette/integration/ServerIntegrationPahoCanPublishOnReadBlockedTopicTest.java +++ b/broker/src/test/java/io/moquette/integration/ServerIntegrationPahoCanPublishOnReadBlockedTopicTest.java @@ -146,6 +146,8 @@ public void shouldNotInternalPublishOnReadBlockedSubscriptionTopic() throws Exce .payload(Unpooled.copiedBuffer("Hello World!!".getBytes(UTF_8))) .build(); + // We will be sending the same message again, retain the payload. + message.payload().retain(); m_server.internalPublish(message, "INTRLPUB"); Awaitility.await().until(m_messagesCollector::isMessageReceived); diff --git a/broker/src/test/java/io/moquette/integration/ServerIntegrationQoSValidationTest.java b/broker/src/test/java/io/moquette/integration/ServerIntegrationQoSValidationTest.java index f63ef12eb..4000c598f 100644 --- a/broker/src/test/java/io/moquette/integration/ServerIntegrationQoSValidationTest.java +++ b/broker/src/test/java/io/moquette/integration/ServerIntegrationQoSValidationTest.java @@ -42,7 +42,7 @@ public class ServerIntegrationQoSValidationTest { - private static final Logger LOG = LoggerFactory.getLogger(ServerIntegrationPahoTest.class); + private static final Logger LOG = LoggerFactory.getLogger(ServerIntegrationQoSValidationTest.class); Server m_server;