Skip to content

Commit

Permalink
Fix handling of PubRelMarker, missing inflightTimeouts.add()
Browse files Browse the repository at this point in the history
PubRelMarkers should not be added to inflightWindow, and should not
cause the inflightSlots being decremented.
Also makes sure that non-pubRelMarker messages get added to the
inflightTimeouts.
  • Loading branch information
hylkevds committed Feb 25, 2021
1 parent 8ca9f73 commit 830bbc9
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -360,14 +360,19 @@ private MqttPublishMessage publishNotRetainedDuplicated(InFlightPacket notAckPac
private void drainQueueToConnection() {
// consume the queue
while (!sessionQueue.isEmpty() && inflighHasSlotsAndConnectionIsUp()) {
final SessionRegistry.EnqueuedMessage msg = sessionQueue.remove();
inflightSlots.decrementAndGet();
final SessionRegistry.EnqueuedMessage msg = sessionQueue.poll();
if (msg == null) {
// Our message was already fetched by another Thread.
return;
}
int sendPacketId = mqttConnection.nextPacketId();
inflightWindow.put(sendPacketId, msg);
if (msg instanceof SessionRegistry.PubRelMarker) {
MqttMessage pubRel = MQTTConnection.pubrel(sendPacketId);
mqttConnection.sendIfWritableElseDrop(pubRel);
} else {
inflightSlots.decrementAndGet();
inflightWindow.put(sendPacketId, msg);
inflightTimeouts.add(new InFlightPacket(sendPacketId, FLIGHT_BEFORE_RESEND_MS));
final SessionRegistry.PublishedMessage msgPub = (SessionRegistry.PublishedMessage) msg;
// Second pass-on.
msgPub.payload.retain();
Expand Down

0 comments on commit 830bbc9

Please sign in to comment.