Skip to content

Commit

Permalink
Added a test for moquette-io#573
Browse files Browse the repository at this point in the history
  • Loading branch information
hylkevds committed May 22, 2021
1 parent 674183d commit cbc31c5
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 2 deletions.
3 changes: 3 additions & 0 deletions broker/src/main/java/io/moquette/BrokerConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ public final class BrokerConstants {

public static final String STORAGE_CLASS_NAME = "storage_class";

public static final int FLIGHT_BEFORE_RESEND_MS = 5_000;
public static final int INFLIGHT_WINDOW_SIZE = 10;

private BrokerConstants() {
}
}
2 changes: 0 additions & 2 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
class Session {

private static final Logger LOG = LoggerFactory.getLogger(Session.class);
private static final int FLIGHT_BEFORE_RESEND_MS = 5_000;
private static final int INFLIGHT_WINDOW_SIZE = 10;

static class InFlightPacket implements Delayed {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.moquette.integration;

import static io.moquette.BrokerConstants.FLIGHT_BEFORE_RESEND_MS;
import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
Expand All @@ -38,6 +39,7 @@
import java.util.concurrent.TimeUnit;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.jupiter.api.Assertions.*;

public class ServerLowlevelMessagesIntegrationTest {
Expand Down Expand Up @@ -171,4 +173,43 @@ public void testWillMessageIsPublishedOnClientBadDisconnection() throws Interrup
m_willSubscriber.disconnect();
}

@Test
public void testResendNotAckedPublishes() throws MqttException, InterruptedException {
LOG.info("*** testResendNotAckedPublishes ***");
String topic = "/test";

MqttClient subscriber = new MqttClient("tcp://localhost:1883", "Subscriber");
MqttClient publisher = new MqttClient("tcp://localhost:1883", "Publisher");

try {
subscriber.connect();
publisher.connect();

AtomicBoolean isFirst = new AtomicBoolean(true);
AtomicBoolean receivedPublish = new AtomicBoolean(false);
subscriber.subscribe(topic, 1, (String topic1, org.eclipse.paho.client.mqttv3.MqttMessage message) -> {
if (isFirst.getAndSet(false)) {
// wait to trigger resending PUBLISH
TimeUnit.SECONDS.sleep(FLIGHT_BEFORE_RESEND_MS * 2);
} else {
receivedPublish.set(true);
}
});

publisher.publish(topic, "hello".getBytes(), 1, false);
Awaitility.await("Waiting for resend.")
.atMost(FLIGHT_BEFORE_RESEND_MS * 3, TimeUnit.MILLISECONDS)
.pollDelay(FLIGHT_BEFORE_RESEND_MS * 2, TimeUnit.MILLISECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.untilTrue(receivedPublish);
} finally {
try {
if (subscriber.isConnected()) {
subscriber.disconnect();
}
} finally {
publisher.disconnect();
}
}
}
}

0 comments on commit cbc31c5

Please sign in to comment.