From 300e8bcb50f46669f69be55542fc97f0d81680cc Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Tue, 2 Feb 2021 11:15:46 +0100 Subject: [PATCH] Fixes #569: NoSuchElementException in Session.drainQueueToConnection Session.drainQueueToConnection checks sessionQueue.isEmpty(), and then uses sessionQueue.remove(). However, between the check and the remove, another thread can steal the last message, causing the remove() to throw an exception. Better to use sessionQueue.poll() with a null-check. --- broker/src/main/java/io/moquette/broker/Session.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index 9545fe020..4247d2711 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -334,7 +334,11 @@ private MqttPublishMessage publishNotRetainedDuplicated(InFlightPacket notAckPac private void drainQueueToConnection() { // consume the queue while (!sessionQueue.isEmpty() && inflighHasSlotsAndConnectionIsUp()) { - final SessionRegistry.EnqueuedMessage msg = sessionQueue.remove(); + final SessionRegistry.EnqueuedMessage msg = sessionQueue.poll(); + if (msg == null) { + // Our message was already fetched by another Thread. + return; + } inflightSlots.decrementAndGet(); int sendPacketId = mqttConnection.nextPacketId(); inflightWindow.put(sendPacketId, msg);