Skip to content

Commit

Permalink
Merge pull request #3773 from stejbac/add-tx-map-to-daostate
Browse files Browse the repository at this point in the history
Add transient tx map to DaoState to speed up getTx queries
  • Loading branch information
ripcurlx authored Dec 16, 2019
2 parents db6c13a + b22e4ad commit 20b56c7
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 39 deletions.
4 changes: 2 additions & 2 deletions core/src/main/java/bisq/core/dao/DaoFacade.java
Original file line number Diff line number Diff line change
Expand Up @@ -624,8 +624,8 @@ public Set<TxOutput> getUnspentTxOutputs() {
return daoStateService.getUnspentTxOutputs();
}

public Set<Tx> getTxs() {
return daoStateService.getTxs();
public int getNumTxs() {
return daoStateService.getNumTxs();
}

public Optional<TxOutput> getLockupTxOutput(String txId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void maybeExportToJson() {
// Access to daoStateService is single threaded, we must not access daoStateService from the thread.
List<JsonTxOutput> allJsonTxOutputs = new ArrayList<>();

List<JsonTx> jsonTxs = daoStateService.getTxStream()
List<JsonTx> jsonTxs = daoStateService.getUnorderedTxStream()
.map(tx -> {
JsonTx jsonTx = getJsonTx(tx);
allJsonTxOutputs.addAll(jsonTx.getOutputs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import bisq.core.dao.node.parser.exceptions.BlockHeightNotConnectingException;
import bisq.core.dao.state.DaoStateService;
import bisq.core.dao.state.model.blockchain.Block;
import bisq.core.dao.state.model.blockchain.Tx;

import bisq.common.app.DevEnv;

Expand All @@ -31,7 +30,6 @@
import javax.inject.Inject;

import java.util.LinkedList;
import java.util.List;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -55,7 +53,6 @@ public class BlockParser {
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////

@SuppressWarnings("WeakerAccess")
@Inject
public BlockParser(TxParser txParser,
DaoStateService daoStateService) {
Expand Down Expand Up @@ -106,14 +103,13 @@ public Block parseBlock(RawBlock rawBlock) throws BlockHashNotConnectingExceptio
// one get resolved.
// Lately there is a patter with 24 iterations observed
long startTs = System.currentTimeMillis();
List<Tx> txList = block.getTxs();

rawBlock.getRawTxs().forEach(rawTx ->
txParser.findTx(rawTx,
genesisTxId,
genesisBlockHeight,
genesisTotalSupply)
.ifPresent(txList::add));
.ifPresent(tx -> daoStateService.onNewTxForLastBlock(block, tx)));

log.info("Parsing {} transactions at block height {} took {} ms", rawBlock.getRawTxs().size(),
blockHeight, System.currentTimeMillis() - startTs);
Expand Down
69 changes: 42 additions & 27 deletions core/src/main/java/bisq/core/dao/state/DaoStateService.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import bisq.core.dao.state.model.governance.Issuance;
import bisq.core.dao.state.model.governance.IssuanceType;
import bisq.core.dao.state.model.governance.ParamChange;
import bisq.core.util.coin.BsqFormatter;
import bisq.core.util.ParsingUtils;
import bisq.core.util.coin.BsqFormatter;

import org.bitcoinj.core.Coin;

Expand Down Expand Up @@ -115,6 +115,8 @@ public void applySnapshot(DaoState snapshot) {

daoState.setChainHeight(snapshot.getChainHeight());

daoState.setTxCache(snapshot.getTxCache());

daoState.getBlocks().clear();
daoState.getBlocks().addAll(snapshot.getBlocks());

Expand Down Expand Up @@ -226,7 +228,25 @@ public void onNewBlockWithEmptyTxs(Block block) {
}
}

// Third we get the onParseBlockComplete called after all rawTxs of blocks have been parsed
// Third we add each successfully parsed BSQ tx to the last block
public void onNewTxForLastBlock(Block block, Tx tx) {
assertDaoStateChange();

getLastBlock().ifPresent(lastBlock -> {
if (block == lastBlock) {
// We need to ensure that the txs in all blocks are in sync with the txs in our txMap (cache).
block.addTx(tx);
daoState.addToTxCache(tx);
} else {
// Not clear if this case can happen but at onNewBlockWithEmptyTxs we handle such a potential edge
// case as well, so we need to reflect that here as well.
log.warn("Block for parsing does not match last block. That might happen in edge cases at reorgs. " +
"Received block={}", block);
}
});
}

// Fourth we get the onParseBlockComplete called after all rawTxs of blocks have been parsed
public void onParseBlockComplete(Block block) {
if (parseBlockChainComplete)
log.info("Parse block completed: Block height {}, {} BSQ transactions.", block.getHeight(), block.getTxs().size());
Expand Down Expand Up @@ -343,29 +363,24 @@ public Optional<Tx> getGenesisTx() {
// Tx
///////////////////////////////////////////////////////////////////////////////////////////

public Stream<Tx> getTxStream() {
return getBlocks().stream()
.flatMap(block -> block.getTxs().stream());
public Stream<Tx> getUnorderedTxStream() {
return daoState.getTxCache().values().stream();
}

public TreeMap<String, Tx> getTxMap() {
return new TreeMap<>(getTxStream().collect(Collectors.toMap(Tx::getId, tx -> tx)));
}

public Set<Tx> getTxs() {
return getTxStream().collect(Collectors.toSet());
}

public Optional<Tx> getTx(String txId) {
return getTxStream().filter(tx -> tx.getId().equals(txId)).findAny();
public int getNumTxs() {
return daoState.getTxCache().size();
}

public List<Tx> getInvalidTxs() {
return getTxStream().filter(tx -> tx.getTxType() == TxType.INVALID).collect(Collectors.toList());
return getUnorderedTxStream().filter(tx -> tx.getTxType() == TxType.INVALID).collect(Collectors.toList());
}

public List<Tx> getIrregularTxs() {
return getTxStream().filter(tx -> tx.getTxType() == TxType.IRREGULAR).collect(Collectors.toList());
return getUnorderedTxStream().filter(tx -> tx.getTxType() == TxType.IRREGULAR).collect(Collectors.toList());
}

public Optional<Tx> getTx(String txId) {
return Optional.ofNullable(daoState.getTxCache().get(txId));
}

public boolean containsTx(String txId) {
Expand Down Expand Up @@ -395,11 +410,11 @@ public boolean hasTxBurntFee(String txId) {
}

public long getTotalBurntFee() {
return getTxStream().mapToLong(Tx::getBurntFee).sum();
return getUnorderedTxStream().mapToLong(Tx::getBurntFee).sum();
}

public Set<Tx> getBurntFeeTxs() {
return getTxStream()
return getUnorderedTxStream()
.filter(tx -> tx.getBurntFee() > 0)
.collect(Collectors.toSet());
}
Expand All @@ -418,17 +433,17 @@ public Optional<TxOutput> getConnectedTxOutput(TxInput txInput) {
// TxOutput
///////////////////////////////////////////////////////////////////////////////////////////

public Stream<TxOutput> getTxOutputStream() {
return getTxStream()
private Stream<TxOutput> getUnorderedTxOutputStream() {
return getUnorderedTxStream()
.flatMap(tx -> tx.getTxOutputs().stream());
}

public boolean existsTxOutput(TxOutputKey key) {
return getTxOutputStream().anyMatch(txOutput -> txOutput.getKey().equals(key));
return getUnorderedTxOutputStream().anyMatch(txOutput -> txOutput.getKey().equals(key));
}

public Optional<TxOutput> getTxOutput(TxOutputKey txOutputKey) {
return getTxOutputStream()
return getUnorderedTxOutputStream()
.filter(txOutput -> txOutput.getKey().equals(txOutputKey))
.findAny();
}
Expand Down Expand Up @@ -513,8 +528,8 @@ public boolean isTxOutputSpendable(TxOutputKey key) {
// TxOutputType
///////////////////////////////////////////////////////////////////////////////////////////

public Set<TxOutput> getTxOutputsByTxOutputType(TxOutputType txOutputType) {
return getTxOutputStream()
private Set<TxOutput> getTxOutputsByTxOutputType(TxOutputType txOutputType) {
return getUnorderedTxOutputStream()
.filter(txOutput -> txOutput.getTxOutputType() == txOutputType)
.collect(Collectors.toSet());
}
Expand Down Expand Up @@ -823,12 +838,12 @@ public long getTotalAmountOfConfiscatedTxOutputs() {
}

public long getTotalAmountOfInvalidatedBsq() {
return getTxStream().mapToLong(Tx::getInvalidatedBsq).sum();
return getUnorderedTxStream().mapToLong(Tx::getInvalidatedBsq).sum();
}

// Contains burnt fee and invalidated bsq due invalid txs
public long getTotalAmountOfBurntBsq() {
return getTxStream().mapToLong(Tx::getBurntBsq).sum();
return getUnorderedTxStream().mapToLong(Tx::getBurntBsq).sum();
}

// Confiscate bond
Expand Down
30 changes: 29 additions & 1 deletion core/src/main/java/bisq/core/dao/state/model/DaoState.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import bisq.core.dao.state.model.blockchain.Block;
import bisq.core.dao.state.model.blockchain.SpentInfo;
import bisq.core.dao.state.model.blockchain.Tx;
import bisq.core.dao.state.model.blockchain.TxOutput;
import bisq.core.dao.state.model.blockchain.TxOutputKey;
import bisq.core.dao.state.model.governance.Cycle;
Expand All @@ -28,22 +29,25 @@
import bisq.core.dao.state.model.governance.ParamChange;

import bisq.common.proto.persistable.PersistablePayload;
import bisq.common.util.JsonExclude;

import com.google.protobuf.Message;

import javax.inject.Inject;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;


/**
* Root class for mutable state of the DAO.
* Holds both blockchain data as well as data derived from the governance process (voting).
Expand Down Expand Up @@ -98,6 +102,10 @@ public static DaoState getClone(DaoState daoState) {
@Getter
private final List<DecryptedBallotsWithMerits> decryptedBallotsWithMeritsList;

// Transient data used only as an index - must be kept in sync with the block list
@JsonExclude
private transient final Map<String, Tx> txCache; // key is txId


///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
Expand Down Expand Up @@ -145,6 +153,10 @@ private DaoState(int chainHeight,
this.paramChangeList = paramChangeList;
this.evaluatedProposalList = evaluatedProposalList;
this.decryptedBallotsWithMeritsList = decryptedBallotsWithMeritsList;

txCache = blocks.stream()
.flatMap(block -> block.getTxs().stream())
.collect(Collectors.toMap(Tx::getId, Function.identity(), (x, y) -> x, HashMap::new));
}

@Override
Expand Down Expand Up @@ -224,6 +236,21 @@ public byte[] getSerializedStateForHashChain() {
return getBsqStateBuilderExcludingBlocks().addBlocks(getBlocks().getLast().toProtoMessage()).build().toByteArray();
}

public void addToTxCache(Tx tx) {
// We shouldn't get duplicate txIds, but use putIfAbsent instead of put for consistency with the map merge
// function used in the constructor to initialise txCache (and to exactly match the pre-caching behaviour).
txCache.putIfAbsent(tx.getId(), tx);
}

public void setTxCache(Map<String, Tx> txCache) {
this.txCache.clear();
this.txCache.putAll(txCache);
}

public Map<String, Tx> getTxCache() {
return Collections.unmodifiableMap(txCache);
}

@Override
public String toString() {
return "DaoState{" +
Expand All @@ -237,6 +264,7 @@ public String toString() {
",\n paramChangeList=" + paramChangeList +
",\n evaluatedProposalList=" + evaluatedProposalList +
",\n decryptedBallotsWithMeritsList=" + decryptedBallotsWithMeritsList +
",\n txCache=" + txCache +
"\n}";
}
}
15 changes: 13 additions & 2 deletions core/src/main/java/bisq/core/dao/state/model/blockchain/Block.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import com.google.common.collect.ImmutableList;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import lombok.EqualsAndHashCode;
import lombok.Value;

/**
* The Block which gets persisted in the DaoState. During parsing transactions can be
Expand All @@ -44,8 +44,8 @@
*
*/
@EqualsAndHashCode(callSuper = true)
@Value
public final class Block extends BaseBlock implements PersistablePayload, ImmutableDaoStateModel {
// We do not expose txs with a Lombok getter. We cannot make it immutable as we add transactions during parsing.
private final List<Tx> txs;

public Block(int height, long time, String hash, String previousBlockHash) {
Expand Down Expand Up @@ -93,6 +93,17 @@ public static Block fromProto(protobuf.BaseBlock proto) {
txs);
}

public void addTx(Tx tx) {
txs.add(tx);
}

// We want to guarantee that no client can modify the list. We use unmodifiableList and not ImmutableList as
// we want that clients reflect any change to the source list. Also ImmutableList is more expensive as it
// creates a copy.
public List<Tx> getTxs() {
return Collections.unmodifiableList(txs);
}

@Override
public String toString() {
return "Block{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void onParseBlockCompleteAfterBatchProcessing(Block block) {
///////////////////////////////////////////////////////////////////////////////////////////

private void updateWithBsqBlockChainData() {
allTxTextField.setText(String.valueOf(daoFacade.getTxs().size()));
allTxTextField.setText(String.valueOf(daoFacade.getNumTxs()));
utxoTextField.setText(String.valueOf(daoFacade.getUnspentTxOutputs().size()));
compensationIssuanceTxTextField.setText(String.valueOf(daoFacade.getNumIssuanceTransactions(IssuanceType.COMPENSATION)));
reimbursementIssuanceTxTextField.setText(String.valueOf(daoFacade.getNumIssuanceTransactions(IssuanceType.REIMBURSEMENT)));
Expand Down

0 comments on commit 20b56c7

Please sign in to comment.