diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index 0af045960..c8887fdbc 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -187,9 +187,9 @@ public void processPubRec(int packetId) { SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(packetId); if (removed != null) { removed.release(); + inflightSlots.incrementAndGet(); } - inflightSlots.incrementAndGet(); if (canSkipQueue()) { inflightSlots.decrementAndGet(); int pubRelPacketId = packetId/*mqttConnection.nextPacketId()*/; @@ -209,10 +209,9 @@ public void processPubComp(int messageID) { SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(messageID); if (removed != null) { removed.release(); + inflightSlots.incrementAndGet(); } - inflightSlots.incrementAndGet(); - drainQueueToConnection(); // TODO notify the interceptor @@ -309,9 +308,9 @@ void pubAckReceived(int ackPacketId) { SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(ackPacketId); if (removed != null) { removed.release(); + inflightSlots.incrementAndGet(); } - inflightSlots.incrementAndGet(); drainQueueToConnection(); }