diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicQosConsumerRecord.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicQosConsumerRecord.java index db950bc..91676fc 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicQosConsumerRecord.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/topic/TopicQosConsumerRecord.java @@ -32,7 +32,7 @@ */ public class TopicQosConsumerRecord extends TopicConsumerRecord { - protected final AtomicBoolean semaphore = new AtomicBoolean(false); + private final AtomicBoolean semaphore = new AtomicBoolean(false); public TopicQosConsumerRecord(BrokerTopic topic, MqttSession session, TopicSubscriber topicSubscriber, long nextConsumerOffset) { @@ -55,6 +55,7 @@ public void pushToClient() { private void push0() { Message message = topic.getMessageQueue().get(nextConsumerOffset); + //消息队列已消费至最新点位 if (message == null) { if (semaphore.compareAndSet(true, false)) { topic.addSubscriber(this); @@ -64,6 +65,17 @@ private void push0() { } return; } + int available = mqttSession.getInflightQueue().available(); + //当前连接的飞行窗口已满 + if (mqttSession.getInflightQueue().available() == 0) { + if (semaphore.compareAndSet(true, false)) { + topic.addSubscriber(this); + if (mqttSession.getInflightQueue().available() > 0) { + topic.push(); + } + } + return; + } MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(message.getPayload()).qos(mqttQoS).topic(message.getTopicBytes()); if (mqttSession.getMqttVersion() == MqttVersion.MQTT_5) { @@ -72,18 +84,11 @@ private void push0() { topic.getMessageQueue().commit(message.getOffset()); nextConsumerOffset = message.getOffset() + 1; - CompletableFuture> future = mqttSession.getInflightQueue().offer(publishBuilder, () -> { - if (semaphore.compareAndSet(true, false)) { - topic.addSubscriber(this); - } - topic.push(); - }); - if (future == null) { - return; + CompletableFuture> future = mqttSession.getInflightQueue().offer(publishBuilder); + if (available == 1) { + future.whenComplete((mqttPacketIdentifierMessage, throwable) -> push0()); + } else { + push0(); } - future.whenComplete((mqttPacketIdentifierMessage, throwable) -> push0()); - - push0(); } - } diff --git a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java index 26c3419..674691d 100644 --- a/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java +++ b/smart-mqtt-common/src/main/java/org/smartboot/mqtt/common/InflightQueue.java @@ -36,6 +36,8 @@ * @version V1.0 , 2022/4/26 */ public class InflightQueue { + public static final Runnable EMPTY_RUNNABLE = () -> { + }; private static final Logger LOGGER = LoggerFactory.getLogger(InflightQueue.class); private static final int TIMEOUT = 30; private final InflightMessage[] queue; @@ -65,9 +67,11 @@ public synchronized CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder) { - return offer(publishBuilder, () -> { + return offer(publishBuilder, EMPTY_RUNNABLE); + } - }); + public int available() { + return queue.length - count; } public synchronized CompletableFuture> offer(MqttMessageBuilders.MessageBuilder publishBuilder, Runnable runnable) {