Skip to content

Commit

Permalink
Merge pull request #4851 from chimp1984/improve-bsq-get-block-request…
Browse files Browse the repository at this point in the history
…-handling

Improve getBlocks request handling
  • Loading branch information
sqrrm authored Dec 29, 2020
2 parents 785c76e + 862b12f commit 9cc3f68
Show file tree
Hide file tree
Showing 17 changed files with 380 additions and 313 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private void sendRepublishRequest(NodeAddress nodeAddress) {
connectToNextNode();
} else {
log.warn("We have stopped already. We ignore that timeoutTimer.run call. " +
"Might be caused by an previous networkNode.sendMessage.onFailure.");
"Might be caused by a previous networkNode.sendMessage.onFailure.");
}
},
TIMEOUT);
Expand All @@ -118,7 +118,7 @@ public void onSuccess(Connection connection) {
stop();
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call." +
"Might be caused by an previous timeout.");
"Might be caused by a previous timeout.");
}
}

Expand All @@ -133,7 +133,7 @@ public void onFailure(@NotNull Throwable throwable) {
connectToNextNode();
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onFailure call. " +
"Might be caused by an previous timeout.");
"Might be caused by a previous timeout.");
}
}
}, MoreExecutors.directExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void requestStateHashes(int fromHeight) {
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
} else {
log.trace("We have stopped already. We ignore that timeoutTimer.run call. " +
"Might be caused by an previous networkNode.sendMessage.onFailure.");
"Might be caused by a previous networkNode.sendMessage.onFailure.");
}
},
TIMEOUT);
Expand All @@ -134,7 +134,7 @@ public void onSuccess(Connection connection) {
nodeAddress.getFullAddress());
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call." +
"Might be caused by an previous timeout.");
"Might be caused by a previous timeout.");
}
}

Expand All @@ -149,7 +149,7 @@ public void onFailure(@NotNull Throwable throwable) {
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onFailure call. " +
"Might be caused by an previous timeout.");
"Might be caused by a previous timeout.");
}
}
}, MoreExecutors.directExecutor());
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/java/bisq/core/dao/node/BsqNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ protected int getStartBlockHeight() {
if (chainHeight > genesisBlockHeight)
startBlockHeight = chainHeight + 1;

log.info("Start parse blocks:\n" +
log.info("getStartBlockHeight:\n" +
" Start block height={}\n" +
" Genesis txId={}\n" +
" Genesis block height={}\n" +
Expand Down Expand Up @@ -223,15 +223,14 @@ protected Optional<Block> doParseBlock(RawBlock rawBlock) throws RequiredReorgFr
// height we have no block but chainHeight is initially set to genesis height (bad design ;-( but a bit tricky
// to change now as it used in many areas.)
if (daoStateService.getBlockAtHeight(rawBlock.getHeight()).isPresent()) {
log.debug("We have already a block with the height of the new block. Height of new block={}", rawBlock.getHeight());
log.info("We have already a block with the height of the new block. Height of new block={}", rawBlock.getHeight());
return Optional.empty();
}

try {
Block block = blockParser.parseBlock(rawBlock);

if (pendingBlocks.contains(rawBlock))
pendingBlocks.remove(rawBlock);
pendingBlocks.remove(rawBlock);

// After parsing we check if we have pending blocks we might have received earlier but which have been
// not connecting from the latest height we had. The list is sorted by height
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ enum JsonTxOutputType {
INVALID_OUTPUT("Invalid");

@Getter
private String displayString;
private final String displayString;

JsonTxOutputType(String displayString) {
this.displayString = displayString;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ enum JsonTxType {
IRREGULAR("Irregular");

@Getter
private String displayString;
private final String displayString;

JsonTxType(String displayString) {
this.displayString = displayString;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,51 +136,62 @@ public void onAwakeFromStandby() {
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof GetBlocksRequest) {
// We received a GetBlocksRequest from a liteNode
if (!stopped) {
final String uid = connection.getUid();
if (!getBlocksRequestHandlers.containsKey(uid)) {
GetBlocksRequestHandler requestHandler = new GetBlocksRequestHandler(networkNode,
daoStateService,
new GetBlocksRequestHandler.Listener() {
@Override
public void onComplete() {
getBlocksRequestHandlers.remove(uid);
}

@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
getBlocksRequestHandlers.remove(uid);
if (!stopped) {
log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" +
"ErrorMessage={}", connection, errorMessage);
peerManager.handleConnectionFault(connection);
} else {
log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call.");
}
}
});
getBlocksRequestHandlers.put(uid, requestHandler);
requestHandler.onGetBlocksRequest((GetBlocksRequest) networkEnvelope, connection);
} else {
log.warn("We have already a GetDataRequestHandler for that connection started. " +
"We start a cleanup timer if the handler has not closed by itself in between 2 minutes.");

UserThread.runAfter(() -> {
if (getBlocksRequestHandlers.containsKey(uid)) {
GetBlocksRequestHandler handler = getBlocksRequestHandlers.get(uid);
handler.stop();
getBlocksRequestHandlers.remove(uid);
}
}, CLEANUP_TIMER);
}
} else {
log.warn("We have stopped already. We ignore that onMessage call.");
}
handleGetBlocksRequest((GetBlocksRequest) networkEnvelope, connection);
} else if (networkEnvelope instanceof RepublishGovernanceDataRequest) {
log.warn("We received a RepublishGovernanceDataRequest and re-published all proposalPayloads and " +
"blindVotePayloads to the P2P network.");
missingDataRequestService.reRepublishAllGovernanceData();
handleRepublishGovernanceDataRequest();
}
}

private void handleGetBlocksRequest(GetBlocksRequest getBlocksRequest, Connection connection) {
if (stopped) {
log.warn("We have stopped already. We ignore that onMessage call.");
return;
}

String uid = connection.getUid();
if (getBlocksRequestHandlers.containsKey(uid)) {
log.warn("We have already a GetDataRequestHandler for that connection started. " +
"We start a cleanup timer if the handler has not closed by itself in between 2 minutes.");

UserThread.runAfter(() -> {
if (getBlocksRequestHandlers.containsKey(uid)) {
GetBlocksRequestHandler handler = getBlocksRequestHandlers.get(uid);
handler.stop();
getBlocksRequestHandlers.remove(uid);
}
}, CLEANUP_TIMER);
return;
}

GetBlocksRequestHandler requestHandler = new GetBlocksRequestHandler(networkNode,
daoStateService,
new GetBlocksRequestHandler.Listener() {
@Override
public void onComplete() {
getBlocksRequestHandlers.remove(uid);
}

@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
getBlocksRequestHandlers.remove(uid);
if (!stopped) {
log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" +
"ErrorMessage={}", connection, errorMessage);
if (connection != null) {
peerManager.handleConnectionFault(connection);
}
} else {
log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call.");
}
}
});
getBlocksRequestHandlers.put(uid, requestHandler);
requestHandler.onGetBlocksRequest(getBlocksRequest, connection);
}

private void handleRepublishGovernanceDataRequest() {
log.warn("We received a RepublishGovernanceDataRequest and re-published all proposalPayloads and " +
"blindVotePayloads to the P2P network.");
missingDataRequestService.reRepublishAllGovernanceData();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
*/
@Slf4j
class GetBlocksRequestHandler {
private static final long TIMEOUT = 120;
private static final long TIMEOUT_MIN = 3;


///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -89,22 +89,28 @@ public GetBlocksRequestHandler(NetworkNode networkNode, DaoStateService daoState
// API
///////////////////////////////////////////////////////////////////////////////////////////

public void onGetBlocksRequest(GetBlocksRequest getBlocksRequest, final Connection connection) {
public void onGetBlocksRequest(GetBlocksRequest getBlocksRequest, Connection connection) {
long ts = System.currentTimeMillis();
// We limit number of blocks to 6000 which is about 1.5 month.
List<Block> blocks = new LinkedList<>(daoStateService.getBlocksFromBlockHeight(getBlocksRequest.getFromBlockHeight(), 6000));
List<RawBlock> rawBlocks = blocks.stream().map(RawBlock::fromBlock).collect(Collectors.toList());
GetBlocksResponse getBlocksResponse = new GetBlocksResponse(rawBlocks, getBlocksRequest.getNonce());
log.info("Received GetBlocksRequest from {} for blocks from height {}",
connection.getPeersNodeAddressOptional(), getBlocksRequest.getFromBlockHeight());
if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
String errorMessage = "A timeout occurred for getBlocksResponse.requestNonce:" +
getBlocksResponse.getRequestNonce() +
" on connection:" + connection;
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection);
},
TIMEOUT, TimeUnit.SECONDS);
log.info("Received GetBlocksRequest from {} for blocks from height {}. " +
"Building GetBlocksResponse with {} blocks took {} ms.",
connection.getPeersNodeAddressOptional(), getBlocksRequest.getFromBlockHeight(),
rawBlocks.size(), System.currentTimeMillis() - ts);

if (timeoutTimer != null) {
timeoutTimer.stop();
log.warn("Timeout was already running. We stopped it.");
}
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
String errorMessage = "A timeout occurred for getBlocksResponse.requestNonce:" +
getBlocksResponse.getRequestNonce() +
" on connection: " + connection;
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection);
},
TIMEOUT_MIN, TimeUnit.MINUTES);

SettableFuture<Connection> future = networkNode.sendMessage(connection, getBlocksResponse);
Futures.addCallback(future, new FutureCallback<>() {
Expand Down Expand Up @@ -145,7 +151,7 @@ public void stop() {

private void handleFault(String errorMessage, CloseConnectionReason closeConnectionReason, Connection connection) {
if (!stopped) {
log.debug(errorMessage + "\n\tcloseConnectionReason=" + closeConnectionReason);
log.warn("{}, closeConnectionReason={}", errorMessage, closeConnectionReason);
cleanup();
listener.onFault(errorMessage, connection);
} else {
Expand Down
Loading

0 comments on commit 9cc3f68

Please sign in to comment.