Skip to content

Commit

Permalink
Enable ServerLowLevel tests not running and add PUB QoS1 test (#580)
Browse files Browse the repository at this point in the history
- Add unit test to check a second publish is sent if the not PUBACK is received in `FLIGHT_BEFORE_RESEND_MS` interval.
- Fixed ServerLowLevel tests not running.

* Added a test for #573
  • Loading branch information
hylkevds authored May 23, 2021
1 parent f6f3f6e commit 4e29483
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 4 deletions.
3 changes: 3 additions & 0 deletions broker/src/main/java/io/moquette/BrokerConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ public final class BrokerConstants {

public static final String STORAGE_CLASS_NAME = "storage_class";

public static final int FLIGHT_BEFORE_RESEND_MS = 5_000;
public static final int INFLIGHT_WINDOW_SIZE = 10;

private BrokerConstants() {
}
}
4 changes: 2 additions & 2 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.moquette.broker;

import static io.moquette.BrokerConstants.FLIGHT_BEFORE_RESEND_MS;
import static io.moquette.BrokerConstants.INFLIGHT_WINDOW_SIZE;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
Expand All @@ -34,8 +36,6 @@
class Session {

private static final Logger LOG = LoggerFactory.getLogger(Session.class);
private static final int FLIGHT_BEFORE_RESEND_MS = 5_000;
private static final int INFLIGHT_WINDOW_SIZE = 10;

static class InFlightPacket implements Delayed {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.moquette.integration;

import static io.moquette.BrokerConstants.FLIGHT_BEFORE_RESEND_MS;
import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
Expand All @@ -38,11 +39,12 @@
import java.util.concurrent.TimeUnit;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.jupiter.api.Assertions.*;

public class ServerLowlevelMessagesIntegrationTests {
public class ServerLowlevelMessagesIntegrationTest {

private static final Logger LOG = LoggerFactory.getLogger(ServerLowlevelMessagesIntegrationTests.class);
private static final Logger LOG = LoggerFactory.getLogger(ServerLowlevelMessagesIntegrationTest.class);
static MqttClientPersistence s_dataStore;
Server m_server;
Client m_client;
Expand Down Expand Up @@ -171,4 +173,43 @@ public void testWillMessageIsPublishedOnClientBadDisconnection() throws Interrup
m_willSubscriber.disconnect();
}

@Test
public void testResendNotAckedPublishes() throws MqttException, InterruptedException {
LOG.info("*** testResendNotAckedPublishes ***");
String topic = "/test";

MqttClient subscriber = new MqttClient("tcp://localhost:1883", "Subscriber");
MqttClient publisher = new MqttClient("tcp://localhost:1883", "Publisher");

try {
subscriber.connect();
publisher.connect();

AtomicBoolean isFirst = new AtomicBoolean(true);
AtomicBoolean receivedPublish = new AtomicBoolean(false);
subscriber.subscribe(topic, 1, (String topic1, org.eclipse.paho.client.mqttv3.MqttMessage message) -> {
if (isFirst.getAndSet(false)) {
// wait to trigger resending PUBLISH
TimeUnit.SECONDS.sleep(FLIGHT_BEFORE_RESEND_MS * 2);
} else {
receivedPublish.set(true);
}
});

publisher.publish(topic, "hello".getBytes(), 1, false);
Awaitility.await("Waiting for resend.")
.atMost(FLIGHT_BEFORE_RESEND_MS * 3, TimeUnit.MILLISECONDS)
.pollDelay(FLIGHT_BEFORE_RESEND_MS * 2, TimeUnit.MILLISECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.untilTrue(receivedPublish);
} finally {
try {
if (subscriber.isConnected()) {
subscriber.disconnect();
}
} finally {
publisher.disconnect();
}
}
}
}

0 comments on commit 4e29483

Please sign in to comment.