diff --git a/core/src/main/java/bisq/core/dao/DaoFacade.java b/core/src/main/java/bisq/core/dao/DaoFacade.java index d8d4ec72021..15712f5ab7f 100644 --- a/core/src/main/java/bisq/core/dao/DaoFacade.java +++ b/core/src/main/java/bisq/core/dao/DaoFacade.java @@ -624,8 +624,8 @@ public Set getUnspentTxOutputs() { return daoStateService.getUnspentTxOutputs(); } - public Set getTxs() { - return daoStateService.getTxs(); + public int getNumTxs() { + return daoStateService.getNumTxs(); } public Optional getLockupTxOutput(String txId) { diff --git a/core/src/main/java/bisq/core/dao/node/explorer/ExportJsonFilesService.java b/core/src/main/java/bisq/core/dao/node/explorer/ExportJsonFilesService.java index 2d0632435c9..2ddcd5355de 100644 --- a/core/src/main/java/bisq/core/dao/node/explorer/ExportJsonFilesService.java +++ b/core/src/main/java/bisq/core/dao/node/explorer/ExportJsonFilesService.java @@ -140,7 +140,7 @@ public void maybeExportToJson() { // Access to daoStateService is single threaded, we must not access daoStateService from the thread. List allJsonTxOutputs = new ArrayList<>(); - List jsonTxs = daoStateService.getTxStream() + List jsonTxs = daoStateService.getUnorderedTxStream() .map(tx -> { JsonTx jsonTx = getJsonTx(tx); allJsonTxOutputs.addAll(jsonTx.getOutputs()); diff --git a/core/src/main/java/bisq/core/dao/node/parser/BlockParser.java b/core/src/main/java/bisq/core/dao/node/parser/BlockParser.java index 859ad34cfa4..900634bfb07 100644 --- a/core/src/main/java/bisq/core/dao/node/parser/BlockParser.java +++ b/core/src/main/java/bisq/core/dao/node/parser/BlockParser.java @@ -22,7 +22,6 @@ import bisq.core.dao.node.parser.exceptions.BlockHeightNotConnectingException; import bisq.core.dao.state.DaoStateService; import bisq.core.dao.state.model.blockchain.Block; -import bisq.core.dao.state.model.blockchain.Tx; import bisq.common.app.DevEnv; @@ -31,7 +30,6 @@ import javax.inject.Inject; import java.util.LinkedList; -import java.util.List; import lombok.extern.slf4j.Slf4j; @@ -55,7 +53,6 @@ public class BlockParser { // Constructor /////////////////////////////////////////////////////////////////////////////////////////// - @SuppressWarnings("WeakerAccess") @Inject public BlockParser(TxParser txParser, DaoStateService daoStateService) { @@ -106,14 +103,13 @@ public Block parseBlock(RawBlock rawBlock) throws BlockHashNotConnectingExceptio // one get resolved. // Lately there is a patter with 24 iterations observed long startTs = System.currentTimeMillis(); - List txList = block.getTxs(); rawBlock.getRawTxs().forEach(rawTx -> txParser.findTx(rawTx, genesisTxId, genesisBlockHeight, genesisTotalSupply) - .ifPresent(txList::add)); + .ifPresent(tx -> daoStateService.onNewTxForLastBlock(block, tx))); log.info("Parsing {} transactions at block height {} took {} ms", rawBlock.getRawTxs().size(), blockHeight, System.currentTimeMillis() - startTs); diff --git a/core/src/main/java/bisq/core/dao/state/DaoStateService.java b/core/src/main/java/bisq/core/dao/state/DaoStateService.java index 8cbfc5553d6..5ad729e25d5 100644 --- a/core/src/main/java/bisq/core/dao/state/DaoStateService.java +++ b/core/src/main/java/bisq/core/dao/state/DaoStateService.java @@ -35,8 +35,8 @@ import bisq.core.dao.state.model.governance.Issuance; import bisq.core.dao.state.model.governance.IssuanceType; import bisq.core.dao.state.model.governance.ParamChange; -import bisq.core.util.coin.BsqFormatter; import bisq.core.util.ParsingUtils; +import bisq.core.util.coin.BsqFormatter; import org.bitcoinj.core.Coin; @@ -115,6 +115,8 @@ public void applySnapshot(DaoState snapshot) { daoState.setChainHeight(snapshot.getChainHeight()); + daoState.setTxCache(snapshot.getTxCache()); + daoState.getBlocks().clear(); daoState.getBlocks().addAll(snapshot.getBlocks()); @@ -226,7 +228,25 @@ public void onNewBlockWithEmptyTxs(Block block) { } } - // Third we get the onParseBlockComplete called after all rawTxs of blocks have been parsed + // Third we add each successfully parsed BSQ tx to the last block + public void onNewTxForLastBlock(Block block, Tx tx) { + assertDaoStateChange(); + + getLastBlock().ifPresent(lastBlock -> { + if (block == lastBlock) { + // We need to ensure that the txs in all blocks are in sync with the txs in our txMap (cache). + block.addTx(tx); + daoState.addToTxCache(tx); + } else { + // Not clear if this case can happen but at onNewBlockWithEmptyTxs we handle such a potential edge + // case as well, so we need to reflect that here as well. + log.warn("Block for parsing does not match last block. That might happen in edge cases at reorgs. " + + "Received block={}", block); + } + }); + } + + // Fourth we get the onParseBlockComplete called after all rawTxs of blocks have been parsed public void onParseBlockComplete(Block block) { if (parseBlockChainComplete) log.info("Parse block completed: Block height {}, {} BSQ transactions.", block.getHeight(), block.getTxs().size()); @@ -343,29 +363,24 @@ public Optional getGenesisTx() { // Tx /////////////////////////////////////////////////////////////////////////////////////////// - public Stream getTxStream() { - return getBlocks().stream() - .flatMap(block -> block.getTxs().stream()); + public Stream getUnorderedTxStream() { + return daoState.getTxCache().values().stream(); } - public TreeMap getTxMap() { - return new TreeMap<>(getTxStream().collect(Collectors.toMap(Tx::getId, tx -> tx))); - } - - public Set getTxs() { - return getTxStream().collect(Collectors.toSet()); - } - - public Optional getTx(String txId) { - return getTxStream().filter(tx -> tx.getId().equals(txId)).findAny(); + public int getNumTxs() { + return daoState.getTxCache().size(); } public List getInvalidTxs() { - return getTxStream().filter(tx -> tx.getTxType() == TxType.INVALID).collect(Collectors.toList()); + return getUnorderedTxStream().filter(tx -> tx.getTxType() == TxType.INVALID).collect(Collectors.toList()); } public List getIrregularTxs() { - return getTxStream().filter(tx -> tx.getTxType() == TxType.IRREGULAR).collect(Collectors.toList()); + return getUnorderedTxStream().filter(tx -> tx.getTxType() == TxType.IRREGULAR).collect(Collectors.toList()); + } + + public Optional getTx(String txId) { + return Optional.ofNullable(daoState.getTxCache().get(txId)); } public boolean containsTx(String txId) { @@ -395,11 +410,11 @@ public boolean hasTxBurntFee(String txId) { } public long getTotalBurntFee() { - return getTxStream().mapToLong(Tx::getBurntFee).sum(); + return getUnorderedTxStream().mapToLong(Tx::getBurntFee).sum(); } public Set getBurntFeeTxs() { - return getTxStream() + return getUnorderedTxStream() .filter(tx -> tx.getBurntFee() > 0) .collect(Collectors.toSet()); } @@ -418,17 +433,17 @@ public Optional getConnectedTxOutput(TxInput txInput) { // TxOutput /////////////////////////////////////////////////////////////////////////////////////////// - public Stream getTxOutputStream() { - return getTxStream() + private Stream getUnorderedTxOutputStream() { + return getUnorderedTxStream() .flatMap(tx -> tx.getTxOutputs().stream()); } public boolean existsTxOutput(TxOutputKey key) { - return getTxOutputStream().anyMatch(txOutput -> txOutput.getKey().equals(key)); + return getUnorderedTxOutputStream().anyMatch(txOutput -> txOutput.getKey().equals(key)); } public Optional getTxOutput(TxOutputKey txOutputKey) { - return getTxOutputStream() + return getUnorderedTxOutputStream() .filter(txOutput -> txOutput.getKey().equals(txOutputKey)) .findAny(); } @@ -513,8 +528,8 @@ public boolean isTxOutputSpendable(TxOutputKey key) { // TxOutputType /////////////////////////////////////////////////////////////////////////////////////////// - public Set getTxOutputsByTxOutputType(TxOutputType txOutputType) { - return getTxOutputStream() + private Set getTxOutputsByTxOutputType(TxOutputType txOutputType) { + return getUnorderedTxOutputStream() .filter(txOutput -> txOutput.getTxOutputType() == txOutputType) .collect(Collectors.toSet()); } @@ -823,12 +838,12 @@ public long getTotalAmountOfConfiscatedTxOutputs() { } public long getTotalAmountOfInvalidatedBsq() { - return getTxStream().mapToLong(Tx::getInvalidatedBsq).sum(); + return getUnorderedTxStream().mapToLong(Tx::getInvalidatedBsq).sum(); } // Contains burnt fee and invalidated bsq due invalid txs public long getTotalAmountOfBurntBsq() { - return getTxStream().mapToLong(Tx::getBurntBsq).sum(); + return getUnorderedTxStream().mapToLong(Tx::getBurntBsq).sum(); } // Confiscate bond diff --git a/core/src/main/java/bisq/core/dao/state/model/DaoState.java b/core/src/main/java/bisq/core/dao/state/model/DaoState.java index 0d98cbe996a..c18338b033a 100644 --- a/core/src/main/java/bisq/core/dao/state/model/DaoState.java +++ b/core/src/main/java/bisq/core/dao/state/model/DaoState.java @@ -19,6 +19,7 @@ import bisq.core.dao.state.model.blockchain.Block; import bisq.core.dao.state.model.blockchain.SpentInfo; +import bisq.core.dao.state.model.blockchain.Tx; import bisq.core.dao.state.model.blockchain.TxOutput; import bisq.core.dao.state.model.blockchain.TxOutputKey; import bisq.core.dao.state.model.governance.Cycle; @@ -28,22 +29,25 @@ import bisq.core.dao.state.model.governance.ParamChange; import bisq.common.proto.persistable.PersistablePayload; +import bisq.common.util.JsonExclude; import com.google.protobuf.Message; import javax.inject.Inject; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.function.Function; import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; - /** * Root class for mutable state of the DAO. * Holds both blockchain data as well as data derived from the governance process (voting). @@ -98,6 +102,10 @@ public static DaoState getClone(DaoState daoState) { @Getter private final List decryptedBallotsWithMeritsList; + // Transient data used only as an index - must be kept in sync with the block list + @JsonExclude + private transient final Map txCache; // key is txId + /////////////////////////////////////////////////////////////////////////////////////////// // Constructor @@ -145,6 +153,10 @@ private DaoState(int chainHeight, this.paramChangeList = paramChangeList; this.evaluatedProposalList = evaluatedProposalList; this.decryptedBallotsWithMeritsList = decryptedBallotsWithMeritsList; + + txCache = blocks.stream() + .flatMap(block -> block.getTxs().stream()) + .collect(Collectors.toMap(Tx::getId, Function.identity(), (x, y) -> x, HashMap::new)); } @Override @@ -224,6 +236,21 @@ public byte[] getSerializedStateForHashChain() { return getBsqStateBuilderExcludingBlocks().addBlocks(getBlocks().getLast().toProtoMessage()).build().toByteArray(); } + public void addToTxCache(Tx tx) { + // We shouldn't get duplicate txIds, but use putIfAbsent instead of put for consistency with the map merge + // function used in the constructor to initialise txCache (and to exactly match the pre-caching behaviour). + txCache.putIfAbsent(tx.getId(), tx); + } + + public void setTxCache(Map txCache) { + this.txCache.clear(); + this.txCache.putAll(txCache); + } + + public Map getTxCache() { + return Collections.unmodifiableMap(txCache); + } + @Override public String toString() { return "DaoState{" + @@ -237,6 +264,7 @@ public String toString() { ",\n paramChangeList=" + paramChangeList + ",\n evaluatedProposalList=" + evaluatedProposalList + ",\n decryptedBallotsWithMeritsList=" + decryptedBallotsWithMeritsList + + ",\n txCache=" + txCache + "\n}"; } } diff --git a/core/src/main/java/bisq/core/dao/state/model/blockchain/Block.java b/core/src/main/java/bisq/core/dao/state/model/blockchain/Block.java index 2763b74c69f..e438197e932 100644 --- a/core/src/main/java/bisq/core/dao/state/model/blockchain/Block.java +++ b/core/src/main/java/bisq/core/dao/state/model/blockchain/Block.java @@ -24,11 +24,11 @@ import com.google.common.collect.ImmutableList; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import lombok.EqualsAndHashCode; -import lombok.Value; /** * The Block which gets persisted in the DaoState. During parsing transactions can be @@ -44,8 +44,8 @@ * */ @EqualsAndHashCode(callSuper = true) -@Value public final class Block extends BaseBlock implements PersistablePayload, ImmutableDaoStateModel { + // We do not expose txs with a Lombok getter. We cannot make it immutable as we add transactions during parsing. private final List txs; public Block(int height, long time, String hash, String previousBlockHash) { @@ -93,6 +93,17 @@ public static Block fromProto(protobuf.BaseBlock proto) { txs); } + public void addTx(Tx tx) { + txs.add(tx); + } + + // We want to guarantee that no client can modify the list. We use unmodifiableList and not ImmutableList as + // we want that clients reflect any change to the source list. Also ImmutableList is more expensive as it + // creates a copy. + public List getTxs() { + return Collections.unmodifiableList(txs); + } + @Override public String toString() { return "Block{" + diff --git a/desktop/src/main/java/bisq/desktop/main/dao/economy/transactions/BSQTransactionsView.java b/desktop/src/main/java/bisq/desktop/main/dao/economy/transactions/BSQTransactionsView.java index 95effae2877..36cd962320b 100644 --- a/desktop/src/main/java/bisq/desktop/main/dao/economy/transactions/BSQTransactionsView.java +++ b/desktop/src/main/java/bisq/desktop/main/dao/economy/transactions/BSQTransactionsView.java @@ -143,7 +143,7 @@ public void onParseBlockCompleteAfterBatchProcessing(Block block) { /////////////////////////////////////////////////////////////////////////////////////////// private void updateWithBsqBlockChainData() { - allTxTextField.setText(String.valueOf(daoFacade.getTxs().size())); + allTxTextField.setText(String.valueOf(daoFacade.getNumTxs())); utxoTextField.setText(String.valueOf(daoFacade.getUnspentTxOutputs().size())); compensationIssuanceTxTextField.setText(String.valueOf(daoFacade.getNumIssuanceTransactions(IssuanceType.COMPENSATION))); reimbursementIssuanceTxTextField.setText(String.valueOf(daoFacade.getNumIssuanceTransactions(IssuanceType.REIMBURSEMENT)));