Skip to content

Commit

Permalink
Merge pull request #2499 from Taraxa-project/issue-2469/remove-test_sate
Browse files Browse the repository at this point in the history
Issue 2469/remove test sate
  • Loading branch information
JakubFornadel authored Jun 5, 2023
2 parents dc36160 + a4e385e commit fa13b07
Show file tree
Hide file tree
Showing 13 changed files with 190 additions and 555 deletions.
2 changes: 0 additions & 2 deletions libraries/core_libs/network/include/network/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ class Network {

void setPendingPeersToReady();
dev::p2p::NodeID getNodeId() const;
int getReceivedBlocksCount() const;
int getReceivedTransactionsCount() const;
std::shared_ptr<network::tarcap::TaraxaPeer> getPeer(dev::p2p::NodeID const &id) const;
// END METHODS USED IN TESTS ONLY

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ class DagBlockPacketHandler : public ExtSyncingPacketHandler {
std::shared_ptr<PbftSyncingState> pbft_syncing_state, std::shared_ptr<PbftChain> pbft_chain,
std::shared_ptr<PbftManager> pbft_mgr, std::shared_ptr<DagManager> dag_mgr,
std::shared_ptr<TransactionManager> trx_mgr, std::shared_ptr<DbStorage> db,
std::shared_ptr<TestState> test_state, const addr_t &node_addr,
const std::string &logs_prefix = "");
const addr_t &node_addr, const std::string &logs_prefix = "");

void sendBlock(dev::p2p::NodeID const &peer_id, DagBlock block, const SharedTransactions &trxs);
void onNewBlockReceived(DagBlock &&block, const std::shared_ptr<TaraxaPeer> &peer = nullptr);
Expand All @@ -32,7 +31,6 @@ class DagBlockPacketHandler : public ExtSyncingPacketHandler {
virtual void process(const threadpool::PacketData &packet_data, const std::shared_ptr<TaraxaPeer> &peer) override;

protected:
std::shared_ptr<TestState> test_state_;
std::shared_ptr<TransactionManager> trx_mgr_{nullptr};
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ class TransactionPacketHandler : public PacketHandler {
public:
TransactionPacketHandler(const FullNodeConfig& conf, std::shared_ptr<PeersState> peers_state,
std::shared_ptr<TimePeriodPacketsStats> packets_stats,
std::shared_ptr<TransactionManager> trx_mgr, std::shared_ptr<TestState> test_state,
const addr_t& node_addr, const std::string& logs_prefix = "TRANSACTION_PH");
std::shared_ptr<TransactionManager> trx_mgr, const addr_t& node_addr,
const std::string& logs_prefix = "TRANSACTION_PH");

/**
* @brief Send transactions
Expand All @@ -43,19 +43,13 @@ class TransactionPacketHandler : public PacketHandler {
// 2 items: hashes and transactions
static constexpr uint32_t kTransactionPacketItemCount = 2;

// Used only for unit tests
void onNewTransactions(const SharedTransactions& transactions);

private:
virtual void validatePacketRlpFormat(const threadpool::PacketData& packet_data) const override;
virtual void process(const threadpool::PacketData& packet_data, const std::shared_ptr<TaraxaPeer>& peer) override;

protected:
std::shared_ptr<TransactionManager> trx_mgr_;

// FOR TESTING ONLY
std::shared_ptr<TestState> test_state_;

std::atomic<uint64_t> received_trx_count_{0};
std::atomic<uint64_t> unique_received_trx_count_{0};
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ namespace taraxa::network::tarcap::v1 {
static const TaraxaCapability::InitPacketsHandlers kInitV1Handlers =
[](const std::string &logs_prefix, const FullNodeConfig &config, const h256 &genesis_hash,
const std::shared_ptr<PeersState> &peers_state, const std::shared_ptr<PbftSyncingState> &pbft_syncing_state,
const std::shared_ptr<TestState> &test_state,
const std::shared_ptr<tarcap::TimePeriodPacketsStats> &packets_stats, const std::shared_ptr<DbStorage> &db,
const std::shared_ptr<PbftManager> &pbft_mgr, const std::shared_ptr<PbftChain> &pbft_chain,
const std::shared_ptr<VoteManager> &vote_mgr, const std::shared_ptr<DagManager> &dag_mgr,
Expand All @@ -36,12 +35,12 @@ static const TaraxaCapability::InitPacketsHandlers kInitV1Handlers =
config, peers_state, packets_stats, pbft_mgr, pbft_chain, vote_mgr, node_addr, logs_prefix);

// Standard packets with mid processing priority
packets_handlers->registerHandler<tarcap::DagBlockPacketHandler>(
config, peers_state, packets_stats, pbft_syncing_state, pbft_chain, pbft_mgr, dag_mgr, trx_mgr, db,
test_state, node_addr, logs_prefix);
packets_handlers->registerHandler<tarcap::DagBlockPacketHandler>(config, peers_state, packets_stats,
pbft_syncing_state, pbft_chain, pbft_mgr,
dag_mgr, trx_mgr, db, node_addr, logs_prefix);

packets_handlers->registerHandler<tarcap::TransactionPacketHandler>(config, peers_state, packets_stats, trx_mgr,
test_state, node_addr, logs_prefix);
node_addr, logs_prefix);

// Non critical packets with low processing priority
packets_handlers->registerHandler<tarcap::StatusPacketHandler>(config, peers_state, packets_stats,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include "config/config.hpp"
#include "network/tarcap/packets_handler.hpp"
#include "network/tarcap/shared_states/peers_state.hpp"
#include "network/tarcap/shared_states/test_state.hpp"
#include "network/tarcap/tarcap_version.hpp"
#include "network/threadpool/tarcap_thread_pool.hpp"
#include "pbft/pbft_chain.hpp"
Expand Down Expand Up @@ -40,7 +39,7 @@ class TaraxaCapability final : public dev::p2p::CapabilityFace {
using InitPacketsHandlers = std::function<std::shared_ptr<PacketsHandler>(
const std::string &logs_prefix, const FullNodeConfig &config, const h256 &genesis_hash,
const std::shared_ptr<PeersState> &peers_state, const std::shared_ptr<PbftSyncingState> &pbft_syncing_state,
const std::shared_ptr<TestState> &test_state,

const std::shared_ptr<tarcap::TimePeriodPacketsStats> &packets_stats, const std::shared_ptr<DbStorage> &db,
const std::shared_ptr<PbftManager> &pbft_mgr, const std::shared_ptr<PbftChain> &pbft_chain,
const std::shared_ptr<VoteManager> &vote_mgr, const std::shared_ptr<DagManager> &dag_mgr,
Expand Down Expand Up @@ -82,19 +81,9 @@ class TaraxaCapability final : public dev::p2p::CapabilityFace {
template <typename PacketHandlerType>
std::shared_ptr<PacketHandlerType> getSpecificHandler() const;

// METHODS USED IN TESTS ONLY
size_t getReceivedBlocksCount() const;
size_t getReceivedTransactionsCount() const;
// END METHODS USED IN TESTS ONLY

private:
bool filterSyncIrrelevantPackets(SubprotocolPacketType packet_type) const;

public:
// TODO: Remove in future when tests are refactored
// Test state
std::shared_ptr<TestState> test_state_;

private:
// Capability version
unsigned version_;
Expand Down
4 changes: 0 additions & 4 deletions libraries/core_libs/network/src/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,6 @@ void Network::setPendingPeersToReady() {

dev::p2p::NodeID Network::getNodeId() const { return host_->id(); }

int Network::getReceivedBlocksCount() const { return tarcaps_.begin()->second->getReceivedBlocksCount(); }

int Network::getReceivedTransactionsCount() const { return tarcaps_.begin()->second->getReceivedTransactionsCount(); }

std::shared_ptr<network::tarcap::TaraxaPeer> Network::getPeer(dev::p2p::NodeID const &id) const {
return tarcaps_.begin()->second->getPeersState()->getPeer(id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include "dag/dag_manager.hpp"
#include "network/tarcap/packets_handlers/latest/transaction_packet_handler.hpp"
#include "network/tarcap/shared_states/pbft_syncing_state.hpp"
#include "network/tarcap/shared_states/test_state.hpp"
#include "transaction/transaction_manager.hpp"

namespace taraxa::network::tarcap {
Expand All @@ -14,12 +13,10 @@ DagBlockPacketHandler::DagBlockPacketHandler(const FullNodeConfig &conf, std::sh
std::shared_ptr<PbftChain> pbft_chain,
std::shared_ptr<PbftManager> pbft_mgr, std::shared_ptr<DagManager> dag_mgr,
std::shared_ptr<TransactionManager> trx_mgr, std::shared_ptr<DbStorage> db,
std::shared_ptr<TestState> test_state, const addr_t &node_addr,
const std::string &logs_prefix)
const addr_t &node_addr, const std::string &logs_prefix)
: ExtSyncingPacketHandler(conf, std::move(peers_state), std::move(packets_stats), std::move(pbft_syncing_state),
std::move(pbft_chain), std::move(pbft_mgr), std::move(dag_mgr), std::move(db), node_addr,
logs_prefix + "DAG_BLOCK_PH"),
test_state_(std::move(test_state)),
trx_mgr_(std::move(trx_mgr)) {}

void DagBlockPacketHandler::validatePacketRlpFormat(const threadpool::PacketData &packet_data) const {
Expand All @@ -40,12 +37,10 @@ void DagBlockPacketHandler::process(const threadpool::PacketData &packet_data,
peer->dag_level_ = block.getLevel();
}

if (dag_mgr_) [[likely]] {
// Do not process this block in case we already have it
if (dag_mgr_->isDagBlockKnown(block.getHash())) {
LOG(log_tr_) << "Received known DagBlockPacket " << hash << "from: " << peer->getId();
return;
}
// Do not process this block in case we already have it
if (dag_mgr_->isDagBlockKnown(block.getHash())) {
LOG(log_tr_) << "Received known DagBlockPacket " << hash << "from: " << peer->getId();
return;
}

onNewBlockReceived(std::move(block), peer);
Expand Down Expand Up @@ -97,96 +92,88 @@ void DagBlockPacketHandler::sendBlock(dev::p2p::NodeID const &peer_id, taraxa::D
}

void DagBlockPacketHandler::onNewBlockReceived(DagBlock &&block, const std::shared_ptr<TaraxaPeer> &peer) {
if (dag_mgr_) [[likely]] {
const auto block_hash = block.getHash();
const auto verified = dag_mgr_->verifyBlock(block);
switch (verified) {
case DagManager::VerifyBlockReturnType::IncorrectTransactionsEstimation:
case DagManager::VerifyBlockReturnType::BlockTooBig:
case DagManager::VerifyBlockReturnType::FailedVdfVerification:
case DagManager::VerifyBlockReturnType::NotEligible:
case DagManager::VerifyBlockReturnType::FailedTipsVerification: {
const auto block_hash = block.getHash();
const auto verified = dag_mgr_->verifyBlock(block);
switch (verified) {
case DagManager::VerifyBlockReturnType::IncorrectTransactionsEstimation:
case DagManager::VerifyBlockReturnType::BlockTooBig:
case DagManager::VerifyBlockReturnType::FailedVdfVerification:
case DagManager::VerifyBlockReturnType::NotEligible:
case DagManager::VerifyBlockReturnType::FailedTipsVerification: {
std::ostringstream err_msg;
err_msg << "DagBlock " << block_hash << " failed verification with error code "
<< static_cast<uint32_t>(verified);
throw MaliciousPeerException(err_msg.str());
}
case DagManager::VerifyBlockReturnType::MissingTransaction:
if (peer->dagSyncingAllowed()) {
if (trx_mgr_->transactionsDropped()) [[unlikely]] {
LOG(log_nf_) << "NewBlock " << block_hash.toString() << " from peer " << peer->getId()
<< " is missing transaction, our pool recently dropped transactions, requesting dag sync";
} else {
LOG(log_wr_) << "NewBlock " << block_hash.toString() << " from peer " << peer->getId()
<< " is missing transaction, requesting dag sync";
}
peer->peer_dag_synced_ = false;
requestPendingDagBlocks(peer);
} else {
if (trx_mgr_->transactionsDropped()) [[unlikely]] {
// Disconnecting since anything after will also contain missing pivot/tips ...
LOG(log_nf_) << "NewBlock " << block_hash.toString() << " from peer " << peer->getId()
<< " is missing transaction, but our pool recently dropped transactions, disconnecting";
disconnect(peer->getId(), dev::p2p::UserReason);
} else {
std::ostringstream err_msg;
err_msg << "DagBlock" << block_hash << " is missing a transaction while in a dag synced state";
throw MaliciousPeerException(err_msg.str());
}
}
break;
case DagManager::VerifyBlockReturnType::MissingTip:
if (peer->peer_dag_synced_) {
std::ostringstream err_msg;
err_msg << "DagBlock " << block_hash << " failed verification with error code "
<< static_cast<uint32_t>(verified);
err_msg << "DagBlock has missing tip";
throw MaliciousPeerException(err_msg.str());
} else {
// peer_dag_synced_ flag ensures that this can only be performed once for a peer
requestPendingDagBlocks(peer);
}
case DagManager::VerifyBlockReturnType::MissingTransaction:
if (peer->dagSyncingAllowed()) {
if (trx_mgr_->transactionsDropped()) [[unlikely]] {
LOG(log_nf_) << "NewBlock " << block_hash.toString() << " from peer " << peer->getId()
<< " is missing transaction, our pool recently dropped transactions, requesting dag sync";
} else {
LOG(log_wr_) << "NewBlock " << block_hash.toString() << " from peer " << peer->getId()
<< " is missing transaction, requesting dag sync";
}
peer->peer_dag_synced_ = false;
requestPendingDagBlocks(peer);
break;
case DagManager::VerifyBlockReturnType::AheadBlock:
case DagManager::VerifyBlockReturnType::FutureBlock:
if (peer->peer_dag_synced_) {
LOG(log_er_) << "DagBlock" << block_hash << " is an ahead/future block. Peer " << peer->getId()
<< " will be disconnected";
disconnect(peer->getId(), dev::p2p::UserReason);
}
break;
case DagManager::VerifyBlockReturnType::Verified: {
auto transactions = trx_mgr_->getPoolTransactions(block.getTrxs()).first;
auto status = dag_mgr_->addDagBlock(std::move(block), std::move(transactions));
if (!status.first) {
LOG(log_dg_) << "Received DagBlockPacket " << block_hash << "from: " << peer->getId();
// Ignore new block packets when pbft syncing
if (pbft_syncing_state_->isPbftSyncing()) {
LOG(log_dg_) << "Ignore new dag block " << block_hash << ", pbft syncing is on";
} else if (peer->peer_dag_syncing_) {
LOG(log_dg_) << "Ignore new dag block " << block_hash << ", dag syncing is on";
} else {
if (trx_mgr_->transactionsDropped()) [[unlikely]] {
// Disconnecting since anything after will also contain missing pivot/tips ...
LOG(log_nf_) << "NewBlock " << block_hash.toString() << " from peer " << peer->getId()
<< " is missing transaction, but our pool recently dropped transactions, disconnecting";
disconnect(peer->getId(), dev::p2p::UserReason);
} else {
if (peer->peer_dag_synced_) {
std::ostringstream err_msg;
err_msg << "DagBlock" << block_hash << " is missing a transaction while in a dag synced state";
if (status.second.size() > 0)
err_msg << "DagBlock" << block.getHash() << " has missing pivot or/and tips " << status.second;
else
err_msg << "DagBlock" << block.getHash() << " could not be added to DAG";
throw MaliciousPeerException(err_msg.str());
}
}
break;
case DagManager::VerifyBlockReturnType::MissingTip:
if (peer->peer_dag_synced_) {
std::ostringstream err_msg;
err_msg << "DagBlock has missing tip";
throw MaliciousPeerException(err_msg.str());
} else {
// peer_dag_synced_ flag ensures that this can only be performed once for a peer
requestPendingDagBlocks(peer);
}
break;
case DagManager::VerifyBlockReturnType::AheadBlock:
case DagManager::VerifyBlockReturnType::FutureBlock:
if (peer->peer_dag_synced_) {
LOG(log_er_) << "DagBlock" << block_hash << " is an ahead/future block. Peer " << peer->getId()
<< " will be disconnected";
disconnect(peer->getId(), dev::p2p::UserReason);
}
break;
case DagManager::VerifyBlockReturnType::Verified: {
auto transactions = trx_mgr_->getPoolTransactions(block.getTrxs()).first;
auto status = dag_mgr_->addDagBlock(std::move(block), std::move(transactions));
if (!status.first) {
LOG(log_dg_) << "Received DagBlockPacket " << block_hash << "from: " << peer->getId();
// Ignore new block packets when pbft syncing
if (pbft_syncing_state_->isPbftSyncing()) {
LOG(log_dg_) << "Ignore new dag block " << block_hash << ", pbft syncing is on";
} else if (peer->peer_dag_syncing_) {
LOG(log_dg_) << "Ignore new dag block " << block_hash << ", dag syncing is on";
} else {
if (peer->peer_dag_synced_) {
std::ostringstream err_msg;
if (status.second.size() > 0)
err_msg << "DagBlock" << block.getHash() << " has missing pivot or/and tips " << status.second;
else
err_msg << "DagBlock" << block.getHash() << " could not be added to DAG";
throw MaliciousPeerException(err_msg.str());
} else {
// peer_dag_synced_ flag ensures that this can only be performed once for a peer
requestPendingDagBlocks(peer);
}
// peer_dag_synced_ flag ensures that this can only be performed once for a peer
requestPendingDagBlocks(peer);
}
}
} break;
case DagManager::VerifyBlockReturnType::ExpiredBlock:
break;
}
} else if (!test_state_->hasBlock(block.getHash())) {
test_state_->insertBlock(block);
onNewBlockVerified(block, false, {});
} else {
LOG(log_tr_) << "Received NewBlock " << block.getHash() << "that is already known";
return;
}
} break;
case DagManager::VerifyBlockReturnType::ExpiredBlock:
break;
}
}

Expand Down
Loading

0 comments on commit fa13b07

Please sign in to comment.