Skip to content

Commit

Permalink
重构Qos推送模型
Browse files Browse the repository at this point in the history
  • Loading branch information
smthing committed Dec 4, 2024
1 parent 93a07f6 commit 531db66
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
*/
public class TopicQosConsumerRecord extends TopicConsumerRecord {

protected final AtomicBoolean semaphore = new AtomicBoolean(false);
private final AtomicBoolean semaphore = new AtomicBoolean(false);


public TopicQosConsumerRecord(BrokerTopic topic, MqttSession session, TopicSubscriber topicSubscriber, long nextConsumerOffset) {
Expand All @@ -55,6 +55,7 @@ public void pushToClient() {

private void push0() {
Message message = topic.getMessageQueue().get(nextConsumerOffset);
//消息队列已消费至最新点位
if (message == null) {
if (semaphore.compareAndSet(true, false)) {
topic.addSubscriber(this);
Expand All @@ -64,6 +65,17 @@ private void push0() {
}
return;
}
int available = mqttSession.getInflightQueue().available();
//当前连接的飞行窗口已满
if (mqttSession.getInflightQueue().available() == 0) {
if (semaphore.compareAndSet(true, false)) {
topic.addSubscriber(this);
if (mqttSession.getInflightQueue().available() > 0) {
topic.push();
}
}
return;
}

MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(message.getPayload()).qos(mqttQoS).topic(message.getTopicBytes());
if (mqttSession.getMqttVersion() == MqttVersion.MQTT_5) {
Expand All @@ -72,18 +84,11 @@ private void push0() {
topic.getMessageQueue().commit(message.getOffset());
nextConsumerOffset = message.getOffset() + 1;

CompletableFuture<MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader>> future = mqttSession.getInflightQueue().offer(publishBuilder, () -> {
if (semaphore.compareAndSet(true, false)) {
topic.addSubscriber(this);
}
topic.push();
});
if (future == null) {
return;
CompletableFuture<MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader>> future = mqttSession.getInflightQueue().offer(publishBuilder);
if (available == 1) {
future.whenComplete((mqttPacketIdentifierMessage, throwable) -> push0());
} else {
push0();
}
future.whenComplete((mqttPacketIdentifierMessage, throwable) -> push0());

push0();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
* @version V1.0 , 2022/4/26
*/
public class InflightQueue {
public static final Runnable EMPTY_RUNNABLE = () -> {
};
private static final Logger LOGGER = LoggerFactory.getLogger(InflightQueue.class);
private static final int TIMEOUT = 30;
private final InflightMessage[] queue;
Expand Down Expand Up @@ -65,9 +67,11 @@ public synchronized CompletableFuture<MqttPacketIdentifierMessage<? extends Mqtt
}

public CompletableFuture<MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader>> offer(MqttMessageBuilders.MessageBuilder publishBuilder) {
return offer(publishBuilder, () -> {
return offer(publishBuilder, EMPTY_RUNNABLE);
}

});
public int available() {
return queue.length - count;
}

public synchronized CompletableFuture<MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader>> offer(MqttMessageBuilders.MessageBuilder publishBuilder, Runnable runnable) {
Expand Down

0 comments on commit 531db66

Please sign in to comment.