Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reorg handling #5819

Merged
merged 4 commits into from
Nov 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions core/src/main/java/bisq/core/dao/node/full/FullNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,17 @@ public void shutDown() {

@Override
protected void startParseBlocks() {
requestChainHeadHeightAndParseBlocks(getStartBlockHeight());
}

@Override
protected void startReOrgFromLastSnapshot() {
super.startReOrgFromLastSnapshot();

int startBlockHeight = getStartBlockHeight();
rpcService.requestChainHeadHeight(chainHeight -> parseBlocksOnHeadHeight(startBlockHeight, chainHeight),

log.info("startParseBlocks: startBlockHeight={}", startBlockHeight);
rpcService.requestChainHeadHeight(chainHeight -> {
// If our persisted block is equal to the chain height we have startBlockHeight 1 block higher,
// so we do not call parseBlocksOnHeadHeight
log.info("startParseBlocks: chainHeight={}", chainHeight);
if (startBlockHeight <= chainHeight) {
parseBlocksOnHeadHeight(startBlockHeight, chainHeight);
}
},
this::handleError);
}

Expand Down Expand Up @@ -194,12 +196,6 @@ private void parseBlocksIfNewBlockAvailable(int chainHeight) {
this::handleError);
}

private void requestChainHeadHeightAndParseBlocks(int startBlockHeight) {
log.info("requestChainHeadHeightAndParseBlocks with startBlockHeight={}", startBlockHeight);
rpcService.requestChainHeadHeight(chainHeight -> parseBlocksOnHeadHeight(startBlockHeight, chainHeight),
this::handleError);
}

private void parseBlocksOnHeadHeight(int startBlockHeight, int chainHeight) {
if (startBlockHeight <= chainHeight) {
blocksToParseInBatch = chainHeight - startBlockHeight;
Expand All @@ -221,7 +217,9 @@ private void parseBlocksOnHeadHeight(int startBlockHeight, int chainHeight) {
log.warn("We are trying to start with a block which is above the chain height of Bitcoin Core. " +
"We need probably wait longer until Bitcoin Core has fully synced. " +
"We try again after a delay of 1 min.");
UserThread.runAfter(() -> requestChainHeadHeightAndParseBlocks(startBlockHeight), 60);
UserThread.runAfter(() -> rpcService.requestChainHeadHeight(chainHeight1 ->
parseBlocksOnHeadHeight(startBlockHeight, chainHeight1),
this::handleError), 60);
}
}

Expand Down
11 changes: 1 addition & 10 deletions core/src/main/java/bisq/core/dao/node/lite/LiteNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,6 @@ protected void startParseBlocks() {
liteNodeNetworkService.requestBlocks(getStartBlockHeight());
}

@Override
protected void startReOrgFromLastSnapshot() {
super.startReOrgFromLastSnapshot();

int startBlockHeight = getStartBlockHeight();
liteNodeNetworkService.reset();
liteNodeNetworkService.requestBlocks(startBlockHeight);
}


///////////////////////////////////////////////////////////////////////////////////////////
// Private
Expand Down Expand Up @@ -252,7 +243,7 @@ private void runDelayedBatchProcessing(List<RawBlock> blocks, Runnable resultHan
doParseBlock(block);
runDelayedBatchProcessing(blocks, resultHandler);
} catch (RequiredReorgFromSnapshotException e) {
resultHandler.run();
log.warn("Interrupt batch processing because if a blockchain reorg. {}", e.toString());
}
});
}
Expand Down
31 changes: 29 additions & 2 deletions seednode/src/main/java/bisq/seednode/SeedNodeMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import bisq.core.user.CookieKey;
import bisq.core.user.User;

import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.P2PService;
import bisq.network.p2p.P2PServiceListener;
import bisq.network.p2p.peers.PeerManager;
import bisq.network.p2p.seed.SeedNodeRepository;

import bisq.common.Timer;
import bisq.common.UserThread;
Expand All @@ -42,6 +44,9 @@
import com.google.inject.Key;
import com.google.inject.name.Names;

import java.util.ArrayList;
import java.util.List;

import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand Down Expand Up @@ -108,9 +113,26 @@ protected void applyInjector() {
seedNode.setInjector(injector);

if (DevEnv.isDaoActivated()) {
injector.getInstance(DaoStateSnapshotService.class).setDaoRequiresRestartHandler(() -> gracefulShutDown(() -> {
}));
injector.getInstance(DaoStateSnapshotService.class).setDaoRequiresRestartHandler(
// We shut down with a deterministic delay per seed to avoid that all seeds shut down at the
// same time in case of a reorg. We use 30 sec. as distance delay between the seeds to be on the
// safe side. We have 12 seeds so that's 6 minutes.
() -> UserThread.runAfter(this::gracefulShutDown, 1 + (getMyIndex() * 30L))
);
}
}

private int getMyIndex() {
P2PService p2PService = injector.getInstance(P2PService.class);
SeedNodeRepository seedNodeRepository = injector.getInstance(SeedNodeRepository.class);
List<NodeAddress> seedNodes = new ArrayList<>(seedNodeRepository.getSeedNodeAddresses());
NodeAddress myAddress = p2PService.getAddress();
for (int i = 0; i < seedNodes.size(); i++) {
if (seedNodes.get(i).equals(myAddress)) {
return i;
}
}
return 0;
}

@Override
Expand Down Expand Up @@ -197,6 +219,11 @@ private void setupConnectionLossCheck() {

}

private void gracefulShutDown() {
gracefulShutDown(() -> {
});
}

@Override
public void gracefulShutDown(ResultHandler resultHandler) {
seedNode.shutDown();
Expand Down