Skip to content

Commit

Permalink
Merge pull request #6478 from HenrikJannsen/seednodes
Browse files Browse the repository at this point in the history
Seednode improvements for monitoring
  • Loading branch information
alejandrogarcia83 authored Dec 29, 2022
2 parents 4c237dc + 42dc0b5 commit 2298d12
Show file tree
Hide file tree
Showing 29 changed files with 418 additions and 168 deletions.
11 changes: 9 additions & 2 deletions common/src/main/java/bisq/common/setup/CommonSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.nio.file.Paths;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

import ch.qos.logback.classic.Level;
Expand Down Expand Up @@ -72,13 +73,19 @@ public static void startPeriodicTasks() {

public static void setupUncaughtExceptionHandler(UncaughtExceptionHandler uncaughtExceptionHandler) {
Thread.UncaughtExceptionHandler handler = (thread, throwable) -> {
// Might come from another thread
if (throwable.getCause() != null && throwable.getCause().getCause() != null &&
throwable.getCause().getCause() instanceof BlockStoreException) {
log.error(throwable.getMessage());
log.error("Uncaught BlockStoreException ", throwable);
} else if (throwable instanceof OutOfMemoryError) {
Profiler.printSystemLoad();
log.error("OutOfMemoryError occurred. We shut down.", throwable);
// Leave it to the handleUncaughtException to shut down or not.
UserThread.execute(() -> uncaughtExceptionHandler.handleUncaughtException(throwable, false));
} else if (throwable instanceof ClassCastException &&
"sun.awt.image.BufImgSurfaceData cannot be cast to sun.java2d.xr.XRSurfaceData".equals(throwable.getMessage())) {
log.warn(throwable.getMessage());
} else if (throwable instanceof RejectedExecutionException) {
log.error("Uncaught RejectedExecutionException ", throwable);
} else if (throwable instanceof UnsupportedOperationException &&
"The system tray is not supported on the current platform.".equals(throwable.getMessage())) {
log.warn(throwable.getMessage());
Expand Down
57 changes: 0 additions & 57 deletions common/src/main/java/bisq/common/util/CompletableFutureUtil.java

This file was deleted.

4 changes: 2 additions & 2 deletions core/src/main/java/bisq/core/app/BisqExecutable.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public abstract class BisqExecutable implements GracefulShutDownHandler, BisqSet
protected Injector injector;
protected AppModule module;
protected Config config;
private boolean isShutdownInProgress;
protected volatile boolean isShutdownInProgress;
private boolean hasDowngraded;

public BisqExecutable(String fullName, String scriptName, String appName, String version) {
Expand Down Expand Up @@ -281,7 +281,7 @@ public void run() {
}
}

private void flushAndExit(ResultHandler resultHandler, int status) {
protected void flushAndExit(ResultHandler resultHandler, int status) {
if (!hasDowngraded) {
// If user tried to downgrade we do not write the persistable data to avoid data corruption
log.info("PersistenceManager flushAllDataToDiskAtShutdown started");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ public AccountingNodeProvider(AccountingLiteNode liteNode,
&& preferences.getRpcPw() != null &&
!preferences.getRpcPw().isEmpty() &&
preferences.getBlockNotifyPort() > 0;
if (BurningManService.isActivated()) {
accountingNode = isBmFullNode && rpcDataSet ? fullNode : liteNode;
if (isBmFullNode && rpcDataSet) {
accountingNode = fullNode;
} else {
accountingNode = inActiveAccountingNode;
accountingNode = BurningManService.isActivated() ? liteNode : inActiveAccountingNode;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import bisq.core.dao.monitoring.model.BlindVoteStateBlock;
import bisq.core.dao.monitoring.model.BlindVoteStateHash;
import bisq.core.dao.monitoring.network.BlindVoteStateNetworkService;
import bisq.core.dao.monitoring.network.StateNetworkService;
import bisq.core.dao.monitoring.network.messages.GetBlindVoteStateHashesRequest;
import bisq.core.dao.monitoring.network.messages.NewBlindVoteStateHashMessage;
import bisq.core.dao.state.DaoStateListener;
Expand Down Expand Up @@ -230,6 +231,10 @@ public void requestHashesFromGenesisBlockHeight(String peersAddress) {
blindVoteStateNetworkService.requestHashes(genesisTxInfo.getGenesisBlockHeight(), peersAddress);
}

public void addResponseListener(StateNetworkService.ResponseListener responseListener) {
blindVoteStateNetworkService.addResponseListener(responseListener);
}


///////////////////////////////////////////////////////////////////////////////////////////
// Listeners
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import bisq.core.dao.monitoring.model.UtxoMismatch;
import bisq.core.dao.monitoring.network.Checkpoint;
import bisq.core.dao.monitoring.network.DaoStateNetworkService;
import bisq.core.dao.monitoring.network.StateNetworkService;
import bisq.core.dao.monitoring.network.messages.GetDaoStateHashesRequest;
import bisq.core.dao.monitoring.network.messages.NewDaoStateHashMessage;
import bisq.core.dao.state.DaoStateListener;
Expand Down Expand Up @@ -289,6 +290,10 @@ public void setCreateSnapshotHandler(Runnable handler) {
createSnapshotHandler = handler;
}

public void addResponseListener(StateNetworkService.ResponseListener responseListener) {
daoStateNetworkService.addResponseListener(responseListener);
}


///////////////////////////////////////////////////////////////////////////////////////////
// Listeners
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import bisq.core.dao.monitoring.model.ProposalStateBlock;
import bisq.core.dao.monitoring.model.ProposalStateHash;
import bisq.core.dao.monitoring.network.ProposalStateNetworkService;
import bisq.core.dao.monitoring.network.StateNetworkService;
import bisq.core.dao.monitoring.network.messages.GetProposalStateHashesRequest;
import bisq.core.dao.monitoring.network.messages.NewProposalStateHashMessage;
import bisq.core.dao.state.DaoStateListener;
Expand Down Expand Up @@ -232,6 +233,10 @@ public void requestHashesFromGenesisBlockHeight(String peersAddress) {
proposalStateNetworkService.requestHashes(genesisTxInfo.getGenesisBlockHeight(), peersAddress);
}

public void addResponseListener(StateNetworkService.ResponseListener responseListener) {
proposalStateNetworkService.addResponseListener(responseListener);
}


///////////////////////////////////////////////////////////////////////////////////////////
// Listeners
Expand Down Expand Up @@ -294,7 +299,9 @@ private boolean maybeUpdateHashChain(int blockHeight) {
return true;
}

private boolean processPeersProposalStateHash(ProposalStateHash proposalStateHash, Optional<NodeAddress> peersNodeAddress, boolean notifyListeners) {
private boolean processPeersProposalStateHash(ProposalStateHash proposalStateHash,
Optional<NodeAddress> peersNodeAddress,
boolean notifyListeners) {
AtomicBoolean changed = new AtomicBoolean(false);
AtomicBoolean inConflictWithNonSeedNode = new AtomicBoolean(this.isInConflictWithNonSeedNode);
AtomicBoolean inConflictWithSeedNode = new AtomicBoolean(this.isInConflictWithSeedNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@
import bisq.network.p2p.peers.Broadcaster;
import bisq.network.p2p.peers.PeerManager;

import bisq.common.UserThread;
import bisq.common.proto.network.NetworkEnvelope;

import javax.inject.Inject;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -42,6 +48,8 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.jetbrains.annotations.NotNull;

import javax.annotation.Nullable;

@Slf4j
Expand All @@ -59,6 +67,12 @@ public interface Listener<Msg extends NewStateHashMessage, Req extends GetStateH
void onPeersStateHashes(List<StH> stateHashes, Optional<NodeAddress> peersNodeAddress);
}

public interface ResponseListener {
void onSuccess(int serializedSize);

void onFault();
}

protected final NetworkNode networkNode;
protected final PeerManager peerManager;
private final Broadcaster broadcaster;
Expand All @@ -67,6 +81,7 @@ public interface Listener<Msg extends NewStateHashMessage, Req extends GetStateH
private final Map<NodeAddress, Han> requestStateHashHandlerMap = new HashMap<>();
private final List<Listener<Msg, Req, StH>> listeners = new CopyOnWriteArrayList<>();
private boolean messageListenerAdded;
private final List<ResponseListener> responseListeners = new CopyOnWriteArrayList<>();


///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -145,7 +160,20 @@ public void sendGetStateHashesResponse(Connection connection, int nonce, List<St
Res getStateHashesResponse = getGetStateHashesResponse(nonce, stateHashes);
log.info("Send {} with {} stateHashes to peer {}", getStateHashesResponse.getClass().getSimpleName(),
stateHashes.size(), connection.getPeersNodeAddressOptional());
networkNode.sendMessage(connection, getStateHashesResponse);
SettableFuture<Connection> future = networkNode.sendMessage(connection, getStateHashesResponse);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
UserThread.execute(() -> responseListeners.forEach(listeners -> listeners.onSuccess(getStateHashesResponse.toProtoMessage().getSerializedSize()))
);
}

@Override
public void onFailure(@NotNull Throwable throwable) {
UserThread.execute(() -> responseListeners.forEach(StateNetworkService.ResponseListener::onFault)
);
}
}, MoreExecutors.directExecutor());
}

public void requestHashesFromAllConnectedSeedNodes(int fromHeight) {
Expand All @@ -171,6 +199,10 @@ public boolean isSeedNode(NodeAddress nodeAddress) {
return peerManager.isSeedNode(nodeAddress);
}

public void addResponseListener(ResponseListener responseListener) {
responseListeners.add(responseListener);
}


///////////////////////////////////////////////////////////////////////////////////////////
// Listeners
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/bisq/core/dao/node/BsqNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public abstract class BsqNode implements DaoSetupService {
// (not parsed) block.
@Getter
protected int chainTipHeight;
protected volatile boolean shutdownInProgress;


///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -156,6 +157,7 @@ public void setWarnMessageHandler(@SuppressWarnings("NullableProblems") Consumer
}

public void shutDown() {
shutdownInProgress = true;
exportJsonFilesService.shutDown();
daoStateSnapshotService.shutDown();
}
Expand Down Expand Up @@ -200,6 +202,10 @@ protected void startReOrgFromLastSnapshot() {


protected Optional<Block> doParseBlock(RawBlock rawBlock) throws RequiredReorgFromSnapshotException {
if (shutdownInProgress) {
return Optional.empty();
}

// We check if we have a block with that height. If so we return. We do not use the chainHeight as with genesis
// height we have no block but chainHeight is initially set to genesis height (bad design ;-( but a bit tricky
// to change now as it used in many areas.)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -63,8 +64,7 @@ public class ExportJsonFilesService implements DaoSetupService {
private final File storageDir;
private final boolean dumpBlockchainData;

private final ListeningExecutorService executor = Utilities.getListeningExecutorService("JsonExporter",
1, 1, 1200);
private final ListeningExecutorService executor;
private JsonFileManager txFileManager, txOutputFileManager, bsqStateFileManager;

@Inject
Expand All @@ -74,6 +74,9 @@ public ExportJsonFilesService(DaoStateService daoStateService,
this.daoStateService = daoStateService;
this.storageDir = storageDir;
this.dumpBlockchainData = dumpBlockchainData;

ThreadPoolExecutor threadPoolExecutor = Utilities.getThreadPoolExecutor("JsonExporter", 1, 1, 20, 60);
executor = MoreExecutors.listeningDecorator(threadPoolExecutor);
}


Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/bisq/core/dao/node/full/FullNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public void start() {

public void shutDown() {
super.shutDown();
rpcService.shutDown();
fullNodeNetworkService.shutDown();
}

Expand Down Expand Up @@ -239,6 +240,9 @@ private void parseBlockRecursively(int blockHeight,
Consumer<Block> newBlockHandler,
ResultHandler resultHandler,
Consumer<Throwable> errorHandler) {
if (shutdownInProgress) {
return;
}
rpcService.requestDtoBlock(blockHeight,
rawBlock -> {
try {
Expand Down
Loading

0 comments on commit 2298d12

Please sign in to comment.