Skip to content

Commit

Permalink
Merge pull request #1 from bisq-network/master
Browse files Browse the repository at this point in the history
Wednesday update
  • Loading branch information
craigsailor authored Dec 6, 2018
2 parents de54518 + 806fcbe commit 1b86fcc
Show file tree
Hide file tree
Showing 77 changed files with 8,079 additions and 5,457 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ desktop.ini
*/target/*
*.class
deploy
releases/*
*/releases/*
8 changes: 6 additions & 2 deletions core/src/main/java/bisq/core/app/BisqExecutable.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@

import static bisq.core.app.BisqEnvironment.DEFAULT_APP_NAME;
import static bisq.core.app.BisqEnvironment.DEFAULT_USER_DATA_DIR;
import static bisq.core.btc.BaseCurrencyNetwork.BTC_MAINNET;
import static bisq.core.btc.BaseCurrencyNetwork.BTC_REGTEST;
import static bisq.core.btc.BaseCurrencyNetwork.BTC_TESTNET;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;

@Slf4j
public abstract class BisqExecutable implements GracefulShutDownHandler {
Expand Down Expand Up @@ -481,10 +485,10 @@ protected void customizeOptionParsing(OptionParser parser) {

//BtcOptionKeys
parser.accepts(BtcOptionKeys.BASE_CURRENCY_NETWORK,
"Base currency network")
format("Base currency network (default: %s)", BisqEnvironment.getDefaultBaseCurrencyNetwork().name()))
.withRequiredArg()
.ofType(String.class)
.defaultsTo(BisqEnvironment.getDefaultBaseCurrencyNetwork().name());
.describedAs(format("%s|%s|%s", BTC_MAINNET, BTC_TESTNET, BTC_REGTEST));

parser.accepts(BtcOptionKeys.REG_TEST_HOST)
.withRequiredArg()
Expand Down
92 changes: 85 additions & 7 deletions core/src/main/java/bisq/core/dao/node/BsqNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@
import bisq.core.dao.DaoSetupService;
import bisq.core.dao.node.full.RawBlock;
import bisq.core.dao.node.parser.BlockParser;
import bisq.core.dao.node.parser.exceptions.BlockHashNotConnectingException;
import bisq.core.dao.node.parser.exceptions.BlockHeightNotConnectingException;
import bisq.core.dao.node.parser.exceptions.RequiredReorgFromSnapshotException;
import bisq.core.dao.state.DaoStateService;
import bisq.core.dao.state.DaoStateSnapshotService;
import bisq.core.dao.state.model.blockchain.Block;

import bisq.network.p2p.P2PService;
import bisq.network.p2p.P2PServiceListener;
Expand All @@ -30,6 +34,11 @@

import com.google.inject.Inject;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;

import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nullable;
Expand All @@ -51,6 +60,7 @@ public abstract class BsqNode implements DaoSetupService {
protected boolean p2pNetworkReady;
@Nullable
protected ErrorMessageHandler errorMessageHandler;
protected List<RawBlock> pendingBlocks = new ArrayList<>();


///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -179,19 +189,87 @@ protected void onParseBlockChainComplete() {
log.info("onParseBlockChainComplete");
parseBlockchainComplete = true;
daoStateService.onParseBlockChainComplete();

// log.error("COMPLETED: sb1={}\nsb2={}", BlockParser.sb1.toString(), BlockParser.sb2.toString());
// log.error("equals? " + BlockParser.sb1.toString().equals(BlockParser.sb2.toString()));
// Utilities.copyToClipboard(BlockParser.sb1.toString() + "\n\n\n" + BlockParser.sb2.toString());
}

@SuppressWarnings("WeakerAccess")
protected void startReOrgFromLastSnapshot() {
daoStateSnapshotService.applySnapshot(true);
startParseBlocks();
}

protected boolean isBlockAlreadyAdded(RawBlock rawBlock) {
return daoStateService.getBlockAtHeight(rawBlock.getHeight()).isPresent();

protected Optional<Block> doParseBlock(RawBlock rawBlock) throws RequiredReorgFromSnapshotException {
// We check if we have a block with that height. If so we return. We do not use the chainHeight as with genesis
// height we have no block but chainHeight is initially set to genesis height (bad design ;-( but a bit tricky
// to change now as it used in many areas.)
if (daoStateService.getBlockAtHeight(rawBlock.getHeight()).isPresent()) {
log.info("We have already a block with the height of the new block. Height of new block={}", rawBlock.getHeight());
return Optional.empty();
}

try {
Block block = blockParser.parseBlock(rawBlock);

if (pendingBlocks.contains(rawBlock))
pendingBlocks.remove(rawBlock);

// After parsing we check if we have pending blocks we might have received earlier but which have been
// not connecting from the latest height we had. The list is sorted by height
if (!pendingBlocks.isEmpty()) {
// To avoid ConcurrentModificationException we copy the list. It might be altered in the method call
ArrayList<RawBlock> tempPendingBlocks = new ArrayList<>(pendingBlocks);
for (RawBlock tempPendingBlock : tempPendingBlocks) {
try {
doParseBlock(tempPendingBlock);
} catch (RequiredReorgFromSnapshotException e1) {
// In case we got a reorg we break the iteration
break;
}
}
}

return Optional.of(block);
} catch (BlockHeightNotConnectingException e) {
// There is no guaranteed order how we receive blocks. We could have received block 102 before 101.
// If block is in future we move the block to teh pendingBlocks list. At next block we look up the
// list if there is any potential candidate with the correct height and if so we remove that from that list.

int heightForNextBlock = daoStateService.getChainHeight() + 1;
if (rawBlock.getHeight() > heightForNextBlock) {
pendingBlocks.add(rawBlock);
pendingBlocks.sort(Comparator.comparing(RawBlock::getHeight));
log.info("We received an block with a future block height. We store it as pending and try to apply " +
"it at the next block. rawBlock: height/hash={}/{}", rawBlock.getHeight(), rawBlock.getHash());
} else if (rawBlock.getHeight() >= daoStateService.getGenesisBlockHeight()) {
// We received an older block. We compare if we have it in our chain.
Optional<Block> optionalBlock = daoStateService.getBlockAtHeight(rawBlock.getHeight());
if (optionalBlock.isPresent()) {
if (optionalBlock.get().getHash().equals(rawBlock.getPreviousBlockHash())) {
log.info("We received an old block we have already parsed and added. We ignore it.");
} else {
log.info("We received an old block with a different hash. We ignore it. Hash={}", rawBlock.getHash());
}
} else {
log.info("In case we have reset from genesis height we would not find the block");
}
} else {
log.info("We ignore it as it was before genesis height");
}
} catch (BlockHashNotConnectingException throwable) {
Optional<Block> lastBlock = daoStateService.getLastBlock();
log.warn("Block not connecting:\n" +
"New block height/hash/previousBlockHash={}/{}/{}, latest block height/hash={}/{}",
rawBlock.getHeight(),
rawBlock.getHash(),
rawBlock.getPreviousBlockHash(),
lastBlock.isPresent() ? lastBlock.get().getHeight() : "lastBlock not present",
lastBlock.isPresent() ? lastBlock.get().getHash() : "lastBlock not present");

pendingBlocks.clear();
startReOrgFromLastSnapshot();
throw new RequiredReorgFromSnapshotException(rawBlock);
}


return Optional.empty();
}
}
76 changes: 34 additions & 42 deletions core/src/main/java/bisq/core/dao/node/full/FullNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import bisq.core.dao.node.explorer.ExportJsonFilesService;
import bisq.core.dao.node.full.network.FullNodeNetworkService;
import bisq.core.dao.node.parser.BlockParser;
import bisq.core.dao.node.parser.exceptions.BlockNotConnectingException;
import bisq.core.dao.node.parser.exceptions.RequiredReorgFromSnapshotException;
import bisq.core.dao.state.DaoStateService;
import bisq.core.dao.state.DaoStateSnapshotService;
import bisq.core.dao.state.model.blockchain.Block;
Expand Down Expand Up @@ -106,6 +106,15 @@ protected void startParseBlocks() {
requestChainHeadHeightAndParseBlocks(getStartBlockHeight());
}

@Override
protected void startReOrgFromLastSnapshot() {
super.startReOrgFromLastSnapshot();

int startBlockHeight = getStartBlockHeight();
rpcService.requestChainHeadHeight(chainHeight -> parseBlocksOnHeadHeight(startBlockHeight, chainHeight),
this::handleError);
}

@Override
protected void onP2PNetworkReady() {
super.onP2PNetworkReady();
Expand Down Expand Up @@ -137,13 +146,9 @@ private void addBlockHandler() {
if (!addBlockHandlerAdded) {
addBlockHandlerAdded = true;
rpcService.addNewBtcBlockHandler(rawBlock -> {
if (!isBlockAlreadyAdded(rawBlock)) {
try {
Block block = blockParser.parseBlock(rawBlock);
onNewBlock(block);
} catch (BlockNotConnectingException throwable) {
handleError(throwable);
}
try {
doParseBlock(rawBlock).ifPresent(this::onNewBlock);
} catch (RequiredReorgFromSnapshotException ignore) {
}
},
this::handleError);
Expand Down Expand Up @@ -190,13 +195,7 @@ private void parseBlocksOnHeadHeight(int startBlockHeight, int chainHeight) {
// if we are at chainTip, so do not include here another check as it would
// not trigger the listener registration.
parseBlocksIfNewBlockAvailable(chainHeight);
}, throwable -> {
if (throwable instanceof BlockNotConnectingException) {
startReOrgFromLastSnapshot();
} else {
handleError(throwable);
}
});
}, this::handleError);
} else {
log.warn("We are trying to start with a block which is above the chain height of bitcoin core. " +
"We need probably wait longer until bitcoin core has fully synced. " +
Expand All @@ -210,33 +209,29 @@ private void parseBlocks(int startBlockHeight,
Consumer<Block> newBlockHandler,
ResultHandler resultHandler,
Consumer<Throwable> errorHandler) {
parseBlock(startBlockHeight, chainHeight, newBlockHandler, resultHandler, errorHandler);
parseBlockRecursively(startBlockHeight, chainHeight, newBlockHandler, resultHandler, errorHandler);
}

// Recursively request and parse all blocks
private void parseBlock(int blockHeight, int chainHeight,
Consumer<Block> newBlockHandler, ResultHandler resultHandler,
Consumer<Throwable> errorHandler) {
private void parseBlockRecursively(int blockHeight,
int chainHeight,
Consumer<Block> newBlockHandler,
ResultHandler resultHandler,
Consumer<Throwable> errorHandler) {
rpcService.requestBtcBlock(blockHeight,
rawBlock -> {
if (!isBlockAlreadyAdded(rawBlock)) {
try {
Block block = blockParser.parseBlock(rawBlock);
newBlockHandler.accept(block);

// Increment blockHeight and recursively call parseBlockAsync until we reach chainHeight
if (blockHeight < chainHeight) {
final int newBlockHeight = blockHeight + 1;
parseBlock(newBlockHeight, chainHeight, newBlockHandler, resultHandler, errorHandler);
} else {
// We are done
resultHandler.handleResult();
}
} catch (BlockNotConnectingException e) {
errorHandler.accept(e);
try {
doParseBlock(rawBlock).ifPresent(newBlockHandler);

// Increment blockHeight and recursively call parseBlockAsync until we reach chainHeight
if (blockHeight < chainHeight) {
int newBlockHeight = blockHeight + 1;
parseBlockRecursively(newBlockHeight, chainHeight, newBlockHandler, resultHandler, errorHandler);
} else {
// We are done
resultHandler.handleResult();
}
} else {
log.info("Block was already added height=", rawBlock.getHeight());
} catch (RequiredReorgFromSnapshotException ignore) {
// If we get a reorg we don't continue to call parseBlockRecursively
}
},
errorHandler);
Expand All @@ -245,16 +240,13 @@ private void parseBlock(int blockHeight, int chainHeight,
private void handleError(Throwable throwable) {
String errorMessage = "An error occurred: Error=" + throwable.toString();
log.error(errorMessage);

if (throwable instanceof BlockNotConnectingException) {
startReOrgFromLastSnapshot();
} else if (throwable instanceof RpcException &&
if (throwable instanceof RpcException &&
throwable.getCause() != null &&
throwable.getCause() instanceof HttpLayerException &&
((HttpLayerException) throwable.getCause()).getCode() == 1004004) {
errorMessage = "You have configured Bisq to run as DAO full node but there is not " +
"localhost Bitcoin Core node detected. You need to have Bitcoin Core started and synced before " +
"starting Bisq.";
"starting Bisq. Please restart Bisq with proper DAO full node setup or switch to lite node mode.";
}

if (errorMessageHandler != null)
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/java/bisq/core/dao/node/full/RpcService.java
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,12 @@ private RawTx getTxFromRawTransaction(RawTransaction rawBtcTx, com.neemre.btcdcl
try {
opReturnData = Utils.HEX.decode(chunks[1]);
} catch (Throwable t) {
// We get sometimes exceptions, seems BitcoinJ
// cannot handle all existing OP_RETURN data, but we ignore them
// anyway as our OP_RETURN data is valid in BitcoinJ
log.warn("Error at Utils.HEX.decode(chunks[1]): " + t.toString() + " / chunks[1]=" + chunks[1]);
log.warn("Error at Utils.HEX.decode(chunks[1]): " + t.toString() +
" / chunks[1]=" + chunks[1] +
"\nWe get sometimes exceptions with opReturn data, seems BitcoinJ " +
"cannot handle all " +
"existing OP_RETURN data, but we ignore them anyway as the OP_RETURN " +
"data used for DAO transactions are all valid in BitcoinJ");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,8 @@ public void onGetBlocksRequest(GetBlocksRequest getBlocksRequest, final Connecti
Log.traceCall(getBlocksRequest + "\n\tconnection=" + connection);
List<Block> blocks = new LinkedList<>(daoStateService.getBlocksFromBlockHeight(getBlocksRequest.getFromBlockHeight()));
List<RawBlock> rawBlocks = blocks.stream().map(RawBlock::fromBlock).collect(Collectors.toList());
final GetBlocksResponse getBlocksResponse = new GetBlocksResponse(rawBlocks, getBlocksRequest.getNonce());
log.debug("getBlocksResponse " + getBlocksResponse.getRequestNonce());
log.info("Received getBlocksResponse from {} for blocks from height {}",
GetBlocksResponse getBlocksResponse = new GetBlocksResponse(rawBlocks, getBlocksRequest.getNonce());
log.info("Received GetBlocksRequest from {} for blocks from height {}",
connection.getPeersNodeAddressOptional(), getBlocksRequest.getFromBlockHeight());
if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
Expand All @@ -108,7 +107,7 @@ public void onGetBlocksRequest(GetBlocksRequest getBlocksRequest, final Connecti
}

SettableFuture<Connection> future = networkNode.sendMessage(connection, getBlocksResponse);
Futures.addCallback(future, new FutureCallback<Connection>() {
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
if (!stopped) {
Expand Down
Loading

0 comments on commit 1b86fcc

Please sign in to comment.