Skip to content

Commit

Permalink
Merge pull request #6468 from HenrikJannsen/change_threadpool
Browse files Browse the repository at this point in the history
Use ThreadPoolExecutor with custom set queueCapacity instead of CachedThreadPool
  • Loading branch information
sqrrm authored Dec 19, 2022
2 parents 40d949c + 14c188a commit c76ee16
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 22 deletions.
38 changes: 19 additions & 19 deletions common/src/main/java/bisq/common/util/Utilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -101,7 +100,15 @@ public static ListeningExecutorService getListeningExecutorService(String name,
int corePoolSize,
int maximumPoolSize,
long keepAliveTimeInSec) {
return MoreExecutors.listeningDecorator(getThreadPoolExecutor(name, corePoolSize, maximumPoolSize, keepAliveTimeInSec));
return getListeningExecutorService(name, corePoolSize, maximumPoolSize, maximumPoolSize, keepAliveTimeInSec);
}

public static ListeningExecutorService getListeningExecutorService(String name,
int corePoolSize,
int maximumPoolSize,
int queueCapacity,
long keepAliveTimeInSec) {
return MoreExecutors.listeningDecorator(getThreadPoolExecutor(name, corePoolSize, maximumPoolSize, queueCapacity, keepAliveTimeInSec));
}

public static ListeningExecutorService getListeningExecutorService(String name,
Expand All @@ -116,8 +123,17 @@ public static ThreadPoolExecutor getThreadPoolExecutor(String name,
int corePoolSize,
int maximumPoolSize,
long keepAliveTimeInSec) {
return getThreadPoolExecutor(name, corePoolSize, maximumPoolSize, maximumPoolSize, keepAliveTimeInSec);
}


public static ThreadPoolExecutor getThreadPoolExecutor(String name,
int corePoolSize,
int maximumPoolSize,
int queueCapacity,
long keepAliveTimeInSec) {
return getThreadPoolExecutor(name, corePoolSize, maximumPoolSize, keepAliveTimeInSec,
new ArrayBlockingQueue<>(maximumPoolSize));
new ArrayBlockingQueue<>(queueCapacity));
}

private static ThreadPoolExecutor getThreadPoolExecutor(String name,
Expand All @@ -135,22 +151,6 @@ private static ThreadPoolExecutor getThreadPoolExecutor(String name,
return executor;
}

public static ExecutorService newCachedThreadPool(String name,
int maximumPoolSize,
long keepAliveTime,
TimeUnit timeUnit) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(name + "-%d")
.setDaemon(true)
.build();
return new ThreadPoolExecutor(0,
maximumPoolSize,
keepAliveTime,
timeUnit,
new SynchronousQueue<>(),
threadFactory);
}

@SuppressWarnings("SameParameterValue")
public static ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor(String name,
int corePoolSize,
Expand Down
12 changes: 10 additions & 2 deletions p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,16 @@ public abstract class NetworkNode implements MessageListener {
this.networkProtoResolver = networkProtoResolver;
this.networkFilter = networkFilter;

connectionExecutor = MoreExecutors.listeningDecorator(Utilities.newCachedThreadPool("NetworkNode.connection", maxConnections * 2, 1, TimeUnit.MINUTES));
sendMessageExecutor = MoreExecutors.listeningDecorator(Utilities.newCachedThreadPool("NetworkNode.sendMessage", maxConnections * 2, 3, TimeUnit.MINUTES));
connectionExecutor = Utilities.getListeningExecutorService("NetworkNode.connection",
maxConnections * 2,
maxConnections * 3,
10,
60);
sendMessageExecutor = Utilities.getListeningExecutorService("NetworkNode.sendMessage",
maxConnections * 2,
maxConnections * 3,
10,
60);
serverExecutor = Utilities.getSingleThreadExecutor("NetworkNode.server-" + servicePort);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public SeedNodeReportingService(P2PService p2PService,

// The pool size must be larger as the expected parallel sends because HttpClient use it
// internally for asynchronous and dependent tasks.
executor = Utilities.newCachedThreadPool("SeedNodeReportingService", 20, 8, TimeUnit.MINUTES);
executor = Utilities.getThreadPoolExecutor("SeedNodeReportingService", 20, 40, 100, 8 * 60);
httpClient = HttpClient.newBuilder().executor(executor).build();

heartBeatTimer = UserThread.runPeriodically(this::sendHeartBeat, HEART_BEAT_DELAY_SEC);
Expand Down

0 comments on commit c76ee16

Please sign in to comment.