From a8a0c0e725ac5bf158197841231fb27de91494af Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Tue, 27 Dec 2022 17:33:39 -0500 Subject: [PATCH] Add custom thread pool to broadcaster The broadcasting consumes most of the threads but has lower priority than other messages being sent. By separating that thread pool from the common sendMessage executor we can reduce the risk that a burst of broadcasts exhausts the thread pool and might drop send message tasks. Signed-off-by: HenrikJannsen --- .../network/p2p/peers/BroadcastHandler.java | 13 ++++++++---- .../bisq/network/p2p/peers/Broadcaster.java | 21 +++++++++++++++++-- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java index b7b89d25795..5a466a5bee1 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; @@ -94,7 +95,9 @@ public interface Listener { // API /////////////////////////////////////////////////////////////////////////////////////////// - public void broadcast(List broadcastRequests, boolean shutDownRequested) { + public void broadcast(List broadcastRequests, + boolean shutDownRequested, + ListeningExecutorService executor) { List confirmedConnections = new ArrayList<>(networkNode.getConfirmedConnections()); Collections.shuffle(confirmedConnections); @@ -153,7 +156,7 @@ public void broadcast(List broadcastRequests, bool return; } - sendToPeer(connection, broadcastRequestsForConnection); + sendToPeer(connection, broadcastRequestsForConnection, executor); }, minDelay, maxDelay, TimeUnit.MILLISECONDS); } } @@ -235,10 +238,12 @@ private List getBroadcastRequestsForConnection(Con .collect(Collectors.toList()); } - private void sendToPeer(Connection connection, List broadcastRequestsForConnection) { + private void sendToPeer(Connection connection, + List broadcastRequestsForConnection, + ListeningExecutorService executor) { // Can be BundleOfEnvelopes or a single BroadcastMessage BroadcastMessage broadcastMessage = getMessage(broadcastRequestsForConnection); - SettableFuture future = networkNode.sendMessage(connection, broadcastMessage); + SettableFuture future = networkNode.sendMessage(connection, broadcastMessage, executor); Futures.addCallback(future, new FutureCallback<>() { @Override diff --git a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java index 2ddc1d8e79a..f327a31afde 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java @@ -23,13 +23,20 @@ import bisq.common.Timer; import bisq.common.UserThread; +import bisq.common.config.Config; +import bisq.common.util.Utilities; import javax.inject.Inject; +import javax.inject.Named; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -49,6 +56,7 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { private Timer timer; private boolean shutDownRequested; private Runnable shutDownResultHandler; + private final ListeningExecutorService executor; /////////////////////////////////////////////////////////////////////////////////////////// @@ -56,9 +64,18 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { /////////////////////////////////////////////////////////////////////////////////////////// @Inject - public Broadcaster(NetworkNode networkNode, PeerManager peerManager) { + public Broadcaster(NetworkNode networkNode, + PeerManager peerManager, + @Named(Config.MAX_CONNECTIONS) int maxConnections) { this.networkNode = networkNode; this.peerManager = peerManager; + + ThreadPoolExecutor threadPoolExecutor = Utilities.getThreadPoolExecutor("Broadcaster", + maxConnections, + maxConnections * 2, + 30, + 30); + executor = MoreExecutors.listeningDecorator(threadPoolExecutor); } public void shutDown(Runnable resultHandler) { @@ -119,7 +136,7 @@ private void maybeBroadcastBundle() { broadcastRequests.stream().map(e -> e.getMessage().getClass().getSimpleName()).collect(Collectors.toList())); BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager, this); broadcastHandlers.add(broadcastHandler); - broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested); + broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested, executor); broadcastRequests.clear(); if (timer != null) {