Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add threadpool for broadcaster #6488

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions common/src/main/java/bisq/common/setup/CommonSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.nio.file.Paths;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

import ch.qos.logback.classic.Level;
Expand Down Expand Up @@ -72,13 +73,14 @@ public static void startPeriodicTasks() {

public static void setupUncaughtExceptionHandler(UncaughtExceptionHandler uncaughtExceptionHandler) {
Thread.UncaughtExceptionHandler handler = (thread, throwable) -> {
// Might come from another thread
if (throwable.getCause() != null && throwable.getCause().getCause() != null &&
throwable.getCause().getCause() instanceof BlockStoreException) {
log.error(throwable.getMessage());
log.error("Uncaught BlockStoreException ", throwable);
} else if (throwable instanceof ClassCastException &&
"sun.awt.image.BufImgSurfaceData cannot be cast to sun.java2d.xr.XRSurfaceData".equals(throwable.getMessage())) {
log.warn(throwable.getMessage());
} else if (throwable instanceof RejectedExecutionException) {
log.error("Uncaught RejectedExecutionException ", throwable);
} else if (throwable instanceof UnsupportedOperationException &&
"The system tray is not supported on the current platform.".equals(throwable.getMessage())) {
log.warn(throwable.getMessage());
Expand Down
50 changes: 31 additions & 19 deletions p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -104,12 +105,12 @@ public abstract class NetworkNode implements MessageListener {
maxConnections * 2,
maxConnections * 3,
30,
60);
30);
sendMessageExecutor = Utilities.getListeningExecutorService("NetworkNode.sendMessage",
maxConnections * 2,
maxConnections * 3,
30,
60);
30);
serverExecutor = Utilities.getSingleThreadExecutor("NetworkNode.server-" + servicePort);
}

Expand Down Expand Up @@ -140,7 +141,7 @@ public SettableFuture<Connection> sendMessage(@NotNull NodeAddress peersNodeAddr

SettableFuture<Connection> resultFuture = SettableFuture.create();
ListenableFuture<Connection> future = connectionExecutor.submit(() -> {
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peersNodeAddress.getFullAddress());
Thread.currentThread().setName("NetworkNode.connectionExecutor:SendMessage-to-" + peersNodeAddress.getFullAddress());

if (peersNodeAddress.equals(getNodeAddress())) {
log.warn("We are sending a message to ourselves");
Expand Down Expand Up @@ -288,25 +289,36 @@ public Socks5Proxy getSocksProxy() {
return null;
}


public SettableFuture<Connection> sendMessage(Connection connection, NetworkEnvelope networkEnvelope) {
// connection.sendMessage might take a bit (compression, write to stream), so we use a thread to not block
ListenableFuture<Connection> future = sendMessageExecutor.submit(() -> {
String id = connection.getPeersNodeAddressOptional().isPresent() ? connection.getPeersNodeAddressOptional().get().getFullAddress() : connection.getUid();
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + id);
connection.sendMessage(networkEnvelope);
return connection;
});
return sendMessage(connection, networkEnvelope, sendMessageExecutor);
}

public SettableFuture<Connection> sendMessage(Connection connection,
NetworkEnvelope networkEnvelope,
ListeningExecutorService executor) {
SettableFuture<Connection> resultFuture = SettableFuture.create();
Futures.addCallback(future, new FutureCallback<>() {
public void onSuccess(Connection connection) {
UserThread.execute(() -> resultFuture.set(connection));
}
try {
ListenableFuture<Connection> future = executor.submit(() -> {
String id = connection.getPeersNodeAddressOptional().isPresent() ? connection.getPeersNodeAddressOptional().get().getFullAddress() : connection.getUid();
Thread.currentThread().setName("NetworkNode:SendMessage-to-" + id);
connection.sendMessage(networkEnvelope);
return connection;
});

public void onFailure(@NotNull Throwable throwable) {
UserThread.execute(() -> resultFuture.setException(throwable));
}
}, MoreExecutors.directExecutor());
Futures.addCallback(future, new FutureCallback<>() {
public void onSuccess(Connection connection) {
UserThread.execute(() -> resultFuture.set(connection));
}

public void onFailure(@NotNull Throwable throwable) {
UserThread.execute(() -> resultFuture.setException(throwable));
}
}, MoreExecutors.directExecutor());

} catch (RejectedExecutionException exception) {
log.error("RejectedExecutionException at sendMessage: ", exception);
resultFuture.setException(exception);
}
return resultFuture;
}

Expand Down
13 changes: 9 additions & 4 deletions p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;

Expand Down Expand Up @@ -94,7 +95,9 @@ public interface Listener {
// API
///////////////////////////////////////////////////////////////////////////////////////////

public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests, boolean shutDownRequested) {
public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests,
boolean shutDownRequested,
ListeningExecutorService executor) {
List<Connection> confirmedConnections = new ArrayList<>(networkNode.getConfirmedConnections());
Collections.shuffle(confirmedConnections);

Expand Down Expand Up @@ -153,7 +156,7 @@ public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests, bool
return;
}

sendToPeer(connection, broadcastRequestsForConnection);
sendToPeer(connection, broadcastRequestsForConnection, executor);
}, minDelay, maxDelay, TimeUnit.MILLISECONDS);
}
}
Expand Down Expand Up @@ -235,10 +238,12 @@ private List<Broadcaster.BroadcastRequest> getBroadcastRequestsForConnection(Con
.collect(Collectors.toList());
}

private void sendToPeer(Connection connection, List<Broadcaster.BroadcastRequest> broadcastRequestsForConnection) {
private void sendToPeer(Connection connection,
List<Broadcaster.BroadcastRequest> broadcastRequestsForConnection,
ListeningExecutorService executor) {
// Can be BundleOfEnvelopes or a single BroadcastMessage
BroadcastMessage broadcastMessage = getMessage(broadcastRequestsForConnection);
SettableFuture<Connection> future = networkNode.sendMessage(connection, broadcastMessage);
SettableFuture<Connection> future = networkNode.sendMessage(connection, broadcastMessage, executor);

Futures.addCallback(future, new FutureCallback<>() {
@Override
Expand Down
21 changes: 19 additions & 2 deletions p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,20 @@

import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.config.Config;
import bisq.common.util.Utilities;

import javax.inject.Inject;
import javax.inject.Named;

import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand All @@ -49,16 +56,26 @@ public class Broadcaster implements BroadcastHandler.ResultHandler {
private Timer timer;
private boolean shutDownRequested;
private Runnable shutDownResultHandler;
private final ListeningExecutorService executor;


///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////

@Inject
public Broadcaster(NetworkNode networkNode, PeerManager peerManager) {
public Broadcaster(NetworkNode networkNode,
PeerManager peerManager,
@Named(Config.MAX_CONNECTIONS) int maxConnections) {
this.networkNode = networkNode;
this.peerManager = peerManager;

ThreadPoolExecutor threadPoolExecutor = Utilities.getThreadPoolExecutor("Broadcaster",
maxConnections,
maxConnections * 2,
30,
30);
executor = MoreExecutors.listeningDecorator(threadPoolExecutor);
}

public void shutDown(Runnable resultHandler) {
Expand Down Expand Up @@ -119,7 +136,7 @@ private void maybeBroadcastBundle() {
broadcastRequests.stream().map(e -> e.getMessage().getClass().getSimpleName()).collect(Collectors.toList()));
BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager, this);
broadcastHandlers.add(broadcastHandler);
broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested);
broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested, executor);
broadcastRequests.clear();

if (timer != null) {
Expand Down