Skip to content

Commit

Permalink
Added a test for moquette-io#573
Browse files Browse the repository at this point in the history
  • Loading branch information
hylkevds committed Feb 18, 2021
1 parent a9f5c24 commit a3780c2
Showing 1 changed file with 36 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.concurrent.TimeUnit;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.jupiter.api.Assertions.*;

public class ServerLowlevelMessagesIntegrationTest {
Expand Down Expand Up @@ -171,4 +173,38 @@ public void testWillMessageIsPublishedOnClientBadDisconnection() throws Interrup
m_willSubscriber.disconnect();
}

@Test
public void testResendNotAckedPublishes() throws MqttException, InterruptedException {
LOG.info("*** testResendNotAckedPublishes ***");
String topic = "/test";

MqttClient subscriber = new MqttClient("tcp://localhost:1883", "Subscriber");
MqttClient publisher = new MqttClient("tcp://localhost:1883", "Publisher");

try {
subscriber.connect();
publisher.connect();

AtomicBoolean isFirst = new AtomicBoolean(true);
CountDownLatch countDownLatch = new CountDownLatch(2);
subscriber.subscribe(topic, 1, (String topic1, org.eclipse.paho.client.mqttv3.MqttMessage message) -> {
if (isFirst.getAndSet(false)) {
// wait to trigger resending PUBLISH
TimeUnit.SECONDS.sleep(12);
}
countDownLatch.countDown();
});

publisher.publish(topic, "hello".getBytes(), 1, false);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}
finally {
try {
subscriber.disconnect();
}
finally {
publisher.disconnect();
}
}
}
}

0 comments on commit a3780c2

Please sign in to comment.