Skip to content

Commit

Permalink
Merge pull request #6418 from HenrikJannsen/improve-dao-node-domain
Browse files Browse the repository at this point in the history
Improve dao node domain
  • Loading branch information
sqrrm authored Nov 26, 2022
2 parents 92e4c62 + 671ab1f commit b10881f
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 33 deletions.
10 changes: 6 additions & 4 deletions core/src/main/java/bisq/core/dao/node/BsqNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ protected Optional<Block> doParseBlock(RawBlock rawBlock) throws RequiredReorgFr

int heightForNextBlock = daoStateService.getChainHeight() + 1;
if (rawBlock.getHeight() > heightForNextBlock) {
// rawBlock is not at expected next height but further in the future
if (!pendingBlocks.contains(rawBlock)) {
pendingBlocks.add(rawBlock);
log.info("We received a block with a future block height. We store it as pending and try to apply " +
Expand All @@ -240,16 +241,17 @@ protected Optional<Block> doParseBlock(RawBlock rawBlock) throws RequiredReorgFr
log.warn("We received a block with a future block height but we had it already added to our pendingBlocks.");
}
} else if (rawBlock.getHeight() >= daoStateService.getGenesisBlockHeight()) {
// rawBlock is not expected next height but either same height as chainHead or in the past
// We received an older block. We compare if we have it in our chain.
Optional<Block> optionalBlock = daoStateService.getBlockAtHeight(rawBlock.getHeight());
if (optionalBlock.isPresent()) {
if (optionalBlock.get().getHash().equals(rawBlock.getPreviousBlockHash())) {
Optional<Block> existingBlockAsSameHeight = daoStateService.getBlockAtHeight(rawBlock.getHeight());
if (existingBlockAsSameHeight.isPresent()) {
if (existingBlockAsSameHeight.get().getHash().equals(rawBlock.getHash())) {
log.info("We received an old block we have already parsed and added. We ignore it.");
} else {
log.info("We received an old block with a different hash. We ignore it. Hash={}", rawBlock.getHash());
}
} else {
log.info("In case we have reset from genesis height we would not find the block");
log.info("In case we have reset from genesis height we would not find the existingBlockAsSameHeight");
}
} else {
log.info("We ignore it as it was before genesis height");
Expand Down
29 changes: 15 additions & 14 deletions core/src/main/java/bisq/core/dao/node/full/FullNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import bisq.core.dao.state.model.blockchain.Block;

import bisq.network.p2p.P2PService;
import bisq.network.p2p.network.ConnectionState;

import bisq.common.UserThread;
import bisq.common.handlers.ResultHandler;
Expand Down Expand Up @@ -58,6 +57,7 @@ public class FullNode extends BsqNode {
private boolean addBlockHandlerAdded;
private int blocksToParseInBatch;
private long parseInBatchStartTime;
private int parseBlocksOnHeadHeightCounter;


///////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -76,7 +76,6 @@ private FullNode(BlockParser blockParser,
this.rpcService = rpcService;

this.fullNodeNetworkService = fullNodeNetworkService;
ConnectionState.setExpectedRequests(5);
}


Expand Down Expand Up @@ -110,8 +109,6 @@ protected void startParseBlocks() {
int startBlockHeight = daoStateService.getChainHeight();
log.info("startParseBlocks: startBlockHeight={}", startBlockHeight);
rpcService.requestChainHeadHeight(chainHeight -> {
// If our persisted block is equal to the chain height we have startBlockHeight 1 block higher,
// so we do not call parseBlocksOnHeadHeight
log.info("startParseBlocks: chainHeight={}", chainHeight);
if (startBlockHeight <= chainHeight) {
parseBlocksOnHeadHeight(startBlockHeight, chainHeight);
Expand Down Expand Up @@ -205,20 +202,24 @@ private void parseBlocksOnHeadHeight(int startBlockHeight, int chainHeight) {
chainHeight,
this::onNewBlock,
() -> {
// We are done but it might be that new blocks have arrived in the meantime,
// We are done, but it might be that new blocks have arrived in the meantime,
// so we try again with startBlockHeight set to current chainHeight
// We also set up the listener in the else main branch where we check
// if we are at chainTip, so do not include here another check as it would
// not trigger the listener registration.
parseBlocksIfNewBlockAvailable(chainHeight);
}, this::handleError);
} else {
log.warn("We are trying to start with a block which is above the chain height of Bitcoin Core. " +
"We need probably wait longer until Bitcoin Core has fully synced. " +
"We try again after a delay of 1 min.");
UserThread.runAfter(() -> rpcService.requestChainHeadHeight(chainHeight1 ->
parseBlocksOnHeadHeight(startBlockHeight, chainHeight1),
this::handleError), 60);
parseBlocksOnHeadHeightCounter++;
if (parseBlocksOnHeadHeightCounter <= 5) {
log.warn("We are trying to start with a block which is above the chain height of Bitcoin Core. " +
"We need to wait longer until Bitcoin Core has fully synced. " +
"We try again after a delay of {} min.", parseBlocksOnHeadHeightCounter * parseBlocksOnHeadHeightCounter);
UserThread.runAfter(() -> rpcService.requestChainHeadHeight(height ->
parseBlocksOnHeadHeight(startBlockHeight, height),
this::handleError), parseBlocksOnHeadHeightCounter * parseBlocksOnHeadHeightCounter * 60L);
} else {
log.warn("We tried {} times to start with startBlockHeight {} which is above the chain height {} of Bitcoin Core. " +
"It might be that Bitcoin Core has not fully synced. We give up now.",
parseBlocksOnHeadHeightCounter, startBlockHeight, chainHeight);
}
}
}

Expand Down
8 changes: 6 additions & 2 deletions core/src/main/java/bisq/core/dao/node/full/RawBlock.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ public static RawBlock fromProto(protobuf.BaseBlock proto) {
@Override
public String toString() {
return "RawBlock{" +
"\n rawTxs=" + rawTxs +
"\n} " + super.toString();
"\n height=" + height +
",\n time=" + time +
",\n hash='" + hash + '\'' +
",\n previousBlockHash='" + previousBlockHash + '\'' +
",\n rawTxs=" + rawTxs +
"\n}";
}
}
10 changes: 8 additions & 2 deletions core/src/main/java/bisq/core/dao/node/full/RawTx.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,13 @@ public static RawTx fromProto(protobuf.BaseTx protoBaseTx) {
@Override
public String toString() {
return "RawTx{" +
"\n rawTxOutputs=" + rawTxOutputs +
"\n} " + super.toString();
"\n txVersion='" + txVersion + '\'' +
",\n id='" + id + '\'' +
",\n blockHeight=" + blockHeight +
",\n blockHash='" + blockHash + '\'' +
",\n time=" + time +
",\n txInputs=" + txInputs +
",\n rawTxOutputs=" + rawTxOutputs +
"\n }";
}
}
11 changes: 10 additions & 1 deletion core/src/main/java/bisq/core/dao/node/full/RawTxOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import bisq.core.dao.state.model.blockchain.TxOutput;

import bisq.common.proto.network.NetworkPayload;
import bisq.common.util.Utilities;

import lombok.EqualsAndHashCode;
import lombok.Value;
Expand Down Expand Up @@ -88,6 +89,14 @@ public static RawTxOutput fromProto(protobuf.BaseTxOutput proto) {

@Override
public String toString() {
return "RawTxOutput{} " + super.toString();
return "RawTxOutput{" +
"\n index=" + index +
",\n value=" + value +
",\n txId='" + txId + '\'' +
",\n pubKeyScript=" + pubKeyScript +
",\n address='" + address + '\'' +
",\n opReturnData=" + Utilities.bytesAsHexString(opReturnData) +
",\n blockHeight=" + blockHeight +
"\n }";
}
}
4 changes: 4 additions & 0 deletions core/src/main/java/bisq/core/dao/node/lite/LiteNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import bisq.network.p2p.P2PService;
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.ConnectionState;

import bisq.common.Timer;
import bisq.common.UserThread;
Expand Down Expand Up @@ -191,6 +192,9 @@ protected void startParseBlocks() {
return;
}

// If we request blocks we increment the ConnectionState counter.
ConnectionState.incrementExpectedInitialDataResponses();

if (chainHeight == daoStateService.getGenesisBlockHeight()) {
liteNodeNetworkService.requestBlocks(chainHeight);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ public TxOutputKey getConnectedTxOutputKey() {
@Override
public String toString() {
return "TxInput{" +
"\n connectedTxOutputTxId='" + connectedTxOutputTxId + '\'' +
",\n connectedTxOutputIndex=" + connectedTxOutputIndex +
",\n pubKey=" + pubKey +
"\n}";
"\n connectedTxOutputTxId='" + connectedTxOutputTxId + '\'' +
",\n connectedTxOutputIndex=" + connectedTxOutputIndex +
",\n pubKey=" + pubKey +
"\n }";
}
}
17 changes: 11 additions & 6 deletions p2p/src/main/java/bisq/network/p2p/network/ConnectionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@ public class ConnectionState implements MessageListener {
private static final long PEER_RESET_TIMER_DELAY_SEC = TimeUnit.MINUTES.toSeconds(4);
private static final long COMPLETED_TIMER_DELAY_SEC = 10;

// Number of expected requests in standard case. Can be different according to network conditions.
// Is different for LiteDaoNodes and FullDaoNodes
@Setter
private static int expectedRequests = 6;
// We have 2 GetDataResponses and 3 GetHashResponses. If node is a lite node it also has a GetBlocksResponse if
// blocks are missing.
private static final int MIN_EXPECTED_RESPONSES = 5;
private static int expectedInitialDataResponses = MIN_EXPECTED_RESPONSES;

// If app runs in LiteNode mode there is one more expected request for the getBlocks request, so we increment standard value.
public static void incrementExpectedInitialDataResponses() {
expectedInitialDataResponses += 1;
}

private final Connection connection;

Expand Down Expand Up @@ -124,7 +129,7 @@ private void onInitialDataExchange() {
}

private void maybeResetInitialDataExchangeType() {
if (numInitialDataResponses >= expectedRequests) {
if (numInitialDataResponses >= expectedInitialDataResponses) {
// We have received the expected messages from initial data requests. We delay a bit the reset
// to give time for processing the response and more tolerance to edge cases where we expect more responses.
// Reset to PEER does not mean disconnection as well, but just that this connection has lower priority and
Expand Down Expand Up @@ -168,7 +173,7 @@ public String toString() {
",\n numInitialDataResponses=" + numInitialDataResponses +
",\n lastInitialDataMsgTimeStamp=" + lastInitialDataMsgTimeStamp +
",\n isSeedNode=" + isSeedNode +
",\n expectedRequests=" + expectedRequests +
",\n expectedInitialDataResponses=" + expectedInitialDataResponses +
"\n}";
}
}

0 comments on commit b10881f

Please sign in to comment.