Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Seednode improvements for monitoring #6478

Merged
merged 16 commits into from
Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions common/src/main/java/bisq/common/setup/CommonSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.nio.file.Paths;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

import ch.qos.logback.classic.Level;
Expand Down Expand Up @@ -72,13 +73,19 @@ public static void startPeriodicTasks() {

public static void setupUncaughtExceptionHandler(UncaughtExceptionHandler uncaughtExceptionHandler) {
Thread.UncaughtExceptionHandler handler = (thread, throwable) -> {
// Might come from another thread
if (throwable.getCause() != null && throwable.getCause().getCause() != null &&
throwable.getCause().getCause() instanceof BlockStoreException) {
log.error(throwable.getMessage());
log.error("Uncaught BlockStoreException ", throwable);
} else if (throwable instanceof OutOfMemoryError) {
Profiler.printSystemLoad();
log.error("OutOfMemoryError occurred. We shut down.", throwable);
// Leave it to the handleUncaughtException to shut down or not.
UserThread.execute(() -> uncaughtExceptionHandler.handleUncaughtException(throwable, false));
} else if (throwable instanceof ClassCastException &&
"sun.awt.image.BufImgSurfaceData cannot be cast to sun.java2d.xr.XRSurfaceData".equals(throwable.getMessage())) {
log.warn(throwable.getMessage());
} else if (throwable instanceof RejectedExecutionException) {
log.error("Uncaught RejectedExecutionException ", throwable);
} else if (throwable instanceof UnsupportedOperationException &&
"The system tray is not supported on the current platform.".equals(throwable.getMessage())) {
log.warn(throwable.getMessage());
Expand Down
57 changes: 0 additions & 57 deletions common/src/main/java/bisq/common/util/CompletableFutureUtil.java

This file was deleted.

4 changes: 2 additions & 2 deletions core/src/main/java/bisq/core/app/BisqExecutable.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public abstract class BisqExecutable implements GracefulShutDownHandler, BisqSet
protected Injector injector;
protected AppModule module;
protected Config config;
private boolean isShutdownInProgress;
protected volatile boolean isShutdownInProgress;
private boolean hasDowngraded;

public BisqExecutable(String fullName, String scriptName, String appName, String version) {
Expand Down Expand Up @@ -281,7 +281,7 @@ public void run() {
}
}

private void flushAndExit(ResultHandler resultHandler, int status) {
protected void flushAndExit(ResultHandler resultHandler, int status) {
if (!hasDowngraded) {
// If user tried to downgrade we do not write the persistable data to avoid data corruption
log.info("PersistenceManager flushAllDataToDiskAtShutdown started");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ public AccountingNodeProvider(AccountingLiteNode liteNode,
&& preferences.getRpcPw() != null &&
!preferences.getRpcPw().isEmpty() &&
preferences.getBlockNotifyPort() > 0;
if (BurningManService.isActivated()) {
accountingNode = isBmFullNode && rpcDataSet ? fullNode : liteNode;
if (isBmFullNode && rpcDataSet) {
accountingNode = fullNode;
} else {
accountingNode = inActiveAccountingNode;
accountingNode = BurningManService.isActivated() ? liteNode : inActiveAccountingNode;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import bisq.core.dao.monitoring.model.BlindVoteStateBlock;
import bisq.core.dao.monitoring.model.BlindVoteStateHash;
import bisq.core.dao.monitoring.network.BlindVoteStateNetworkService;
import bisq.core.dao.monitoring.network.StateNetworkService;
import bisq.core.dao.monitoring.network.messages.GetBlindVoteStateHashesRequest;
import bisq.core.dao.monitoring.network.messages.NewBlindVoteStateHashMessage;
import bisq.core.dao.state.DaoStateListener;
Expand Down Expand Up @@ -230,6 +231,10 @@ public void requestHashesFromGenesisBlockHeight(String peersAddress) {
blindVoteStateNetworkService.requestHashes(genesisTxInfo.getGenesisBlockHeight(), peersAddress);
}

public void addResponseListener(StateNetworkService.ResponseListener responseListener) {
blindVoteStateNetworkService.addResponseListener(responseListener);
}


///////////////////////////////////////////////////////////////////////////////////////////
// Listeners
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import bisq.core.dao.monitoring.model.UtxoMismatch;
import bisq.core.dao.monitoring.network.Checkpoint;
import bisq.core.dao.monitoring.network.DaoStateNetworkService;
import bisq.core.dao.monitoring.network.StateNetworkService;
import bisq.core.dao.monitoring.network.messages.GetDaoStateHashesRequest;
import bisq.core.dao.monitoring.network.messages.NewDaoStateHashMessage;
import bisq.core.dao.state.DaoStateListener;
Expand Down Expand Up @@ -289,6 +290,10 @@ public void setCreateSnapshotHandler(Runnable handler) {
createSnapshotHandler = handler;
}

public void addResponseListener(StateNetworkService.ResponseListener responseListener) {
daoStateNetworkService.addResponseListener(responseListener);
}


///////////////////////////////////////////////////////////////////////////////////////////
// Listeners
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import bisq.core.dao.monitoring.model.ProposalStateBlock;
import bisq.core.dao.monitoring.model.ProposalStateHash;
import bisq.core.dao.monitoring.network.ProposalStateNetworkService;
import bisq.core.dao.monitoring.network.StateNetworkService;
import bisq.core.dao.monitoring.network.messages.GetProposalStateHashesRequest;
import bisq.core.dao.monitoring.network.messages.NewProposalStateHashMessage;
import bisq.core.dao.state.DaoStateListener;
Expand Down Expand Up @@ -232,6 +233,10 @@ public void requestHashesFromGenesisBlockHeight(String peersAddress) {
proposalStateNetworkService.requestHashes(genesisTxInfo.getGenesisBlockHeight(), peersAddress);
}

public void addResponseListener(StateNetworkService.ResponseListener responseListener) {
proposalStateNetworkService.addResponseListener(responseListener);
}


///////////////////////////////////////////////////////////////////////////////////////////
// Listeners
Expand Down Expand Up @@ -294,7 +299,9 @@ private boolean maybeUpdateHashChain(int blockHeight) {
return true;
}

private boolean processPeersProposalStateHash(ProposalStateHash proposalStateHash, Optional<NodeAddress> peersNodeAddress, boolean notifyListeners) {
private boolean processPeersProposalStateHash(ProposalStateHash proposalStateHash,
Optional<NodeAddress> peersNodeAddress,
boolean notifyListeners) {
AtomicBoolean changed = new AtomicBoolean(false);
AtomicBoolean inConflictWithNonSeedNode = new AtomicBoolean(this.isInConflictWithNonSeedNode);
AtomicBoolean inConflictWithSeedNode = new AtomicBoolean(this.isInConflictWithSeedNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@
import bisq.network.p2p.peers.Broadcaster;
import bisq.network.p2p.peers.PeerManager;

import bisq.common.UserThread;
import bisq.common.proto.network.NetworkEnvelope;

import javax.inject.Inject;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -42,6 +48,8 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.jetbrains.annotations.NotNull;

import javax.annotation.Nullable;

@Slf4j
Expand All @@ -59,6 +67,12 @@ public interface Listener<Msg extends NewStateHashMessage, Req extends GetStateH
void onPeersStateHashes(List<StH> stateHashes, Optional<NodeAddress> peersNodeAddress);
}

public interface ResponseListener {
void onSuccess(int serializedSize);

void onFault();
}

protected final NetworkNode networkNode;
protected final PeerManager peerManager;
private final Broadcaster broadcaster;
Expand All @@ -67,6 +81,7 @@ public interface Listener<Msg extends NewStateHashMessage, Req extends GetStateH
private final Map<NodeAddress, Han> requestStateHashHandlerMap = new HashMap<>();
private final List<Listener<Msg, Req, StH>> listeners = new CopyOnWriteArrayList<>();
private boolean messageListenerAdded;
private final List<ResponseListener> responseListeners = new CopyOnWriteArrayList<>();


///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -145,7 +160,20 @@ public void sendGetStateHashesResponse(Connection connection, int nonce, List<St
Res getStateHashesResponse = getGetStateHashesResponse(nonce, stateHashes);
log.info("Send {} with {} stateHashes to peer {}", getStateHashesResponse.getClass().getSimpleName(),
stateHashes.size(), connection.getPeersNodeAddressOptional());
networkNode.sendMessage(connection, getStateHashesResponse);
SettableFuture<Connection> future = networkNode.sendMessage(connection, getStateHashesResponse);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
UserThread.execute(() -> responseListeners.forEach(listeners -> listeners.onSuccess(getStateHashesResponse.toProtoMessage().getSerializedSize()))
);
}

@Override
public void onFailure(@NotNull Throwable throwable) {
UserThread.execute(() -> responseListeners.forEach(StateNetworkService.ResponseListener::onFault)
);
}
}, MoreExecutors.directExecutor());
}

public void requestHashesFromAllConnectedSeedNodes(int fromHeight) {
Expand All @@ -171,6 +199,10 @@ public boolean isSeedNode(NodeAddress nodeAddress) {
return peerManager.isSeedNode(nodeAddress);
}

public void addResponseListener(ResponseListener responseListener) {
responseListeners.add(responseListener);
}


///////////////////////////////////////////////////////////////////////////////////////////
// Listeners
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/bisq/core/dao/node/BsqNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public abstract class BsqNode implements DaoSetupService {
// (not parsed) block.
@Getter
protected int chainTipHeight;
protected volatile boolean shutdownInProgress;


///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -156,6 +157,7 @@ public void setWarnMessageHandler(@SuppressWarnings("NullableProblems") Consumer
}

public void shutDown() {
shutdownInProgress = true;
exportJsonFilesService.shutDown();
daoStateSnapshotService.shutDown();
}
Expand Down Expand Up @@ -200,6 +202,10 @@ protected void startReOrgFromLastSnapshot() {


protected Optional<Block> doParseBlock(RawBlock rawBlock) throws RequiredReorgFromSnapshotException {
if (shutdownInProgress) {
return Optional.empty();
}

// We check if we have a block with that height. If so we return. We do not use the chainHeight as with genesis
// height we have no block but chainHeight is initially set to genesis height (bad design ;-( but a bit tricky
// to change now as it used in many areas.)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -63,8 +64,7 @@ public class ExportJsonFilesService implements DaoSetupService {
private final File storageDir;
private final boolean dumpBlockchainData;

private final ListeningExecutorService executor = Utilities.getListeningExecutorService("JsonExporter",
1, 1, 1200);
private final ListeningExecutorService executor;
private JsonFileManager txFileManager, txOutputFileManager, bsqStateFileManager;

@Inject
Expand All @@ -74,6 +74,9 @@ public ExportJsonFilesService(DaoStateService daoStateService,
this.daoStateService = daoStateService;
this.storageDir = storageDir;
this.dumpBlockchainData = dumpBlockchainData;

ThreadPoolExecutor threadPoolExecutor = Utilities.getThreadPoolExecutor("JsonExporter", 1, 1, 20, 60);
executor = MoreExecutors.listeningDecorator(threadPoolExecutor);
}


Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/bisq/core/dao/node/full/FullNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public void start() {

public void shutDown() {
super.shutDown();
rpcService.shutDown();
fullNodeNetworkService.shutDown();
}

Expand Down Expand Up @@ -239,6 +240,9 @@ private void parseBlockRecursively(int blockHeight,
Consumer<Block> newBlockHandler,
ResultHandler resultHandler,
Consumer<Throwable> errorHandler) {
if (shutdownInProgress) {
return;
}
rpcService.requestDtoBlock(blockHeight,
rawBlock -> {
try {
Expand Down
Loading