diff --git a/base-scheduler/src/main/java/com/baidu/bifromq/basescheduler/Batcher.java b/base-scheduler/src/main/java/com/baidu/bifromq/basescheduler/Batcher.java index fa587deba..aace55d49 100644 --- a/base-scheduler/src/main/java/com/baidu/bifromq/basescheduler/Batcher.java +++ b/base-scheduler/src/main/java/com/baidu/bifromq/basescheduler/Batcher.java @@ -48,9 +48,9 @@ public abstract class Batcher { private final Gauge batchSaturationGauge; private final Timer batchCallTimer; private final Timer batchExecTimer; - private final DistributionSummary batchBuildTimeSummary; + private final Timer batchBuildTimer; + private final Timer queueingTimer; private final DistributionSummary batchSizeSummary; - private final DistributionSummary queueingTimeSummary; private final double alphaIncrease = 0.2; private final double alphaDecrease = 0.05; private final AtomicInteger emaMaxBatchSize = new AtomicInteger(); @@ -85,13 +85,13 @@ protected Batcher(BatcherKey batcherKey, batchExecTimer = Timer.builder("batcher.exec.time") .tags(tags) .register(Metrics.globalRegistry); - batchBuildTimeSummary = DistributionSummary.builder("batcher.build.time") + batchBuildTimer = Timer.builder("batcher.build.time") .tags(tags) .register(Metrics.globalRegistry); - batchSizeSummary = DistributionSummary.builder("batcher.batch.size") + queueingTimer = Timer.builder("batcher.queueing.time") .tags(tags) .register(Metrics.globalRegistry); - queueingTimeSummary = DistributionSummary.builder("batcher.queueing.time") + batchSizeSummary = DistributionSummary.builder("batcher.batch.size") .tags(tags) .register(Metrics.globalRegistry); } @@ -125,9 +125,9 @@ public void close() { Metrics.globalRegistry.remove(batchSaturationGauge); Metrics.globalRegistry.remove(batchCallTimer); Metrics.globalRegistry.remove(batchExecTimer); - Metrics.globalRegistry.remove(batchBuildTimeSummary); + Metrics.globalRegistry.remove(batchBuildTimer); Metrics.globalRegistry.remove(batchSizeSummary); - Metrics.globalRegistry.remove(queueingTimeSummary); + Metrics.globalRegistry.remove(queueingTimer); } protected abstract IBatchCall newBatch(); @@ -161,11 +161,11 @@ private void batchAndEmit() { batchCall.add(callTask); batchedTasks.add(callTask); batchSize++; - queueingTimeSummary.record(System.nanoTime() - callTask.ts); + queueingTimer.record(System.nanoTime() - callTask.ts, TimeUnit.NANOSECONDS); } batchSizeSummary.record(batchSize); long execStart = System.nanoTime(); - batchBuildTimeSummary.record((execStart - buildStart)); + batchBuildTimer.record(execStart - buildStart, TimeUnit.NANOSECONDS); final int finalBatchSize = batchSize; batchCall.execute() .whenComplete((v, e) -> { diff --git a/bifromq-dist/bifromq-dist-server/src/main/java/com/baidu/bifromq/dist/server/DistResponsePipeline.java b/bifromq-dist/bifromq-dist-server/src/main/java/com/baidu/bifromq/dist/server/DistResponsePipeline.java index 5db20f311..ba0b78a64 100644 --- a/bifromq-dist/bifromq-dist-server/src/main/java/com/baidu/bifromq/dist/server/DistResponsePipeline.java +++ b/bifromq-dist/bifromq-dist-server/src/main/java/com/baidu/bifromq/dist/server/DistResponsePipeline.java @@ -59,6 +59,7 @@ protected CompletableFuture handleRequest(String tenantId, DistReques .handle((fanOutByTopic, e) -> { DistReply.Builder replyBuilder = DistReply.newBuilder().setReqId(request.getReqId()); if (e != null) { + log.debug("Failed to dist", e); if (e instanceof BackPressureException || e.getCause() instanceof BackPressureException) { for (PublisherMessagePack publisherMsgPack : request.getMessagesList()) { DistReply.Result.Builder resultBuilder = DistReply.Result.newBuilder(); diff --git a/bifromq-metrics/src/main/java/com/baidu/bifromq/metrics/TenantMetric.java b/bifromq-metrics/src/main/java/com/baidu/bifromq/metrics/TenantMetric.java index 9ee1a9afd..6c3d2263d 100644 --- a/bifromq-metrics/src/main/java/com/baidu/bifromq/metrics/TenantMetric.java +++ b/bifromq-metrics/src/main/java/com/baidu/bifromq/metrics/TenantMetric.java @@ -50,8 +50,13 @@ public enum TenantMetric { MqttQoS2ExternalLatency("mqtt.ex.qos2.latency", Meter.Type.TIMER), MqttTransientFanOutBytes("mqtt.tfanout.bytes", Meter.Type.DISTRIBUTION_SUMMARY), MqttPersistentFanOutBytes("mqtt.pfanout.bytes", Meter.Type.DISTRIBUTION_SUMMARY), + MqttRetryQueuedBytes("mqtt.retry.queued.bytes", Meter.Type.DISTRIBUTION_SUMMARY), + MqttRetryDroppedBytes("mqtt.retry.dropped.bytes", Meter.Type.DISTRIBUTION_SUMMARY), + MqttRetryTimeoutBytes("mqtt.retry.timeout.bytes", Meter.Type.DISTRIBUTION_SUMMARY), + MqttRetryDistedBytes("mqtt.retry.disted.bytes", Meter.Type.DISTRIBUTION_SUMMARY), MqttOutOfOrderDiscardBytes("mqtt.ood.discard.bytes", Meter.Type.DISTRIBUTION_SUMMARY), - MqttOutOfOrderSendBytes("mqtt.ood.send.bytes", Meter.Type.DISTRIBUTION_SUMMARY), + MqttOutOfOrderFlushBytes("mqtt.ood.flush.bytes", Meter.Type.DISTRIBUTION_SUMMARY), + MqttOutOfOrderDrainBytes("mqtt.ood.drain.bytes", Meter.Type.DISTRIBUTION_SUMMARY), MqttReorderBytes("mqtt.reorder.bytes", Meter.Type.DISTRIBUTION_SUMMARY), MqttTopicSeqAbortCount("mqtt.topic.seq.abort.count", Meter.Type.COUNTER), MqttTopicSorterAbortCount("mqtt.topic.sorter.abort.count", Meter.Type.COUNTER), diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/MQTTSessionHandler.java b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/MQTTSessionHandler.java index 682c2f5e7..c2e6f644b 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/MQTTSessionHandler.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/MQTTSessionHandler.java @@ -29,6 +29,10 @@ import static com.baidu.bifromq.metrics.TenantMetric.MqttQoS2DistBytes; import static com.baidu.bifromq.metrics.TenantMetric.MqttQoS2ExternalLatency; import static com.baidu.bifromq.metrics.TenantMetric.MqttQoS2IngressBytes; +import static com.baidu.bifromq.metrics.TenantMetric.MqttRetryDistedBytes; +import static com.baidu.bifromq.metrics.TenantMetric.MqttRetryDroppedBytes; +import static com.baidu.bifromq.metrics.TenantMetric.MqttRetryQueuedBytes; +import static com.baidu.bifromq.metrics.TenantMetric.MqttRetryTimeoutBytes; import static com.baidu.bifromq.metrics.TenantMetric.MqttTransientSubLatency; import static com.baidu.bifromq.mqtt.handler.IMQTTProtocolHelper.SubResult.EXCEED_LIMIT; import static com.baidu.bifromq.mqtt.handler.MQTTSessionIdUtil.packetId; @@ -38,6 +42,7 @@ import static com.baidu.bifromq.mqtt.utils.AuthUtil.buildSubAction; import static com.baidu.bifromq.mqtt.utils.AuthUtil.buildUnsubAction; import static com.baidu.bifromq.plugin.eventcollector.ThreadLocalEventPool.getLocal; +import static com.baidu.bifromq.sysprops.BifroMQSysProp.DIST_RETRY_ON_ERROR; import static com.baidu.bifromq.type.MQTTClientInfoConstants.MQTT_PROTOCOL_VER_5_VALUE; import static com.baidu.bifromq.type.MQTTClientInfoConstants.MQTT_PROTOCOL_VER_KEY; import static com.baidu.bifromq.type.QoS.AT_LEAST_ONCE; @@ -55,7 +60,6 @@ import static com.bifromq.plugin.resourcethrottler.TenantResourceType.TotalSharedSubscriptions; import static java.util.concurrent.CompletableFuture.allOf; -import com.baidu.bifromq.basehlc.HLC; import com.baidu.bifromq.baserpc.utils.FutureTracker; import com.baidu.bifromq.dist.client.DistResult; import com.baidu.bifromq.inbox.storage.proto.LWT; @@ -127,10 +131,12 @@ import io.netty.handler.codec.mqtt.MqttTopicSubscription; import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; import java.time.Duration; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.SortedMap; @@ -211,6 +217,9 @@ void setAcked() { } } + private record RetryMessage(long reqId, String topic, Message message, CompletableFuture onDone) { + } + protected final TenantSettings settings; protected final String userSessionId; protected final int keepAliveTimeSeconds; @@ -218,6 +227,7 @@ void setAcked() { protected final AtomicLong memUsage; protected final ITenantMeter tenantMeter; private final long idleTimeoutNanos; + private final long retryTimeoutMillis; private final MPSThrottler throttler; private final Set> fgTasks = new HashSet<>(); private final FutureTracker bgTasks = new FutureTracker(); @@ -240,7 +250,9 @@ void setAcked() { private final CompletableFuture onInitialized = new CompletableFuture<>(); private final TopicMessageIdGenerator msgIdGenerator; private final TopicMessageOrderingSender orderingSender; + private final LinkedHashSet retryQueue = new LinkedHashSet<>(); private ScheduledFuture resendTask; + private ScheduledFuture retryTask; private int receivingCount = 0; protected MQTTSessionHandler(TenantSettings settings, @@ -267,6 +279,7 @@ protected MQTTSessionHandler(TenantSettings settings, authProvider = sessionCtx.authProvider(ctx); eventCollector = sessionCtx.eventCollector; resourceThrottler = sessionCtx.resourceThrottler; + retryTimeoutMillis = BifroMQSysProp.SYNC_WINDOW_INTERVAL_MILLIS.get(); msgIdGenerator = new TopicMessageIdGenerator(Duration.ofMillis(BifroMQSysProp.SYNC_WINDOW_INTERVAL_MILLIS.get()), settings.maxActiveTopicsPerPublisher, tenantMeter); @@ -452,7 +465,7 @@ private void handlePubMsg(MqttPublishMessage mqttMessage) { return; } String topic = helper().getTopic(mqttMessage); - long nowMillis = HLC.INST.getPhysical(); + long nowMillis = sessionCtx.nowMillis(); long msgId = msgIdGenerator.nextMessageId(topic, nowMillis); int ingressMsgBytes = mqttMessage.fixedHeader().remainingLength() + 1; (switch (mqttMessage.fixedHeader().qosLevel()) { @@ -692,7 +705,7 @@ private void handlePubAckMsg(MqttPubAckMessage mqttMessage) { int packetId = mqttMessage.variableHeader().messageId(); if (isConfirming(packetId)) { SubMessage confirmed = confirm(packetId); - tenantMeter.recordSummary(MqttQoS1DeliverBytes, confirmed.message().getPayload().size()); + tenantMeter.recordSummary(MqttQoS1DeliverBytes, confirmed.estBytes()); } else { log.trace("No packetId to confirm released: sessionId={}, packetId={}", userSessionId(clientInfo), packetId); @@ -741,7 +754,7 @@ private void handlePubCompMsg(MqttMessage message) { .size(confirmed.message().getPayload().size()) .clientInfo(clientInfo)); } - tenantMeter.recordSummary(MqttQoS2DeliverBytes, confirmed.message.getPayload().size()); + tenantMeter.recordSummary(MqttQoS2DeliverBytes, confirmed.estBytes()); } else { log.trace("No packetId to confirm released: sessionId={}, packetId={}", userSessionId(clientInfo), packetId); @@ -1274,22 +1287,26 @@ private CompletableFuture doPub(long reqId, } } default -> { - // TODO: support limit retry - msgIdGenerator.markDrain(topic, message.getMessageId()); + if (v.distResult() == DistResult.BACK_PRESSURE_REJECTED) { + msgIdGenerator.markDrain(topic, message.getMessageId()); + } switch (message.getPubQoS()) { case AT_MOST_ONCE -> eventCollector.report(getLocal(QoS0DistError.class) .reqId(reqId) + .reason(v.distResult().name()) .topic(topic) .size(ingressMsgSize) .clientInfo(clientInfo)); case AT_LEAST_ONCE -> eventCollector.report(getLocal(QoS1DistError.class) .reqId(reqId) + .reason(v.distResult().name()) .topic(topic) .isDup(isDup) .size(ingressMsgSize) .clientInfo(clientInfo)); case EXACTLY_ONCE -> eventCollector.report(getLocal(QoS2DistError.class) .reqId(reqId) + .reason(v.distResult().name()) .topic(topic) .isDup(isDup) .size(ingressMsgSize) @@ -1302,7 +1319,7 @@ private CompletableFuture doPub(long reqId, } private CompletableFuture doPubLastWill(LWT willMessage) { - long reqId = HLC.INST.getPhysical(); + long reqId = sessionCtx.nowMillis(); Message message = willMessage.getMessage().toBuilder() .setMessageId(0) .setTimestamp(reqId) @@ -1355,8 +1372,7 @@ private CompletableFuture doPub(long reqId, reqId, topic, message.getPubQoS(), message.getPayload().size()); } - CompletableFuture distTask = - trackTask(sessionCtx.distClient.pub(reqId, topic, message, clientInfo), background); + CompletableFuture distTask = trackTask(distMessage(reqId, topic, message), background); if (!message.getIsRetain()) { return distTask .thenApplyAsync(v -> new IMQTTProtocolHelper.PubResult(v, RetainReply.Result.RETAINED), ctx.executor()); @@ -1368,6 +1384,88 @@ private CompletableFuture doPub(long reqId, } } + private CompletableFuture distMessage(long reqId, String topic, Message message) { + CompletableFuture onDone = new CompletableFuture<>(); + sessionCtx.distClient.pub(reqId, topic, message, clientInfo) + .thenAccept(distResult -> { + if ((boolean) DIST_RETRY_ON_ERROR.get() && distResult == DistResult.ERROR) { + // queue for retry + queueForRetry(reqId, topic, message, onDone); + } else { + onDone.complete(distResult); + } + }); + return onDone; + } + + private void queueForRetry(long reqId, String topic, Message message, CompletableFuture onDone) { + if (ctx.executor().inEventLoop()) { + retryQueue.add(new RetryMessage(reqId, topic, message, onDone)); + tenantMeter.recordSummary(MqttRetryQueuedBytes, topic.length() + message.getPayload().size()); + if (retryTask == null || retryTask.isDone()) { + retryTask = ctx.executor().schedule(this::retryDist, 100, TimeUnit.MILLISECONDS); + } + + } else { + ctx.executor().execute(() -> queueForRetry(reqId, topic, message, onDone)); + } + } + + private void retryDist() { + Iterator itr = retryQueue.iterator(); + long nowMillis = sessionCtx.nowMillis(); + List> retryTasks = new ArrayList<>(retryQueue.size()); + while (itr.hasNext()) { + RetryMessage retryMessage = itr.next(); + String topic = retryMessage.topic(); + Message message = retryMessage.message(); + long messageId = retryMessage.message().getMessageId(); + long drainMessageId = msgIdGenerator.drainMessageId(topic); + if (messageId < drainMessageId || nowMillis - message.getTimestamp() > retryTimeoutMillis) { + // stop retry when messageId has been drained or timeout + itr.remove(); + retryMessage.onDone().complete(DistResult.ERROR); + if (messageId < drainMessageId) { + tenantMeter.recordSummary(MqttRetryDroppedBytes, topic.length() + message.getPayload().size()); + } + if (nowMillis - message.getTimestamp() > retryTimeoutMillis) { + tenantMeter.recordSummary(MqttRetryTimeoutBytes, topic.length() + message.getPayload().size()); + } + } else { + // dist again + retryTasks.add(sessionCtx.distClient.pub(retryMessage.reqId(), topic, message, clientInfo) + .thenAcceptAsync(distResult -> { + // if not transient error, remove from retry queue and finish the task + switch (distResult) { + case OK, NO_MATCH -> { + tenantMeter.recordSummary(MqttRetryDistedBytes, + topic.length() + message.getPayload().size()); + retryQueue.remove(retryMessage); + retryMessage.onDone().complete(distResult); + } + case BACK_PRESSURE_REJECTED -> { + retryQueue.remove(retryMessage); + retryMessage.onDone().complete(distResult); + } + } + }, ctx.executor())); + } + } + if (!retryTasks.isEmpty()) { + CompletableFuture.allOf(retryTasks.toArray(CompletableFuture[]::new)) + .thenAcceptAsync(v -> { + if (!retryQueue.isEmpty()) { + retryTask = ctx.executor().schedule(this::retryDist, 100, TimeUnit.MILLISECONDS); + } else { + retryTask = null; + } + }, ctx.executor()); + } else { + // no async retry tasks + retryTask = null; + } + } + private CompletableFuture retainMessage(long reqId, String topic, Message message, boolean isLWT) { if (!settings.retainEnabled) { diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/TopicMessageOrderingSender.java b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/TopicMessageOrderingSender.java index 0e3fde99e..29be1dc99 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/TopicMessageOrderingSender.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/TopicMessageOrderingSender.java @@ -13,10 +13,11 @@ package com.baidu.bifromq.mqtt.handler; -import static com.baidu.bifromq.metrics.TenantMetric.MqttOutOfOrderSendBytes; +import static com.baidu.bifromq.metrics.TenantMetric.MqttOutOfOrderDrainBytes; +import static com.baidu.bifromq.metrics.TenantMetric.MqttOutOfOrderFlushBytes; import static com.baidu.bifromq.metrics.TenantMetric.MqttReorderBytes; import static com.baidu.bifromq.metrics.TenantMetric.MqttTopicSorterAbortCount; -import static com.baidu.bifromq.mqtt.utils.MessageIdUtil.isDrainFlagSet; +import static com.baidu.bifromq.mqtt.utils.MessageIdUtil.isFlushFlagSet; import static com.baidu.bifromq.mqtt.utils.MessageIdUtil.isSuccessive; import static com.baidu.bifromq.mqtt.utils.MessageIdUtil.previousMessageId; @@ -88,7 +89,7 @@ private record SortingMessage(long inboxSeqNo, MQTTSessionHandler.SubMessage sub private static class SortingBuffer { final MessageSender sender; - final long syncWindowIntervalMillis; + final long timeoutDelay; final EventExecutor executor; final SortedMap sortingBuffer = new TreeMap<>(); final ITenantMeter meter; @@ -98,12 +99,12 @@ private static class SortingBuffer { boolean afterDrain; SortingBuffer(MessageSender sender, - long syncWindowIntervalMillis, + long timeoutDelay, EventExecutor executor, long firstMsgId, ITenantMeter meter) { this.sender = sender; - this.syncWindowIntervalMillis = syncWindowIntervalMillis; + this.timeoutDelay = timeoutDelay; this.executor = executor; this.meter = meter; // reset to previous sequence @@ -130,8 +131,8 @@ boolean submit(long inboxSeqNo, MQTTSessionHandler.SubMessage subMessage) { tailMsgId = msgId; meter.recordSummary(MqttReorderBytes, subMessage.estBytes()); sortingBuffer.put(msgId, new SortingMessage(inboxSeqNo, subMessage)); - if (isDrainFlagSet(msgId)) { - drain(); + if (isFlushFlagSet(msgId)) { + flush(); } } } else if (msgId > tailMsgId) { @@ -139,11 +140,11 @@ boolean submit(long inboxSeqNo, MQTTSessionHandler.SubMessage subMessage) { meter.recordSummary(MqttReorderBytes, subMessage.estBytes()); sortingBuffer.put(msgId, new SortingMessage(inboxSeqNo, subMessage)); tailMsgId = msgId; - if (isDrainFlagSet(msgId)) { - drain(); + if (isFlushFlagSet(msgId)) { + flush(); } else { if (timeout == null) { - timeout = executor.schedule(this::drain, syncWindowIntervalMillis, TimeUnit.MILLISECONDS); + timeout = executor.schedule(this::drain, timeoutDelay, TimeUnit.MILLISECONDS); } } } else { @@ -163,14 +164,21 @@ boolean submit(long inboxSeqNo, MQTTSessionHandler.SubMessage subMessage) { return success; } + void flush() { + assert executor.inEventLoop(); + long oodSentBytes = send(true); + meter.recordSummary(MqttOutOfOrderFlushBytes, oodSentBytes); + afterDrain = true; + } + void drain() { assert executor.inEventLoop(); long oodSentBytes = send(true); - meter.recordSummary(MqttOutOfOrderSendBytes, oodSentBytes); + meter.recordSummary(MqttOutOfOrderDrainBytes, oodSentBytes); afterDrain = true; } - private long send(boolean drain) { + private long send(boolean flush) { assert executor.inEventLoop(); // cancel timeout task if (timeout != null && !timeout.isDone()) { @@ -182,7 +190,7 @@ private long send(boolean drain) { while (entryIterator.hasNext()) { Map.Entry entry = entryIterator.next(); boolean isSuccessive = isSuccessive(headMsgId, entry.getKey()); - if (isSuccessive || drain) { + if (isSuccessive || flush) { headMsgId = entry.getKey(); SortingMessage subMessage = entry.getValue(); if (!isSuccessive) { @@ -195,7 +203,7 @@ private long send(boolean drain) { } } if (!sortingBuffer.isEmpty()) { - timeout = executor.schedule(this::drain, syncWindowIntervalMillis, TimeUnit.MILLISECONDS); + timeout = executor.schedule(this::drain, timeoutDelay, TimeUnit.MILLISECONDS); } else { // the invariant must be hold assert headMsgId == tailMsgId; diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/v3/MQTT3ProtocolHelper.java b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/v3/MQTT3ProtocolHelper.java index 482f645e7..c9143f5a1 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/v3/MQTT3ProtocolHelper.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/v3/MQTT3ProtocolHelper.java @@ -356,14 +356,7 @@ public ProtocolResponse onQoS0DistDenied(String topic, Message distMessage, Chec @Override public ProtocolResponse onQoS0PubHandled(PubResult result, MqttPublishMessage message, UserProperties userProps) { - if (result.distResult() == DistResult.BACK_PRESSURE_REJECTED || - result.retainResult() == RetainReply.Result.BACK_PRESSURE_REJECTED) { - return responseNothing(getLocal(ServerBusy.class) - .reason("Too many qos0 publish") - .clientInfo(clientInfo)); - } else { - return responseNothing(); - } + return responseNothing(); } @Override @@ -379,9 +372,7 @@ public ProtocolResponse onQoS1DistDenied(String topic, int packetId, Message dis public ProtocolResponse onQoS1PubHandled(PubResult result, MqttPublishMessage message, UserProperties userProps) { if (result.distResult() == DistResult.BACK_PRESSURE_REJECTED || result.retainResult() == RetainReply.Result.BACK_PRESSURE_REJECTED) { - return responseNothing(getLocal(ServerBusy.class) - .reason("Too many qos1 publish") - .clientInfo(clientInfo)); + return responseNothing(); } else { if (settings.debugMode) { return response(MqttMessageBuilders.pubAck() @@ -419,9 +410,7 @@ public ProtocolResponse onQoS2DistDenied(String topic, int packetId, Message dis public ProtocolResponse onQoS2PubHandled(PubResult result, MqttPublishMessage message, UserProperties userProps) { if (result.distResult() == DistResult.BACK_PRESSURE_REJECTED || result.retainResult() == RetainReply.Result.BACK_PRESSURE_REJECTED) { - return responseNothing(getLocal(ServerBusy.class) - .reason("Too many qos2 publish") - .clientInfo(clientInfo)); + return responseNothing(); } else { if (settings.debugMode) { return response(MQTT3MessageBuilders.pubRec() diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/v5/MQTT5ProtocolHelper.java b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/v5/MQTT5ProtocolHelper.java index 7d267f270..f52b189e3 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/v5/MQTT5ProtocolHelper.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/v5/MQTT5ProtocolHelper.java @@ -755,14 +755,7 @@ public ProtocolResponse onQoS0DistDenied(String topic, Message distMessage, Chec @Override public ProtocolResponse onQoS0PubHandled(PubResult result, MqttPublishMessage message, UserProperties userProps) { - if (result.distResult() == DistResult.BACK_PRESSURE_REJECTED - || result.retainResult() == RetainReply.Result.BACK_PRESSURE_REJECTED) { - return responseNothing(getLocal(ServerBusy.class) - .reason("Too many QoS0 publish") - .clientInfo(clientInfo)); - } else { - return responseNothing(); - } + return responseNothing(); } @Override @@ -792,10 +785,7 @@ public ProtocolResponse onQoS1DistDenied(String topic, int packetId, Message dis public ProtocolResponse onQoS1PubHandled(PubResult result, MqttPublishMessage message, UserProperties userProps) { if (result.distResult() == DistResult.BACK_PRESSURE_REJECTED || result.retainResult() == RetainReply.Result.BACK_PRESSURE_REJECTED) { - return responseNothing( - getLocal(ServerBusy.class) - .reason("Too many QoS1 publish") - .clientInfo(clientInfo)); + return responseNothing(); } int packetId = message.variableHeader().packetId(); Event[] debugEvents; @@ -871,9 +861,7 @@ public ProtocolResponse onQoS2DistDenied(String topic, int packetId, Message dis public ProtocolResponse onQoS2PubHandled(PubResult result, MqttPublishMessage message, UserProperties userProps) { if (result.distResult() == DistResult.BACK_PRESSURE_REJECTED || result.retainResult() == RetainReply.Result.BACK_PRESSURE_REJECTED) { - return responseNothing(getLocal(ServerBusy.class) - .reason("Too many QoS2 publish") - .clientInfo(clientInfo)); + return responseNothing(); } int packetId = message.variableHeader().packetId(); Event[] debugEvents; diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/session/ITicker.java b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/session/ITicker.java new file mode 100644 index 000000000..8d8c142c6 --- /dev/null +++ b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/session/ITicker.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024. The BifroMQ Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package com.baidu.bifromq.mqtt.session; + +import com.baidu.bifromq.basehlc.HLC; + +public interface ITicker { + long systemNanos(); + + long nowMillis(); + + ITicker SYSTEM_TICKER = + new ITicker() { + @Override + public long systemNanos() { + return System.nanoTime(); + } + + @Override + public long nowMillis() { + return HLC.INST.getPhysical(); + } + }; +} diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/session/MQTTSessionContext.java b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/session/MQTTSessionContext.java index 033ef20c1..6155aa874 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/session/MQTTSessionContext.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/session/MQTTSessionContext.java @@ -27,7 +27,6 @@ import com.baidu.bifromq.retain.client.IRetainClient; import com.baidu.bifromq.sessiondict.client.ISessionDictClient; import com.bifromq.plugin.resourcethrottler.IResourceThrottler; -import com.google.common.base.Ticker; import io.netty.channel.ChannelHandlerContext; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; @@ -48,7 +47,7 @@ public final class MQTTSessionContext { public final ISessionDictClient sessionDictClient; public final String serverId; public final int defaultKeepAliveTimeSeconds; - private final Ticker ticker; + private final ITicker ticker; private final FutureTracker futureTracker = new FutureTracker(); private final TenantGauge tenantTransientSubNumGauge; private final TenantGauge tenantMemGauge; @@ -66,7 +65,7 @@ public final class MQTTSessionContext { IEventCollector eventCollector, IResourceThrottler resourceThrottler, ISettingProvider settingProvider, - Ticker ticker) { + ITicker ticker) { this.serverId = serverId; this.localSessionRegistry = localSessionRegistry; this.localDistService = localDistService; @@ -79,13 +78,17 @@ public final class MQTTSessionContext { this.retainClient = retainClient; this.sessionDictClient = sessionDictClient; this.defaultKeepAliveTimeSeconds = defaultKeepAliveTimeSeconds; - this.ticker = ticker == null ? Ticker.systemTicker() : ticker; + this.ticker = ticker == null ? ITicker.SYSTEM_TICKER : ticker; this.tenantTransientSubNumGauge = new TenantGauge(MqttTransientSubCountGauge); this.tenantMemGauge = new TenantGauge(MqttSessionWorkingMemoryGauge); } public long nanoTime() { - return ticker.read(); + return ticker.systemNanos(); + } + + public long nowMillis() { + return ticker.nowMillis(); } public IAuthProvider authProvider(ChannelHandlerContext ctx) { diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/utils/MessageIdUtil.java b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/utils/MessageIdUtil.java index 95ca0fa85..51290b109 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/utils/MessageIdUtil.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/utils/MessageIdUtil.java @@ -15,7 +15,7 @@ public class MessageIdUtil { - private static final long DRAIN_FLAG_MASK = 0x80000000L; + private static final long FLUSH_FLAG_MASK = 0x80000000L; private static final long MESSAGE_SEQ_MASK = 0x7FFFFFFFL; private static final int INTEGER_BITS = 32; @@ -28,11 +28,11 @@ public static long messageId(long syncWindowSequence, long messageSequence) { } public static long messageId(long syncWindowSequence, long messageSequence, boolean drainFlag) { - return (syncWindowSequence << INTEGER_BITS) | messageSequence | (drainFlag ? DRAIN_FLAG_MASK : 0); + return (syncWindowSequence << INTEGER_BITS) | messageSequence | (drainFlag ? FLUSH_FLAG_MASK : 0); } - public static boolean isDrainFlagSet(long messageId) { - return (messageId & DRAIN_FLAG_MASK) != 0; + public static boolean isFlushFlagSet(long messageId) { + return (messageId & FLUSH_FLAG_MASK) != 0; } public static long syncWindowSequence(long messageId) { @@ -54,7 +54,7 @@ public static boolean isSuccessive(long messageId, long successorMessageId) { } if (syncWindowSequence == successorSyncWindowSequence || syncWindowSequence + 1 == successorSyncWindowSequence) { - return (messageSequence(messageId) + 1) % DRAIN_FLAG_MASK == messageSequence(successorMessageId); + return (messageSequence(messageId) + 1) % FLUSH_FLAG_MASK == messageSequence(successorMessageId); } return messageSequence(successorMessageId) == 0; } diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/handler/TopicMessageOrderingSenderTest.java b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/handler/TopicMessageOrderingSenderTest.java index e524ec668..7a55dbbcc 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/handler/TopicMessageOrderingSenderTest.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/handler/TopicMessageOrderingSenderTest.java @@ -268,7 +268,7 @@ public void testForceDrain() { assertEquals(subMessageList.get(2), subMessage2_0); verify(timeoutFuture).cancel(false); verify(meter, times(1)).recordSummary(eq(TenantMetric.MqttReorderBytes), anyDouble()); - verify(meter, times(1)).recordSummary(eq(TenantMetric.MqttOutOfOrderSendBytes), anyDouble()); + verify(meter, times(1)).recordSummary(eq(TenantMetric.MqttOutOfOrderDrainBytes), anyDouble()); verify(meter).recordCount(eq(TenantMetric.MqttTopicSorterAbortCount)); } @@ -306,7 +306,7 @@ public void testTimeoutDrain() { assertEquals(subMsgCaptor.getAllValues(), List.of(subMessage1_0, subMessage1_2, subMessage1_4)); verify(meter, times(2)).recordSummary(eq(TenantMetric.MqttReorderBytes), anyDouble()); - verify(meter).recordSummary(eq(TenantMetric.MqttOutOfOrderSendBytes), anyDouble()); + verify(meter).recordSummary(eq(TenantMetric.MqttOutOfOrderDrainBytes), anyDouble()); } private MQTTSessionHandler.SubMessage mockSubMessage(long messageId, String publisher) { diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandlerTest.java b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandlerTest.java index 15406ab2a..0d0c0d273 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandlerTest.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/handler/v3/MQTT3TransientSessionHandlerTest.java @@ -381,7 +381,7 @@ public void handleQoS0PubDistBackPressured() { channel.runScheduledPendingTasks(); // the channel is still open, but the message is dropped assertTrue(channel.isOpen()); - verifyEvent(QOS0_DIST_ERROR, SERVER_BUSY); + verifyEvent(QOS0_DIST_ERROR); } @@ -424,7 +424,7 @@ public void handleQoS1PubDistBackPressured() { channel.runScheduledPendingTasks(); // the channel is still open, but the message is dropped assertTrue(channel.isOpen()); - verifyEvent(QOS1_DIST_ERROR, SERVER_BUSY); + verifyEvent(QOS1_DIST_ERROR); } @Test @@ -480,7 +480,7 @@ public void handleQoS2PubDistBackPressured() { channel.runScheduledPendingTasks(); // the channel is still open, but the message is dropped assertTrue(channel.isOpen()); - verifyEvent(QOS2_DIST_ERROR, SERVER_BUSY); + verifyEvent(QOS2_DIST_ERROR); } @Test diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/handler/v3/MQTTC2SPubTest.java b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/handler/v3/MQTTC2SPubTest.java index 374c47ffd..7547213fe 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/handler/v3/MQTTC2SPubTest.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/handler/v3/MQTTC2SPubTest.java @@ -28,7 +28,6 @@ import static com.baidu.bifromq.plugin.eventcollector.EventType.QOS0_DIST_ERROR; import static com.baidu.bifromq.plugin.eventcollector.EventType.QOS1_DIST_ERROR; import static com.baidu.bifromq.plugin.eventcollector.EventType.QOS2_DIST_ERROR; -import static com.baidu.bifromq.plugin.eventcollector.EventType.SERVER_BUSY; import static com.baidu.bifromq.plugin.settingprovider.Setting.MsgPubPerSec; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -96,7 +95,7 @@ public void qos0PubDistBackPressure() { mockDistBackPressure(); MqttPublishMessage publishMessage = MQTTMessageUtils.publishQoS0Message("testTopic", 123); channel.writeInbound(publishMessage); - verifyEvent(CLIENT_CONNECTED, QOS0_DIST_ERROR, SERVER_BUSY); + verifyEvent(CLIENT_CONNECTED, QOS0_DIST_ERROR); } @Test @@ -141,7 +140,7 @@ public void qos1PubDistBackPressure() { mockDistBackPressure(); MqttPublishMessage publishMessage = MQTTMessageUtils.publishQoS1Message("testTopic", 123); channel.writeInbound(publishMessage); - verifyEvent(CLIENT_CONNECTED, QOS1_DIST_ERROR, SERVER_BUSY); + verifyEvent(CLIENT_CONNECTED, QOS1_DIST_ERROR); } @@ -213,7 +212,7 @@ public void qoS2PubDistBackPressure() { mockDistBackPressure(); MqttPublishMessage publishMessage = MQTTMessageUtils.publishQoS2Message("testTopic", 123); channel.writeInbound(publishMessage); - verifyEvent(CLIENT_CONNECTED, QOS2_DIST_ERROR, SERVER_BUSY); + verifyEvent(CLIENT_CONNECTED, QOS2_DIST_ERROR); // assertFalse(sessionContext.isConfirming(tenantId, channel.id().asLongText(), 123)); } diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/handler/v5/MQTT5TransientSessionHandlerTest.java b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/handler/v5/MQTT5TransientSessionHandlerTest.java index 3ae209c33..eb99750c5 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/handler/v5/MQTT5TransientSessionHandlerTest.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/handler/v5/MQTT5TransientSessionHandlerTest.java @@ -343,7 +343,7 @@ public void handleQoS0PubDistBackPressured() { channel.runScheduledPendingTasks(); channel.runPendingTasks(); assertTrue(channel.isOpen()); - verifyEvent(QOS0_DIST_ERROR, SERVER_BUSY); + verifyEvent(QOS0_DIST_ERROR); } @@ -387,7 +387,7 @@ public void handleQoS1PubDistBackPressured() { channel.runPendingTasks(); // the channel is still open, but message is dropped assertTrue(channel.isOpen()); - verifyEvent(QOS1_DIST_ERROR, SERVER_BUSY); + verifyEvent(QOS1_DIST_ERROR); } @Test @@ -443,7 +443,7 @@ public void handleQoS2PubDistBackPressured() { channel.runScheduledPendingTasks(); channel.runPendingTasks(); assertTrue(channel.isOpen()); - verifyEvent(QOS2_DIST_ERROR, SERVER_BUSY); + verifyEvent(QOS2_DIST_ERROR); } @Test diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/utils/TestTicker.java b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/utils/TestTicker.java index d9cfe9d61..f9737650d 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/utils/TestTicker.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/com/baidu/bifromq/mqtt/utils/TestTicker.java @@ -13,11 +13,12 @@ package com.baidu.bifromq.mqtt.utils; -import com.google.common.base.Ticker; +import com.baidu.bifromq.mqtt.session.ITicker; import java.util.concurrent.TimeUnit; -public class TestTicker extends Ticker { +public class TestTicker implements ITicker { private long nanos; + private long millis; public TestTicker() { reset(); @@ -25,15 +26,22 @@ public TestTicker() { public void reset() { nanos = System.nanoTime(); + millis = System.currentTimeMillis(); } @Override - public long read() { + public long systemNanos() { // read different timestamp return ++nanos; } + @Override + public long nowMillis() { + return ++millis; + } + public void advanceTimeBy(long duration, TimeUnit unit) { nanos += unit.toNanos(duration); + millis += unit.toMillis(duration); } } diff --git a/bifromq-sysprops/src/main/java/com/baidu/bifromq/sysprops/BifroMQSysProp.java b/bifromq-sysprops/src/main/java/com/baidu/bifromq/sysprops/BifroMQSysProp.java index 845c669d1..7d084aff1 100644 --- a/bifromq-sysprops/src/main/java/com/baidu/bifromq/sysprops/BifroMQSysProp.java +++ b/bifromq-sysprops/src/main/java/com/baidu/bifromq/sysprops/BifroMQSysProp.java @@ -72,6 +72,7 @@ public enum BifroMQSysProp { RETAIN_STORE_RANGE_VOTER_COUNT("retain_store_range_voter_count", 3, IntegerParser.POSITIVE), RETAIN_STORE_LOAD_EST_WINDOW_SECONDS("retain_store_load_estimation_window_seconds", 5L, LongParser.POSITIVE), RETAIN_STORE_RECOVERY_TIMEOUT_MILLIS("retain_store_recovery_timeout_millis", 10000L, LongParser.NON_NEGATIVE), + DIST_RETRY_ON_ERROR("dist_retry_on_error", false, BooleanParser.INSTANCE), SYNC_WINDOW_INTERVAL_MILLIS("sync_window_interval_millis", 5000L, LongParser.POSITIVE); public final String propKey;