From e0f4aa281a93dc15ff10c994422b9851606e95a5 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Tue, 27 Dec 2022 17:11:38 -0500 Subject: [PATCH 01/15] Catch RejectedExecutionException at UncaughtExceptionHandler and log error instead calling the uncaughtExceptionHandler Signed-off-by: HenrikJannsen --- common/src/main/java/bisq/common/setup/CommonSetup.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/common/src/main/java/bisq/common/setup/CommonSetup.java b/common/src/main/java/bisq/common/setup/CommonSetup.java index 20d99a22c31..ae92f30af5d 100644 --- a/common/src/main/java/bisq/common/setup/CommonSetup.java +++ b/common/src/main/java/bisq/common/setup/CommonSetup.java @@ -35,6 +35,7 @@ import java.nio.file.Paths; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import ch.qos.logback.classic.Level; @@ -72,13 +73,14 @@ public static void startPeriodicTasks() { public static void setupUncaughtExceptionHandler(UncaughtExceptionHandler uncaughtExceptionHandler) { Thread.UncaughtExceptionHandler handler = (thread, throwable) -> { - // Might come from another thread if (throwable.getCause() != null && throwable.getCause().getCause() != null && throwable.getCause().getCause() instanceof BlockStoreException) { - log.error(throwable.getMessage()); + log.error("Uncaught BlockStoreException ", throwable); } else if (throwable instanceof ClassCastException && "sun.awt.image.BufImgSurfaceData cannot be cast to sun.java2d.xr.XRSurfaceData".equals(throwable.getMessage())) { log.warn(throwable.getMessage()); + } else if (throwable instanceof RejectedExecutionException) { + log.error("Uncaught RejectedExecutionException ", throwable); } else if (throwable instanceof UnsupportedOperationException && "The system tray is not supported on the current platform.".equals(throwable.getMessage())) { log.warn(throwable.getMessage()); From 7953e353959148100537903f5bc803269574b874 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Tue, 27 Dec 2022 17:13:13 -0500 Subject: [PATCH 02/15] Add executor parameter to sendMessage Add try/catch to handle RejectedExecutionException Signed-off-by: HenrikJannsen --- .../bisq/network/p2p/network/NetworkNode.java | 46 ++++++++++++------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java b/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java index e90fb1513b8..409f01e6a35 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java +++ b/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java @@ -50,6 +50,7 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -140,7 +141,7 @@ public SettableFuture sendMessage(@NotNull NodeAddress peersNodeAddr SettableFuture resultFuture = SettableFuture.create(); ListenableFuture future = connectionExecutor.submit(() -> { - Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peersNodeAddress.getFullAddress()); + Thread.currentThread().setName("NetworkNode.connectionExecutor:SendMessage-to-" + peersNodeAddress.getFullAddress()); if (peersNodeAddress.equals(getNodeAddress())) { log.warn("We are sending a message to ourselves"); @@ -288,25 +289,36 @@ public Socks5Proxy getSocksProxy() { return null; } - public SettableFuture sendMessage(Connection connection, NetworkEnvelope networkEnvelope) { - // connection.sendMessage might take a bit (compression, write to stream), so we use a thread to not block - ListenableFuture future = sendMessageExecutor.submit(() -> { - String id = connection.getPeersNodeAddressOptional().isPresent() ? connection.getPeersNodeAddressOptional().get().getFullAddress() : connection.getUid(); - Thread.currentThread().setName("NetworkNode:SendMessage-to-" + id); - connection.sendMessage(networkEnvelope); - return connection; - }); + return sendMessage(connection, networkEnvelope, sendMessageExecutor); + } + + public SettableFuture sendMessage(Connection connection, + NetworkEnvelope networkEnvelope, + ListeningExecutorService executor) { SettableFuture resultFuture = SettableFuture.create(); - Futures.addCallback(future, new FutureCallback<>() { - public void onSuccess(Connection connection) { - UserThread.execute(() -> resultFuture.set(connection)); - } + try { + ListenableFuture future = executor.submit(() -> { + String id = connection.getPeersNodeAddressOptional().isPresent() ? connection.getPeersNodeAddressOptional().get().getFullAddress() : connection.getUid(); + Thread.currentThread().setName("NetworkNode:SendMessage-to-" + id); + connection.sendMessage(networkEnvelope); + return connection; + }); - public void onFailure(@NotNull Throwable throwable) { - UserThread.execute(() -> resultFuture.setException(throwable)); - } - }, MoreExecutors.directExecutor()); + Futures.addCallback(future, new FutureCallback<>() { + public void onSuccess(Connection connection) { + UserThread.execute(() -> resultFuture.set(connection)); + } + + public void onFailure(@NotNull Throwable throwable) { + UserThread.execute(() -> resultFuture.setException(throwable)); + } + }, MoreExecutors.directExecutor()); + + } catch (RejectedExecutionException exception) { + log.error("RejectedExecutionException at sendMessage: ", exception); + resultFuture.setException(exception); + } return resultFuture; } From d5b65fe23950d75cee949b0621e57c26cba78c70 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Tue, 27 Dec 2022 17:31:07 -0500 Subject: [PATCH 03/15] Reduce keepAliveTime to 30 sec. Signed-off-by: HenrikJannsen --- p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java b/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java index 409f01e6a35..f4f6a5a3216 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java +++ b/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java @@ -105,12 +105,12 @@ public abstract class NetworkNode implements MessageListener { maxConnections * 2, maxConnections * 3, 30, - 60); + 30); sendMessageExecutor = Utilities.getListeningExecutorService("NetworkNode.sendMessage", maxConnections * 2, maxConnections * 3, 30, - 60); + 30); serverExecutor = Utilities.getSingleThreadExecutor("NetworkNode.server-" + servicePort); } From a8a0c0e725ac5bf158197841231fb27de91494af Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Tue, 27 Dec 2022 17:33:39 -0500 Subject: [PATCH 04/15] Add custom thread pool to broadcaster The broadcasting consumes most of the threads but has lower priority than other messages being sent. By separating that thread pool from the common sendMessage executor we can reduce the risk that a burst of broadcasts exhausts the thread pool and might drop send message tasks. Signed-off-by: HenrikJannsen --- .../network/p2p/peers/BroadcastHandler.java | 13 ++++++++---- .../bisq/network/p2p/peers/Broadcaster.java | 21 +++++++++++++++++-- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java index b7b89d25795..5a466a5bee1 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; @@ -94,7 +95,9 @@ public interface Listener { // API /////////////////////////////////////////////////////////////////////////////////////////// - public void broadcast(List broadcastRequests, boolean shutDownRequested) { + public void broadcast(List broadcastRequests, + boolean shutDownRequested, + ListeningExecutorService executor) { List confirmedConnections = new ArrayList<>(networkNode.getConfirmedConnections()); Collections.shuffle(confirmedConnections); @@ -153,7 +156,7 @@ public void broadcast(List broadcastRequests, bool return; } - sendToPeer(connection, broadcastRequestsForConnection); + sendToPeer(connection, broadcastRequestsForConnection, executor); }, minDelay, maxDelay, TimeUnit.MILLISECONDS); } } @@ -235,10 +238,12 @@ private List getBroadcastRequestsForConnection(Con .collect(Collectors.toList()); } - private void sendToPeer(Connection connection, List broadcastRequestsForConnection) { + private void sendToPeer(Connection connection, + List broadcastRequestsForConnection, + ListeningExecutorService executor) { // Can be BundleOfEnvelopes or a single BroadcastMessage BroadcastMessage broadcastMessage = getMessage(broadcastRequestsForConnection); - SettableFuture future = networkNode.sendMessage(connection, broadcastMessage); + SettableFuture future = networkNode.sendMessage(connection, broadcastMessage, executor); Futures.addCallback(future, new FutureCallback<>() { @Override diff --git a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java index 2ddc1d8e79a..f327a31afde 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java @@ -23,13 +23,20 @@ import bisq.common.Timer; import bisq.common.UserThread; +import bisq.common.config.Config; +import bisq.common.util.Utilities; import javax.inject.Inject; +import javax.inject.Named; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -49,6 +56,7 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { private Timer timer; private boolean shutDownRequested; private Runnable shutDownResultHandler; + private final ListeningExecutorService executor; /////////////////////////////////////////////////////////////////////////////////////////// @@ -56,9 +64,18 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { /////////////////////////////////////////////////////////////////////////////////////////// @Inject - public Broadcaster(NetworkNode networkNode, PeerManager peerManager) { + public Broadcaster(NetworkNode networkNode, + PeerManager peerManager, + @Named(Config.MAX_CONNECTIONS) int maxConnections) { this.networkNode = networkNode; this.peerManager = peerManager; + + ThreadPoolExecutor threadPoolExecutor = Utilities.getThreadPoolExecutor("Broadcaster", + maxConnections, + maxConnections * 2, + 30, + 30); + executor = MoreExecutors.listeningDecorator(threadPoolExecutor); } public void shutDown(Runnable resultHandler) { @@ -119,7 +136,7 @@ private void maybeBroadcastBundle() { broadcastRequests.stream().map(e -> e.getMessage().getClass().getSimpleName()).collect(Collectors.toList())); BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager, this); broadcastHandlers.add(broadcastHandler); - broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested); + broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested, executor); broadcastRequests.clear(); if (timer != null) { From c3acd5fb4a0ab003e2a49d0a27c4ff9177fab193 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Fri, 16 Dec 2022 17:48:06 -0500 Subject: [PATCH 05/15] Handle OutOfMemoryError to cause a shutdown at seed node --- .../main/java/bisq/common/setup/CommonSetup.java | 5 +++++ .../src/main/java/bisq/seednode/SeedNodeMain.java | 13 +++++++++++++ 2 files changed, 18 insertions(+) diff --git a/common/src/main/java/bisq/common/setup/CommonSetup.java b/common/src/main/java/bisq/common/setup/CommonSetup.java index ae92f30af5d..d46d933433d 100644 --- a/common/src/main/java/bisq/common/setup/CommonSetup.java +++ b/common/src/main/java/bisq/common/setup/CommonSetup.java @@ -76,6 +76,11 @@ public static void setupUncaughtExceptionHandler(UncaughtExceptionHandler uncaug if (throwable.getCause() != null && throwable.getCause().getCause() != null && throwable.getCause().getCause() instanceof BlockStoreException) { log.error("Uncaught BlockStoreException ", throwable); + } else if (throwable instanceof OutOfMemoryError) { + Profiler.printSystemLoad(); + log.error("OutOfMemoryError occurred. We shut down.", throwable); + // Leave it to the handleUncaughtException to shut down or not. + UserThread.execute(() -> uncaughtExceptionHandler.handleUncaughtException(throwable, false)); } else if (throwable instanceof ClassCastException && "sun.awt.image.BufImgSurfaceData cannot be cast to sun.java2d.xr.XRSurfaceData".equals(throwable.getMessage())) { log.warn(throwable.getMessage()); diff --git a/seednode/src/main/java/bisq/seednode/SeedNodeMain.java b/seednode/src/main/java/bisq/seednode/SeedNodeMain.java index 3bfc2c30e84..81f0e28f80f 100644 --- a/seednode/src/main/java/bisq/seednode/SeedNodeMain.java +++ b/seednode/src/main/java/bisq/seednode/SeedNodeMain.java @@ -95,6 +95,19 @@ protected void onApplicationLaunched() { } + /////////////////////////////////////////////////////////////////////////////////////////// + // UncaughtExceptionHandler implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void handleUncaughtException(Throwable throwable, boolean doShutDown) { + if (throwable instanceof OutOfMemoryError || doShutDown) { + log.error("We got an OutOfMemoryError and shut down"); + gracefulShutDown(() -> log.info("gracefulShutDown complete")); + } + } + + /////////////////////////////////////////////////////////////////////////////////////////// // We continue with a series of synchronous execution tasks /////////////////////////////////////////////////////////////////////////////////////////// From 1030f891b9c56fdaefadb0156f97919825dab512 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Fri, 16 Dec 2022 17:48:58 -0500 Subject: [PATCH 06/15] Improve/cleanup logs. Only log lostAllConnections after numOnConnections > 2 to avoid logs at startup Signed-off-by: HenrikJannsen --- .../java/bisq/network/p2p/network/Connection.java | 12 ++++++------ .../java/bisq/network/p2p/peers/PeerManager.java | 8 ++++++-- .../p2p/peers/getdata/GetDataRequestHandler.java | 2 +- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/network/Connection.java b/p2p/src/main/java/bisq/network/p2p/network/Connection.java index be0a3390d66..049c690e04f 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/Connection.java +++ b/p2p/src/main/java/bisq/network/p2p/network/Connection.java @@ -595,7 +595,7 @@ public String printDetails() { public boolean reportInvalidRequest(RuleViolation ruleViolation) { - log.warn("We got reported the ruleViolation {} at connection {}", ruleViolation, this); + log.info("We got reported the ruleViolation {} at connection with address{} and uid {}", ruleViolation, this.getPeersNodeAddressProperty(), this.getUid()); int numRuleViolations; numRuleViolations = ruleViolations.getOrDefault(ruleViolation, 0); @@ -603,11 +603,11 @@ public boolean reportInvalidRequest(RuleViolation ruleViolation) { ruleViolations.put(ruleViolation, numRuleViolations); if (numRuleViolations >= ruleViolation.maxTolerance) { - log.warn("We close connection as we received too many corrupt requests.\n" + - "numRuleViolations={}\n\t" + - "corruptRequest={}\n\t" + - "corruptRequests={}\n\t" + - "connection={}", numRuleViolations, ruleViolation, ruleViolations, this); + log.warn("We close connection as we received too many corrupt requests. " + + "numRuleViolations={} " + + "corruptRequest={} " + + "corruptRequests={} " + + "connection with address{} and uid {}", numRuleViolations, ruleViolation, ruleViolations, this.getPeersNodeAddressProperty(), this.getUid()); this.ruleViolation = ruleViolation; if (ruleViolation == RuleViolation.PEER_BANNED) { log.warn("We close connection due RuleViolation.PEER_BANNED. peersNodeAddress={}", getPeersNodeAddressOptional()); diff --git a/p2p/src/main/java/bisq/network/p2p/peers/PeerManager.java b/p2p/src/main/java/bisq/network/p2p/peers/PeerManager.java index ebc581bbe2e..7d9e649b89c 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/PeerManager.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/PeerManager.java @@ -84,6 +84,7 @@ public final class PeerManager implements ConnectionListener, PersistedDataHost private static final boolean PRINT_REPORTED_PEERS_DETAILS = true; private Timer printStatisticsTimer; private boolean shutDownRequested; + private int numOnConnections; /////////////////////////////////////////////////////////////////////////////////////////// @@ -216,6 +217,8 @@ public void onConnection(Connection connection) { doHouseKeeping(); + numOnConnections++; + if (lostAllConnections) { lostAllConnections = false; stopped = false; @@ -238,7 +241,8 @@ public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection boolean previousLostAllConnections = lostAllConnections; lostAllConnections = networkNode.getAllConnections().isEmpty(); - if (lostAllConnections) { + // At start-up we ignore if we would lose a connection and would fall back to no connections + if (lostAllConnections && numOnConnections > 2) { stopped = true; if (!shutDownRequested) { @@ -562,7 +566,7 @@ boolean checkMaxConnections() { if (!candidates.isEmpty()) { Connection connection = candidates.remove(0); - log.info("checkMaxConnections: Num candidates for shut down={}. We close oldest connection to peer {}", + log.info("checkMaxConnections: Num candidates (inbound/peer) for shut down={}. We close oldest connection to peer {}", candidates.size(), connection.getPeersNodeAddressOptional()); if (!connection.isStopped()) { connection.shutDown(CloseConnectionReason.TOO_MANY_CONNECTIONS_OPEN, diff --git a/p2p/src/main/java/bisq/network/p2p/peers/getdata/GetDataRequestHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/getdata/GetDataRequestHandler.java index 02aab8e1645..e512ecb7c5b 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/getdata/GetDataRequestHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/getdata/GetDataRequestHandler.java @@ -136,7 +136,7 @@ public void onSuccess(Connection connection) { @Override public void onFailure(@NotNull Throwable throwable) { if (!stopped) { - String errorMessage = "Sending getDataRequest to " + connection + + String errorMessage = "Sending getDataResponse to " + connection + " failed. That is expected if the peer is offline. getDataResponse=" + getDataResponse + "." + "Exception: " + throwable.getMessage(); handleFault(errorMessage, CloseConnectionReason.SEND_MSG_FAILURE, connection); From d5b4ce275bf1f03e43a00f39dff16c42f9b8490f Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Fri, 16 Dec 2022 17:56:01 -0500 Subject: [PATCH 07/15] Add support for listeners when GetDataResponse and GetBlocksResponse are sent. Signed-off-by: HenrikJannsen --- .../BlindVoteStateMonitoringService.java | 5 +++ .../monitoring/DaoStateMonitoringService.java | 5 +++ .../ProposalStateMonitoringService.java | 9 ++++- .../network/StateNetworkService.java | 34 ++++++++++++++++++- .../full/network/FullNodeNetworkService.java | 19 ++++++++++- .../full/network/GetBlocksRequestHandler.java | 4 +-- .../peers/getdata/GetDataRequestHandler.java | 4 +-- .../p2p/peers/getdata/RequestDataManager.java | 19 ++++++++++- 8 files changed, 91 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/bisq/core/dao/monitoring/BlindVoteStateMonitoringService.java b/core/src/main/java/bisq/core/dao/monitoring/BlindVoteStateMonitoringService.java index f803060a53a..e6915a0c92c 100644 --- a/core/src/main/java/bisq/core/dao/monitoring/BlindVoteStateMonitoringService.java +++ b/core/src/main/java/bisq/core/dao/monitoring/BlindVoteStateMonitoringService.java @@ -25,6 +25,7 @@ import bisq.core.dao.monitoring.model.BlindVoteStateBlock; import bisq.core.dao.monitoring.model.BlindVoteStateHash; import bisq.core.dao.monitoring.network.BlindVoteStateNetworkService; +import bisq.core.dao.monitoring.network.StateNetworkService; import bisq.core.dao.monitoring.network.messages.GetBlindVoteStateHashesRequest; import bisq.core.dao.monitoring.network.messages.NewBlindVoteStateHashMessage; import bisq.core.dao.state.DaoStateListener; @@ -230,6 +231,10 @@ public void requestHashesFromGenesisBlockHeight(String peersAddress) { blindVoteStateNetworkService.requestHashes(genesisTxInfo.getGenesisBlockHeight(), peersAddress); } + public void addResponseListener(StateNetworkService.ResponseListener responseListener) { + blindVoteStateNetworkService.addResponseListener(responseListener); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Listeners diff --git a/core/src/main/java/bisq/core/dao/monitoring/DaoStateMonitoringService.java b/core/src/main/java/bisq/core/dao/monitoring/DaoStateMonitoringService.java index 1d35f3b2371..ff6205ec715 100644 --- a/core/src/main/java/bisq/core/dao/monitoring/DaoStateMonitoringService.java +++ b/core/src/main/java/bisq/core/dao/monitoring/DaoStateMonitoringService.java @@ -23,6 +23,7 @@ import bisq.core.dao.monitoring.model.UtxoMismatch; import bisq.core.dao.monitoring.network.Checkpoint; import bisq.core.dao.monitoring.network.DaoStateNetworkService; +import bisq.core.dao.monitoring.network.StateNetworkService; import bisq.core.dao.monitoring.network.messages.GetDaoStateHashesRequest; import bisq.core.dao.monitoring.network.messages.NewDaoStateHashMessage; import bisq.core.dao.state.DaoStateListener; @@ -289,6 +290,10 @@ public void setCreateSnapshotHandler(Runnable handler) { createSnapshotHandler = handler; } + public void addResponseListener(StateNetworkService.ResponseListener responseListener) { + daoStateNetworkService.addResponseListener(responseListener); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Listeners diff --git a/core/src/main/java/bisq/core/dao/monitoring/ProposalStateMonitoringService.java b/core/src/main/java/bisq/core/dao/monitoring/ProposalStateMonitoringService.java index c28f33a2357..a87c1b15185 100644 --- a/core/src/main/java/bisq/core/dao/monitoring/ProposalStateMonitoringService.java +++ b/core/src/main/java/bisq/core/dao/monitoring/ProposalStateMonitoringService.java @@ -24,6 +24,7 @@ import bisq.core.dao.monitoring.model.ProposalStateBlock; import bisq.core.dao.monitoring.model.ProposalStateHash; import bisq.core.dao.monitoring.network.ProposalStateNetworkService; +import bisq.core.dao.monitoring.network.StateNetworkService; import bisq.core.dao.monitoring.network.messages.GetProposalStateHashesRequest; import bisq.core.dao.monitoring.network.messages.NewProposalStateHashMessage; import bisq.core.dao.state.DaoStateListener; @@ -232,6 +233,10 @@ public void requestHashesFromGenesisBlockHeight(String peersAddress) { proposalStateNetworkService.requestHashes(genesisTxInfo.getGenesisBlockHeight(), peersAddress); } + public void addResponseListener(StateNetworkService.ResponseListener responseListener) { + proposalStateNetworkService.addResponseListener(responseListener); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Listeners @@ -294,7 +299,9 @@ private boolean maybeUpdateHashChain(int blockHeight) { return true; } - private boolean processPeersProposalStateHash(ProposalStateHash proposalStateHash, Optional peersNodeAddress, boolean notifyListeners) { + private boolean processPeersProposalStateHash(ProposalStateHash proposalStateHash, + Optional peersNodeAddress, + boolean notifyListeners) { AtomicBoolean changed = new AtomicBoolean(false); AtomicBoolean inConflictWithNonSeedNode = new AtomicBoolean(this.isInConflictWithNonSeedNode); AtomicBoolean inConflictWithSeedNode = new AtomicBoolean(this.isInConflictWithSeedNode); diff --git a/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java b/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java index 2df00fc94ff..af3e4eedae3 100644 --- a/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java +++ b/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java @@ -29,10 +29,16 @@ import bisq.network.p2p.peers.Broadcaster; import bisq.network.p2p.peers.PeerManager; +import bisq.common.UserThread; import bisq.common.proto.network.NetworkEnvelope; import javax.inject.Inject; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; + import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,6 +48,8 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; + import javax.annotation.Nullable; @Slf4j @@ -59,6 +67,12 @@ public interface Listener stateHashes, Optional peersNodeAddress); } + public interface ResponseListener { + void onSuccess(int serializedSize); + + void onFault(); + } + protected final NetworkNode networkNode; protected final PeerManager peerManager; private final Broadcaster broadcaster; @@ -67,6 +81,7 @@ public interface Listener requestStateHashHandlerMap = new HashMap<>(); private final List> listeners = new CopyOnWriteArrayList<>(); private boolean messageListenerAdded; + private final List responseListeners = new CopyOnWriteArrayList<>(); /////////////////////////////////////////////////////////////////////////////////////////// @@ -145,7 +160,20 @@ public void sendGetStateHashesResponse(Connection connection, int nonce, List future = networkNode.sendMessage(connection, getStateHashesResponse); + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(Connection connection) { + UserThread.execute(() -> responseListeners.forEach(listeners -> listeners.onSuccess(getStateHashesResponse.toProtoMessage().getSerializedSize())) + ); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + UserThread.execute(() -> responseListeners.forEach(StateNetworkService.ResponseListener::onFault) + ); + } + }, MoreExecutors.directExecutor()); } public void requestHashesFromAllConnectedSeedNodes(int fromHeight) { @@ -171,6 +199,10 @@ public boolean isSeedNode(NodeAddress nodeAddress) { return peerManager.isSeedNode(nodeAddress); } + public void addResponseListener(ResponseListener responseListener) { + responseListeners.add(responseListener); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Listeners diff --git a/core/src/main/java/bisq/core/dao/node/full/network/FullNodeNetworkService.java b/core/src/main/java/bisq/core/dao/node/full/network/FullNodeNetworkService.java index e8b6a0483f1..20583433cb5 100644 --- a/core/src/main/java/bisq/core/dao/node/full/network/FullNodeNetworkService.java +++ b/core/src/main/java/bisq/core/dao/node/full/network/FullNodeNetworkService.java @@ -37,7 +37,9 @@ import javax.inject.Inject; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import lombok.extern.slf4j.Slf4j; @@ -51,6 +53,12 @@ public class FullNodeNetworkService implements MessageListener, PeerManager.List private static final long CLEANUP_TIMER = 120; + public interface ResponseListener { + void onSuccess(int serializedSize); + + void onFault(); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Class fields @@ -65,6 +73,7 @@ public class FullNodeNetworkService implements MessageListener, PeerManager.List // Key is connection UID private final Map getBlocksRequestHandlers = new HashMap<>(); private boolean stopped; + private final List responseListeners = new CopyOnWriteArrayList<>(); /////////////////////////////////////////////////////////////////////////////////////////// @@ -107,6 +116,10 @@ public void publishNewBlock(Block block) { broadcaster.broadcast(newBlockBroadcastMessage, networkNode.getNodeAddress()); } + public void addResponseListener(ResponseListener responseListener) { + responseListeners.add(responseListener); + } + /////////////////////////////////////////////////////////////////////////////////////////// // PeerManager.Listener implementation @@ -166,8 +179,10 @@ private void handleGetBlocksRequest(GetBlocksRequest getBlocksRequest, Connectio daoStateService, new GetBlocksRequestHandler.Listener() { @Override - public void onComplete() { + public void onComplete(int serializedSize) { getBlocksRequestHandlers.remove(uid); + + responseListeners.forEach(listener -> listener.onSuccess(serializedSize)); } @Override @@ -179,6 +194,8 @@ public void onFault(String errorMessage, @Nullable Connection connection) { if (connection != null) { peerManager.handleConnectionFault(connection); } + + responseListeners.forEach(ResponseListener::onFault); } else { log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call."); } diff --git a/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java b/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java index de8fad9f8be..411d49de4fc 100644 --- a/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java +++ b/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java @@ -57,7 +57,7 @@ class GetBlocksRequestHandler { /////////////////////////////////////////////////////////////////////////////////////////// public interface Listener { - void onComplete(); + void onComplete(int serializedSize); void onFault(String errorMessage, Connection connection); } @@ -120,7 +120,7 @@ public void onSuccess(Connection connection) { log.info("Send DataResponse to {} succeeded. getBlocksResponse.getBlocks().size()={}", connection.getPeersNodeAddressOptional(), getBlocksResponse.getBlocks().size()); cleanup(); - listener.onComplete(); + listener.onComplete(getBlocksResponse.toProtoNetworkEnvelope().getSerializedSize()); } else { log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call."); } diff --git a/p2p/src/main/java/bisq/network/p2p/peers/getdata/GetDataRequestHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/getdata/GetDataRequestHandler.java index e512ecb7c5b..c000449b907 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/getdata/GetDataRequestHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/getdata/GetDataRequestHandler.java @@ -50,7 +50,7 @@ public class GetDataRequestHandler { /////////////////////////////////////////////////////////////////////////////////////////// public interface Listener { - void onComplete(); + void onComplete(int serializedSize); void onFault(String errorMessage, Connection connection); } @@ -126,8 +126,8 @@ public void onSuccess(Connection connection) { if (!stopped) { log.trace("Send DataResponse to {} succeeded. getDataResponse={}", connection.getPeersNodeAddressOptional(), getDataResponse); + listener.onComplete(getDataResponse.toProtoNetworkEnvelope().getSerializedSize()); cleanup(); - listener.onComplete(); } else { log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call."); } diff --git a/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataManager.java b/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataManager.java index 8225c5cb07a..53fd181dd89 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataManager.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataManager.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -63,6 +64,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener, private static int NUM_ADDITIONAL_SEEDS_FOR_UPDATE_REQUEST = 1; private boolean isPreliminaryDataRequest = true; + /////////////////////////////////////////////////////////////////////////////////////////// // Listener /////////////////////////////////////////////////////////////////////////////////////////// @@ -81,6 +83,12 @@ default void onNoSeedNodeAvailable() { } } + public interface ResponseListener { + void onSuccess(int serializedSize); + + void onFault(); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Class fields @@ -90,6 +98,7 @@ default void onNoSeedNodeAvailable() { private final P2PDataStorage dataStorage; private final PeerManager peerManager; private final List seedNodeAddresses; + private final List responseListeners = new CopyOnWriteArrayList<>(); // As we use Guice injection we cannot set the listener in our constructor but the P2PService calls the setListener // in it's constructor so we can guarantee it is not null. @@ -205,6 +214,10 @@ public Optional getNodeAddressOfPreliminaryDataRequest() { return nodeAddressOfPreliminaryDataRequest; } + public void addResponseListener(ResponseListener responseListener) { + responseListeners.add(responseListener); + } + /////////////////////////////////////////////////////////////////////////////////////////// // ConnectionListener implementation @@ -276,9 +289,11 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { GetDataRequestHandler getDataRequestHandler = new GetDataRequestHandler(networkNode, dataStorage, new GetDataRequestHandler.Listener() { @Override - public void onComplete() { + public void onComplete(int serializedSize) { getDataRequestHandlers.remove(uid); log.trace("requestDataHandshake completed.\n\tConnection={}", connection); + + responseListeners.forEach(listener -> listener.onSuccess(serializedSize)); } @Override @@ -288,6 +303,8 @@ public void onFault(String errorMessage, @Nullable Connection connection) { log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" + "ErrorMessage={}", connection, errorMessage); peerManager.handleConnectionFault(connection); + + responseListeners.forEach(ResponseListener::onFault); } else { log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call."); } From 12e8b468598ee2fc8814c5abe768a656172405ae Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Fri, 16 Dec 2022 18:00:24 -0500 Subject: [PATCH 08/15] Report size or faults of GetDataResponse and GetBlocksResponse. Remove Unspecified and use optional instead. Add reporting for data requests and hash requests. Report commit hash only if present. Report messages only if an enum entry is present. Signed-off-by: HenrikJannsen --- .../reporting/DoubleValueReportingItem.java | 23 ++-- .../reporting/LongValueReportingItem.java | 39 ++++-- .../seednode/reporting/ReportingItem.java | 4 +- .../seednode/reporting/ReportingItems.java | 6 +- .../reporting/SeedNodeReportingService.java | 130 ++++++++++++++++-- .../reporting/StringValueReportingItem.java | 24 ++-- 6 files changed, 181 insertions(+), 45 deletions(-) diff --git a/seednode/src/main/java/bisq/seednode/reporting/DoubleValueReportingItem.java b/seednode/src/main/java/bisq/seednode/reporting/DoubleValueReportingItem.java index b4a7486127f..d666a823407 100644 --- a/seednode/src/main/java/bisq/seednode/reporting/DoubleValueReportingItem.java +++ b/seednode/src/main/java/bisq/seednode/reporting/DoubleValueReportingItem.java @@ -17,12 +17,14 @@ package bisq.seednode.reporting; +import java.util.Optional; + import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; - +@Slf4j public enum DoubleValueReportingItem implements ReportingItem { - Unspecified("", "Unspecified"), sentBytesPerSec("network", "sentBytesPerSec"), receivedBytesPerSec("network", "receivedBytesPerSec"), receivedMessagesPerSec("network", "receivedMessagesPerSec"), @@ -47,16 +49,15 @@ public DoubleValueReportingItem withValue(double value) { return this; } - public static DoubleValueReportingItem from(String key, double value) { - DoubleValueReportingItem item; + public static Optional from(String key, double value) { try { - item = DoubleValueReportingItem.valueOf(key); + DoubleValueReportingItem item = DoubleValueReportingItem.valueOf(key); + item.setValue(value); + return Optional.of(item); } catch (Throwable t) { - item = Unspecified; + log.warn("No enum value with {}", key); + return Optional.empty(); } - - item.setValue(value); - return item; } @Override @@ -66,8 +67,8 @@ public protobuf.ReportingItem toProtoMessage() { .build(); } - public static DoubleValueReportingItem fromProto(protobuf.ReportingItem baseProto, - protobuf.DoubleValueReportingItem proto) { + public static Optional fromProto(protobuf.ReportingItem baseProto, + protobuf.DoubleValueReportingItem proto) { return DoubleValueReportingItem.from(baseProto.getKey(), proto.getValue()); } diff --git a/seednode/src/main/java/bisq/seednode/reporting/LongValueReportingItem.java b/seednode/src/main/java/bisq/seednode/reporting/LongValueReportingItem.java index a62a2d381cb..59de27e5ac8 100644 --- a/seednode/src/main/java/bisq/seednode/reporting/LongValueReportingItem.java +++ b/seednode/src/main/java/bisq/seednode/reporting/LongValueReportingItem.java @@ -17,13 +17,16 @@ package bisq.seednode.reporting; +import java.util.Optional; + import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; - +@Slf4j public enum LongValueReportingItem implements ReportingItem { - Unspecified("", "Unspecified"), OfferPayload("data", "OfferPayload"), + BsqSwapOfferPayload("data", "BsqSwapOfferPayload"), MailboxStoragePayload("data", "MailboxStoragePayload"), TradeStatistics3("data", "TradeStatistics3"), AccountAgeWitness("data", "AccountAgeWitness"), @@ -47,6 +50,21 @@ public enum LongValueReportingItem implements ReportingItem { sentBytes("network", "sentBytes"), receivedBytes("network", "receivedBytes"), + PreliminaryGetDataRequest("network", "PreliminaryGetDataRequest"), + GetUpdatedDataRequest("network", "GetUpdatedDataRequest"), + GetBlocksRequest("network", "GetBlocksRequest"), + GetDaoStateHashesRequest("network", "GetDaoStateHashesRequest"), + GetProposalStateHashesRequest("network", "GetProposalStateHashesRequest"), + GetBlindVoteStateHashesRequest("network", "GetBlindVoteStateHashesRequest"), + + GetDataResponse("network", "GetDataResponse"), + GetBlocksResponse("network", "GetBlocksResponse"), + GetDaoStateHashesResponse("network", "GetDaoStateHashesResponse"), + GetProposalStateHashesResponse("network", "GetProposalStateHashesResponse"), + GetBlindVoteStateHashesResponse("network", "GetBlindVoteStateHashesResponse"), + + failedResponseClassName("network", "failedResponseClassName"), + usedMemoryInMB("node", "usedMemoryInMB"), totalMemoryInMB("node", "totalMemoryInMB"), jvmStartTimeInSec("node", "jvmStartTimeInSec"); @@ -69,16 +87,15 @@ public LongValueReportingItem withValue(long value) { return this; } - public static LongValueReportingItem from(String key, long value) { - LongValueReportingItem item; + public static Optional from(String key, long value) { try { - item = LongValueReportingItem.valueOf(key); + LongValueReportingItem item = LongValueReportingItem.valueOf(key); + item.setValue(value); + return Optional.of(item); } catch (Throwable t) { - item = Unspecified; + log.warn("No enum value with {}", key); + return Optional.empty(); } - - item.setValue(value); - return item; } @Override @@ -88,8 +105,8 @@ public protobuf.ReportingItem toProtoMessage() { .build(); } - public static LongValueReportingItem fromProto(protobuf.ReportingItem baseProto, - protobuf.LongValueReportingItem proto) { + public static Optional fromProto(protobuf.ReportingItem baseProto, + protobuf.LongValueReportingItem proto) { return LongValueReportingItem.from(baseProto.getKey(), proto.getValue()); } diff --git a/seednode/src/main/java/bisq/seednode/reporting/ReportingItem.java b/seednode/src/main/java/bisq/seednode/reporting/ReportingItem.java index 828b6585429..57e0020aa0a 100644 --- a/seednode/src/main/java/bisq/seednode/reporting/ReportingItem.java +++ b/seednode/src/main/java/bisq/seednode/reporting/ReportingItem.java @@ -20,6 +20,8 @@ import bisq.common.proto.ProtobufferRuntimeException; import bisq.common.proto.network.NetworkPayload; +import java.util.Optional; + public interface ReportingItem extends NetworkPayload { String getKey(); @@ -35,7 +37,7 @@ default protobuf.ReportingItem.Builder getBuilder() { protobuf.ReportingItem toProtoMessage(); - static ReportingItem fromProto(protobuf.ReportingItem proto) { + static Optional fromProto(protobuf.ReportingItem proto) { switch (proto.getMessageCase()) { case STRING_VALUE_REPORTING_ITEM: return StringValueReportingItem.fromProto(proto, proto.getStringValueReportingItem()); diff --git a/seednode/src/main/java/bisq/seednode/reporting/ReportingItems.java b/seednode/src/main/java/bisq/seednode/reporting/ReportingItems.java index d5cb51057dc..66afbc6b544 100644 --- a/seednode/src/main/java/bisq/seednode/reporting/ReportingItems.java +++ b/seednode/src/main/java/bisq/seednode/reporting/ReportingItems.java @@ -23,6 +23,7 @@ import java.io.ByteArrayOutputStream; import java.util.ArrayList; +import java.util.Optional; import java.util.stream.Collectors; import lombok.Getter; @@ -50,7 +51,10 @@ public protobuf.ReportingItems toProtoMessage() { public static ReportingItems fromProto(protobuf.ReportingItems proto) { ReportingItems reportingItems = new ReportingItems(proto.getAddress()); reportingItems.addAll(proto.getReportingItemList().stream() - .map(ReportingItem::fromProto).collect(Collectors.toList())); + .map(ReportingItem::fromProto) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList())); return reportingItems; } diff --git a/seednode/src/main/java/bisq/seednode/reporting/SeedNodeReportingService.java b/seednode/src/main/java/bisq/seednode/reporting/SeedNodeReportingService.java index de03faecb38..c775d11bd7e 100644 --- a/seednode/src/main/java/bisq/seednode/reporting/SeedNodeReportingService.java +++ b/seednode/src/main/java/bisq/seednode/reporting/SeedNodeReportingService.java @@ -24,6 +24,12 @@ import bisq.core.dao.monitoring.model.BlindVoteStateBlock; import bisq.core.dao.monitoring.model.DaoStateBlock; import bisq.core.dao.monitoring.model.ProposalStateBlock; +import bisq.core.dao.monitoring.network.StateNetworkService; +import bisq.core.dao.monitoring.network.messages.GetBlindVoteStateHashesRequest; +import bisq.core.dao.monitoring.network.messages.GetDaoStateHashesRequest; +import bisq.core.dao.monitoring.network.messages.GetProposalStateHashesRequest; +import bisq.core.dao.node.full.network.FullNodeNetworkService; +import bisq.core.dao.node.messages.GetBlocksRequest; import bisq.core.dao.state.DaoStateListener; import bisq.core.dao.state.DaoStateService; @@ -31,6 +37,9 @@ import bisq.network.p2p.network.NetworkNode; import bisq.network.p2p.network.Statistic; import bisq.network.p2p.peers.PeerManager; +import bisq.network.p2p.peers.getdata.RequestDataManager; +import bisq.network.p2p.peers.getdata.messages.GetUpdatedDataRequest; +import bisq.network.p2p.peers.getdata.messages.PreliminaryGetDataRequest; import bisq.network.p2p.storage.P2PDataStorage; import bisq.network.p2p.storage.payload.ProtectedStorageEntry; @@ -103,6 +112,8 @@ public SeedNodeReportingService(P2PService p2PService, DaoStateMonitoringService daoStateMonitoringService, ProposalStateMonitoringService proposalStateMonitoringService, BlindVoteStateMonitoringService blindVoteStateMonitoringService, + RequestDataManager requestDataManager, + FullNodeNetworkService fullNodeNetworkService, @Named(Config.MAX_CONNECTIONS) int maxConnections, @Named(Config.SEED_NODE_REPORTING_SERVER_URL) String seedNodeReportingServerUrl) { this.p2PService = p2PService; @@ -142,6 +153,106 @@ public void onDaoStateBlockCreated() { } }; daoFacade.addBsqStateListener(daoStateListener); + + p2PService.getNetworkNode().addMessageListener((networkEnvelope, connection) -> { + if (networkEnvelope instanceof PreliminaryGetDataRequest || + networkEnvelope instanceof GetUpdatedDataRequest || + networkEnvelope instanceof GetBlocksRequest || + networkEnvelope instanceof GetDaoStateHashesRequest || + networkEnvelope instanceof GetProposalStateHashesRequest || + networkEnvelope instanceof GetBlindVoteStateHashesRequest) { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + int serializedSize = networkEnvelope.toProtoNetworkEnvelope().getSerializedSize(); + String simpleName = networkEnvelope.getClass().getSimpleName(); + try { + LongValueReportingItem reportingItem = LongValueReportingItem.valueOf(simpleName); + reportingItems.add(reportingItem.withValue(serializedSize)); + sendReportingItems(reportingItems); + } catch (Throwable t) { + log.warn("Could not find enum for {}. Error={}", simpleName, t); + } + } + }); + + requestDataManager.addResponseListener(new RequestDataManager.ResponseListener() { + @Override + public void onSuccess(int serializedSize) { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + reportingItems.add(LongValueReportingItem.GetDataResponse.withValue(serializedSize)); + sendReportingItems(reportingItems); + } + + @Override + public void onFault() { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + reportingItems.add(LongValueReportingItem.GetDataResponse.withValue(-1)); + sendReportingItems(reportingItems); + } + }); + + fullNodeNetworkService.addResponseListener(new FullNodeNetworkService.ResponseListener() { + @Override + public void onSuccess(int serializedSize) { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + reportingItems.add(LongValueReportingItem.GetBlocksResponse.withValue(serializedSize)); + sendReportingItems(reportingItems); + } + + @Override + public void onFault() { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + reportingItems.add(LongValueReportingItem.GetBlocksResponse.withValue(-1)); + sendReportingItems(reportingItems); + } + }); + + daoStateMonitoringService.addResponseListener(new StateNetworkService.ResponseListener() { + @Override + public void onSuccess(int serializedSize) { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + reportingItems.add(LongValueReportingItem.GetDaoStateHashesResponse.withValue(serializedSize)); + sendReportingItems(reportingItems); + } + + @Override + public void onFault() { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + reportingItems.add(LongValueReportingItem.GetDaoStateHashesResponse.withValue(-1)); + sendReportingItems(reportingItems); + } + }); + + proposalStateMonitoringService.addResponseListener(new StateNetworkService.ResponseListener() { + @Override + public void onSuccess(int serializedSize) { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + reportingItems.add(LongValueReportingItem.GetProposalStateHashesResponse.withValue(serializedSize)); + sendReportingItems(reportingItems); + } + + @Override + public void onFault() { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + reportingItems.add(LongValueReportingItem.GetProposalStateHashesResponse.withValue(-1)); + sendReportingItems(reportingItems); + } + }); + + blindVoteStateMonitoringService.addResponseListener(new StateNetworkService.ResponseListener() { + @Override + public void onSuccess(int serializedSize) { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + reportingItems.add(LongValueReportingItem.GetBlindVoteStateHashesResponse.withValue(serializedSize)); + sendReportingItems(reportingItems); + } + + @Override + public void onFault() { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + reportingItems.add(LongValueReportingItem.GetBlindVoteStateHashesResponse.withValue(-1)); + sendReportingItems(reportingItems); + } + }); } public void shutDown() { @@ -213,7 +324,7 @@ private void sendDataReport() { numItemsByType.putIfAbsent(className, 0); numItemsByType.put(className, numItemsByType.get(className) + 1); }); - numItemsByType.forEach((key, value) -> reportingItems.add(LongValueReportingItem.from(key, value))); + numItemsByType.forEach((key, value) -> LongValueReportingItem.from(key, value).ifPresent(reportingItems::add)); // Network reportingItems.add(LongValueReportingItem.numConnections.withValue(networkNode.getAllConnections().size())); @@ -233,16 +344,15 @@ private void sendDataReport() { reportingItems.add(LongValueReportingItem.maxConnections.withValue(maxConnections)); reportingItems.add(StringValueReportingItem.version.withValue(Version.VERSION)); - // If no commit hash is found we use 0 in hex format - String commitHash = Version.findCommitHash().orElse("00"); - reportingItems.add(StringValueReportingItem.commitHash.withValue(commitHash)); + Version.findCommitHash().ifPresent(commitHash -> reportingItems.add(StringValueReportingItem.commitHash.withValue(commitHash))); sendReportingItems(reportingItems); } private void sendReportingItems(ReportingItems reportingItems) { + String truncated = Utilities.toTruncatedString(reportingItems.toString()); try { - log.info("Send report to monitor server: {}", reportingItems.toString()); + log.info("Going to send report to monitor server: {}", truncated); // We send the data as hex encoded protobuf data. We do not use the envelope as it is not part of the p2p system. byte[] protoMessageAsBytes = reportingItems.toProtoMessageAsBytes(); HttpRequest request = HttpRequest.newBuilder() @@ -253,14 +363,16 @@ private void sendReportingItems(ReportingItems reportingItems) { httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()).whenComplete((response, throwable) -> { if (throwable != null) { log.warn("Exception at sending reporting data. {}", throwable.getMessage()); - } else if (response.statusCode() != 200) { - log.error("Response error message: {}", response); + } else if (response.statusCode() == 200) { + log.info("Sent successfully report to monitor server with {} items", reportingItems.size()); + } else { + log.warn("Server responded with error. Response={}", response); } }); } catch (RejectedExecutionException t) { - log.warn("Did not send reportingItems {} because of RejectedExecutionException {}", reportingItems, t.toString()); + log.warn("Did not send reportingItems {} because of RejectedExecutionException {}", truncated, t.toString()); } catch (Throwable t) { - log.warn("Did not send reportingItems {} because of exception {}", reportingItems, t.toString()); + log.warn("Did not send reportingItems {} because of exception {}", truncated, t.toString()); } } diff --git a/seednode/src/main/java/bisq/seednode/reporting/StringValueReportingItem.java b/seednode/src/main/java/bisq/seednode/reporting/StringValueReportingItem.java index 65f9edbf1f6..e814fb42f6e 100644 --- a/seednode/src/main/java/bisq/seednode/reporting/StringValueReportingItem.java +++ b/seednode/src/main/java/bisq/seednode/reporting/StringValueReportingItem.java @@ -17,13 +17,14 @@ package bisq.seednode.reporting; +import java.util.Optional; + import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; - +@Slf4j public enum StringValueReportingItem implements ReportingItem { - Unspecified("", "Unspecified"), - daoStateHash("dao", "daoStateHash"), proposalHash("dao", "proposalHash"), blindVoteHash("dao", "blindVoteHash"), @@ -49,16 +50,15 @@ public StringValueReportingItem withValue(String value) { return this; } - public static StringValueReportingItem from(String key, String value) { - StringValueReportingItem item; + public static Optional from(String key, String value) { try { - item = StringValueReportingItem.valueOf(key); + StringValueReportingItem item = StringValueReportingItem.valueOf(key); + item.setValue(value); + return Optional.of(item); } catch (Throwable t) { - item = Unspecified; + log.warn("No enum value with {}", key); + return Optional.empty(); } - - item.setValue(value); - return item; } @Override @@ -73,8 +73,8 @@ public protobuf.ReportingItem toProtoMessage() { .build(); } - public static StringValueReportingItem fromProto(protobuf.ReportingItem baseProto, - protobuf.StringValueReportingItem proto) { + public static Optional fromProto(protobuf.ReportingItem baseProto, + protobuf.StringValueReportingItem proto) { return StringValueReportingItem.from(baseProto.getKey(), proto.getValue()); } From 0b33a12c1974d5ba6720714bee98b1cc39a92775 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Fri, 16 Dec 2022 19:32:02 -0500 Subject: [PATCH 09/15] Change visibility, make isShutdownInProgress volatile Signed-off-by: HenrikJannsen --- core/src/main/java/bisq/core/app/BisqExecutable.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/bisq/core/app/BisqExecutable.java b/core/src/main/java/bisq/core/app/BisqExecutable.java index e861d15b516..a92134dceb8 100644 --- a/core/src/main/java/bisq/core/app/BisqExecutable.java +++ b/core/src/main/java/bisq/core/app/BisqExecutable.java @@ -74,7 +74,7 @@ public abstract class BisqExecutable implements GracefulShutDownHandler, BisqSet protected Injector injector; protected AppModule module; protected Config config; - private boolean isShutdownInProgress; + protected volatile boolean isShutdownInProgress; private boolean hasDowngraded; public BisqExecutable(String fullName, String scriptName, String appName, String version) { @@ -281,7 +281,7 @@ public void run() { } } - private void flushAndExit(ResultHandler resultHandler, int status) { + protected void flushAndExit(ResultHandler resultHandler, int status) { if (!hasDowngraded) { // If user tried to downgrade we do not write the persistable data to avoid data corruption log.info("PersistenceManager flushAllDataToDiskAtShutdown started"); From 7f16e874d262499b739e655199111b60b4b3268d Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Fri, 16 Dec 2022 20:49:06 -0500 Subject: [PATCH 10/15] Improve shutdown at dao nodes Signed-off-by: HenrikJannsen --- .../main/java/bisq/core/dao/node/BsqNode.java | 6 ++++++ .../java/bisq/core/dao/node/full/FullNode.java | 4 ++++ .../bisq/core/dao/node/full/RpcService.java | 17 ++++++++++------- .../java/bisq/core/dao/node/lite/LiteNode.java | 3 +++ 4 files changed, 23 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/bisq/core/dao/node/BsqNode.java b/core/src/main/java/bisq/core/dao/node/BsqNode.java index 2e11ce5d12a..0bebd702dae 100644 --- a/core/src/main/java/bisq/core/dao/node/BsqNode.java +++ b/core/src/main/java/bisq/core/dao/node/BsqNode.java @@ -72,6 +72,7 @@ public abstract class BsqNode implements DaoSetupService { // (not parsed) block. @Getter protected int chainTipHeight; + protected volatile boolean shutdownInProgress; /////////////////////////////////////////////////////////////////////////////////////////// @@ -156,6 +157,7 @@ public void setWarnMessageHandler(@SuppressWarnings("NullableProblems") Consumer } public void shutDown() { + shutdownInProgress = true; exportJsonFilesService.shutDown(); daoStateSnapshotService.shutDown(); } @@ -200,6 +202,10 @@ protected void startReOrgFromLastSnapshot() { protected Optional doParseBlock(RawBlock rawBlock) throws RequiredReorgFromSnapshotException { + if (shutdownInProgress) { + return Optional.empty(); + } + // We check if we have a block with that height. If so we return. We do not use the chainHeight as with genesis // height we have no block but chainHeight is initially set to genesis height (bad design ;-( but a bit tricky // to change now as it used in many areas.) diff --git a/core/src/main/java/bisq/core/dao/node/full/FullNode.java b/core/src/main/java/bisq/core/dao/node/full/FullNode.java index 93c18a9a67b..469f19b608f 100644 --- a/core/src/main/java/bisq/core/dao/node/full/FullNode.java +++ b/core/src/main/java/bisq/core/dao/node/full/FullNode.java @@ -97,6 +97,7 @@ public void start() { public void shutDown() { super.shutDown(); + rpcService.shutDown(); fullNodeNetworkService.shutDown(); } @@ -239,6 +240,9 @@ private void parseBlockRecursively(int blockHeight, Consumer newBlockHandler, ResultHandler resultHandler, Consumer errorHandler) { + if (shutdownInProgress) { + return; + } rpcService.requestDtoBlock(blockHeight, rawBlock -> { try { diff --git a/core/src/main/java/bisq/core/dao/node/full/RpcService.java b/core/src/main/java/bisq/core/dao/node/full/RpcService.java index d7e557db389..175189ffeb2 100644 --- a/core/src/main/java/bisq/core/dao/node/full/RpcService.java +++ b/core/src/main/java/bisq/core/dao/node/full/RpcService.java @@ -92,7 +92,7 @@ public class RpcService { // We could use multiple threads, but then we need to support ordering of results in a queue // Keep that for optimization after measuring performance differences private final ListeningExecutorService executor = Utilities.getSingleThreadListeningExecutor("RpcService"); - private volatile boolean isShutDown; + private volatile boolean shutdownInProgress; private final Set setupResultHandlers = new CopyOnWriteArraySet<>(); private final Set> setupErrorHandlers = new CopyOnWriteArraySet<>(); private volatile boolean setupComplete; @@ -139,14 +139,17 @@ private RpcService(Preferences preferences, /////////////////////////////////////////////////////////////////////////////////////////// public void shutDown() { - isShutDown = true; + if (shutdownInProgress) { + return; + } + shutdownInProgress = true; if (daemon != null) { daemon.shutdown(); log.info("daemon shut down"); } // A hard shutdown is justified for the RPC service. - executor.shutdown(); + executor.shutdownNow(); } public void setup(ResultHandler resultHandler, Consumer errorHandler) { @@ -217,7 +220,7 @@ public void onFailure(@NotNull Throwable throwable) { } }, MoreExecutors.directExecutor()); } catch (Exception e) { - if (!isShutDown || !(e instanceof RejectedExecutionException)) { + if (!shutdownInProgress || !(e instanceof RejectedExecutionException)) { log.warn(e.toString(), e); throw e; } @@ -311,7 +314,7 @@ public void onFailure(@NotNull Throwable throwable) { } }, MoreExecutors.directExecutor()); } catch (Exception e) { - if (!isShutDown || !(e instanceof RejectedExecutionException)) { + if (!shutdownInProgress || !(e instanceof RejectedExecutionException)) { log.error("Exception at requestChainHeadHeight", e); throw e; } @@ -346,7 +349,7 @@ public void onFailure(@NotNull Throwable throwable) { }, MoreExecutors.directExecutor()); } catch (Exception e) { log.error("Exception at requestDtoBlock", e); - if (!isShutDown || !(e instanceof RejectedExecutionException)) { + if (!shutdownInProgress || !(e instanceof RejectedExecutionException)) { log.warn(e.toString(), e); throw e; } @@ -381,7 +384,7 @@ public void onFailure(@NotNull Throwable throwable) { } }, MoreExecutors.directExecutor()); } catch (Exception e) { - if (!isShutDown || !(e instanceof RejectedExecutionException)) { + if (!shutdownInProgress || !(e instanceof RejectedExecutionException)) { log.warn(e.toString(), e); throw e; } diff --git a/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java b/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java index 6d4fa40f3c0..68e2769e14b 100644 --- a/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java +++ b/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java @@ -248,6 +248,9 @@ private void onRequestedBlocksReceived(List blockList, Runnable onPars } private void runDelayedBatchProcessing(List blocks, Runnable resultHandler) { + if (shutdownInProgress) { + return; + } UserThread.execute(() -> { if (blocks.isEmpty()) { resultHandler.run(); From abbee20284a6befdfc9f4e1c09356743a3252c03 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Mon, 19 Dec 2022 23:39:48 -0500 Subject: [PATCH 11/15] Remove CompletableFutureUtil Signed-off-by: HenrikJannsen --- .../common/util/CompletableFutureUtil.java | 57 ------------------- 1 file changed, 57 deletions(-) delete mode 100644 common/src/main/java/bisq/common/util/CompletableFutureUtil.java diff --git a/common/src/main/java/bisq/common/util/CompletableFutureUtil.java b/common/src/main/java/bisq/common/util/CompletableFutureUtil.java deleted file mode 100644 index 7c885ab4db9..00000000000 --- a/common/src/main/java/bisq/common/util/CompletableFutureUtil.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * This file is part of Bisq. - * - * Bisq is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * Bisq is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with Bisq. If not, see . - */ - -package bisq.common.util; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -//todo -public class CompletableFutureUtil { - public static CompletableFuture> allOf(Collection> collection) { - //noinspection unchecked - return allOf(collection.toArray(new CompletableFuture[0])); - } - - public static CompletableFuture> allOf(Stream> stream) { - return allOf(stream.collect(Collectors.toList())); - } - - public static CompletableFuture> allOf(CompletableFuture... list) { - CompletableFuture> result = CompletableFuture.allOf(list).thenApply(v -> - Stream.of(list) - .map(future -> { - // We want to return the results in list, not the futures. Once allOf call is complete - // we know that all futures have completed (normally, exceptional or cancelled). - // For exceptional and canceled cases we throw an exception. - T res = future.join(); - if (future.isCompletedExceptionally()) { - throw new RuntimeException((future.handle((r, throwable) -> throwable).join())); - } - if (future.isCancelled()) { - throw new RuntimeException("Future got canceled"); - } - return res; - }) - .collect(Collectors.toList()) - ); - return result; - } -} From e1d8d0a6a0355edad54ef2095391e24f3625baa5 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Tue, 20 Dec 2022 19:34:18 -0500 Subject: [PATCH 12/15] Make codacy happy Signed-off-by: HenrikJannsen --- core/src/main/java/bisq/core/dao/node/full/RpcService.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/bisq/core/dao/node/full/RpcService.java b/core/src/main/java/bisq/core/dao/node/full/RpcService.java index 175189ffeb2..9f3c999f02b 100644 --- a/core/src/main/java/bisq/core/dao/node/full/RpcService.java +++ b/core/src/main/java/bisq/core/dao/node/full/RpcService.java @@ -219,11 +219,14 @@ public void onFailure(@NotNull Throwable throwable) { }); } }, MoreExecutors.directExecutor()); - } catch (Exception e) { - if (!shutdownInProgress || !(e instanceof RejectedExecutionException)) { + } catch (RejectedExecutionException e) { + if (!shutdownInProgress) { log.warn(e.toString(), e); throw e; } + } catch (Exception e) { + log.warn(e.toString(), e); + throw e; } } From 9a31606fea720b3950458922570c4b175d54c9d6 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Thu, 22 Dec 2022 23:02:14 -0500 Subject: [PATCH 13/15] Make codacy happy Signed-off-by: HenrikJannsen --- .../bisq/core/dao/node/full/RpcService.java | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/bisq/core/dao/node/full/RpcService.java b/core/src/main/java/bisq/core/dao/node/full/RpcService.java index 9f3c999f02b..2412e4e3564 100644 --- a/core/src/main/java/bisq/core/dao/node/full/RpcService.java +++ b/core/src/main/java/bisq/core/dao/node/full/RpcService.java @@ -221,11 +221,11 @@ public void onFailure(@NotNull Throwable throwable) { }, MoreExecutors.directExecutor()); } catch (RejectedExecutionException e) { if (!shutdownInProgress) { - log.warn(e.toString(), e); + log.error(e.toString(), e); throw e; } } catch (Exception e) { - log.warn(e.toString(), e); + log.error(e.toString(), e); throw e; } } @@ -316,11 +316,14 @@ public void onFailure(@NotNull Throwable throwable) { UserThread.execute(() -> errorHandler.accept(throwable)); } }, MoreExecutors.directExecutor()); - } catch (Exception e) { - if (!shutdownInProgress || !(e instanceof RejectedExecutionException)) { + } catch (RejectedExecutionException e) { + if (!shutdownInProgress) { log.error("Exception at requestChainHeadHeight", e); throw e; } + } catch (Exception e) { + log.error("Exception at requestChainHeadHeight", e); + throw e; } } @@ -350,12 +353,14 @@ public void onFailure(@NotNull Throwable throwable) { UserThread.execute(() -> errorHandler.accept(throwable)); } }, MoreExecutors.directExecutor()); - } catch (Exception e) { - log.error("Exception at requestDtoBlock", e); - if (!shutdownInProgress || !(e instanceof RejectedExecutionException)) { - log.warn(e.toString(), e); + } catch (RejectedExecutionException e) { + if (!shutdownInProgress) { + log.error("Exception at requestDtoBlock", e); throw e; } + } catch (Exception e) { + log.error("Exception at requestDtoBlock", e); + throw e; } } @@ -386,11 +391,14 @@ public void onFailure(@NotNull Throwable throwable) { UserThread.execute(() -> errorHandler.accept(throwable)); } }, MoreExecutors.directExecutor()); - } catch (Exception e) { - if (!shutdownInProgress || !(e instanceof RejectedExecutionException)) { - log.warn(e.toString(), e); + } catch (RejectedExecutionException e) { + if (!shutdownInProgress) { + log.error(e.toString(), e); throw e; } + } catch (Exception e) { + log.error(e.toString(), e); + throw e; } } From 5b38e5b83b6f40de352228ec51d787fa7849a603 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Tue, 27 Dec 2022 18:40:38 -0500 Subject: [PATCH 14/15] Allow to set fullNode if isBmFullNode and rpcDataSet is true even if BurningManService.isActivated() is false. This allows us to run the seed nodes already in BM mode before activation Signed-off-by: HenrikJannsen --- .../burningman/accounting/node/AccountingNodeProvider.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/bisq/core/dao/burningman/accounting/node/AccountingNodeProvider.java b/core/src/main/java/bisq/core/dao/burningman/accounting/node/AccountingNodeProvider.java index 0022aa7b77d..9efff49e2ca 100644 --- a/core/src/main/java/bisq/core/dao/burningman/accounting/node/AccountingNodeProvider.java +++ b/core/src/main/java/bisq/core/dao/burningman/accounting/node/AccountingNodeProvider.java @@ -48,10 +48,10 @@ public AccountingNodeProvider(AccountingLiteNode liteNode, && preferences.getRpcPw() != null && !preferences.getRpcPw().isEmpty() && preferences.getBlockNotifyPort() > 0; - if (BurningManService.isActivated()) { - accountingNode = isBmFullNode && rpcDataSet ? fullNode : liteNode; + if (isBmFullNode && rpcDataSet) { + accountingNode = fullNode; } else { - accountingNode = inActiveAccountingNode; + accountingNode = BurningManService.isActivated() ? liteNode : inActiveAccountingNode; } } } From 5ff7be92082410ae3ee63214cbfcb8ed6ed9a619 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 28 Dec 2022 12:09:59 -0500 Subject: [PATCH 15/15] Increase capacity for thread pool queue at ExportJsonFilesService One seed node using the ExportJsonFilesService had RejectedExecutionExceptions as before we had only permitted a thread pool queue capacity of 1. Signed-off-by: HenrikJannsen --- .../core/dao/node/explorer/ExportJsonFilesService.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/bisq/core/dao/node/explorer/ExportJsonFilesService.java b/core/src/main/java/bisq/core/dao/node/explorer/ExportJsonFilesService.java index 1175a264468..8ff811b440d 100644 --- a/core/src/main/java/bisq/core/dao/node/explorer/ExportJsonFilesService.java +++ b/core/src/main/java/bisq/core/dao/node/explorer/ExportJsonFilesService.java @@ -53,6 +53,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -63,8 +64,7 @@ public class ExportJsonFilesService implements DaoSetupService { private final File storageDir; private final boolean dumpBlockchainData; - private final ListeningExecutorService executor = Utilities.getListeningExecutorService("JsonExporter", - 1, 1, 1200); + private final ListeningExecutorService executor; private JsonFileManager txFileManager, txOutputFileManager, bsqStateFileManager; @Inject @@ -74,6 +74,9 @@ public ExportJsonFilesService(DaoStateService daoStateService, this.daoStateService = daoStateService; this.storageDir = storageDir; this.dumpBlockchainData = dumpBlockchainData; + + ThreadPoolExecutor threadPoolExecutor = Utilities.getThreadPoolExecutor("JsonExporter", 1, 1, 20, 60); + executor = MoreExecutors.listeningDecorator(threadPoolExecutor); }