Skip to content

Commit

Permalink
1. support "fast drain mode" when pub failed due to internal temp error
Browse files Browse the repository at this point in the history
2. separate dist worker call for each tenant to reduce interference on latency
  • Loading branch information
popduke committed Apr 8, 2024
1 parent a8a33ee commit 5a5763d
Show file tree
Hide file tree
Showing 21 changed files with 414 additions and 240 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.baidu.bifromq.dist.client;

import com.baidu.bifromq.baserpc.IRPCClient;
import com.baidu.bifromq.basescheduler.exception.BackPressureException;
import com.baidu.bifromq.dist.RPCBluePrint;
import com.baidu.bifromq.dist.client.scheduler.DistServerCall;
import com.baidu.bifromq.dist.client.scheduler.DistServerCallScheduler;
Expand Down Expand Up @@ -55,7 +56,11 @@ public CompletableFuture<DistResult> pub(long reqId, String topic, Message messa
return reqScheduler.schedule(new DistServerCall(publisher, topic, message))
.exceptionally(e -> {
log.debug("Failed to pub", e);
return DistResult.ERROR;
if (e instanceof BackPressureException || e.getCause() instanceof BackPressureException) {
return DistResult.BACK_PRESSURE_REJECTED;
} else {
return DistResult.ERROR;
}
});
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,40 @@ message DistPack{
repeated commontype.TopicMessagePack msgPack = 2; // topic messages packs shares same tenantId
}

// deprecate since 3.1.0, will be removed in 5.0.0
message BatchDistRequest {
uint64 reqId = 1;
repeated DistPack distPack = 2; // sorted by tenantId and topic
string orderKey = 3;
}

// deprecate since 3.1.0, will be removed in 5.0.0
message BatchDistReply {
uint64 reqId = 1;
map<string, TopicFanout> result = 2;
}

message TenantDistRequest{
uint64 reqId = 1;
string tenantId = 2;
repeated commontype.TopicMessagePack msgPack = 3;
string orderKey = 4;
}

message TenantDistReply {
enum Code{
OK = 0;
ERROR = 1;
}

message Result{
Code code = 1;
uint32 fanout = 2;
}
uint64 reqId = 1;
map<string, Result> results = 2; // key: topic
}

message DistServiceRWCoProcInput{
oneof type{
BatchMatchRequest batchMatch = 1;
Expand All @@ -78,12 +101,14 @@ message DistServiceRWCoProcOutput{
message DistServiceROCoProcInput{
oneof Input{
BatchDistRequest batchDist = 1;
TenantDistRequest tenantDist = 2;
}
}

message DistServiceROCoProcOutput{
oneof Output{
BatchDistReply batchDist = 1;
TenantDistReply tenantDist = 2;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class DistResponsePipeline extends ResponsePipeline<DistRequest, DistReply> {
protected CompletableFuture<DistReply> handleRequest(String tenantId, DistRequest request) {
return distCallScheduler.schedule(new DistWorkerCall(tenantId, request.getMessagesList(),
callQueueIdx, tenantFanouts.get(tenantId).estimate()))
.handle((v, e) -> {
.handle((fanOutByTopic, e) -> {
DistReply.Builder replyBuilder = DistReply.newBuilder().setReqId(request.getReqId());
if (e != null) {
if (e instanceof BackPressureException || e.getCause() instanceof BackPressureException) {
Expand Down Expand Up @@ -85,20 +85,22 @@ protected CompletableFuture<DistReply> handleRequest(String tenantId, DistReques
.code(RPC_FAILURE));
}
} else {
tenantFanouts.get(tenantId).log(v.values().stream().reduce(0, Integer::sum) / v.size());
// TODO: exclude fanout = -1?
tenantFanouts.get(tenantId).log(fanOutByTopic.values().stream().reduce(0, Integer::sum) /
fanOutByTopic.size());
for (PublisherMessagePack publisherMsgPack : request.getMessagesList()) {
DistReply.Result.Builder resultBuilder = DistReply.Result.newBuilder();
for (PublisherMessagePack.TopicPack topicPack : publisherMsgPack.getMessagePackList()) {
int fanout = v.get(topicPack.getTopic());
resultBuilder.putTopic(topicPack.getTopic(),
fanout > 0 ? DistReply.Code.OK : DistReply.Code.NO_MATCH);
int fanout = fanOutByTopic.get(topicPack.getTopic());
resultBuilder.putTopic(topicPack.getTopic(), fanout > 0 ? DistReply.Code.OK :
(fanout == 0 ? DistReply.Code.NO_MATCH : DistReply.Code.ERROR));
}
replyBuilder.addResults(resultBuilder.build());
}
eventCollector.report(getLocal(Disted.class)
.reqId(request.getReqId())
.messages(request.getMessagesList())
.fanout(v.values().stream().reduce(0, Integer::sum)));
.fanout(fanOutByTopic.values().stream().reduce(0, Integer::sum)));
}
return replyBuilder.build();
});
Expand Down
Loading

0 comments on commit 5a5763d

Please sign in to comment.