Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run processAccountingBlocks async in forkjoinpool thread #6505

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,15 @@ public static Sha256Hash getSha256Hash(AccountingBlock block) {

@Nullable
public static Sha256Hash getSha256Hash(Collection<AccountingBlock> blocks) {
long ts = System.currentTimeMillis();
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
for (AccountingBlock accountingBlock : blocks) {
outputStream.write(accountingBlock.toProtoMessage().toByteArray());
}
return Sha256Hash.of(outputStream.toByteArray());
Sha256Hash hash = Sha256Hash.of(outputStream.toByteArray());
// 2833 blocks takes about 23 ms
log.info("getSha256Hash for {} blocks took {} ms", blocks.size(), System.currentTimeMillis() - ts);
return hash;
} catch (IOException e) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -191,35 +193,44 @@ protected void applyReOrg() {
///////////////////////////////////////////////////////////////////////////////////////////

private void processAccountingBlocks(List<AccountingBlock> blocks) {
log.info("We received blocks from height {} to {}",
blocks.get(0).getHeight(),
blocks.get(blocks.size() - 1).getHeight());

boolean requiresReOrg = false;
for (AccountingBlock block : blocks) {
try {
burningManAccountingService.addBlock(block);
} catch (BlockHeightNotConnectingException e) {
log.info("Height not connecting. This could happen if we received multiple responses and had already applied a previous one. {}", e.toString());
} catch (BlockHashNotConnectingException e) {
log.warn("Interrupt loop because a reorg is required. {}", e.toString());
requiresReOrg = true;
break;
CompletableFuture.runAsync(() -> {
long ts = System.currentTimeMillis();
log.info("We received blocks from height {} to {}",
blocks.get(0).getHeight(),
blocks.get(blocks.size() - 1).getHeight());

AtomicBoolean requiresReOrg = new AtomicBoolean(false);
for (AccountingBlock block : blocks) {
try {
burningManAccountingService.addBlock(block);
} catch (BlockHeightNotConnectingException e) {
log.info("Height not connecting. This could happen if we received multiple responses and had already applied a previous one. {}", e.toString());
} catch (BlockHashNotConnectingException e) {
log.warn("Interrupt loop because a reorg is required. {}", e.toString());
requiresReOrg.set(true);
break;
}
}
}
if (requiresReOrg) {
applyReOrg();
return;
}

int heightOfLastBlock = burningManAccountingService.getBlockHeightOfLastBlock();
if (walletsSetup.isDownloadComplete() && heightOfLastBlock < bsqWalletService.getBestChainHeight()) {
accountingLiteNodeNetworkService.requestBlocks(heightOfLastBlock + 1);
} else {
if (!initialBlockRequestsComplete) {
onInitialBlockRequestsComplete();
}
}
UserThread.execute(() -> {
if (requiresReOrg.get()) {
applyReOrg();
return;
}

int heightOfLastBlock = burningManAccountingService.getBlockHeightOfLastBlock();
if (walletsSetup.isDownloadComplete() && heightOfLastBlock < bsqWalletService.getBestChainHeight()) {
accountingLiteNodeNetworkService.requestBlocks(heightOfLastBlock + 1);
} else {
if (!initialBlockRequestsComplete) {
onInitialBlockRequestsComplete();
}
}

// 2833 blocks takes about 24 sec
log.info("processAccountingBlocksAsync for {} blocks took {} ms", blocks.size(), System.currentTimeMillis() - ts);
});
});
}

private void processNewAccountingBlock(AccountingBlock accountingBlock) {
Expand Down