diff --git a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java index b7b89d25795..5a466a5bee1 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; @@ -94,7 +95,9 @@ public interface Listener { // API /////////////////////////////////////////////////////////////////////////////////////////// - public void broadcast(List broadcastRequests, boolean shutDownRequested) { + public void broadcast(List broadcastRequests, + boolean shutDownRequested, + ListeningExecutorService executor) { List confirmedConnections = new ArrayList<>(networkNode.getConfirmedConnections()); Collections.shuffle(confirmedConnections); @@ -153,7 +156,7 @@ public void broadcast(List broadcastRequests, bool return; } - sendToPeer(connection, broadcastRequestsForConnection); + sendToPeer(connection, broadcastRequestsForConnection, executor); }, minDelay, maxDelay, TimeUnit.MILLISECONDS); } } @@ -235,10 +238,12 @@ private List getBroadcastRequestsForConnection(Con .collect(Collectors.toList()); } - private void sendToPeer(Connection connection, List broadcastRequestsForConnection) { + private void sendToPeer(Connection connection, + List broadcastRequestsForConnection, + ListeningExecutorService executor) { // Can be BundleOfEnvelopes or a single BroadcastMessage BroadcastMessage broadcastMessage = getMessage(broadcastRequestsForConnection); - SettableFuture future = networkNode.sendMessage(connection, broadcastMessage); + SettableFuture future = networkNode.sendMessage(connection, broadcastMessage, executor); Futures.addCallback(future, new FutureCallback<>() { @Override diff --git a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java index 2ddc1d8e79a..f327a31afde 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java @@ -23,13 +23,20 @@ import bisq.common.Timer; import bisq.common.UserThread; +import bisq.common.config.Config; +import bisq.common.util.Utilities; import javax.inject.Inject; +import javax.inject.Named; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -49,6 +56,7 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { private Timer timer; private boolean shutDownRequested; private Runnable shutDownResultHandler; + private final ListeningExecutorService executor; /////////////////////////////////////////////////////////////////////////////////////////// @@ -56,9 +64,18 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { /////////////////////////////////////////////////////////////////////////////////////////// @Inject - public Broadcaster(NetworkNode networkNode, PeerManager peerManager) { + public Broadcaster(NetworkNode networkNode, + PeerManager peerManager, + @Named(Config.MAX_CONNECTIONS) int maxConnections) { this.networkNode = networkNode; this.peerManager = peerManager; + + ThreadPoolExecutor threadPoolExecutor = Utilities.getThreadPoolExecutor("Broadcaster", + maxConnections, + maxConnections * 2, + 30, + 30); + executor = MoreExecutors.listeningDecorator(threadPoolExecutor); } public void shutDown(Runnable resultHandler) { @@ -119,7 +136,7 @@ private void maybeBroadcastBundle() { broadcastRequests.stream().map(e -> e.getMessage().getClass().getSimpleName()).collect(Collectors.toList())); BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager, this); broadcastHandlers.add(broadcastHandler); - broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested); + broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested, executor); broadcastRequests.clear(); if (timer != null) {