Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve getBlocks request handling #4851

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private void sendRepublishRequest(NodeAddress nodeAddress) {
connectToNextNode();
} else {
log.warn("We have stopped already. We ignore that timeoutTimer.run call. " +
"Might be caused by an previous networkNode.sendMessage.onFailure.");
"Might be caused by a previous networkNode.sendMessage.onFailure.");
}
},
TIMEOUT);
Expand All @@ -118,7 +118,7 @@ public void onSuccess(Connection connection) {
stop();
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call." +
"Might be caused by an previous timeout.");
"Might be caused by a previous timeout.");
}
}

Expand All @@ -133,7 +133,7 @@ public void onFailure(@NotNull Throwable throwable) {
connectToNextNode();
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onFailure call. " +
"Might be caused by an previous timeout.");
"Might be caused by a previous timeout.");
}
}
}, MoreExecutors.directExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void requestStateHashes(int fromHeight) {
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_TIMEOUT);
} else {
log.trace("We have stopped already. We ignore that timeoutTimer.run call. " +
"Might be caused by an previous networkNode.sendMessage.onFailure.");
"Might be caused by a previous networkNode.sendMessage.onFailure.");
}
},
TIMEOUT);
Expand All @@ -134,7 +134,7 @@ public void onSuccess(Connection connection) {
nodeAddress.getFullAddress());
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call." +
"Might be caused by an previous timeout.");
"Might be caused by a previous timeout.");
}
}

Expand All @@ -149,7 +149,7 @@ public void onFailure(@NotNull Throwable throwable) {
handleFault(errorMessage, nodeAddress, CloseConnectionReason.SEND_MSG_FAILURE);
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onFailure call. " +
"Might be caused by an previous timeout.");
"Might be caused by a previous timeout.");
}
}
}, MoreExecutors.directExecutor());
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/java/bisq/core/dao/node/BsqNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ protected int getStartBlockHeight() {
if (chainHeight > genesisBlockHeight)
startBlockHeight = chainHeight + 1;

log.info("Start parse blocks:\n" +
log.info("getStartBlockHeight:\n" +
" Start block height={}\n" +
" Genesis txId={}\n" +
" Genesis block height={}\n" +
Expand Down Expand Up @@ -223,15 +223,14 @@ protected Optional<Block> doParseBlock(RawBlock rawBlock) throws RequiredReorgFr
// height we have no block but chainHeight is initially set to genesis height (bad design ;-( but a bit tricky
// to change now as it used in many areas.)
if (daoStateService.getBlockAtHeight(rawBlock.getHeight()).isPresent()) {
log.debug("We have already a block with the height of the new block. Height of new block={}", rawBlock.getHeight());
log.info("We have already a block with the height of the new block. Height of new block={}", rawBlock.getHeight());
return Optional.empty();
}

try {
Block block = blockParser.parseBlock(rawBlock);

if (pendingBlocks.contains(rawBlock))
pendingBlocks.remove(rawBlock);
pendingBlocks.remove(rawBlock);

// After parsing we check if we have pending blocks we might have received earlier but which have been
// not connecting from the latest height we had. The list is sorted by height
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ enum JsonTxOutputType {
INVALID_OUTPUT("Invalid");

@Getter
private String displayString;
private final String displayString;

JsonTxOutputType(String displayString) {
this.displayString = displayString;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ enum JsonTxType {
IRREGULAR("Irregular");

@Getter
private String displayString;
private final String displayString;

JsonTxType(String displayString) {
this.displayString = displayString;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,51 +136,62 @@ public void onAwakeFromStandby() {
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof GetBlocksRequest) {
// We received a GetBlocksRequest from a liteNode
if (!stopped) {
final String uid = connection.getUid();
if (!getBlocksRequestHandlers.containsKey(uid)) {
GetBlocksRequestHandler requestHandler = new GetBlocksRequestHandler(networkNode,
daoStateService,
new GetBlocksRequestHandler.Listener() {
@Override
public void onComplete() {
getBlocksRequestHandlers.remove(uid);
}

@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
getBlocksRequestHandlers.remove(uid);
if (!stopped) {
log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" +
"ErrorMessage={}", connection, errorMessage);
peerManager.handleConnectionFault(connection);
} else {
log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call.");
}
}
});
getBlocksRequestHandlers.put(uid, requestHandler);
requestHandler.onGetBlocksRequest((GetBlocksRequest) networkEnvelope, connection);
} else {
log.warn("We have already a GetDataRequestHandler for that connection started. " +
"We start a cleanup timer if the handler has not closed by itself in between 2 minutes.");

UserThread.runAfter(() -> {
if (getBlocksRequestHandlers.containsKey(uid)) {
GetBlocksRequestHandler handler = getBlocksRequestHandlers.get(uid);
handler.stop();
getBlocksRequestHandlers.remove(uid);
}
}, CLEANUP_TIMER);
}
} else {
log.warn("We have stopped already. We ignore that onMessage call.");
}
handleGetBlocksRequest((GetBlocksRequest) networkEnvelope, connection);
} else if (networkEnvelope instanceof RepublishGovernanceDataRequest) {
log.warn("We received a RepublishGovernanceDataRequest and re-published all proposalPayloads and " +
"blindVotePayloads to the P2P network.");
missingDataRequestService.reRepublishAllGovernanceData();
handleRepublishGovernanceDataRequest();
}
}

private void handleGetBlocksRequest(GetBlocksRequest getBlocksRequest, Connection connection) {
if (stopped) {
log.warn("We have stopped already. We ignore that onMessage call.");
return;
}

String uid = connection.getUid();
if (getBlocksRequestHandlers.containsKey(uid)) {
log.warn("We have already a GetDataRequestHandler for that connection started. " +
"We start a cleanup timer if the handler has not closed by itself in between 2 minutes.");

UserThread.runAfter(() -> {
if (getBlocksRequestHandlers.containsKey(uid)) {
GetBlocksRequestHandler handler = getBlocksRequestHandlers.get(uid);
handler.stop();
getBlocksRequestHandlers.remove(uid);
}
}, CLEANUP_TIMER);
return;
}

GetBlocksRequestHandler requestHandler = new GetBlocksRequestHandler(networkNode,
daoStateService,
new GetBlocksRequestHandler.Listener() {
@Override
public void onComplete() {
getBlocksRequestHandlers.remove(uid);
}

@Override
public void onFault(String errorMessage, @Nullable Connection connection) {
getBlocksRequestHandlers.remove(uid);
if (!stopped) {
log.trace("GetDataRequestHandler failed.\n\tConnection={}\n\t" +
"ErrorMessage={}", connection, errorMessage);
if (connection != null) {
peerManager.handleConnectionFault(connection);
}
} else {
log.warn("We have stopped already. We ignore that getDataRequestHandler.handle.onFault call.");
}
}
});
getBlocksRequestHandlers.put(uid, requestHandler);
requestHandler.onGetBlocksRequest(getBlocksRequest, connection);
}

private void handleRepublishGovernanceDataRequest() {
log.warn("We received a RepublishGovernanceDataRequest and re-published all proposalPayloads and " +
"blindVotePayloads to the P2P network.");
missingDataRequestService.reRepublishAllGovernanceData();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
*/
@Slf4j
class GetBlocksRequestHandler {
private static final long TIMEOUT = 120;
private static final long TIMEOUT_MIN = 3;


///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -89,22 +89,28 @@ public GetBlocksRequestHandler(NetworkNode networkNode, DaoStateService daoState
// API
///////////////////////////////////////////////////////////////////////////////////////////

public void onGetBlocksRequest(GetBlocksRequest getBlocksRequest, final Connection connection) {
public void onGetBlocksRequest(GetBlocksRequest getBlocksRequest, Connection connection) {
long ts = System.currentTimeMillis();
// We limit number of blocks to 6000 which is about 1.5 month.
List<Block> blocks = new LinkedList<>(daoStateService.getBlocksFromBlockHeight(getBlocksRequest.getFromBlockHeight(), 6000));
List<RawBlock> rawBlocks = blocks.stream().map(RawBlock::fromBlock).collect(Collectors.toList());
GetBlocksResponse getBlocksResponse = new GetBlocksResponse(rawBlocks, getBlocksRequest.getNonce());
log.info("Received GetBlocksRequest from {} for blocks from height {}",
connection.getPeersNodeAddressOptional(), getBlocksRequest.getFromBlockHeight());
if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
String errorMessage = "A timeout occurred for getBlocksResponse.requestNonce:" +
getBlocksResponse.getRequestNonce() +
" on connection:" + connection;
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection);
},
TIMEOUT, TimeUnit.SECONDS);
log.info("Received GetBlocksRequest from {} for blocks from height {}. " +
"Building GetBlocksResponse with {} blocks took {} ms.",
connection.getPeersNodeAddressOptional(), getBlocksRequest.getFromBlockHeight(),
rawBlocks.size(), System.currentTimeMillis() - ts);

if (timeoutTimer != null) {
timeoutTimer.stop();
log.warn("Timeout was already running. We stopped it.");
}
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
String errorMessage = "A timeout occurred for getBlocksResponse.requestNonce:" +
getBlocksResponse.getRequestNonce() +
" on connection: " + connection;
handleFault(errorMessage, CloseConnectionReason.SEND_MSG_TIMEOUT, connection);
},
TIMEOUT_MIN, TimeUnit.MINUTES);

SettableFuture<Connection> future = networkNode.sendMessage(connection, getBlocksResponse);
Futures.addCallback(future, new FutureCallback<>() {
Expand Down Expand Up @@ -145,7 +151,7 @@ public void stop() {

private void handleFault(String errorMessage, CloseConnectionReason closeConnectionReason, Connection connection) {
if (!stopped) {
log.debug(errorMessage + "\n\tcloseConnectionReason=" + closeConnectionReason);
log.warn("{}, closeConnectionReason={}", errorMessage, closeConnectionReason);
cleanup();
listener.onFault(errorMessage, connection);
} else {
Expand Down
Loading