Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve dao node domain #6418

Merged
merged 5 commits into from
Nov 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions core/src/main/java/bisq/core/dao/node/BsqNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ protected Optional<Block> doParseBlock(RawBlock rawBlock) throws RequiredReorgFr

int heightForNextBlock = daoStateService.getChainHeight() + 1;
if (rawBlock.getHeight() > heightForNextBlock) {
// rawBlock is not at expected next height but further in the future
if (!pendingBlocks.contains(rawBlock)) {
pendingBlocks.add(rawBlock);
log.info("We received a block with a future block height. We store it as pending and try to apply " +
Expand All @@ -240,16 +241,17 @@ protected Optional<Block> doParseBlock(RawBlock rawBlock) throws RequiredReorgFr
log.warn("We received a block with a future block height but we had it already added to our pendingBlocks.");
}
} else if (rawBlock.getHeight() >= daoStateService.getGenesisBlockHeight()) {
// rawBlock is not expected next height but either same height as chainHead or in the past
// We received an older block. We compare if we have it in our chain.
Optional<Block> optionalBlock = daoStateService.getBlockAtHeight(rawBlock.getHeight());
if (optionalBlock.isPresent()) {
if (optionalBlock.get().getHash().equals(rawBlock.getPreviousBlockHash())) {
Optional<Block> existingBlockAsSameHeight = daoStateService.getBlockAtHeight(rawBlock.getHeight());
if (existingBlockAsSameHeight.isPresent()) {
if (existingBlockAsSameHeight.get().getHash().equals(rawBlock.getHash())) {
log.info("We received an old block we have already parsed and added. We ignore it.");
} else {
log.info("We received an old block with a different hash. We ignore it. Hash={}", rawBlock.getHash());
}
} else {
log.info("In case we have reset from genesis height we would not find the block");
log.info("In case we have reset from genesis height we would not find the existingBlockAsSameHeight");
}
} else {
log.info("We ignore it as it was before genesis height");
Expand Down
29 changes: 15 additions & 14 deletions core/src/main/java/bisq/core/dao/node/full/FullNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import bisq.core.dao.state.model.blockchain.Block;

import bisq.network.p2p.P2PService;
import bisq.network.p2p.network.ConnectionState;

import bisq.common.UserThread;
import bisq.common.handlers.ResultHandler;
Expand Down Expand Up @@ -58,6 +57,7 @@ public class FullNode extends BsqNode {
private boolean addBlockHandlerAdded;
private int blocksToParseInBatch;
private long parseInBatchStartTime;
private int parseBlocksOnHeadHeightCounter;


///////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -76,7 +76,6 @@ private FullNode(BlockParser blockParser,
this.rpcService = rpcService;

this.fullNodeNetworkService = fullNodeNetworkService;
ConnectionState.setExpectedRequests(5);
}


Expand Down Expand Up @@ -110,8 +109,6 @@ protected void startParseBlocks() {
int startBlockHeight = daoStateService.getChainHeight();
log.info("startParseBlocks: startBlockHeight={}", startBlockHeight);
rpcService.requestChainHeadHeight(chainHeight -> {
// If our persisted block is equal to the chain height we have startBlockHeight 1 block higher,
// so we do not call parseBlocksOnHeadHeight
log.info("startParseBlocks: chainHeight={}", chainHeight);
if (startBlockHeight <= chainHeight) {
parseBlocksOnHeadHeight(startBlockHeight, chainHeight);
Expand Down Expand Up @@ -205,20 +202,24 @@ private void parseBlocksOnHeadHeight(int startBlockHeight, int chainHeight) {
chainHeight,
this::onNewBlock,
() -> {
// We are done but it might be that new blocks have arrived in the meantime,
// We are done, but it might be that new blocks have arrived in the meantime,
// so we try again with startBlockHeight set to current chainHeight
// We also set up the listener in the else main branch where we check
// if we are at chainTip, so do not include here another check as it would
// not trigger the listener registration.
parseBlocksIfNewBlockAvailable(chainHeight);
}, this::handleError);
} else {
log.warn("We are trying to start with a block which is above the chain height of Bitcoin Core. " +
"We need probably wait longer until Bitcoin Core has fully synced. " +
"We try again after a delay of 1 min.");
UserThread.runAfter(() -> rpcService.requestChainHeadHeight(chainHeight1 ->
parseBlocksOnHeadHeight(startBlockHeight, chainHeight1),
this::handleError), 60);
parseBlocksOnHeadHeightCounter++;
if (parseBlocksOnHeadHeightCounter <= 5) {
log.warn("We are trying to start with a block which is above the chain height of Bitcoin Core. " +
"We need to wait longer until Bitcoin Core has fully synced. " +
"We try again after a delay of {} min.", parseBlocksOnHeadHeightCounter * parseBlocksOnHeadHeightCounter);
UserThread.runAfter(() -> rpcService.requestChainHeadHeight(height ->
parseBlocksOnHeadHeight(startBlockHeight, height),
this::handleError), parseBlocksOnHeadHeightCounter * parseBlocksOnHeadHeightCounter * 60L);
} else {
log.warn("We tried {} times to start with startBlockHeight {} which is above the chain height {} of Bitcoin Core. " +
"It might be that Bitcoin Core has not fully synced. We give up now.",
parseBlocksOnHeadHeightCounter, startBlockHeight, chainHeight);
}
}
}

Expand Down
8 changes: 6 additions & 2 deletions core/src/main/java/bisq/core/dao/node/full/RawBlock.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ public static RawBlock fromProto(protobuf.BaseBlock proto) {
@Override
public String toString() {
return "RawBlock{" +
"\n rawTxs=" + rawTxs +
"\n} " + super.toString();
"\n height=" + height +
",\n time=" + time +
",\n hash='" + hash + '\'' +
",\n previousBlockHash='" + previousBlockHash + '\'' +
",\n rawTxs=" + rawTxs +
"\n}";
}
}
10 changes: 8 additions & 2 deletions core/src/main/java/bisq/core/dao/node/full/RawTx.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,13 @@ public static RawTx fromProto(protobuf.BaseTx protoBaseTx) {
@Override
public String toString() {
return "RawTx{" +
"\n rawTxOutputs=" + rawTxOutputs +
"\n} " + super.toString();
"\n txVersion='" + txVersion + '\'' +
",\n id='" + id + '\'' +
",\n blockHeight=" + blockHeight +
",\n blockHash='" + blockHash + '\'' +
",\n time=" + time +
",\n txInputs=" + txInputs +
",\n rawTxOutputs=" + rawTxOutputs +
"\n }";
}
}
11 changes: 10 additions & 1 deletion core/src/main/java/bisq/core/dao/node/full/RawTxOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import bisq.core.dao.state.model.blockchain.TxOutput;

import bisq.common.proto.network.NetworkPayload;
import bisq.common.util.Utilities;

import lombok.EqualsAndHashCode;
import lombok.Value;
Expand Down Expand Up @@ -88,6 +89,14 @@ public static RawTxOutput fromProto(protobuf.BaseTxOutput proto) {

@Override
public String toString() {
return "RawTxOutput{} " + super.toString();
return "RawTxOutput{" +
"\n index=" + index +
",\n value=" + value +
",\n txId='" + txId + '\'' +
",\n pubKeyScript=" + pubKeyScript +
",\n address='" + address + '\'' +
",\n opReturnData=" + Utilities.bytesAsHexString(opReturnData) +
",\n blockHeight=" + blockHeight +
"\n }";
}
}
4 changes: 4 additions & 0 deletions core/src/main/java/bisq/core/dao/node/lite/LiteNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import bisq.network.p2p.P2PService;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.ConnectionState;

import bisq.common.Timer;
import bisq.common.UserThread;
Expand Down Expand Up @@ -191,6 +192,9 @@ protected void startParseBlocks() {
return;
}

// If we request blocks we increment the ConnectionState counter.
ConnectionState.incrementExpectedInitialDataResponses();

if (chainHeight == daoStateService.getGenesisBlockHeight()) {
liteNodeNetworkService.requestBlocks(chainHeight);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ public TxOutputKey getConnectedTxOutputKey() {
@Override
public String toString() {
return "TxInput{" +
"\n connectedTxOutputTxId='" + connectedTxOutputTxId + '\'' +
",\n connectedTxOutputIndex=" + connectedTxOutputIndex +
",\n pubKey=" + pubKey +
"\n}";
"\n connectedTxOutputTxId='" + connectedTxOutputTxId + '\'' +
",\n connectedTxOutputIndex=" + connectedTxOutputIndex +
",\n pubKey=" + pubKey +
"\n }";
}
}
17 changes: 11 additions & 6 deletions p2p/src/main/java/bisq/network/p2p/network/ConnectionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@ public class ConnectionState implements MessageListener {
private static final long PEER_RESET_TIMER_DELAY_SEC = TimeUnit.MINUTES.toSeconds(4);
private static final long COMPLETED_TIMER_DELAY_SEC = 10;

// Number of expected requests in standard case. Can be different according to network conditions.
// Is different for LiteDaoNodes and FullDaoNodes
@Setter
private static int expectedRequests = 6;
// We have 2 GetDataResponses and 3 GetHashResponses. If node is a lite node it also has a GetBlocksResponse if
// blocks are missing.
private static final int MIN_EXPECTED_RESPONSES = 5;
private static int expectedInitialDataResponses = MIN_EXPECTED_RESPONSES;

// If app runs in LiteNode mode there is one more expected request for the getBlocks request, so we increment standard value.
public static void incrementExpectedInitialDataResponses() {
expectedInitialDataResponses += 1;
}

private final Connection connection;

Expand Down Expand Up @@ -124,7 +129,7 @@ private void onInitialDataExchange() {
}

private void maybeResetInitialDataExchangeType() {
if (numInitialDataResponses >= expectedRequests) {
if (numInitialDataResponses >= expectedInitialDataResponses) {
// We have received the expected messages from initial data requests. We delay a bit the reset
// to give time for processing the response and more tolerance to edge cases where we expect more responses.
// Reset to PEER does not mean disconnection as well, but just that this connection has lower priority and
Expand Down Expand Up @@ -168,7 +173,7 @@ public String toString() {
",\n numInitialDataResponses=" + numInitialDataResponses +
",\n lastInitialDataMsgTimeStamp=" + lastInitialDataMsgTimeStamp +
",\n isSeedNode=" + isSeedNode +
",\n expectedRequests=" + expectedRequests +
",\n expectedInitialDataResponses=" + expectedInitialDataResponses +
"\n}";
}
}