Skip to content

Commit

Permalink
Fixes moquette-io#583: inflightSlots counting error
Browse files Browse the repository at this point in the history
When processing an ACK, the inflightSlots should only be increased if it
is the first time the ACK is received for a given message. The ACK may
be received multiple times, the subsequent ACKs should not cause the
count to be increased.
  • Loading branch information
hylkevds committed May 9, 2021
1 parent f0904f7 commit dd89c6b
Showing 1 changed file with 3 additions and 4 deletions.
7 changes: 3 additions & 4 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,9 @@ public void processPubRec(int packetId) {
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(packetId);
if (removed != null) {
removed.release();
inflightSlots.incrementAndGet();
}

inflightSlots.incrementAndGet();
if (canSkipQueue()) {
inflightSlots.decrementAndGet();
int pubRelPacketId = packetId/*mqttConnection.nextPacketId()*/;
Expand All @@ -209,10 +209,9 @@ public void processPubComp(int messageID) {
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(messageID);
if (removed != null) {
removed.release();
inflightSlots.incrementAndGet();
}

inflightSlots.incrementAndGet();

drainQueueToConnection();

// TODO notify the interceptor
Expand Down Expand Up @@ -309,9 +308,9 @@ void pubAckReceived(int ackPacketId) {
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(ackPacketId);
if (removed != null) {
removed.release();
inflightSlots.incrementAndGet();
}

inflightSlots.incrementAndGet();
drainQueueToConnection();
}

Expand Down

0 comments on commit dd89c6b

Please sign in to comment.