diff --git a/common/src/main/java/bisq/common/setup/CommonSetup.java b/common/src/main/java/bisq/common/setup/CommonSetup.java index 20d99a22c31..d46d933433d 100644 --- a/common/src/main/java/bisq/common/setup/CommonSetup.java +++ b/common/src/main/java/bisq/common/setup/CommonSetup.java @@ -35,6 +35,7 @@ import java.nio.file.Paths; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import ch.qos.logback.classic.Level; @@ -72,13 +73,19 @@ public static void startPeriodicTasks() { public static void setupUncaughtExceptionHandler(UncaughtExceptionHandler uncaughtExceptionHandler) { Thread.UncaughtExceptionHandler handler = (thread, throwable) -> { - // Might come from another thread if (throwable.getCause() != null && throwable.getCause().getCause() != null && throwable.getCause().getCause() instanceof BlockStoreException) { - log.error(throwable.getMessage()); + log.error("Uncaught BlockStoreException ", throwable); + } else if (throwable instanceof OutOfMemoryError) { + Profiler.printSystemLoad(); + log.error("OutOfMemoryError occurred. We shut down.", throwable); + // Leave it to the handleUncaughtException to shut down or not. + UserThread.execute(() -> uncaughtExceptionHandler.handleUncaughtException(throwable, false)); } else if (throwable instanceof ClassCastException && "sun.awt.image.BufImgSurfaceData cannot be cast to sun.java2d.xr.XRSurfaceData".equals(throwable.getMessage())) { log.warn(throwable.getMessage()); + } else if (throwable instanceof RejectedExecutionException) { + log.error("Uncaught RejectedExecutionException ", throwable); } else if (throwable instanceof UnsupportedOperationException && "The system tray is not supported on the current platform.".equals(throwable.getMessage())) { log.warn(throwable.getMessage()); diff --git a/common/src/main/java/bisq/common/util/CompletableFutureUtil.java b/common/src/main/java/bisq/common/util/CompletableFutureUtil.java deleted file mode 100644 index 7c885ab4db9..00000000000 --- a/common/src/main/java/bisq/common/util/CompletableFutureUtil.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * This file is part of Bisq. - * - * Bisq is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * Bisq is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with Bisq. If not, see . - */ - -package bisq.common.util; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -//todo -public class CompletableFutureUtil { - public static CompletableFuture> allOf(Collection> collection) { - //noinspection unchecked - return allOf(collection.toArray(new CompletableFuture[0])); - } - - public static CompletableFuture> allOf(Stream> stream) { - return allOf(stream.collect(Collectors.toList())); - } - - public static CompletableFuture> allOf(CompletableFuture... list) { - CompletableFuture> result = CompletableFuture.allOf(list).thenApply(v -> - Stream.of(list) - .map(future -> { - // We want to return the results in list, not the futures. Once allOf call is complete - // we know that all futures have completed (normally, exceptional or cancelled). - // For exceptional and canceled cases we throw an exception. - T res = future.join(); - if (future.isCompletedExceptionally()) { - throw new RuntimeException((future.handle((r, throwable) -> throwable).join())); - } - if (future.isCancelled()) { - throw new RuntimeException("Future got canceled"); - } - return res; - }) - .collect(Collectors.toList()) - ); - return result; - } -} diff --git a/core/src/main/java/bisq/core/app/BisqExecutable.java b/core/src/main/java/bisq/core/app/BisqExecutable.java index e861d15b516..a92134dceb8 100644 --- a/core/src/main/java/bisq/core/app/BisqExecutable.java +++ b/core/src/main/java/bisq/core/app/BisqExecutable.java @@ -74,7 +74,7 @@ public abstract class BisqExecutable implements GracefulShutDownHandler, BisqSet protected Injector injector; protected AppModule module; protected Config config; - private boolean isShutdownInProgress; + protected volatile boolean isShutdownInProgress; private boolean hasDowngraded; public BisqExecutable(String fullName, String scriptName, String appName, String version) { @@ -281,7 +281,7 @@ public void run() { } } - private void flushAndExit(ResultHandler resultHandler, int status) { + protected void flushAndExit(ResultHandler resultHandler, int status) { if (!hasDowngraded) { // If user tried to downgrade we do not write the persistable data to avoid data corruption log.info("PersistenceManager flushAllDataToDiskAtShutdown started"); diff --git a/core/src/main/java/bisq/core/dao/burningman/accounting/node/AccountingNodeProvider.java b/core/src/main/java/bisq/core/dao/burningman/accounting/node/AccountingNodeProvider.java index 0022aa7b77d..9efff49e2ca 100644 --- a/core/src/main/java/bisq/core/dao/burningman/accounting/node/AccountingNodeProvider.java +++ b/core/src/main/java/bisq/core/dao/burningman/accounting/node/AccountingNodeProvider.java @@ -48,10 +48,10 @@ public AccountingNodeProvider(AccountingLiteNode liteNode, && preferences.getRpcPw() != null && !preferences.getRpcPw().isEmpty() && preferences.getBlockNotifyPort() > 0; - if (BurningManService.isActivated()) { - accountingNode = isBmFullNode && rpcDataSet ? fullNode : liteNode; + if (isBmFullNode && rpcDataSet) { + accountingNode = fullNode; } else { - accountingNode = inActiveAccountingNode; + accountingNode = BurningManService.isActivated() ? liteNode : inActiveAccountingNode; } } } diff --git a/core/src/main/java/bisq/core/dao/monitoring/BlindVoteStateMonitoringService.java b/core/src/main/java/bisq/core/dao/monitoring/BlindVoteStateMonitoringService.java index f803060a53a..e6915a0c92c 100644 --- a/core/src/main/java/bisq/core/dao/monitoring/BlindVoteStateMonitoringService.java +++ b/core/src/main/java/bisq/core/dao/monitoring/BlindVoteStateMonitoringService.java @@ -25,6 +25,7 @@ import bisq.core.dao.monitoring.model.BlindVoteStateBlock; import bisq.core.dao.monitoring.model.BlindVoteStateHash; import bisq.core.dao.monitoring.network.BlindVoteStateNetworkService; +import bisq.core.dao.monitoring.network.StateNetworkService; import bisq.core.dao.monitoring.network.messages.GetBlindVoteStateHashesRequest; import bisq.core.dao.monitoring.network.messages.NewBlindVoteStateHashMessage; import bisq.core.dao.state.DaoStateListener; @@ -230,6 +231,10 @@ public void requestHashesFromGenesisBlockHeight(String peersAddress) { blindVoteStateNetworkService.requestHashes(genesisTxInfo.getGenesisBlockHeight(), peersAddress); } + public void addResponseListener(StateNetworkService.ResponseListener responseListener) { + blindVoteStateNetworkService.addResponseListener(responseListener); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Listeners diff --git a/core/src/main/java/bisq/core/dao/monitoring/DaoStateMonitoringService.java b/core/src/main/java/bisq/core/dao/monitoring/DaoStateMonitoringService.java index 1d35f3b2371..ff6205ec715 100644 --- a/core/src/main/java/bisq/core/dao/monitoring/DaoStateMonitoringService.java +++ b/core/src/main/java/bisq/core/dao/monitoring/DaoStateMonitoringService.java @@ -23,6 +23,7 @@ import bisq.core.dao.monitoring.model.UtxoMismatch; import bisq.core.dao.monitoring.network.Checkpoint; import bisq.core.dao.monitoring.network.DaoStateNetworkService; +import bisq.core.dao.monitoring.network.StateNetworkService; import bisq.core.dao.monitoring.network.messages.GetDaoStateHashesRequest; import bisq.core.dao.monitoring.network.messages.NewDaoStateHashMessage; import bisq.core.dao.state.DaoStateListener; @@ -289,6 +290,10 @@ public void setCreateSnapshotHandler(Runnable handler) { createSnapshotHandler = handler; } + public void addResponseListener(StateNetworkService.ResponseListener responseListener) { + daoStateNetworkService.addResponseListener(responseListener); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Listeners diff --git a/core/src/main/java/bisq/core/dao/monitoring/ProposalStateMonitoringService.java b/core/src/main/java/bisq/core/dao/monitoring/ProposalStateMonitoringService.java index c28f33a2357..a87c1b15185 100644 --- a/core/src/main/java/bisq/core/dao/monitoring/ProposalStateMonitoringService.java +++ b/core/src/main/java/bisq/core/dao/monitoring/ProposalStateMonitoringService.java @@ -24,6 +24,7 @@ import bisq.core.dao.monitoring.model.ProposalStateBlock; import bisq.core.dao.monitoring.model.ProposalStateHash; import bisq.core.dao.monitoring.network.ProposalStateNetworkService; +import bisq.core.dao.monitoring.network.StateNetworkService; import bisq.core.dao.monitoring.network.messages.GetProposalStateHashesRequest; import bisq.core.dao.monitoring.network.messages.NewProposalStateHashMessage; import bisq.core.dao.state.DaoStateListener; @@ -232,6 +233,10 @@ public void requestHashesFromGenesisBlockHeight(String peersAddress) { proposalStateNetworkService.requestHashes(genesisTxInfo.getGenesisBlockHeight(), peersAddress); } + public void addResponseListener(StateNetworkService.ResponseListener responseListener) { + proposalStateNetworkService.addResponseListener(responseListener); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Listeners @@ -294,7 +299,9 @@ private boolean maybeUpdateHashChain(int blockHeight) { return true; } - private boolean processPeersProposalStateHash(ProposalStateHash proposalStateHash, Optional peersNodeAddress, boolean notifyListeners) { + private boolean processPeersProposalStateHash(ProposalStateHash proposalStateHash, + Optional peersNodeAddress, + boolean notifyListeners) { AtomicBoolean changed = new AtomicBoolean(false); AtomicBoolean inConflictWithNonSeedNode = new AtomicBoolean(this.isInConflictWithNonSeedNode); AtomicBoolean inConflictWithSeedNode = new AtomicBoolean(this.isInConflictWithSeedNode); diff --git a/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java b/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java index 2df00fc94ff..af3e4eedae3 100644 --- a/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java +++ b/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java @@ -29,10 +29,16 @@ import bisq.network.p2p.peers.Broadcaster; import bisq.network.p2p.peers.PeerManager; +import bisq.common.UserThread; import bisq.common.proto.network.NetworkEnvelope; import javax.inject.Inject; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; + import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,6 +48,8 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; + import javax.annotation.Nullable; @Slf4j @@ -59,6 +67,12 @@ public interface Listener stateHashes, Optional peersNodeAddress); } + public interface ResponseListener { + void onSuccess(int serializedSize); + + void onFault(); + } + protected final NetworkNode networkNode; protected final PeerManager peerManager; private final Broadcaster broadcaster; @@ -67,6 +81,7 @@ public interface Listener requestStateHashHandlerMap = new HashMap<>(); private final List> listeners = new CopyOnWriteArrayList<>(); private boolean messageListenerAdded; + private final List responseListeners = new CopyOnWriteArrayList<>(); /////////////////////////////////////////////////////////////////////////////////////////// @@ -145,7 +160,20 @@ public void sendGetStateHashesResponse(Connection connection, int nonce, List future = networkNode.sendMessage(connection, getStateHashesResponse); + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(Connection connection) { + UserThread.execute(() -> responseListeners.forEach(listeners -> listeners.onSuccess(getStateHashesResponse.toProtoMessage().getSerializedSize())) + ); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + UserThread.execute(() -> responseListeners.forEach(StateNetworkService.ResponseListener::onFault) + ); + } + }, MoreExecutors.directExecutor()); } public void requestHashesFromAllConnectedSeedNodes(int fromHeight) { @@ -171,6 +199,10 @@ public boolean isSeedNode(NodeAddress nodeAddress) { return peerManager.isSeedNode(nodeAddress); } + public void addResponseListener(ResponseListener responseListener) { + responseListeners.add(responseListener); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Listeners diff --git a/core/src/main/java/bisq/core/dao/node/BsqNode.java b/core/src/main/java/bisq/core/dao/node/BsqNode.java index 2e11ce5d12a..0bebd702dae 100644 --- a/core/src/main/java/bisq/core/dao/node/BsqNode.java +++ b/core/src/main/java/bisq/core/dao/node/BsqNode.java @@ -72,6 +72,7 @@ public abstract class BsqNode implements DaoSetupService { // (not parsed) block. @Getter protected int chainTipHeight; + protected volatile boolean shutdownInProgress; /////////////////////////////////////////////////////////////////////////////////////////// @@ -156,6 +157,7 @@ public void setWarnMessageHandler(@SuppressWarnings("NullableProblems") Consumer } public void shutDown() { + shutdownInProgress = true; exportJsonFilesService.shutDown(); daoStateSnapshotService.shutDown(); } @@ -200,6 +202,10 @@ protected void startReOrgFromLastSnapshot() { protected Optional doParseBlock(RawBlock rawBlock) throws RequiredReorgFromSnapshotException { + if (shutdownInProgress) { + return Optional.empty(); + } + // We check if we have a block with that height. If so we return. We do not use the chainHeight as with genesis // height we have no block but chainHeight is initially set to genesis height (bad design ;-( but a bit tricky // to change now as it used in many areas.) diff --git a/core/src/main/java/bisq/core/dao/node/explorer/ExportJsonFilesService.java b/core/src/main/java/bisq/core/dao/node/explorer/ExportJsonFilesService.java index 1175a264468..8ff811b440d 100644 --- a/core/src/main/java/bisq/core/dao/node/explorer/ExportJsonFilesService.java +++ b/core/src/main/java/bisq/core/dao/node/explorer/ExportJsonFilesService.java @@ -53,6 +53,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -63,8 +64,7 @@ public class ExportJsonFilesService implements DaoSetupService { private final File storageDir; private final boolean dumpBlockchainData; - private final ListeningExecutorService executor = Utilities.getListeningExecutorService("JsonExporter", - 1, 1, 1200); + private final ListeningExecutorService executor; private JsonFileManager txFileManager, txOutputFileManager, bsqStateFileManager; @Inject @@ -74,6 +74,9 @@ public ExportJsonFilesService(DaoStateService daoStateService, this.daoStateService = daoStateService; this.storageDir = storageDir; this.dumpBlockchainData = dumpBlockchainData; + + ThreadPoolExecutor threadPoolExecutor = Utilities.getThreadPoolExecutor("JsonExporter", 1, 1, 20, 60); + executor = MoreExecutors.listeningDecorator(threadPoolExecutor); } diff --git a/core/src/main/java/bisq/core/dao/node/full/FullNode.java b/core/src/main/java/bisq/core/dao/node/full/FullNode.java index 93c18a9a67b..469f19b608f 100644 --- a/core/src/main/java/bisq/core/dao/node/full/FullNode.java +++ b/core/src/main/java/bisq/core/dao/node/full/FullNode.java @@ -97,6 +97,7 @@ public void start() { public void shutDown() { super.shutDown(); + rpcService.shutDown(); fullNodeNetworkService.shutDown(); } @@ -239,6 +240,9 @@ private void parseBlockRecursively(int blockHeight, Consumer newBlockHandler, ResultHandler resultHandler, Consumer errorHandler) { + if (shutdownInProgress) { + return; + } rpcService.requestDtoBlock(blockHeight, rawBlock -> { try { diff --git a/core/src/main/java/bisq/core/dao/node/full/RpcService.java b/core/src/main/java/bisq/core/dao/node/full/RpcService.java index d7e557db389..2412e4e3564 100644 --- a/core/src/main/java/bisq/core/dao/node/full/RpcService.java +++ b/core/src/main/java/bisq/core/dao/node/full/RpcService.java @@ -92,7 +92,7 @@ public class RpcService { // We could use multiple threads, but then we need to support ordering of results in a queue // Keep that for optimization after measuring performance differences private final ListeningExecutorService executor = Utilities.getSingleThreadListeningExecutor("RpcService"); - private volatile boolean isShutDown; + private volatile boolean shutdownInProgress; private final Set setupResultHandlers = new CopyOnWriteArraySet<>(); private final Set> setupErrorHandlers = new CopyOnWriteArraySet<>(); private volatile boolean setupComplete; @@ -139,14 +139,17 @@ private RpcService(Preferences preferences, /////////////////////////////////////////////////////////////////////////////////////////// public void shutDown() { - isShutDown = true; + if (shutdownInProgress) { + return; + } + shutdownInProgress = true; if (daemon != null) { daemon.shutdown(); log.info("daemon shut down"); } // A hard shutdown is justified for the RPC service. - executor.shutdown(); + executor.shutdownNow(); } public void setup(ResultHandler resultHandler, Consumer errorHandler) { @@ -216,11 +219,14 @@ public void onFailure(@NotNull Throwable throwable) { }); } }, MoreExecutors.directExecutor()); - } catch (Exception e) { - if (!isShutDown || !(e instanceof RejectedExecutionException)) { - log.warn(e.toString(), e); + } catch (RejectedExecutionException e) { + if (!shutdownInProgress) { + log.error(e.toString(), e); throw e; } + } catch (Exception e) { + log.error(e.toString(), e); + throw e; } } @@ -310,11 +316,14 @@ public void onFailure(@NotNull Throwable throwable) { UserThread.execute(() -> errorHandler.accept(throwable)); } }, MoreExecutors.directExecutor()); - } catch (Exception e) { - if (!isShutDown || !(e instanceof RejectedExecutionException)) { + } catch (RejectedExecutionException e) { + if (!shutdownInProgress) { log.error("Exception at requestChainHeadHeight", e); throw e; } + } catch (Exception e) { + log.error("Exception at requestChainHeadHeight", e); + throw e; } } @@ -344,12 +353,14 @@ public void onFailure(@NotNull Throwable throwable) { UserThread.execute(() -> errorHandler.accept(throwable)); } }, MoreExecutors.directExecutor()); - } catch (Exception e) { - log.error("Exception at requestDtoBlock", e); - if (!isShutDown || !(e instanceof RejectedExecutionException)) { - log.warn(e.toString(), e); + } catch (RejectedExecutionException e) { + if (!shutdownInProgress) { + log.error("Exception at requestDtoBlock", e); throw e; } + } catch (Exception e) { + log.error("Exception at requestDtoBlock", e); + throw e; } } @@ -380,11 +391,14 @@ public void onFailure(@NotNull Throwable throwable) { UserThread.execute(() -> errorHandler.accept(throwable)); } }, MoreExecutors.directExecutor()); - } catch (Exception e) { - if (!isShutDown || !(e instanceof RejectedExecutionException)) { - log.warn(e.toString(), e); + } catch (RejectedExecutionException e) { + if (!shutdownInProgress) { + log.error(e.toString(), e); throw e; } + } catch (Exception e) { + log.error(e.toString(), e); + throw e; } } diff --git a/core/src/main/java/bisq/core/dao/node/full/network/FullNodeNetworkService.java b/core/src/main/java/bisq/core/dao/node/full/network/FullNodeNetworkService.java index e8b6a0483f1..20583433cb5 100644 --- a/core/src/main/java/bisq/core/dao/node/full/network/FullNodeNetworkService.java +++ b/core/src/main/java/bisq/core/dao/node/full/network/FullNodeNetworkService.java @@ -37,7 +37,9 @@ import javax.inject.Inject; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import lombok.extern.slf4j.Slf4j; @@ -51,6 +53,12 @@ public class FullNodeNetworkService implements MessageListener, PeerManager.List private static final long CLEANUP_TIMER = 120; + public interface ResponseListener { + void onSuccess(int serializedSize); + + void onFault(); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Class fields @@ -65,6 +73,7 @@ public class FullNodeNetworkService implements MessageListener, PeerManager.List // Key is connection UID private final Map getBlocksRequestHandlers = new HashMap<>(); private boolean stopped; + private final List responseListeners = new CopyOnWriteArrayList<>(); /////////////////////////////////////////////////////////////////////////////////////////// @@ -107,6 +116,10 @@ public void publishNewBlock(Block block) { broadcaster.broadcast(newBlockBroadcastMessage, networkNode.getNodeAddress()); } + public void addResponseListener(ResponseListener responseListener) { + responseListeners.add(responseListener); + } + /////////////////////////////////////////////////////////////////////////////////////////// // PeerManager.Listener implementation @@ -166,8 +179,10 @@ private void handleGetBlocksRequest(GetBlocksRequest getBlocksRequest, Connectio daoStateService, new GetBlocksRequestHandler.Listener() { @Override - public void onComplete() { + public void onComplete(int serializedSize) { getBlocksRequestHandlers.remove(uid); + + responseListeners.forEach(listener -> listener.onSuccess(serializedSize)); } @Override @@ -179,6 +194,8 @@ public void onFault(String errorMessage, @Nullable Connection connection) { if (connection != null) { peerManager.handleConnectionFault(connection); } + + responseListeners.forEach(ResponseListener::onFault); } else { log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call."); } diff --git a/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java b/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java index de8fad9f8be..411d49de4fc 100644 --- a/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java +++ b/core/src/main/java/bisq/core/dao/node/full/network/GetBlocksRequestHandler.java @@ -57,7 +57,7 @@ class GetBlocksRequestHandler { /////////////////////////////////////////////////////////////////////////////////////////// public interface Listener { - void onComplete(); + void onComplete(int serializedSize); void onFault(String errorMessage, Connection connection); } @@ -120,7 +120,7 @@ public void onSuccess(Connection connection) { log.info("Send DataResponse to {} succeeded. getBlocksResponse.getBlocks().size()={}", connection.getPeersNodeAddressOptional(), getBlocksResponse.getBlocks().size()); cleanup(); - listener.onComplete(); + listener.onComplete(getBlocksResponse.toProtoNetworkEnvelope().getSerializedSize()); } else { log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call."); } diff --git a/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java b/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java index 6d4fa40f3c0..68e2769e14b 100644 --- a/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java +++ b/core/src/main/java/bisq/core/dao/node/lite/LiteNode.java @@ -248,6 +248,9 @@ private void onRequestedBlocksReceived(List blockList, Runnable onPars } private void runDelayedBatchProcessing(List blocks, Runnable resultHandler) { + if (shutdownInProgress) { + return; + } UserThread.execute(() -> { if (blocks.isEmpty()) { resultHandler.run(); diff --git a/p2p/src/main/java/bisq/network/p2p/network/Connection.java b/p2p/src/main/java/bisq/network/p2p/network/Connection.java index be0a3390d66..049c690e04f 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/Connection.java +++ b/p2p/src/main/java/bisq/network/p2p/network/Connection.java @@ -595,7 +595,7 @@ public String printDetails() { public boolean reportInvalidRequest(RuleViolation ruleViolation) { - log.warn("We got reported the ruleViolation {} at connection {}", ruleViolation, this); + log.info("We got reported the ruleViolation {} at connection with address{} and uid {}", ruleViolation, this.getPeersNodeAddressProperty(), this.getUid()); int numRuleViolations; numRuleViolations = ruleViolations.getOrDefault(ruleViolation, 0); @@ -603,11 +603,11 @@ public boolean reportInvalidRequest(RuleViolation ruleViolation) { ruleViolations.put(ruleViolation, numRuleViolations); if (numRuleViolations >= ruleViolation.maxTolerance) { - log.warn("We close connection as we received too many corrupt requests.\n" + - "numRuleViolations={}\n\t" + - "corruptRequest={}\n\t" + - "corruptRequests={}\n\t" + - "connection={}", numRuleViolations, ruleViolation, ruleViolations, this); + log.warn("We close connection as we received too many corrupt requests. " + + "numRuleViolations={} " + + "corruptRequest={} " + + "corruptRequests={} " + + "connection with address{} and uid {}", numRuleViolations, ruleViolation, ruleViolations, this.getPeersNodeAddressProperty(), this.getUid()); this.ruleViolation = ruleViolation; if (ruleViolation == RuleViolation.PEER_BANNED) { log.warn("We close connection due RuleViolation.PEER_BANNED. peersNodeAddress={}", getPeersNodeAddressOptional()); diff --git a/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java b/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java index e90fb1513b8..f4f6a5a3216 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java +++ b/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java @@ -50,6 +50,7 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -104,12 +105,12 @@ public abstract class NetworkNode implements MessageListener { maxConnections * 2, maxConnections * 3, 30, - 60); + 30); sendMessageExecutor = Utilities.getListeningExecutorService("NetworkNode.sendMessage", maxConnections * 2, maxConnections * 3, 30, - 60); + 30); serverExecutor = Utilities.getSingleThreadExecutor("NetworkNode.server-" + servicePort); } @@ -140,7 +141,7 @@ public SettableFuture sendMessage(@NotNull NodeAddress peersNodeAddr SettableFuture resultFuture = SettableFuture.create(); ListenableFuture future = connectionExecutor.submit(() -> { - Thread.currentThread().setName("NetworkNode:SendMessage-to-" + peersNodeAddress.getFullAddress()); + Thread.currentThread().setName("NetworkNode.connectionExecutor:SendMessage-to-" + peersNodeAddress.getFullAddress()); if (peersNodeAddress.equals(getNodeAddress())) { log.warn("We are sending a message to ourselves"); @@ -288,25 +289,36 @@ public Socks5Proxy getSocksProxy() { return null; } - public SettableFuture sendMessage(Connection connection, NetworkEnvelope networkEnvelope) { - // connection.sendMessage might take a bit (compression, write to stream), so we use a thread to not block - ListenableFuture future = sendMessageExecutor.submit(() -> { - String id = connection.getPeersNodeAddressOptional().isPresent() ? connection.getPeersNodeAddressOptional().get().getFullAddress() : connection.getUid(); - Thread.currentThread().setName("NetworkNode:SendMessage-to-" + id); - connection.sendMessage(networkEnvelope); - return connection; - }); + return sendMessage(connection, networkEnvelope, sendMessageExecutor); + } + + public SettableFuture sendMessage(Connection connection, + NetworkEnvelope networkEnvelope, + ListeningExecutorService executor) { SettableFuture resultFuture = SettableFuture.create(); - Futures.addCallback(future, new FutureCallback<>() { - public void onSuccess(Connection connection) { - UserThread.execute(() -> resultFuture.set(connection)); - } + try { + ListenableFuture future = executor.submit(() -> { + String id = connection.getPeersNodeAddressOptional().isPresent() ? connection.getPeersNodeAddressOptional().get().getFullAddress() : connection.getUid(); + Thread.currentThread().setName("NetworkNode:SendMessage-to-" + id); + connection.sendMessage(networkEnvelope); + return connection; + }); - public void onFailure(@NotNull Throwable throwable) { - UserThread.execute(() -> resultFuture.setException(throwable)); - } - }, MoreExecutors.directExecutor()); + Futures.addCallback(future, new FutureCallback<>() { + public void onSuccess(Connection connection) { + UserThread.execute(() -> resultFuture.set(connection)); + } + + public void onFailure(@NotNull Throwable throwable) { + UserThread.execute(() -> resultFuture.setException(throwable)); + } + }, MoreExecutors.directExecutor()); + + } catch (RejectedExecutionException exception) { + log.error("RejectedExecutionException at sendMessage: ", exception); + resultFuture.setException(exception); + } return resultFuture; } diff --git a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java index b7b89d25795..5a466a5bee1 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; @@ -94,7 +95,9 @@ public interface Listener { // API /////////////////////////////////////////////////////////////////////////////////////////// - public void broadcast(List broadcastRequests, boolean shutDownRequested) { + public void broadcast(List broadcastRequests, + boolean shutDownRequested, + ListeningExecutorService executor) { List confirmedConnections = new ArrayList<>(networkNode.getConfirmedConnections()); Collections.shuffle(confirmedConnections); @@ -153,7 +156,7 @@ public void broadcast(List broadcastRequests, bool return; } - sendToPeer(connection, broadcastRequestsForConnection); + sendToPeer(connection, broadcastRequestsForConnection, executor); }, minDelay, maxDelay, TimeUnit.MILLISECONDS); } } @@ -235,10 +238,12 @@ private List getBroadcastRequestsForConnection(Con .collect(Collectors.toList()); } - private void sendToPeer(Connection connection, List broadcastRequestsForConnection) { + private void sendToPeer(Connection connection, + List broadcastRequestsForConnection, + ListeningExecutorService executor) { // Can be BundleOfEnvelopes or a single BroadcastMessage BroadcastMessage broadcastMessage = getMessage(broadcastRequestsForConnection); - SettableFuture future = networkNode.sendMessage(connection, broadcastMessage); + SettableFuture future = networkNode.sendMessage(connection, broadcastMessage, executor); Futures.addCallback(future, new FutureCallback<>() { @Override diff --git a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java index 2ddc1d8e79a..f327a31afde 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java @@ -23,13 +23,20 @@ import bisq.common.Timer; import bisq.common.UserThread; +import bisq.common.config.Config; +import bisq.common.util.Utilities; import javax.inject.Inject; +import javax.inject.Named; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -49,6 +56,7 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { private Timer timer; private boolean shutDownRequested; private Runnable shutDownResultHandler; + private final ListeningExecutorService executor; /////////////////////////////////////////////////////////////////////////////////////////// @@ -56,9 +64,18 @@ public class Broadcaster implements BroadcastHandler.ResultHandler { /////////////////////////////////////////////////////////////////////////////////////////// @Inject - public Broadcaster(NetworkNode networkNode, PeerManager peerManager) { + public Broadcaster(NetworkNode networkNode, + PeerManager peerManager, + @Named(Config.MAX_CONNECTIONS) int maxConnections) { this.networkNode = networkNode; this.peerManager = peerManager; + + ThreadPoolExecutor threadPoolExecutor = Utilities.getThreadPoolExecutor("Broadcaster", + maxConnections, + maxConnections * 2, + 30, + 30); + executor = MoreExecutors.listeningDecorator(threadPoolExecutor); } public void shutDown(Runnable resultHandler) { @@ -119,7 +136,7 @@ private void maybeBroadcastBundle() { broadcastRequests.stream().map(e -> e.getMessage().getClass().getSimpleName()).collect(Collectors.toList())); BroadcastHandler broadcastHandler = new BroadcastHandler(networkNode, peerManager, this); broadcastHandlers.add(broadcastHandler); - broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested); + broadcastHandler.broadcast(new ArrayList<>(broadcastRequests), shutDownRequested, executor); broadcastRequests.clear(); if (timer != null) { diff --git a/p2p/src/main/java/bisq/network/p2p/peers/PeerManager.java b/p2p/src/main/java/bisq/network/p2p/peers/PeerManager.java index ebc581bbe2e..7d9e649b89c 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/PeerManager.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/PeerManager.java @@ -84,6 +84,7 @@ public final class PeerManager implements ConnectionListener, PersistedDataHost private static final boolean PRINT_REPORTED_PEERS_DETAILS = true; private Timer printStatisticsTimer; private boolean shutDownRequested; + private int numOnConnections; /////////////////////////////////////////////////////////////////////////////////////////// @@ -216,6 +217,8 @@ public void onConnection(Connection connection) { doHouseKeeping(); + numOnConnections++; + if (lostAllConnections) { lostAllConnections = false; stopped = false; @@ -238,7 +241,8 @@ public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection boolean previousLostAllConnections = lostAllConnections; lostAllConnections = networkNode.getAllConnections().isEmpty(); - if (lostAllConnections) { + // At start-up we ignore if we would lose a connection and would fall back to no connections + if (lostAllConnections && numOnConnections > 2) { stopped = true; if (!shutDownRequested) { @@ -562,7 +566,7 @@ boolean checkMaxConnections() { if (!candidates.isEmpty()) { Connection connection = candidates.remove(0); - log.info("checkMaxConnections: Num candidates for shut down={}. We close oldest connection to peer {}", + log.info("checkMaxConnections: Num candidates (inbound/peer) for shut down={}. We close oldest connection to peer {}", candidates.size(), connection.getPeersNodeAddressOptional()); if (!connection.isStopped()) { connection.shutDown(CloseConnectionReason.TOO_MANY_CONNECTIONS_OPEN, diff --git a/p2p/src/main/java/bisq/network/p2p/peers/getdata/GetDataRequestHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/getdata/GetDataRequestHandler.java index 02aab8e1645..c000449b907 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/getdata/GetDataRequestHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/getdata/GetDataRequestHandler.java @@ -50,7 +50,7 @@ public class GetDataRequestHandler { /////////////////////////////////////////////////////////////////////////////////////////// public interface Listener { - void onComplete(); + void onComplete(int serializedSize); void onFault(String errorMessage, Connection connection); } @@ -126,8 +126,8 @@ public void onSuccess(Connection connection) { if (!stopped) { log.trace("Send DataResponse to {} succeeded. getDataResponse={}", connection.getPeersNodeAddressOptional(), getDataResponse); + listener.onComplete(getDataResponse.toProtoNetworkEnvelope().getSerializedSize()); cleanup(); - listener.onComplete(); } else { log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call."); } @@ -136,7 +136,7 @@ public void onSuccess(Connection connection) { @Override public void onFailure(@NotNull Throwable throwable) { if (!stopped) { - String errorMessage = "Sending getDataRequest to " + connection + + String errorMessage = "Sending getDataResponse to " + connection + " failed. That is expected if the peer is offline. getDataResponse=" + getDataResponse + "." + "Exception: " + throwable.getMessage(); handleFault(errorMessage, CloseConnectionReason.SEND_MSG_FAILURE, connection); diff --git a/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataManager.java b/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataManager.java index 8225c5cb07a..53fd181dd89 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataManager.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataManager.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -63,6 +64,7 @@ public class RequestDataManager implements MessageListener, ConnectionListener, private static int NUM_ADDITIONAL_SEEDS_FOR_UPDATE_REQUEST = 1; private boolean isPreliminaryDataRequest = true; + /////////////////////////////////////////////////////////////////////////////////////////// // Listener /////////////////////////////////////////////////////////////////////////////////////////// @@ -81,6 +83,12 @@ default void onNoSeedNodeAvailable() { } } + public interface ResponseListener { + void onSuccess(int serializedSize); + + void onFault(); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Class fields @@ -90,6 +98,7 @@ default void onNoSeedNodeAvailable() { private final P2PDataStorage dataStorage; private final PeerManager peerManager; private final List seedNodeAddresses; + private final List responseListeners = new CopyOnWriteArrayList<>(); // As we use Guice injection we cannot set the listener in our constructor but the P2PService calls the setListener // in it's constructor so we can guarantee it is not null. @@ -205,6 +214,10 @@ public Optional getNodeAddressOfPreliminaryDataRequest() { return nodeAddressOfPreliminaryDataRequest; } + public void addResponseListener(ResponseListener responseListener) { + responseListeners.add(responseListener); + } + /////////////////////////////////////////////////////////////////////////////////////////// // ConnectionListener implementation @@ -276,9 +289,11 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { GetDataRequestHandler getDataRequestHandler = new GetDataRequestHandler(networkNode, dataStorage, new GetDataRequestHandler.Listener() { @Override - public void onComplete() { + public void onComplete(int serializedSize) { getDataRequestHandlers.remove(uid); log.trace("requestDataHandshake completed.\n\tConnection={}", connection); + + responseListeners.forEach(listener -> listener.onSuccess(serializedSize)); } @Override @@ -288,6 +303,8 @@ public void onFault(String errorMessage, @Nullable Connection connection) { log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" + "ErrorMessage={}", connection, errorMessage); peerManager.handleConnectionFault(connection); + + responseListeners.forEach(ResponseListener::onFault); } else { log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call."); } diff --git a/seednode/src/main/java/bisq/seednode/SeedNodeMain.java b/seednode/src/main/java/bisq/seednode/SeedNodeMain.java index 3bfc2c30e84..81f0e28f80f 100644 --- a/seednode/src/main/java/bisq/seednode/SeedNodeMain.java +++ b/seednode/src/main/java/bisq/seednode/SeedNodeMain.java @@ -95,6 +95,19 @@ protected void onApplicationLaunched() { } + /////////////////////////////////////////////////////////////////////////////////////////// + // UncaughtExceptionHandler implementation + /////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public void handleUncaughtException(Throwable throwable, boolean doShutDown) { + if (throwable instanceof OutOfMemoryError || doShutDown) { + log.error("We got an OutOfMemoryError and shut down"); + gracefulShutDown(() -> log.info("gracefulShutDown complete")); + } + } + + /////////////////////////////////////////////////////////////////////////////////////////// // We continue with a series of synchronous execution tasks /////////////////////////////////////////////////////////////////////////////////////////// diff --git a/seednode/src/main/java/bisq/seednode/reporting/DoubleValueReportingItem.java b/seednode/src/main/java/bisq/seednode/reporting/DoubleValueReportingItem.java index b4a7486127f..d666a823407 100644 --- a/seednode/src/main/java/bisq/seednode/reporting/DoubleValueReportingItem.java +++ b/seednode/src/main/java/bisq/seednode/reporting/DoubleValueReportingItem.java @@ -17,12 +17,14 @@ package bisq.seednode.reporting; +import java.util.Optional; + import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; - +@Slf4j public enum DoubleValueReportingItem implements ReportingItem { - Unspecified("", "Unspecified"), sentBytesPerSec("network", "sentBytesPerSec"), receivedBytesPerSec("network", "receivedBytesPerSec"), receivedMessagesPerSec("network", "receivedMessagesPerSec"), @@ -47,16 +49,15 @@ public DoubleValueReportingItem withValue(double value) { return this; } - public static DoubleValueReportingItem from(String key, double value) { - DoubleValueReportingItem item; + public static Optional from(String key, double value) { try { - item = DoubleValueReportingItem.valueOf(key); + DoubleValueReportingItem item = DoubleValueReportingItem.valueOf(key); + item.setValue(value); + return Optional.of(item); } catch (Throwable t) { - item = Unspecified; + log.warn("No enum value with {}", key); + return Optional.empty(); } - - item.setValue(value); - return item; } @Override @@ -66,8 +67,8 @@ public protobuf.ReportingItem toProtoMessage() { .build(); } - public static DoubleValueReportingItem fromProto(protobuf.ReportingItem baseProto, - protobuf.DoubleValueReportingItem proto) { + public static Optional fromProto(protobuf.ReportingItem baseProto, + protobuf.DoubleValueReportingItem proto) { return DoubleValueReportingItem.from(baseProto.getKey(), proto.getValue()); } diff --git a/seednode/src/main/java/bisq/seednode/reporting/LongValueReportingItem.java b/seednode/src/main/java/bisq/seednode/reporting/LongValueReportingItem.java index a62a2d381cb..59de27e5ac8 100644 --- a/seednode/src/main/java/bisq/seednode/reporting/LongValueReportingItem.java +++ b/seednode/src/main/java/bisq/seednode/reporting/LongValueReportingItem.java @@ -17,13 +17,16 @@ package bisq.seednode.reporting; +import java.util.Optional; + import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; - +@Slf4j public enum LongValueReportingItem implements ReportingItem { - Unspecified("", "Unspecified"), OfferPayload("data", "OfferPayload"), + BsqSwapOfferPayload("data", "BsqSwapOfferPayload"), MailboxStoragePayload("data", "MailboxStoragePayload"), TradeStatistics3("data", "TradeStatistics3"), AccountAgeWitness("data", "AccountAgeWitness"), @@ -47,6 +50,21 @@ public enum LongValueReportingItem implements ReportingItem { sentBytes("network", "sentBytes"), receivedBytes("network", "receivedBytes"), + PreliminaryGetDataRequest("network", "PreliminaryGetDataRequest"), + GetUpdatedDataRequest("network", "GetUpdatedDataRequest"), + GetBlocksRequest("network", "GetBlocksRequest"), + GetDaoStateHashesRequest("network", "GetDaoStateHashesRequest"), + GetProposalStateHashesRequest("network", "GetProposalStateHashesRequest"), + GetBlindVoteStateHashesRequest("network", "GetBlindVoteStateHashesRequest"), + + GetDataResponse("network", "GetDataResponse"), + GetBlocksResponse("network", "GetBlocksResponse"), + GetDaoStateHashesResponse("network", "GetDaoStateHashesResponse"), + GetProposalStateHashesResponse("network", "GetProposalStateHashesResponse"), + GetBlindVoteStateHashesResponse("network", "GetBlindVoteStateHashesResponse"), + + failedResponseClassName("network", "failedResponseClassName"), + usedMemoryInMB("node", "usedMemoryInMB"), totalMemoryInMB("node", "totalMemoryInMB"), jvmStartTimeInSec("node", "jvmStartTimeInSec"); @@ -69,16 +87,15 @@ public LongValueReportingItem withValue(long value) { return this; } - public static LongValueReportingItem from(String key, long value) { - LongValueReportingItem item; + public static Optional from(String key, long value) { try { - item = LongValueReportingItem.valueOf(key); + LongValueReportingItem item = LongValueReportingItem.valueOf(key); + item.setValue(value); + return Optional.of(item); } catch (Throwable t) { - item = Unspecified; + log.warn("No enum value with {}", key); + return Optional.empty(); } - - item.setValue(value); - return item; } @Override @@ -88,8 +105,8 @@ public protobuf.ReportingItem toProtoMessage() { .build(); } - public static LongValueReportingItem fromProto(protobuf.ReportingItem baseProto, - protobuf.LongValueReportingItem proto) { + public static Optional fromProto(protobuf.ReportingItem baseProto, + protobuf.LongValueReportingItem proto) { return LongValueReportingItem.from(baseProto.getKey(), proto.getValue()); } diff --git a/seednode/src/main/java/bisq/seednode/reporting/ReportingItem.java b/seednode/src/main/java/bisq/seednode/reporting/ReportingItem.java index 828b6585429..57e0020aa0a 100644 --- a/seednode/src/main/java/bisq/seednode/reporting/ReportingItem.java +++ b/seednode/src/main/java/bisq/seednode/reporting/ReportingItem.java @@ -20,6 +20,8 @@ import bisq.common.proto.ProtobufferRuntimeException; import bisq.common.proto.network.NetworkPayload; +import java.util.Optional; + public interface ReportingItem extends NetworkPayload { String getKey(); @@ -35,7 +37,7 @@ default protobuf.ReportingItem.Builder getBuilder() { protobuf.ReportingItem toProtoMessage(); - static ReportingItem fromProto(protobuf.ReportingItem proto) { + static Optional fromProto(protobuf.ReportingItem proto) { switch (proto.getMessageCase()) { case STRING_VALUE_REPORTING_ITEM: return StringValueReportingItem.fromProto(proto, proto.getStringValueReportingItem()); diff --git a/seednode/src/main/java/bisq/seednode/reporting/ReportingItems.java b/seednode/src/main/java/bisq/seednode/reporting/ReportingItems.java index d5cb51057dc..66afbc6b544 100644 --- a/seednode/src/main/java/bisq/seednode/reporting/ReportingItems.java +++ b/seednode/src/main/java/bisq/seednode/reporting/ReportingItems.java @@ -23,6 +23,7 @@ import java.io.ByteArrayOutputStream; import java.util.ArrayList; +import java.util.Optional; import java.util.stream.Collectors; import lombok.Getter; @@ -50,7 +51,10 @@ public protobuf.ReportingItems toProtoMessage() { public static ReportingItems fromProto(protobuf.ReportingItems proto) { ReportingItems reportingItems = new ReportingItems(proto.getAddress()); reportingItems.addAll(proto.getReportingItemList().stream() - .map(ReportingItem::fromProto).collect(Collectors.toList())); + .map(ReportingItem::fromProto) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList())); return reportingItems; } diff --git a/seednode/src/main/java/bisq/seednode/reporting/SeedNodeReportingService.java b/seednode/src/main/java/bisq/seednode/reporting/SeedNodeReportingService.java index de03faecb38..c775d11bd7e 100644 --- a/seednode/src/main/java/bisq/seednode/reporting/SeedNodeReportingService.java +++ b/seednode/src/main/java/bisq/seednode/reporting/SeedNodeReportingService.java @@ -24,6 +24,12 @@ import bisq.core.dao.monitoring.model.BlindVoteStateBlock; import bisq.core.dao.monitoring.model.DaoStateBlock; import bisq.core.dao.monitoring.model.ProposalStateBlock; +import bisq.core.dao.monitoring.network.StateNetworkService; +import bisq.core.dao.monitoring.network.messages.GetBlindVoteStateHashesRequest; +import bisq.core.dao.monitoring.network.messages.GetDaoStateHashesRequest; +import bisq.core.dao.monitoring.network.messages.GetProposalStateHashesRequest; +import bisq.core.dao.node.full.network.FullNodeNetworkService; +import bisq.core.dao.node.messages.GetBlocksRequest; import bisq.core.dao.state.DaoStateListener; import bisq.core.dao.state.DaoStateService; @@ -31,6 +37,9 @@ import bisq.network.p2p.network.NetworkNode; import bisq.network.p2p.network.Statistic; import bisq.network.p2p.peers.PeerManager; +import bisq.network.p2p.peers.getdata.RequestDataManager; +import bisq.network.p2p.peers.getdata.messages.GetUpdatedDataRequest; +import bisq.network.p2p.peers.getdata.messages.PreliminaryGetDataRequest; import bisq.network.p2p.storage.P2PDataStorage; import bisq.network.p2p.storage.payload.ProtectedStorageEntry; @@ -103,6 +112,8 @@ public SeedNodeReportingService(P2PService p2PService, DaoStateMonitoringService daoStateMonitoringService, ProposalStateMonitoringService proposalStateMonitoringService, BlindVoteStateMonitoringService blindVoteStateMonitoringService, + RequestDataManager requestDataManager, + FullNodeNetworkService fullNodeNetworkService, @Named(Config.MAX_CONNECTIONS) int maxConnections, @Named(Config.SEED_NODE_REPORTING_SERVER_URL) String seedNodeReportingServerUrl) { this.p2PService = p2PService; @@ -142,6 +153,106 @@ public void onDaoStateBlockCreated() { } }; daoFacade.addBsqStateListener(daoStateListener); + + p2PService.getNetworkNode().addMessageListener((networkEnvelope, connection) -> { + if (networkEnvelope instanceof PreliminaryGetDataRequest || + networkEnvelope instanceof GetUpdatedDataRequest || + networkEnvelope instanceof GetBlocksRequest || + networkEnvelope instanceof GetDaoStateHashesRequest || + networkEnvelope instanceof GetProposalStateHashesRequest || + networkEnvelope instanceof GetBlindVoteStateHashesRequest) { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + int serializedSize = networkEnvelope.toProtoNetworkEnvelope().getSerializedSize(); + String simpleName = networkEnvelope.getClass().getSimpleName(); + try { + LongValueReportingItem reportingItem = LongValueReportingItem.valueOf(simpleName); + reportingItems.add(reportingItem.withValue(serializedSize)); + sendReportingItems(reportingItems); + } catch (Throwable t) { + log.warn("Could not find enum for {}. Error={}", simpleName, t); + } + } + }); + + requestDataManager.addResponseListener(new RequestDataManager.ResponseListener() { + @Override + public void onSuccess(int serializedSize) { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + reportingItems.add(LongValueReportingItem.GetDataResponse.withValue(serializedSize)); + sendReportingItems(reportingItems); + } + + @Override + public void onFault() { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + reportingItems.add(LongValueReportingItem.GetDataResponse.withValue(-1)); + sendReportingItems(reportingItems); + } + }); + + fullNodeNetworkService.addResponseListener(new FullNodeNetworkService.ResponseListener() { + @Override + public void onSuccess(int serializedSize) { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + reportingItems.add(LongValueReportingItem.GetBlocksResponse.withValue(serializedSize)); + sendReportingItems(reportingItems); + } + + @Override + public void onFault() { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + reportingItems.add(LongValueReportingItem.GetBlocksResponse.withValue(-1)); + sendReportingItems(reportingItems); + } + }); + + daoStateMonitoringService.addResponseListener(new StateNetworkService.ResponseListener() { + @Override + public void onSuccess(int serializedSize) { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + reportingItems.add(LongValueReportingItem.GetDaoStateHashesResponse.withValue(serializedSize)); + sendReportingItems(reportingItems); + } + + @Override + public void onFault() { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + reportingItems.add(LongValueReportingItem.GetDaoStateHashesResponse.withValue(-1)); + sendReportingItems(reportingItems); + } + }); + + proposalStateMonitoringService.addResponseListener(new StateNetworkService.ResponseListener() { + @Override + public void onSuccess(int serializedSize) { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + reportingItems.add(LongValueReportingItem.GetProposalStateHashesResponse.withValue(serializedSize)); + sendReportingItems(reportingItems); + } + + @Override + public void onFault() { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + reportingItems.add(LongValueReportingItem.GetProposalStateHashesResponse.withValue(-1)); + sendReportingItems(reportingItems); + } + }); + + blindVoteStateMonitoringService.addResponseListener(new StateNetworkService.ResponseListener() { + @Override + public void onSuccess(int serializedSize) { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + reportingItems.add(LongValueReportingItem.GetBlindVoteStateHashesResponse.withValue(serializedSize)); + sendReportingItems(reportingItems); + } + + @Override + public void onFault() { + ReportingItems reportingItems = new ReportingItems(getMyAddress()); + reportingItems.add(LongValueReportingItem.GetBlindVoteStateHashesResponse.withValue(-1)); + sendReportingItems(reportingItems); + } + }); } public void shutDown() { @@ -213,7 +324,7 @@ private void sendDataReport() { numItemsByType.putIfAbsent(className, 0); numItemsByType.put(className, numItemsByType.get(className) + 1); }); - numItemsByType.forEach((key, value) -> reportingItems.add(LongValueReportingItem.from(key, value))); + numItemsByType.forEach((key, value) -> LongValueReportingItem.from(key, value).ifPresent(reportingItems::add)); // Network reportingItems.add(LongValueReportingItem.numConnections.withValue(networkNode.getAllConnections().size())); @@ -233,16 +344,15 @@ private void sendDataReport() { reportingItems.add(LongValueReportingItem.maxConnections.withValue(maxConnections)); reportingItems.add(StringValueReportingItem.version.withValue(Version.VERSION)); - // If no commit hash is found we use 0 in hex format - String commitHash = Version.findCommitHash().orElse("00"); - reportingItems.add(StringValueReportingItem.commitHash.withValue(commitHash)); + Version.findCommitHash().ifPresent(commitHash -> reportingItems.add(StringValueReportingItem.commitHash.withValue(commitHash))); sendReportingItems(reportingItems); } private void sendReportingItems(ReportingItems reportingItems) { + String truncated = Utilities.toTruncatedString(reportingItems.toString()); try { - log.info("Send report to monitor server: {}", reportingItems.toString()); + log.info("Going to send report to monitor server: {}", truncated); // We send the data as hex encoded protobuf data. We do not use the envelope as it is not part of the p2p system. byte[] protoMessageAsBytes = reportingItems.toProtoMessageAsBytes(); HttpRequest request = HttpRequest.newBuilder() @@ -253,14 +363,16 @@ private void sendReportingItems(ReportingItems reportingItems) { httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()).whenComplete((response, throwable) -> { if (throwable != null) { log.warn("Exception at sending reporting data. {}", throwable.getMessage()); - } else if (response.statusCode() != 200) { - log.error("Response error message: {}", response); + } else if (response.statusCode() == 200) { + log.info("Sent successfully report to monitor server with {} items", reportingItems.size()); + } else { + log.warn("Server responded with error. Response={}", response); } }); } catch (RejectedExecutionException t) { - log.warn("Did not send reportingItems {} because of RejectedExecutionException {}", reportingItems, t.toString()); + log.warn("Did not send reportingItems {} because of RejectedExecutionException {}", truncated, t.toString()); } catch (Throwable t) { - log.warn("Did not send reportingItems {} because of exception {}", reportingItems, t.toString()); + log.warn("Did not send reportingItems {} because of exception {}", truncated, t.toString()); } } diff --git a/seednode/src/main/java/bisq/seednode/reporting/StringValueReportingItem.java b/seednode/src/main/java/bisq/seednode/reporting/StringValueReportingItem.java index 65f9edbf1f6..e814fb42f6e 100644 --- a/seednode/src/main/java/bisq/seednode/reporting/StringValueReportingItem.java +++ b/seednode/src/main/java/bisq/seednode/reporting/StringValueReportingItem.java @@ -17,13 +17,14 @@ package bisq.seednode.reporting; +import java.util.Optional; + import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; - +@Slf4j public enum StringValueReportingItem implements ReportingItem { - Unspecified("", "Unspecified"), - daoStateHash("dao", "daoStateHash"), proposalHash("dao", "proposalHash"), blindVoteHash("dao", "blindVoteHash"), @@ -49,16 +50,15 @@ public StringValueReportingItem withValue(String value) { return this; } - public static StringValueReportingItem from(String key, String value) { - StringValueReportingItem item; + public static Optional from(String key, String value) { try { - item = StringValueReportingItem.valueOf(key); + StringValueReportingItem item = StringValueReportingItem.valueOf(key); + item.setValue(value); + return Optional.of(item); } catch (Throwable t) { - item = Unspecified; + log.warn("No enum value with {}", key); + return Optional.empty(); } - - item.setValue(value); - return item; } @Override @@ -73,8 +73,8 @@ public protobuf.ReportingItem toProtoMessage() { .build(); } - public static StringValueReportingItem fromProto(protobuf.ReportingItem baseProto, - protobuf.StringValueReportingItem proto) { + public static Optional fromProto(protobuf.ReportingItem baseProto, + protobuf.StringValueReportingItem proto) { return StringValueReportingItem.from(baseProto.getKey(), proto.getValue()); }