Skip to content

Commit

Permalink
[ISSUE #4630] Fix concurrency problem and split task handle threadpool (
Browse files Browse the repository at this point in the history
#4679)

* fix concurrency problem

* split task handle threadpool

* fix checkstyle problem
  • Loading branch information
lrhkobe authored Dec 19, 2023
1 parent 2b2bf71 commit e056d7a
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,17 +222,21 @@ private void registerTCPRequestProcessor() {
ListenProcessor listenProcessor = new ListenProcessor(this);
registerProcessor(Command.LISTEN_REQUEST, listenProcessor, taskHandleExecutorService);

ThreadPoolExecutor sendExecutorService = super.getTcpThreadPoolGroup().getSendExecutorService();
MessageTransferProcessor messageTransferProcessor = new MessageTransferProcessor(this);
registerProcessor(Command.REQUEST_TO_SERVER, messageTransferProcessor, taskHandleExecutorService);
registerProcessor(Command.RESPONSE_TO_SERVER, messageTransferProcessor, taskHandleExecutorService);
registerProcessor(Command.ASYNC_MESSAGE_TO_SERVER, messageTransferProcessor, taskHandleExecutorService);
registerProcessor(Command.BROADCAST_MESSAGE_TO_SERVER, messageTransferProcessor, taskHandleExecutorService);
registerProcessor(Command.REQUEST_TO_SERVER, messageTransferProcessor, sendExecutorService);
registerProcessor(Command.ASYNC_MESSAGE_TO_SERVER, messageTransferProcessor, sendExecutorService);
registerProcessor(Command.BROADCAST_MESSAGE_TO_SERVER, messageTransferProcessor, sendExecutorService);

ThreadPoolExecutor replyExecutorService = super.getTcpThreadPoolGroup().getReplyExecutorService();
registerProcessor(Command.RESPONSE_TO_SERVER, messageTransferProcessor, replyExecutorService);

ThreadPoolExecutor ackExecutorService = super.getTcpThreadPoolGroup().getAckExecutorService();
MessageAckProcessor messageAckProcessor = new MessageAckProcessor(this);
registerProcessor(Command.RESPONSE_TO_CLIENT_ACK, messageAckProcessor, taskHandleExecutorService);
registerProcessor(Command.ASYNC_MESSAGE_TO_CLIENT_ACK, messageAckProcessor, taskHandleExecutorService);
registerProcessor(Command.BROADCAST_MESSAGE_TO_CLIENT_ACK, messageAckProcessor, taskHandleExecutorService);
registerProcessor(Command.REQUEST_TO_CLIENT_ACK, messageAckProcessor, taskHandleExecutorService);
registerProcessor(Command.RESPONSE_TO_CLIENT_ACK, messageAckProcessor, ackExecutorService);
registerProcessor(Command.ASYNC_MESSAGE_TO_CLIENT_ACK, messageAckProcessor, ackExecutorService);
registerProcessor(Command.BROADCAST_MESSAGE_TO_CLIENT_ACK, messageAckProcessor, ackExecutorService);
registerProcessor(Command.REQUEST_TO_CLIENT_ACK, messageAckProcessor, ackExecutorService);
}

public EventMeshServer getEventMeshServer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public class TCPThreadPoolGroup implements ThreadPoolGroup {
private final EventMeshTCPConfiguration eventMeshTCPConfiguration;
private ScheduledExecutorService scheduler;
private ThreadPoolExecutor taskHandleExecutorService;
private ThreadPoolExecutor sendExecutorService;
private ThreadPoolExecutor ackExecutorService;
private ThreadPoolExecutor replyExecutorService;
private ThreadPoolExecutor broadcastMsgDownstreamExecutorService;

public TCPThreadPoolGroup(EventMeshTCPConfiguration eventMeshTCPConfiguration) {
Expand All @@ -45,9 +48,27 @@ public void initThreadPool() {
taskHandleExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshTCPConfiguration.getEventMeshTcpTaskHandleExecutorPoolSize(),
eventMeshTCPConfiguration.getEventMeshTcpTaskHandleExecutorPoolSize(),
new LinkedBlockingQueue<>(10_000),
new LinkedBlockingQueue<>(eventMeshTCPConfiguration.getEventMeshTcpTaskHandleExecutorQueueSize()),
new EventMeshThreadFactory("eventMesh-tcp-task-handle", true));

sendExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshTCPConfiguration.getEventMeshTcpMsgSendExecutorPoolSize(),
eventMeshTCPConfiguration.getEventMeshTcpMsgSendExecutorPoolSize(),
new LinkedBlockingQueue<>(eventMeshTCPConfiguration.getEventMeshTcpMsgSendExecutorQueueSize()),
new EventMeshThreadFactory("eventMesh-tcp-msg-send", true));

replyExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshTCPConfiguration.getEventMeshTcpMsgReplyExecutorPoolSize(),
eventMeshTCPConfiguration.getEventMeshTcpMsgReplyExecutorPoolSize(),
new LinkedBlockingQueue<>(eventMeshTCPConfiguration.getEventMeshTcpMsgReplyExecutorQueueSize()),
new EventMeshThreadFactory("eventMesh-tcp-msg-reply", true));

ackExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshTCPConfiguration.getEventMeshTcpMsgAckExecutorPoolSize(),
eventMeshTCPConfiguration.getEventMeshTcpMsgAckExecutorPoolSize(),
new LinkedBlockingQueue<>(eventMeshTCPConfiguration.getEventMeshTcpMsgAckExecutorQueueSize()),
new EventMeshThreadFactory("eventMesh-tcp-msg-ack", true));

broadcastMsgDownstreamExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
eventMeshTCPConfiguration.getEventMeshTcpMsgDownStreamExecutorPoolSize(),
eventMeshTCPConfiguration.getEventMeshTcpMsgDownStreamExecutorPoolSize(),
Expand All @@ -59,6 +80,9 @@ public void initThreadPool() {
public void shutdownThreadPool() {
scheduler.shutdown();
taskHandleExecutorService.shutdown();
sendExecutorService.shutdown();;
replyExecutorService.shutdown();
ackExecutorService.shutdown();
broadcastMsgDownstreamExecutorService.shutdown();
}

Expand All @@ -73,4 +97,16 @@ public ThreadPoolExecutor getTaskHandleExecutorService() {
public ThreadPoolExecutor getBroadcastMsgDownstreamExecutorService() {
return broadcastMsgDownstreamExecutorService;
}

public ThreadPoolExecutor getSendExecutorService() {
return sendExecutorService;
}

public ThreadPoolExecutor getAckExecutorService() {
return ackExecutorService;
}

public ThreadPoolExecutor getReplyExecutorService() {
return replyExecutorService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,28 @@ public class EventMeshTCPConfiguration extends CommonConfiguration {
private int eventMeshTcpGlobalScheduler = 5;

@ConfigFiled(field = "tcp.taskHandleExecutorPoolSize")
private int eventMeshTcpTaskHandleExecutorPoolSize = Runtime.getRuntime().availableProcessors();
private int eventMeshTcpTaskHandleExecutorPoolSize = 2 * Runtime.getRuntime().availableProcessors();

@ConfigFiled(field = "tcp.sendExecutorPoolSize")
private int eventMeshTcpMsgSendExecutorPoolSize = 2 * Runtime.getRuntime().availableProcessors();

@ConfigFiled(field = "tcp.replyExecutorPoolSize")
private int eventMeshTcpMsgReplyExecutorPoolSize = 2 * Runtime.getRuntime().availableProcessors();

@ConfigFiled(field = "tcp.ackExecutorPoolSize")
private int eventMeshTcpMsgAckExecutorPoolSize = 2 * Runtime.getRuntime().availableProcessors();

@ConfigFiled(field = "tcp.taskHandleExecutorQueueSize")
private int eventMeshTcpTaskHandleExecutorQueueSize = 10000;

@ConfigFiled(field = "tcp.sendExecutorQueueSize")
private int eventMeshTcpMsgSendExecutorQueueSize = 10000;

@ConfigFiled(field = "tcp.replyExecutorQueueSize")
private int eventMeshTcpMsgReplyExecutorQueueSize = 10000;

@ConfigFiled(field = "tcp.ackExecutorQueueSize")
private int eventMeshTcpMsgAckExecutorQueueSize = 10000;

@ConfigFiled(field = "tcp.msgDownStreamExecutorPoolSize")
private int eventMeshTcpMsgDownStreamExecutorPoolSize = Math.max(Runtime.getRuntime().availableProcessors(), 8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,20 @@ private void closeSession(Session session) throws Exception {

session.setSessionState(SessionState.CLOSED);

if (EventMeshConstants.PURPOSE_SUB.equals(session.getClient().getPurpose())) {
cleanClientGroupWrapperByCloseSub(session);
} else if (EventMeshConstants.PURPOSE_PUB.equals(session.getClient().getPurpose())) {
cleanClientGroupWrapperByClosePub(session);
} else {
log.error("client purpose config is error:{}", session.getClient().getPurpose());
final String clientGroup = session.getClient().getGroup();
if (!lockMap.containsKey(clientGroup)) {
lockMap.putIfAbsent(clientGroup, new Object());
}
synchronized (lockMap.get(clientGroup)) {
if (EventMeshConstants.PURPOSE_SUB.equals(session.getClient().getPurpose())) {
cleanClientGroupWrapperByCloseSub(session);
} else if (EventMeshConstants.PURPOSE_PUB.equals(
session.getClient().getPurpose())) {
cleanClientGroupWrapperByClosePub(session);
} else {
log.error("client purpose config is error:{}",
session.getClient().getPurpose());
}
}

if (session.getContext() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.util.EventMeshUtil;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -67,7 +66,7 @@ public void shutdown() {
log.info("rebalance service shutdown......");
}

public void printRebalanceThreadPoolState() {
EventMeshUtil.printState((ThreadPoolExecutor) serviceRebalanceScheduler);
public int getRebalanceThreadPoolQueueSize() {
return ((ThreadPoolExecutor) serviceRebalanceScheduler).getQueue().size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -126,8 +127,16 @@ public void start() throws Exception {

}), delay, period, TimeUnit.MILLISECONDS);

monitorThreadPoolTask = eventMeshTCPServer.getTcpThreadPoolGroup().getScheduler().scheduleAtFixedRate(() -> {
eventMeshTCPServer.getEventMeshRebalanceService().printRebalanceThreadPoolState();
monitorThreadPoolTask = eventMeshTCPServer.getTcpThreadPoolGroup().getScheduler().scheduleAtFixedRate(() -> {
appLogger.info("{TaskHandle:{},Send:{},Ack:{},Reply:{},Push:{},Scheduler:{},Rebalance:{}}",
eventMeshTCPServer.getTcpThreadPoolGroup().getTaskHandleExecutorService().getQueue().size(),
eventMeshTCPServer.getTcpThreadPoolGroup().getSendExecutorService().getQueue().size(),
eventMeshTCPServer.getTcpThreadPoolGroup().getAckExecutorService().getQueue().size(),
eventMeshTCPServer.getTcpThreadPoolGroup().getReplyExecutorService().getQueue().size(),
eventMeshTCPServer.getTcpThreadPoolGroup().getBroadcastMsgDownstreamExecutorService().getQueue().size(),
((ThreadPoolExecutor) eventMeshTCPServer.getTcpThreadPoolGroup().getScheduler()).getQueue().size(),
eventMeshTCPServer.getEventMeshRebalanceService().getRebalanceThreadPoolQueueSize());

eventMeshTCPServer.getTcpRetryer().printState();

// monitor retry queue size
Expand All @@ -137,7 +146,6 @@ public void start() throws Exception {
EventMeshConstants.PROTOCOL_TCP,
MonitorMetricConstants.RETRY_QUEUE_SIZE,
tcpSummaryMetrics.getRetrySize());

}, 10, PRINT_THREADPOOLSTATE_INTERVAL, TimeUnit.SECONDS);
log.info("EventMeshTcpMonitor started......");
}
Expand Down

0 comments on commit e056d7a

Please sign in to comment.