Skip to content

Commit

Permalink
1. do not report ServerBusy when dist failed
Browse files Browse the repository at this point in the history
2. introduce retry behavior when dist failed which could be enabled via sys prop
3. split OOD handling related metrics for better diagnosis
4. using timer instead of distributed summary to meter latency for Batcher, so that the metric unit could be unified
  • Loading branch information
popduke committed Apr 10, 2024
1 parent 4f0eb3b commit e18f797
Show file tree
Hide file tree
Showing 16 changed files with 222 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ public abstract class Batcher<Call, CallResult, BatcherKey> {
private final Gauge batchSaturationGauge;
private final Timer batchCallTimer;
private final Timer batchExecTimer;
private final DistributionSummary batchBuildTimeSummary;
private final Timer batchBuildTimer;
private final Timer queueingTimer;
private final DistributionSummary batchSizeSummary;
private final DistributionSummary queueingTimeSummary;
private final double alphaIncrease = 0.2;
private final double alphaDecrease = 0.05;
private final AtomicInteger emaMaxBatchSize = new AtomicInteger();
Expand Down Expand Up @@ -85,13 +85,13 @@ protected Batcher(BatcherKey batcherKey,
batchExecTimer = Timer.builder("batcher.exec.time")
.tags(tags)
.register(Metrics.globalRegistry);
batchBuildTimeSummary = DistributionSummary.builder("batcher.build.time")
batchBuildTimer = Timer.builder("batcher.build.time")
.tags(tags)
.register(Metrics.globalRegistry);
batchSizeSummary = DistributionSummary.builder("batcher.batch.size")
queueingTimer = Timer.builder("batcher.queueing.time")
.tags(tags)
.register(Metrics.globalRegistry);
queueingTimeSummary = DistributionSummary.builder("batcher.queueing.time")
batchSizeSummary = DistributionSummary.builder("batcher.batch.size")
.tags(tags)
.register(Metrics.globalRegistry);
}
Expand Down Expand Up @@ -125,9 +125,9 @@ public void close() {
Metrics.globalRegistry.remove(batchSaturationGauge);
Metrics.globalRegistry.remove(batchCallTimer);
Metrics.globalRegistry.remove(batchExecTimer);
Metrics.globalRegistry.remove(batchBuildTimeSummary);
Metrics.globalRegistry.remove(batchBuildTimer);
Metrics.globalRegistry.remove(batchSizeSummary);
Metrics.globalRegistry.remove(queueingTimeSummary);
Metrics.globalRegistry.remove(queueingTimer);
}

protected abstract IBatchCall<Call, CallResult, BatcherKey> newBatch();
Expand Down Expand Up @@ -161,11 +161,11 @@ private void batchAndEmit() {
batchCall.add(callTask);
batchedTasks.add(callTask);
batchSize++;
queueingTimeSummary.record(System.nanoTime() - callTask.ts);
queueingTimer.record(System.nanoTime() - callTask.ts, TimeUnit.NANOSECONDS);
}
batchSizeSummary.record(batchSize);
long execStart = System.nanoTime();
batchBuildTimeSummary.record((execStart - buildStart));
batchBuildTimer.record(execStart - buildStart, TimeUnit.NANOSECONDS);
final int finalBatchSize = batchSize;
batchCall.execute()
.whenComplete((v, e) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ protected CompletableFuture<DistReply> handleRequest(String tenantId, DistReques
.handle((fanOutByTopic, e) -> {
DistReply.Builder replyBuilder = DistReply.newBuilder().setReqId(request.getReqId());
if (e != null) {
log.debug("Failed to dist", e);
if (e instanceof BackPressureException || e.getCause() instanceof BackPressureException) {
for (PublisherMessagePack publisherMsgPack : request.getMessagesList()) {
DistReply.Result.Builder resultBuilder = DistReply.Result.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,13 @@ public enum TenantMetric {
MqttQoS2ExternalLatency("mqtt.ex.qos2.latency", Meter.Type.TIMER),
MqttTransientFanOutBytes("mqtt.tfanout.bytes", Meter.Type.DISTRIBUTION_SUMMARY),
MqttPersistentFanOutBytes("mqtt.pfanout.bytes", Meter.Type.DISTRIBUTION_SUMMARY),
MqttRetryQueuedBytes("mqtt.retry.queued.bytes", Meter.Type.DISTRIBUTION_SUMMARY),
MqttRetryDroppedBytes("mqtt.retry.dropped.bytes", Meter.Type.DISTRIBUTION_SUMMARY),
MqttRetryTimeoutBytes("mqtt.retry.timeout.bytes", Meter.Type.DISTRIBUTION_SUMMARY),
MqttRetryDistedBytes("mqtt.retry.disted.bytes", Meter.Type.DISTRIBUTION_SUMMARY),
MqttOutOfOrderDiscardBytes("mqtt.ood.discard.bytes", Meter.Type.DISTRIBUTION_SUMMARY),
MqttOutOfOrderSendBytes("mqtt.ood.send.bytes", Meter.Type.DISTRIBUTION_SUMMARY),
MqttOutOfOrderFlushBytes("mqtt.ood.flush.bytes", Meter.Type.DISTRIBUTION_SUMMARY),
MqttOutOfOrderDrainBytes("mqtt.ood.drain.bytes", Meter.Type.DISTRIBUTION_SUMMARY),
MqttReorderBytes("mqtt.reorder.bytes", Meter.Type.DISTRIBUTION_SUMMARY),
MqttTopicSeqAbortCount("mqtt.topic.seq.abort.count", Meter.Type.COUNTER),
MqttTopicSorterAbortCount("mqtt.topic.sorter.abort.count", Meter.Type.COUNTER),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
import static com.baidu.bifromq.metrics.TenantMetric.MqttQoS2DistBytes;
import static com.baidu.bifromq.metrics.TenantMetric.MqttQoS2ExternalLatency;
import static com.baidu.bifromq.metrics.TenantMetric.MqttQoS2IngressBytes;
import static com.baidu.bifromq.metrics.TenantMetric.MqttRetryDistedBytes;
import static com.baidu.bifromq.metrics.TenantMetric.MqttRetryDroppedBytes;
import static com.baidu.bifromq.metrics.TenantMetric.MqttRetryQueuedBytes;
import static com.baidu.bifromq.metrics.TenantMetric.MqttRetryTimeoutBytes;
import static com.baidu.bifromq.metrics.TenantMetric.MqttTransientSubLatency;
import static com.baidu.bifromq.mqtt.handler.IMQTTProtocolHelper.SubResult.EXCEED_LIMIT;
import static com.baidu.bifromq.mqtt.handler.MQTTSessionIdUtil.packetId;
Expand All @@ -38,6 +42,7 @@
import static com.baidu.bifromq.mqtt.utils.AuthUtil.buildSubAction;
import static com.baidu.bifromq.mqtt.utils.AuthUtil.buildUnsubAction;
import static com.baidu.bifromq.plugin.eventcollector.ThreadLocalEventPool.getLocal;
import static com.baidu.bifromq.sysprops.BifroMQSysProp.DIST_RETRY_ON_ERROR;
import static com.baidu.bifromq.type.MQTTClientInfoConstants.MQTT_PROTOCOL_VER_5_VALUE;
import static com.baidu.bifromq.type.MQTTClientInfoConstants.MQTT_PROTOCOL_VER_KEY;
import static com.baidu.bifromq.type.QoS.AT_LEAST_ONCE;
Expand All @@ -55,7 +60,6 @@
import static com.bifromq.plugin.resourcethrottler.TenantResourceType.TotalSharedSubscriptions;
import static java.util.concurrent.CompletableFuture.allOf;

import com.baidu.bifromq.basehlc.HLC;
import com.baidu.bifromq.baserpc.utils.FutureTracker;
import com.baidu.bifromq.dist.client.DistResult;
import com.baidu.bifromq.inbox.storage.proto.LWT;
Expand Down Expand Up @@ -127,10 +131,12 @@
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.SortedMap;
Expand Down Expand Up @@ -211,13 +217,17 @@ void setAcked() {
}
}

private record RetryMessage(long reqId, String topic, Message message, CompletableFuture<DistResult> onDone) {
}

protected final TenantSettings settings;
protected final String userSessionId;
protected final int keepAliveTimeSeconds;
protected final ClientInfo clientInfo;
protected final AtomicLong memUsage;
protected final ITenantMeter tenantMeter;
private final long idleTimeoutNanos;
private final long retryTimeoutMillis;
private final MPSThrottler throttler;
private final Set<CompletableFuture<?>> fgTasks = new HashSet<>();
private final FutureTracker bgTasks = new FutureTracker();
Expand All @@ -240,7 +250,9 @@ void setAcked() {
private final CompletableFuture<Void> onInitialized = new CompletableFuture<>();
private final TopicMessageIdGenerator msgIdGenerator;
private final TopicMessageOrderingSender orderingSender;
private final LinkedHashSet<RetryMessage> retryQueue = new LinkedHashSet<>();
private ScheduledFuture<?> resendTask;
private ScheduledFuture<?> retryTask;
private int receivingCount = 0;

protected MQTTSessionHandler(TenantSettings settings,
Expand All @@ -267,6 +279,7 @@ protected MQTTSessionHandler(TenantSettings settings,
authProvider = sessionCtx.authProvider(ctx);
eventCollector = sessionCtx.eventCollector;
resourceThrottler = sessionCtx.resourceThrottler;
retryTimeoutMillis = BifroMQSysProp.SYNC_WINDOW_INTERVAL_MILLIS.get();
msgIdGenerator =
new TopicMessageIdGenerator(Duration.ofMillis(BifroMQSysProp.SYNC_WINDOW_INTERVAL_MILLIS.get()),
settings.maxActiveTopicsPerPublisher, tenantMeter);
Expand Down Expand Up @@ -452,7 +465,7 @@ private void handlePubMsg(MqttPublishMessage mqttMessage) {
return;
}
String topic = helper().getTopic(mqttMessage);
long nowMillis = HLC.INST.getPhysical();
long nowMillis = sessionCtx.nowMillis();
long msgId = msgIdGenerator.nextMessageId(topic, nowMillis);
int ingressMsgBytes = mqttMessage.fixedHeader().remainingLength() + 1;
(switch (mqttMessage.fixedHeader().qosLevel()) {
Expand Down Expand Up @@ -692,7 +705,7 @@ private void handlePubAckMsg(MqttPubAckMessage mqttMessage) {
int packetId = mqttMessage.variableHeader().messageId();
if (isConfirming(packetId)) {
SubMessage confirmed = confirm(packetId);
tenantMeter.recordSummary(MqttQoS1DeliverBytes, confirmed.message().getPayload().size());
tenantMeter.recordSummary(MqttQoS1DeliverBytes, confirmed.estBytes());
} else {
log.trace("No packetId to confirm released: sessionId={}, packetId={}",
userSessionId(clientInfo), packetId);
Expand Down Expand Up @@ -741,7 +754,7 @@ private void handlePubCompMsg(MqttMessage message) {
.size(confirmed.message().getPayload().size())
.clientInfo(clientInfo));
}
tenantMeter.recordSummary(MqttQoS2DeliverBytes, confirmed.message.getPayload().size());
tenantMeter.recordSummary(MqttQoS2DeliverBytes, confirmed.estBytes());
} else {
log.trace("No packetId to confirm released: sessionId={}, packetId={}",
userSessionId(clientInfo), packetId);
Expand Down Expand Up @@ -1274,22 +1287,26 @@ private CompletableFuture<IMQTTProtocolHelper.PubResult> doPub(long reqId,
}
}
default -> {
// TODO: support limit retry
msgIdGenerator.markDrain(topic, message.getMessageId());
if (v.distResult() == DistResult.BACK_PRESSURE_REJECTED) {
msgIdGenerator.markDrain(topic, message.getMessageId());
}
switch (message.getPubQoS()) {
case AT_MOST_ONCE -> eventCollector.report(getLocal(QoS0DistError.class)
.reqId(reqId)
.reason(v.distResult().name())
.topic(topic)
.size(ingressMsgSize)
.clientInfo(clientInfo));
case AT_LEAST_ONCE -> eventCollector.report(getLocal(QoS1DistError.class)
.reqId(reqId)
.reason(v.distResult().name())
.topic(topic)
.isDup(isDup)
.size(ingressMsgSize)
.clientInfo(clientInfo));
case EXACTLY_ONCE -> eventCollector.report(getLocal(QoS2DistError.class)
.reqId(reqId)
.reason(v.distResult().name())
.topic(topic)
.isDup(isDup)
.size(ingressMsgSize)
Expand All @@ -1302,7 +1319,7 @@ private CompletableFuture<IMQTTProtocolHelper.PubResult> doPub(long reqId,
}

private CompletableFuture<Void> doPubLastWill(LWT willMessage) {
long reqId = HLC.INST.getPhysical();
long reqId = sessionCtx.nowMillis();
Message message = willMessage.getMessage().toBuilder()
.setMessageId(0)
.setTimestamp(reqId)
Expand Down Expand Up @@ -1355,8 +1372,7 @@ private CompletableFuture<IMQTTProtocolHelper.PubResult> doPub(long reqId,
reqId, topic, message.getPubQoS(), message.getPayload().size());
}

CompletableFuture<DistResult> distTask =
trackTask(sessionCtx.distClient.pub(reqId, topic, message, clientInfo), background);
CompletableFuture<DistResult> distTask = trackTask(distMessage(reqId, topic, message), background);
if (!message.getIsRetain()) {
return distTask
.thenApplyAsync(v -> new IMQTTProtocolHelper.PubResult(v, RetainReply.Result.RETAINED), ctx.executor());
Expand All @@ -1368,6 +1384,88 @@ private CompletableFuture<IMQTTProtocolHelper.PubResult> doPub(long reqId,
}
}

private CompletableFuture<DistResult> distMessage(long reqId, String topic, Message message) {
CompletableFuture<DistResult> onDone = new CompletableFuture<>();
sessionCtx.distClient.pub(reqId, topic, message, clientInfo)
.thenAccept(distResult -> {
if ((boolean) DIST_RETRY_ON_ERROR.get() && distResult == DistResult.ERROR) {
// queue for retry
queueForRetry(reqId, topic, message, onDone);
} else {
onDone.complete(distResult);
}
});
return onDone;
}

private void queueForRetry(long reqId, String topic, Message message, CompletableFuture<DistResult> onDone) {
if (ctx.executor().inEventLoop()) {
retryQueue.add(new RetryMessage(reqId, topic, message, onDone));
tenantMeter.recordSummary(MqttRetryQueuedBytes, topic.length() + message.getPayload().size());
if (retryTask == null || retryTask.isDone()) {
retryTask = ctx.executor().schedule(this::retryDist, 100, TimeUnit.MILLISECONDS);
}

} else {
ctx.executor().execute(() -> queueForRetry(reqId, topic, message, onDone));
}
}

private void retryDist() {
Iterator<RetryMessage> itr = retryQueue.iterator();
long nowMillis = sessionCtx.nowMillis();
List<CompletableFuture<Void>> retryTasks = new ArrayList<>(retryQueue.size());
while (itr.hasNext()) {
RetryMessage retryMessage = itr.next();
String topic = retryMessage.topic();
Message message = retryMessage.message();
long messageId = retryMessage.message().getMessageId();
long drainMessageId = msgIdGenerator.drainMessageId(topic);
if (messageId < drainMessageId || nowMillis - message.getTimestamp() > retryTimeoutMillis) {
// stop retry when messageId has been drained or timeout
itr.remove();
retryMessage.onDone().complete(DistResult.ERROR);
if (messageId < drainMessageId) {
tenantMeter.recordSummary(MqttRetryDroppedBytes, topic.length() + message.getPayload().size());
}
if (nowMillis - message.getTimestamp() > retryTimeoutMillis) {
tenantMeter.recordSummary(MqttRetryTimeoutBytes, topic.length() + message.getPayload().size());
}
} else {
// dist again
retryTasks.add(sessionCtx.distClient.pub(retryMessage.reqId(), topic, message, clientInfo)
.thenAcceptAsync(distResult -> {
// if not transient error, remove from retry queue and finish the task
switch (distResult) {
case OK, NO_MATCH -> {
tenantMeter.recordSummary(MqttRetryDistedBytes,
topic.length() + message.getPayload().size());
retryQueue.remove(retryMessage);
retryMessage.onDone().complete(distResult);
}
case BACK_PRESSURE_REJECTED -> {
retryQueue.remove(retryMessage);
retryMessage.onDone().complete(distResult);
}
}
}, ctx.executor()));
}
}
if (!retryTasks.isEmpty()) {
CompletableFuture.allOf(retryTasks.toArray(CompletableFuture[]::new))
.thenAcceptAsync(v -> {
if (!retryQueue.isEmpty()) {
retryTask = ctx.executor().schedule(this::retryDist, 100, TimeUnit.MILLISECONDS);
} else {
retryTask = null;
}
}, ctx.executor());
} else {
// no async retry tasks
retryTask = null;
}
}

private CompletableFuture<RetainReply.Result> retainMessage(long reqId, String topic, Message message,
boolean isLWT) {
if (!settings.retainEnabled) {
Expand Down
Loading

0 comments on commit e18f797

Please sign in to comment.