diff --git a/CMakeLists.txt b/CMakeLists.txt index f6614691c1..2ce8bc7515 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,7 +7,7 @@ set(TARAXA_PATCH_VERSION 3) set(TARAXA_VERSION ${TARAXA_MAJOR_VERSION}.${TARAXA_MINOR_VERSION}.${TARAXA_PATCH_VERSION}) # Any time a change in the network protocol is introduced this version should be increased -set(TARAXA_NET_VERSION 3) +set(TARAXA_NET_VERSION 4) # Major version is modified when DAG blocks, pbft blocks and any basic building blocks of our blockchain is modified # in the db set(TARAXA_DB_MAJOR_VERSION 1) diff --git a/libraries/common/include/common/constants.hpp b/libraries/common/include/common/constants.hpp index 2fa857ee91..e9ba096485 100644 --- a/libraries/common/include/common/constants.hpp +++ b/libraries/common/include/common/constants.hpp @@ -30,7 +30,7 @@ constexpr uint32_t kMinTransactionPoolSize{30000}; constexpr uint32_t kDefaultTransactionPoolSize{200000}; constexpr uint32_t kMaxNonFinalizedTransactions{1000000}; -const size_t kV2NetworkVersion = 2; +const size_t kV3NetworkVersion = 3; const uint32_t kRecentlyFinalizedTransactionsFactor = 2; diff --git a/libraries/core_libs/network/include/network/tarcap/packets_handlers/latest/dag_block_packet_handler.hpp b/libraries/core_libs/network/include/network/tarcap/packets_handlers/latest/dag_block_packet_handler.hpp index c2e39296c0..4552b072af 100644 --- a/libraries/core_libs/network/include/network/tarcap/packets_handlers/latest/dag_block_packet_handler.hpp +++ b/libraries/core_libs/network/include/network/tarcap/packets_handlers/latest/dag_block_packet_handler.hpp @@ -17,9 +17,8 @@ class DagBlockPacketHandler : public ExtSyncingPacketHandler { std::shared_ptr pbft_syncing_state, std::shared_ptr pbft_chain, std::shared_ptr pbft_mgr, std::shared_ptr dag_mgr, std::shared_ptr trx_mgr, std::shared_ptr db, - bool trxs_in_dag_packet, const addr_t &node_addr, const std::string &logs_prefix = ""); + const addr_t &node_addr, const std::string &logs_prefix = ""); - void sendBlock(dev::p2p::NodeID const &peer_id, DagBlock block, const SharedTransactions &trxs); void sendBlockWithTransactions(dev::p2p::NodeID const &peer_id, DagBlock block, const SharedTransactions &trxs); void onNewBlockReceived(DagBlock &&block, const std::shared_ptr &peer = nullptr, const std::unordered_map> &trxs = {}); @@ -34,7 +33,6 @@ class DagBlockPacketHandler : public ExtSyncingPacketHandler { protected: std::shared_ptr trx_mgr_{nullptr}; - const bool kTrxsInDagPacket; }; } // namespace taraxa::network::tarcap diff --git a/libraries/core_libs/network/include/network/tarcap/packets_handlers/latest/transaction_packet_handler.hpp b/libraries/core_libs/network/include/network/tarcap/packets_handlers/latest/transaction_packet_handler.hpp index 47ba929275..208b511174 100644 --- a/libraries/core_libs/network/include/network/tarcap/packets_handlers/latest/transaction_packet_handler.hpp +++ b/libraries/core_libs/network/include/network/tarcap/packets_handlers/latest/transaction_packet_handler.hpp @@ -16,7 +16,7 @@ class TransactionPacketHandler : public PacketHandler { public: TransactionPacketHandler(const FullNodeConfig& conf, std::shared_ptr peers_state, std::shared_ptr packets_stats, - std::shared_ptr trx_mgr, const addr_t& node_addr, bool hash_gossip, + std::shared_ptr trx_mgr, const addr_t& node_addr, const std::string& logs_prefix = "TRANSACTION_PH"); /** @@ -48,15 +48,6 @@ class TransactionPacketHandler : public PacketHandler { virtual void process(const threadpool::PacketData& packet_data, const std::shared_ptr& peer) override; protected: - /** - * @brief Sends batch of transactions to all connected peers - * @note Support of the old V2 version, remove once most of the network is updated or after a hardfork. This method is - * used as periodic event to broadcast transactions to the other peers in network - * - * @param transactions to be sent - */ - void periodicSendTransactionsWithoutHashGossip(std::vector&& transactions); - /** * @brief select which transactions and hashes to send to which connected peer * @@ -82,7 +73,6 @@ class TransactionPacketHandler : public PacketHandler { std::atomic received_trx_count_{0}; std::atomic unique_received_trx_count_{0}; - const bool kHashGossip = true; }; } // namespace taraxa::network::tarcap diff --git a/libraries/core_libs/network/include/network/tarcap/packets_handlers/v3/get_pbft_sync_packet_handler.hpp b/libraries/core_libs/network/include/network/tarcap/packets_handlers/v3/get_pbft_sync_packet_handler.hpp new file mode 100644 index 0000000000..d30c50c649 --- /dev/null +++ b/libraries/core_libs/network/include/network/tarcap/packets_handlers/v3/get_pbft_sync_packet_handler.hpp @@ -0,0 +1,41 @@ +#pragma once + +#include "../latest/common/packet_handler.hpp" + +namespace taraxa { +class PbftChain; +class DbStorage; +class VoteManager; +} // namespace taraxa + +namespace taraxa::network::tarcap { +class PbftSyncingState; +} + +namespace taraxa::network::tarcap::v3 { +class GetPbftSyncPacketHandler : public PacketHandler { + public: + GetPbftSyncPacketHandler(const FullNodeConfig& conf, std::shared_ptr peers_state, + std::shared_ptr packets_stats, + std::shared_ptr pbft_syncing_state, std::shared_ptr pbft_chain, + std::shared_ptr vote_mgr, std::shared_ptr db, + const addr_t& node_addr, const std::string& logs_prefix = "GET_PBFT_SYNC_PH"); + + // Packet type that is processed by this handler + static constexpr SubprotocolPacketType kPacketType_ = SubprotocolPacketType::GetPbftSyncPacket; + + private: + virtual void validatePacketRlpFormat(const threadpool::PacketData& packet_data) const override; + virtual void process(const threadpool::PacketData& packet_data, const std::shared_ptr& peer) override; + + virtual void sendPbftBlocks(const std::shared_ptr& peer, PbftPeriod from_period, + size_t blocks_to_transfer, bool pbft_chain_synced); + + protected: + std::shared_ptr pbft_syncing_state_; + std::shared_ptr pbft_chain_; + std::shared_ptr vote_mgr_; + std::shared_ptr db_; +}; + +} // namespace taraxa::network::tarcap::v3 diff --git a/libraries/core_libs/network/include/network/tarcap/packets_handlers/v3/pbft_sync_packet_handler.hpp b/libraries/core_libs/network/include/network/tarcap/packets_handlers/v3/pbft_sync_packet_handler.hpp new file mode 100644 index 0000000000..e2459081a1 --- /dev/null +++ b/libraries/core_libs/network/include/network/tarcap/packets_handlers/v3/pbft_sync_packet_handler.hpp @@ -0,0 +1,43 @@ +#pragma once + +#include "../latest/common/ext_syncing_packet_handler.hpp" +#include "common/thread_pool.hpp" +#include "vote_manager/vote_manager.hpp" + +namespace taraxa::network::tarcap::v3 { + +class PbftSyncPacketHandler : public ExtSyncingPacketHandler { + public: + PbftSyncPacketHandler(const FullNodeConfig& conf, std::shared_ptr peers_state, + std::shared_ptr packets_stats, + std::shared_ptr pbft_syncing_state, std::shared_ptr pbft_chain, + std::shared_ptr pbft_mgr, std::shared_ptr dag_mgr, + std::shared_ptr vote_mgr, std::shared_ptr db, const addr_t& node_addr, + const std::string& logs_prefix = "PBFT_SYNC_PH"); + + void handleMaliciousSyncPeer(const dev::p2p::NodeID& id); + + // Packet type that is processed by this handler + static constexpr SubprotocolPacketType kPacketType_ = SubprotocolPacketType::PbftSyncPacket; + + private: + virtual void validatePacketRlpFormat(const threadpool::PacketData& packet_data) const override; + virtual void process(const threadpool::PacketData& packet_data, const std::shared_ptr& peer) override; + + protected: + virtual PeriodData decodePeriodData(const dev::RLP& period_data_rlp) const; + virtual std::vector> decodeVotesBundle(const dev::RLP& votes_bundle_rlp) const; + + void pbftSyncComplete(); + void delayedPbftSync(int counter); + + static constexpr uint32_t kDelayedPbftSyncDelayMs = 10; + + std::shared_ptr vote_mgr_; + util::ThreadPool periodic_events_tp_; + + static constexpr size_t kStandardPacketSize = 2; + static constexpr size_t kChainSyncedPacketSize = 3; +}; + +} // namespace taraxa::network::tarcap::v3 diff --git a/libraries/core_libs/network/include/network/tarcap/taraxa_capability.hpp b/libraries/core_libs/network/include/network/tarcap/taraxa_capability.hpp index 8a1f50f969..0fa960f748 100644 --- a/libraries/core_libs/network/include/network/tarcap/taraxa_capability.hpp +++ b/libraries/core_libs/network/include/network/tarcap/taraxa_capability.hpp @@ -59,6 +59,9 @@ class TaraxaCapability final : public dev::p2p::CapabilityFace { */ static const InitPacketsHandlers kInitLatestVersionHandlers; + // TODO: remove this once we pass HF + static const InitPacketsHandlers kInitV3Handlers; + public: TaraxaCapability(TarcapVersion version, const FullNodeConfig &conf, const h256 &genesis_hash, std::weak_ptr host, const dev::KeyPair &key, diff --git a/libraries/core_libs/network/src/network.cpp b/libraries/core_libs/network/src/network.cpp index 07ad0d602e..53f51eabec 100644 --- a/libraries/core_libs/network/src/network.cpp +++ b/libraries/core_libs/network/src/network.cpp @@ -73,15 +73,16 @@ Network::Network(const FullNodeConfig &config, const h256 &genesis_hash, std::fi dev::p2p::Host::CapabilitiesFactory constructCapabilities = [&](std::weak_ptr host) { assert(!host.expired()); - assert(kV2NetworkVersion < TARAXA_NET_VERSION); + assert(kV3NetworkVersion < TARAXA_NET_VERSION); dev::p2p::Host::CapabilityList capabilities; // Register old version (V2) of taraxa capability - auto v2_tarcap = std::make_shared( - kV2NetworkVersion, config, genesis_hash, host, key, packets_tp_, all_packets_stats_, pbft_syncing_state_, db, - pbft_mgr, pbft_chain, vote_mgr, dag_mgr, trx_mgr, slashing_manager, pillar_chain_mgr); - capabilities.emplace_back(v2_tarcap); + auto v3_tarcap = std::make_shared( + kV3NetworkVersion, config, genesis_hash, host, key, packets_tp_, all_packets_stats_, pbft_syncing_state_, db, + pbft_mgr, pbft_chain, vote_mgr, dag_mgr, trx_mgr, slashing_manager, pillar_chain_mgr, + network::tarcap::TaraxaCapability::kInitV3Handlers); + capabilities.emplace_back(v3_tarcap); // Register latest version of taraxa capability auto latest_tarcap = std::make_shared( diff --git a/libraries/core_libs/network/src/tarcap/packets_handlers/latest/dag_block_packet_handler.cpp b/libraries/core_libs/network/src/tarcap/packets_handlers/latest/dag_block_packet_handler.cpp index e856b8f209..a0b22ea34b 100644 --- a/libraries/core_libs/network/src/tarcap/packets_handlers/latest/dag_block_packet_handler.cpp +++ b/libraries/core_libs/network/src/tarcap/packets_handlers/latest/dag_block_packet_handler.cpp @@ -13,22 +13,17 @@ DagBlockPacketHandler::DagBlockPacketHandler(const FullNodeConfig &conf, std::sh std::shared_ptr pbft_chain, std::shared_ptr pbft_mgr, std::shared_ptr dag_mgr, std::shared_ptr trx_mgr, std::shared_ptr db, - bool trxs_in_dag_packet, const addr_t &node_addr, - const std::string &logs_prefix) + const addr_t &node_addr, const std::string &logs_prefix) : ExtSyncingPacketHandler(conf, std::move(peers_state), std::move(packets_stats), std::move(pbft_syncing_state), std::move(pbft_chain), std::move(pbft_mgr), std::move(dag_mgr), std::move(db), node_addr, logs_prefix + "DAG_BLOCK_PH"), - trx_mgr_(std::move(trx_mgr)), - kTrxsInDagPacket(trxs_in_dag_packet) {} + trx_mgr_(std::move(trx_mgr)) {} void DagBlockPacketHandler::validatePacketRlpFormat(const threadpool::PacketData &packet_data) const { - constexpr size_t required_size_v2 = 8; constexpr size_t required_size = 2; // Only one dag block can be received - if (kTrxsInDagPacket && packet_data.rlp_.itemCount() != required_size) { + if (packet_data.rlp_.itemCount() != required_size) { throw InvalidRlpItemsCountException(packet_data.type_str_, packet_data.rlp_.itemCount(), required_size); - } else if (!kTrxsInDagPacket && packet_data.rlp_.itemCount() != required_size_v2) { - throw InvalidRlpItemsCountException(packet_data.type_str_, packet_data.rlp_.itemCount(), required_size_v2); } } @@ -69,51 +64,6 @@ void DagBlockPacketHandler::process(const threadpool::PacketData &packet_data, onNewBlockReceived(std::move(block), peer, transactions); } -void DagBlockPacketHandler::sendBlock(dev::p2p::NodeID const &peer_id, taraxa::DagBlock block, - const SharedTransactions &trxs) { - std::shared_ptr peer = peers_state_->getPeer(peer_id); - if (!peer) { - LOG(log_wr_) << "Send dag block " << block.getHash() << ". Failed to obtain peer " << peer_id; - return; - } - - // This lock prevents race condition between syncing and gossiping dag blocks - std::unique_lock lock(peer->mutex_for_sending_dag_blocks_); - - // Transactions are first sent in transactions packet before sending the block - uint32_t index = 0; - while (index < trxs.size()) { - const uint32_t trx_count_to_send = std::min(static_cast(kMaxTransactionsInPacket), trxs.size() - index); - - dev::RLPStream s(TransactionPacketHandler::kTransactionPacketItemCount); - s.appendList(trx_count_to_send); - - taraxa::bytes trx_bytes; - for (uint32_t i = index; i < index + trx_count_to_send; i++) { - auto trx_data = trxs[i]->rlp(); - s << trxs[i]->getHash(); - trx_bytes.insert(trx_bytes.end(), std::begin(trx_data), std::end(trx_data)); - } - - s.appendList(trx_count_to_send); - s.appendRaw(trx_bytes, trx_count_to_send); - sealAndSend(peer_id, TransactionPacket, std::move(s)); - - index += trx_count_to_send; - } - - if (!sealAndSend(peer_id, DagBlockPacket, block.streamRLP(true))) { - LOG(log_wr_) << "Sending DagBlock " << block.getHash() << " failed to " << peer_id; - return; - } - - // Mark data as known if sending was successful - peer->markDagBlockAsKnown(block.getHash()); - for (const auto &trx : trxs) { - peer->markTransactionAsKnown(trx->getHash()); - } -} - void DagBlockPacketHandler::sendBlockWithTransactions(dev::p2p::NodeID const &peer_id, taraxa::DagBlock block, const SharedTransactions &trxs) { std::shared_ptr peer = peers_state_->getPeer(peer_id); @@ -290,11 +240,8 @@ void DagBlockPacketHandler::onNewBlockVerified(const DagBlock &block, bool propo transactions_to_send.push_back(trx); peer_and_transactions_to_log += trx_hash.abridged(); } - if (kTrxsInDagPacket) { - sendBlockWithTransactions(peer_id, block, transactions_to_send); - } else { - sendBlock(peer_id, block, transactions_to_send); - } + + sendBlockWithTransactions(peer_id, block, transactions_to_send); peer->markDagBlockAsKnown(block_hash); } } diff --git a/libraries/core_libs/network/src/tarcap/packets_handlers/latest/transaction_packet_handler.cpp b/libraries/core_libs/network/src/tarcap/packets_handlers/latest/transaction_packet_handler.cpp index 63cb41449d..adf7178eaa 100644 --- a/libraries/core_libs/network/src/tarcap/packets_handlers/latest/transaction_packet_handler.cpp +++ b/libraries/core_libs/network/src/tarcap/packets_handlers/latest/transaction_packet_handler.cpp @@ -10,10 +10,9 @@ namespace taraxa::network::tarcap { TransactionPacketHandler::TransactionPacketHandler(const FullNodeConfig &conf, std::shared_ptr peers_state, std::shared_ptr packets_stats, std::shared_ptr trx_mgr, const addr_t &node_addr, - bool hash_gossip, const std::string &logs_prefix) + const std::string &logs_prefix) : PacketHandler(conf, std::move(peers_state), std::move(packets_stats), node_addr, logs_prefix + "TRANSACTION_PH"), - trx_mgr_(std::move(trx_mgr)), - kHashGossip(hash_gossip) {} + trx_mgr_(std::move(trx_mgr)) {} void TransactionPacketHandler::validatePacketRlpFormat(const threadpool::PacketData &packet_data) const { auto items = packet_data.rlp_.itemCount(); @@ -100,46 +99,6 @@ inline void TransactionPacketHandler::process(const threadpool::PacketData &pack } } -void TransactionPacketHandler::periodicSendTransactionsWithoutHashGossip( - std::vector &&transactions) { - std::vector>>> - peers_with_transactions_to_send; - - auto peers = peers_state_->getAllPeers(); - for (const auto &peer : peers) { - // Confirm that status messages were exchanged otherwise message might be ignored and node would - // incorrectly markTransactionAsKnown - if (!peer.second->syncing_) { - SharedTransactions peer_trxs; - for (auto const &account_trx : transactions) { - for (auto const &trx : account_trx) { - auto trx_hash = trx->getHash(); - if (peer.second->isTransactionKnown(trx_hash)) { - continue; - } - peer_trxs.push_back(trx); - if (peer_trxs.size() == kMaxTransactionsInPacket) { - peers_with_transactions_to_send.push_back({peer.first, {peer_trxs, {}}}); - peer_trxs.clear(); - }; - } - } - if (peer_trxs.size() > 0) { - peers_with_transactions_to_send.push_back({peer.first, {peer_trxs, {}}}); - } - } - } - const auto peers_to_send_count = peers_with_transactions_to_send.size(); - if (peers_to_send_count > 0) { - // Sending it in same order favours some peers over others, always start with a different position - uint32_t start_with = rand() % peers_to_send_count; - for (uint32_t i = 0; i < peers_to_send_count; i++) { - auto peer_to_send = peers_with_transactions_to_send[(start_with + i) % peers_to_send_count]; - sendTransactions(peers[peer_to_send.first], std::move(peer_to_send.second)); - } - } -} - std::pair>> TransactionPacketHandler::transactionsToSendToPeer(std::shared_ptr peer, const std::vector &transactions, @@ -216,12 +175,6 @@ TransactionPacketHandler::transactionsToSendToPeers(std::vector &&transactions) { - // Support of old v2 net version. Remove once network is fully updated - if (!kHashGossip) { - periodicSendTransactionsWithoutHashGossip(std::move(transactions)); - return; - } - auto peers_with_transactions_to_send = transactionsToSendToPeers(std::move(transactions)); const auto peers_to_send_count = peers_with_transactions_to_send.size(); if (peers_to_send_count > 0) { diff --git a/libraries/core_libs/network/src/tarcap/packets_handlers/v3/get_pbft_sync_packet_handler.cpp b/libraries/core_libs/network/src/tarcap/packets_handlers/v3/get_pbft_sync_packet_handler.cpp new file mode 100644 index 0000000000..a9cd4ec9f6 --- /dev/null +++ b/libraries/core_libs/network/src/tarcap/packets_handlers/v3/get_pbft_sync_packet_handler.cpp @@ -0,0 +1,117 @@ +#include "network/tarcap/packets_handlers/v3/get_pbft_sync_packet_handler.hpp" + +#include "network/tarcap/shared_states/pbft_syncing_state.hpp" +#include "pbft/pbft_chain.hpp" +#include "pbft/period_data.hpp" +#include "storage/storage.hpp" +#include "vote/pbft_vote.hpp" +#include "vote/votes_bundle_rlp.hpp" +#include "vote_manager/vote_manager.hpp" + +namespace taraxa::network::tarcap::v3 { + +GetPbftSyncPacketHandler::GetPbftSyncPacketHandler(const FullNodeConfig &conf, std::shared_ptr peers_state, + std::shared_ptr packets_stats, + std::shared_ptr pbft_syncing_state, + std::shared_ptr pbft_chain, + std::shared_ptr vote_mgr, std::shared_ptr db, + const addr_t &node_addr, const std::string &logs_prefix) + : PacketHandler(conf, std::move(peers_state), std::move(packets_stats), node_addr, + logs_prefix + "GET_PBFT_SYNC_PH"), + pbft_syncing_state_(std::move(pbft_syncing_state)), + pbft_chain_(std::move(pbft_chain)), + vote_mgr_(std::move(vote_mgr)), + db_(std::move(db)) {} + +void GetPbftSyncPacketHandler::validatePacketRlpFormat(const threadpool::PacketData &packet_data) const { + if (constexpr size_t required_size = 1; packet_data.rlp_.itemCount() != required_size) { + throw InvalidRlpItemsCountException(packet_data.type_str_, packet_data.rlp_.itemCount(), required_size); + } +} + +void GetPbftSyncPacketHandler::process(const threadpool::PacketData &packet_data, + const std::shared_ptr &peer) { + LOG(log_tr_) << "Received GetPbftSyncPacket Block"; + + const size_t height_to_sync = packet_data.rlp_[0].toInt(); + // Here need PBFT chain size, not synced period since synced blocks has not verified yet. + const size_t my_chain_size = pbft_chain_->getPbftChainSize(); + if (height_to_sync > my_chain_size) { + // Node update peers PBFT chain size in status packet. Should not request syncing period bigger than pbft chain size + std::ostringstream err_msg; + err_msg << "Peer " << packet_data.from_node_id_ << " request syncing period start at " << height_to_sync + << ". That's bigger than own PBFT chain size " << my_chain_size; + throw MaliciousPeerException(err_msg.str()); + } + + if (kConf.is_light_node && height_to_sync + kConf.light_node_history <= my_chain_size) { + std::ostringstream err_msg; + err_msg << "Peer " << packet_data.from_node_id_ << " request syncing period start at " << height_to_sync + << ". Light node does not have the data " << my_chain_size; + throw MaliciousPeerException(err_msg.str()); + } + + size_t blocks_to_transfer = 0; + auto pbft_chain_synced = false; + const auto total_period_data_size = my_chain_size - height_to_sync + 1; + if (total_period_data_size <= kConf.network.sync_level_size) { + blocks_to_transfer = total_period_data_size; + pbft_chain_synced = true; + } else { + blocks_to_transfer = kConf.network.sync_level_size; + } + LOG(log_tr_) << "Will send " << blocks_to_transfer << " PBFT blocks to " << packet_data.from_node_id_; + + sendPbftBlocks(peer, height_to_sync, blocks_to_transfer, pbft_chain_synced); +} + +// api for pbft syncing +void GetPbftSyncPacketHandler::sendPbftBlocks(const std::shared_ptr &peer, PbftPeriod from_period, + size_t blocks_to_transfer, bool pbft_chain_synced) { + const auto &peer_id = peer->getId(); + LOG(log_tr_) << "sendPbftBlocks: peer want to sync from pbft chain height " << from_period << ", will send at most " + << blocks_to_transfer << " pbft blocks to " << peer_id; + + for (auto block_period = from_period; block_period < from_period + blocks_to_transfer; block_period++) { + bool last_block = (block_period == from_period + blocks_to_transfer - 1); + auto data = db_->getPeriodDataRaw(block_period); + + if (data.size() == 0) { + // This can happen when switching from light node to full node setting + LOG(log_er_) << "DB corrupted. Cannot find period " << block_period << " PBFT block in db"; + return; + } + + data = PeriodData::ToOldPeriodData(data); + + dev::RLPStream s; + if (pbft_chain_synced && last_block) { + // Latest finalized block cert votes are saved in db as reward votes for new blocks + const auto reward_votes = vote_mgr_->getRewardVotes(); + assert(!reward_votes.empty()); + // It is possible that the node pushed another block to the chain in the meantime + if (reward_votes[0]->getPeriod() == block_period) { + s.appendList(3); + s << last_block; + s.appendRaw(data); + s.appendRaw(encodePbftVotesBundleRlp(reward_votes)); + } else { + s.appendList(2); + s << last_block; + s.appendRaw(data); + } + } else { + s.appendList(2); + s << last_block; + s.appendRaw(data); + } + + LOG(log_dg_) << "Sending PbftSyncPacket period " << block_period << " to " << peer_id; + sealAndSend(peer_id, SubprotocolPacketType::PbftSyncPacket, std::move(s)); + if (pbft_chain_synced && last_block) { + peer->syncing_ = false; + } + } +} + +} // namespace taraxa::network::tarcap::v3 diff --git a/libraries/core_libs/network/src/tarcap/packets_handlers/v3/pbft_sync_packet_handler.cpp b/libraries/core_libs/network/src/tarcap/packets_handlers/v3/pbft_sync_packet_handler.cpp new file mode 100644 index 0000000000..277fec6ad6 --- /dev/null +++ b/libraries/core_libs/network/src/tarcap/packets_handlers/v3/pbft_sync_packet_handler.cpp @@ -0,0 +1,296 @@ +#include "network/tarcap/packets_handlers/v3/pbft_sync_packet_handler.hpp" + +#include "network/tarcap/shared_states/pbft_syncing_state.hpp" +#include "pbft/pbft_chain.hpp" +#include "pbft/pbft_manager.hpp" +#include "transaction/transaction_manager.hpp" +#include "vote/pbft_vote.hpp" +#include "vote/votes_bundle_rlp.hpp" + +namespace taraxa::network::tarcap::v3 { + +PbftSyncPacketHandler::PbftSyncPacketHandler(const FullNodeConfig &conf, std::shared_ptr peers_state, + std::shared_ptr packets_stats, + std::shared_ptr pbft_syncing_state, + std::shared_ptr pbft_chain, + std::shared_ptr pbft_mgr, std::shared_ptr dag_mgr, + std::shared_ptr vote_mgr, std::shared_ptr db, + const addr_t &node_addr, const std::string &logs_prefix) + : ExtSyncingPacketHandler(conf, std::move(peers_state), std::move(packets_stats), std::move(pbft_syncing_state), + std::move(pbft_chain), std::move(pbft_mgr), std::move(dag_mgr), std::move(db), node_addr, + logs_prefix + "PBFT_SYNC_PH"), + vote_mgr_(std::move(vote_mgr)), + periodic_events_tp_(1, true) {} + +void PbftSyncPacketHandler::validatePacketRlpFormat(const threadpool::PacketData &packet_data) const { + if (packet_data.rlp_.itemCount() != kStandardPacketSize && packet_data.rlp_.itemCount() != kChainSyncedPacketSize) { + throw InvalidRlpItemsCountException(packet_data.type_str_, packet_data.rlp_.itemCount(), kStandardPacketSize); + } + + // PeriodData rlp parsing cannot be done through util::rlp_tuple, which automatically checks the rlp size so it is + // checked here manually + if (packet_data.rlp_[1].itemCount() != PeriodData::kBaseRlpItemCount && + packet_data.rlp_[1].itemCount() != PeriodData::kExtendedRlpItemCount) { + throw InvalidRlpItemsCountException(packet_data.type_str_ + ":PeriodData", packet_data.rlp_[1].itemCount(), + PeriodData::kBaseRlpItemCount); + } +} + +void PbftSyncPacketHandler::process(const threadpool::PacketData &packet_data, + const std::shared_ptr &peer) { + // Note: no need to consider possible race conditions due to concurrent processing as it is + // disabled on priority_queue blocking dependencies level + const auto syncing_peer = pbft_syncing_state_->syncingPeer(); + if (!syncing_peer) { + LOG(log_wr_) << "PbftSyncPacket received from unexpected peer " << packet_data.from_node_id_.abridged() + << " but there is no current syncing peer set"; + return; + } + + if (syncing_peer->getId() != packet_data.from_node_id_) { + LOG(log_wr_) << "PbftSyncPacket received from unexpected peer " << packet_data.from_node_id_.abridged() + << " current syncing peer " << syncing_peer->getId().abridged(); + return; + } + + // Process received pbft blocks + // pbft_chain_synced is the flag to indicate own PBFT chain has synced with the peer's PBFT chain + const bool pbft_chain_synced = packet_data.rlp_.itemCount() == kChainSyncedPacketSize; + // last_block is the flag to indicate this is the last block in each syncing round, doesn't mean PBFT chain has synced + const bool last_block = packet_data.rlp_[0].toInt(); + PeriodData period_data; + try { + period_data = decodePeriodData(packet_data.rlp_[1]); + } catch (const std::runtime_error &e) { + throw MaliciousPeerException("Unable to parse PeriodData: " + std::string(e.what())); + } + + std::vector> current_block_cert_votes; + if (pbft_chain_synced) { + current_block_cert_votes = decodeVotesBundle(packet_data.rlp_[2]); + } + const auto pbft_blk_hash = period_data.pbft_blk->getBlockHash(); + + std::string received_dag_blocks_str; // This is just log related stuff + for (auto const &block : period_data.dag_blocks) { + received_dag_blocks_str += block.getHash().toString() + " "; + if (peer->dag_level_ < block.getLevel()) { + peer->dag_level_ = block.getLevel(); + } + } + + const auto pbft_block_period = period_data.pbft_blk->getPeriod(); + LOG(log_dg_) << "PbftSyncPacket received. Period: " << pbft_block_period + << ", dag Blocks: " << received_dag_blocks_str << " from " << packet_data.from_node_id_; + + peer->markPbftBlockAsKnown(pbft_blk_hash); + // Update peer's pbft period if outdated + if (peer->pbft_chain_size_ < pbft_block_period) { + peer->pbft_chain_size_ = pbft_block_period; + } + + LOG(log_tr_) << "Processing pbft block: " << pbft_blk_hash; + + if (pbft_chain_->findPbftBlockInChain(pbft_blk_hash)) { + LOG(log_wr_) << "PBFT block " << pbft_blk_hash << ", period: " << period_data.pbft_blk->getPeriod() << " from " + << packet_data.from_node_id_ << " already present in chain"; + } else { + if (pbft_block_period != pbft_mgr_->pbftSyncingPeriod() + 1) { + // This can happen if we just got synced and block was cert voted + if (pbft_chain_synced && pbft_block_period == pbft_mgr_->pbftSyncingPeriod()) { + pbftSyncComplete(); + return; + } + + LOG(log_er_) << "Block " << pbft_blk_hash << " period unexpected: " << pbft_block_period + << ". Expected period: " << pbft_mgr_->pbftSyncingPeriod() + 1; + return; + } + + // Check cert vote matches if final synced block + if (pbft_chain_synced) { + for (auto const &vote : current_block_cert_votes) { + if (vote->getBlockHash() != pbft_blk_hash) { + LOG(log_er_) << "Invalid cert votes block hash " << vote->getBlockHash() << " instead of " << pbft_blk_hash + << " from peer " << packet_data.from_node_id_.abridged() << " received, stop syncing."; + handleMaliciousSyncPeer(packet_data.from_node_id_); + return; + } + } + } + + // Check votes match the hash of previous block in the queue + auto last_pbft_block_hash = pbft_mgr_->lastPbftBlockHashFromQueueOrChain(); + // Check cert vote matches + for (auto const &vote : period_data.previous_block_cert_votes) { + if (vote->getBlockHash() != last_pbft_block_hash) { + LOG(log_er_) << "Invalid cert votes block hash " << vote->getBlockHash() << " instead of " + << last_pbft_block_hash << " from peer " << packet_data.from_node_id_.abridged() + << " received, stop syncing."; + handleMaliciousSyncPeer(packet_data.from_node_id_); + return; + } + } + + if (!pbft_mgr_->validatePillarDataInPeriodData(period_data)) { + handleMaliciousSyncPeer(packet_data.from_node_id_); + return; + } + + auto order_hash = PbftManager::calculateOrderHash(period_data.dag_blocks); + if (order_hash != period_data.pbft_blk->getOrderHash()) { + { // This is just log related stuff + std::vector trx_order; + trx_order.reserve(period_data.transactions.size()); + std::vector blk_order; + blk_order.reserve(period_data.dag_blocks.size()); + for (auto t : period_data.transactions) { + trx_order.push_back(t->getHash()); + } + for (auto b : period_data.dag_blocks) { + blk_order.push_back(b.getHash()); + } + LOG(log_er_) << "Order hash incorrect in period data " << pbft_blk_hash << " expected: " << order_hash + << " received " << period_data.pbft_blk->getOrderHash() << "; Dag order: " << blk_order + << "; Trx order: " << trx_order << "; from " << packet_data.from_node_id_.abridged() + << ", stop syncing."; + } + handleMaliciousSyncPeer(packet_data.from_node_id_); + return; + } + + // This is special case when queue is empty and we can not say for sure that all votes that are part of this block + // have been verified before + if (pbft_mgr_->periodDataQueueEmpty()) { + for (const auto &v : period_data.previous_block_cert_votes) { + if (auto vote_is_valid = vote_mgr_->validateVote(v); vote_is_valid.first == false) { + LOG(log_er_) << "Invalid reward votes in block " << period_data.pbft_blk->getBlockHash() << " from peer " + << packet_data.from_node_id_.abridged() + << " received, stop syncing. Validation failed. Err: " << vote_is_valid.second; + handleMaliciousSyncPeer(packet_data.from_node_id_); + return; + } + + vote_mgr_->addVerifiedVote(v); + } + + // And now we need to replace it with verified votes + if (auto votes = vote_mgr_->checkRewardVotes(period_data.pbft_blk, true); votes.first) { + period_data.previous_block_cert_votes = std::move(votes.second); + } else { + // checkRewardVotes could fail because we just cert voted this block and moved to next period, + // in that case we are probably fully synced + if (pbft_block_period <= vote_mgr_->getRewardVotesPbftBlockPeriod()) { + pbft_syncing_state_->setPbftSyncing(false); + return; + } + + LOG(log_er_) << "Invalid reward votes in block " << period_data.pbft_blk->getBlockHash() << " from peer " + << packet_data.from_node_id_.abridged() << " received, stop syncing."; + handleMaliciousSyncPeer(packet_data.from_node_id_); + return; + } + } + + LOG(log_tr_) << "Synced PBFT block hash " << pbft_blk_hash << " with " + << period_data.previous_block_cert_votes.size() << " cert votes"; + LOG(log_tr_) << "Synced PBFT block " << period_data; + pbft_mgr_->periodDataQueuePush(std::move(period_data), packet_data.from_node_id_, + std::move(current_block_cert_votes)); + } + + auto pbft_sync_period = pbft_mgr_->pbftSyncingPeriod(); + + // Reset last sync packet received time + pbft_syncing_state_->setLastSyncPacketTime(); + + if (pbft_chain_synced) { + pbftSyncComplete(); + return; + } + + if (last_block) { + // If current sync period is actually bigger than the block we just received we are probably synced + if (pbft_sync_period > pbft_block_period) { + pbft_syncing_state_->setPbftSyncing(false); + return; + } + if (pbft_syncing_state_->isPbftSyncing()) { + if (pbft_sync_period > pbft_chain_->getPbftChainSize() + (10 * kConf.network.sync_level_size)) { + LOG(log_tr_) << "Syncing pbft blocks too fast than processing. Has synced period " << pbft_sync_period + << ", PBFT chain size " << pbft_chain_->getPbftChainSize(); + periodic_events_tp_.post(kDelayedPbftSyncDelayMs, [this] { delayedPbftSync(1); }); + } else { + if (!syncPeerPbft(pbft_sync_period + 1)) { + pbft_syncing_state_->setPbftSyncing(false); + return; + } + } + } + } +} + +PeriodData PbftSyncPacketHandler::decodePeriodData(const dev::RLP &period_data_rlp) const { + return PeriodData::FromOldPeriodData(period_data_rlp); +} + +std::vector> PbftSyncPacketHandler::decodeVotesBundle( + const dev::RLP &votes_bundle_rlp) const { + return decodePbftVotesBundleRlp(votes_bundle_rlp); +} + +void PbftSyncPacketHandler::pbftSyncComplete() { + if (pbft_mgr_->periodDataQueueSize()) { + LOG(log_tr_) << "Syncing pbft blocks faster than processing. Remaining sync size " + << pbft_mgr_->periodDataQueueSize(); + periodic_events_tp_.post(kDelayedPbftSyncDelayMs, [this] { pbftSyncComplete(); }); + } else { + LOG(log_dg_) << "Syncing PBFT is completed"; + // We are pbft synced with the node we are connected to but + // calling startSyncingPbft will check if some nodes have + // greater pbft chain size and we should continue syncing with + // them, Or sync pending DAG blocks + pbft_syncing_state_->setPbftSyncing(false); + startSyncingPbft(); + if (!pbft_syncing_state_->isPbftSyncing()) { + requestPendingDagBlocks(); + } + } +} + +void PbftSyncPacketHandler::delayedPbftSync(int counter) { + const uint32_t max_delayed_pbft_sync_count = 60000 / kDelayedPbftSyncDelayMs; + auto pbft_sync_period = pbft_mgr_->pbftSyncingPeriod(); + if (counter > max_delayed_pbft_sync_count) { + LOG(log_er_) << "Pbft blocks stuck in queue, no new block processed in 60 seconds " << pbft_sync_period << " " + << pbft_chain_->getPbftChainSize(); + pbft_syncing_state_->setPbftSyncing(false); + LOG(log_tr_) << "Syncing PBFT is stopping"; + return; + } + + if (pbft_syncing_state_->isPbftSyncing()) { + if (pbft_sync_period > pbft_chain_->getPbftChainSize() + (10 * kConf.network.sync_level_size)) { + LOG(log_tr_) << "Syncing pbft blocks faster than processing " << pbft_sync_period << " " + << pbft_chain_->getPbftChainSize(); + periodic_events_tp_.post(kDelayedPbftSyncDelayMs, [this, counter] { delayedPbftSync(counter + 1); }); + } else { + if (!syncPeerPbft(pbft_sync_period + 1)) { + pbft_syncing_state_->setPbftSyncing(false); + } + } + } +} + +void PbftSyncPacketHandler::handleMaliciousSyncPeer(const dev::p2p::NodeID &id) { + peers_state_->set_peer_malicious(id); + + if (auto host = peers_state_->host_.lock(); host) { + LOG(log_nf_) << "Disconnect peer " << id; + host->disconnect(id, dev::p2p::UserReason); + } else { + LOG(log_er_) << "Unable to handleMaliciousSyncPeer, host == nullptr"; + } +} + +} // namespace taraxa::network::tarcap::v3 diff --git a/libraries/core_libs/network/src/tarcap/taraxa_capability.cpp b/libraries/core_libs/network/src/tarcap/taraxa_capability.cpp index 568bcfbce6..0b61b58ba9 100644 --- a/libraries/core_libs/network/src/tarcap/taraxa_capability.cpp +++ b/libraries/core_libs/network/src/tarcap/taraxa_capability.cpp @@ -16,6 +16,8 @@ #include "network/tarcap/packets_handlers/latest/transaction_packet_handler.hpp" #include "network/tarcap/packets_handlers/latest/vote_packet_handler.hpp" #include "network/tarcap/packets_handlers/latest/votes_bundle_packet_handler.hpp" +#include "network/tarcap/packets_handlers/v3/get_pbft_sync_packet_handler.hpp" +#include "network/tarcap/packets_handlers/v3/pbft_sync_packet_handler.hpp" #include "network/tarcap/shared_states/pbft_syncing_state.hpp" #include "node/node.hpp" #include "pbft/pbft_chain.hpp" @@ -257,13 +259,11 @@ const TaraxaCapability::InitPacketsHandlers TaraxaCapability::kInitLatestVersion // Standard packets with mid processing priority packets_handlers->registerHandler(config, peers_state, packets_stats, pbft_syncing_state, - pbft_chain, pbft_mgr, dag_mgr, trx_mgr, db, - version > kV2NetworkVersion, node_addr, logs_prefix); + pbft_chain, pbft_mgr, dag_mgr, trx_mgr, db, node_addr, + logs_prefix); - // Support for transaition from V2 to V3, once all nodes update to V3 post next hardfork, V2 support can be - // removed packets_handlers->registerHandler(config, peers_state, packets_stats, trx_mgr, - node_addr, version > kV2NetworkVersion, logs_prefix); + node_addr, logs_prefix); // Non critical packets with low processing priority packets_handlers->registerHandler(config, peers_state, packets_stats, pbft_syncing_state, @@ -292,4 +292,57 @@ const TaraxaCapability::InitPacketsHandlers TaraxaCapability::kInitLatestVersion return packets_handlers; }; +const TaraxaCapability::InitPacketsHandlers TaraxaCapability::kInitV3Handlers = + [](const std::string &logs_prefix, const FullNodeConfig &config, const h256 &genesis_hash, + const std::shared_ptr &peers_state, const std::shared_ptr &pbft_syncing_state, + const std::shared_ptr &packets_stats, const std::shared_ptr &db, + const std::shared_ptr &pbft_mgr, const std::shared_ptr &pbft_chain, + const std::shared_ptr &vote_mgr, const std::shared_ptr &dag_mgr, + const std::shared_ptr &trx_mgr, const std::shared_ptr &slashing_manager, + const std::shared_ptr &pillar_chain_mgr, TarcapVersion version, + const addr_t &node_addr) { + auto packets_handlers = std::make_shared(); + // Consensus packets with high processing priority + packets_handlers->registerHandler(config, peers_state, packets_stats, pbft_mgr, pbft_chain, + vote_mgr, slashing_manager, node_addr, logs_prefix); + packets_handlers->registerHandler( + config, peers_state, packets_stats, pbft_mgr, pbft_chain, vote_mgr, slashing_manager, node_addr, logs_prefix); + packets_handlers->registerHandler( + config, peers_state, packets_stats, pbft_mgr, pbft_chain, vote_mgr, slashing_manager, node_addr, logs_prefix); + + // Standard packets with mid processing priority + packets_handlers->registerHandler(config, peers_state, packets_stats, pbft_syncing_state, + pbft_chain, pbft_mgr, dag_mgr, trx_mgr, db, node_addr, + logs_prefix); + + packets_handlers->registerHandler(config, peers_state, packets_stats, trx_mgr, + node_addr, logs_prefix); + + // Non critical packets with low processing priority + packets_handlers->registerHandler(config, peers_state, packets_stats, pbft_syncing_state, + pbft_chain, pbft_mgr, dag_mgr, db, genesis_hash, node_addr, + logs_prefix); + packets_handlers->registerHandler(config, peers_state, packets_stats, trx_mgr, dag_mgr, + db, node_addr, logs_prefix); + + packets_handlers->registerHandler(config, peers_state, packets_stats, pbft_syncing_state, + pbft_chain, pbft_mgr, dag_mgr, trx_mgr, db, node_addr, + logs_prefix); + + packets_handlers->registerHandler( + config, peers_state, packets_stats, pbft_syncing_state, pbft_chain, vote_mgr, db, node_addr, logs_prefix); + + packets_handlers->registerHandler(config, peers_state, packets_stats, + pbft_syncing_state, pbft_chain, pbft_mgr, dag_mgr, + vote_mgr, db, node_addr, logs_prefix); + packets_handlers->registerHandler(config, peers_state, packets_stats, pillar_chain_mgr, + node_addr, logs_prefix); + packets_handlers->registerHandler(config, peers_state, packets_stats, + pillar_chain_mgr, node_addr, logs_prefix); + packets_handlers->registerHandler(config, peers_state, packets_stats, + pillar_chain_mgr, node_addr, logs_prefix); + + return packets_handlers; + }; + } // namespace taraxa::network::tarcap diff --git a/libraries/core_libs/node/src/node.cpp b/libraries/core_libs/node/src/node.cpp index f7f854c705..0ee48d9c94 100644 --- a/libraries/core_libs/node/src/node.cpp +++ b/libraries/core_libs/node/src/node.cpp @@ -93,7 +93,6 @@ void FullNode::init() { if (conf_.db_config.fix_trx_period) { migration_manager.applyTransactionPeriod(); } - if (db_->getDagBlocksCount() == 0) { db_->setGenesisHash(conf_.genesis.genesisHash()); } diff --git a/libraries/core_libs/storage/include/storage/migration/dag_block_period_migration.hpp b/libraries/core_libs/storage/include/storage/migration/dag_block_period_migration.hpp new file mode 100644 index 0000000000..59173cf862 --- /dev/null +++ b/libraries/core_libs/storage/include/storage/migration/dag_block_period_migration.hpp @@ -0,0 +1,12 @@ +#pragma once +#include "storage/migration/migration_base.hpp" + +namespace taraxa::storage::migration { +class PeriodDataDagBlockMigration : public migration::Base { + public: + PeriodDataDagBlockMigration(std::shared_ptr db); + std::string id() override; + uint32_t dbVersion() override; + void migrate(logger::Logger& log) override; +}; +} // namespace taraxa::storage::migration \ No newline at end of file diff --git a/libraries/core_libs/storage/include/storage/storage.hpp b/libraries/core_libs/storage/include/storage/storage.hpp index 6be77acc4f..7247c562c7 100644 --- a/libraries/core_libs/storage/include/storage/storage.hpp +++ b/libraries/core_libs/storage/include/storage/storage.hpp @@ -456,6 +456,11 @@ class DbStorage : public std::enable_shared_from_this { checkStatus(batch.Put(handle(col), toSlice(k), toSlice(v))); } + template + void insert(Batch& batch, rocksdb::ColumnFamilyHandle* col, K const& k, V const& v) { + checkStatus(batch.Put(col, toSlice(k), toSlice(v))); + } + template void remove(Column const& col, K const& k) { checkStatus(db_->Delete(write_options_, handle(col), toSlice(k))); diff --git a/libraries/core_libs/storage/src/migration/dag_block_period_migration.cpp b/libraries/core_libs/storage/src/migration/dag_block_period_migration.cpp new file mode 100644 index 0000000000..2cdade2db4 --- /dev/null +++ b/libraries/core_libs/storage/src/migration/dag_block_period_migration.cpp @@ -0,0 +1,70 @@ +#include "storage/migration/dag_block_period_migration.hpp" + +#include + +#include + +#include "common/thread_pool.hpp" +#include "common/util.hpp" +#include "pbft/period_data.hpp" + +namespace taraxa::storage::migration { + +PeriodDataDagBlockMigration::PeriodDataDagBlockMigration(std::shared_ptr db) : migration::Base(db) {} + +std::string PeriodDataDagBlockMigration::id() { return "PeriodDataDagBlockMigration"; } + +uint32_t PeriodDataDagBlockMigration::dbVersion() { return 1; } + +void PeriodDataDagBlockMigration::migrate(logger::Logger& log) { + auto orig_col = DbStorage::Columns::period_data; + auto copied_col = db_->copyColumn(db_->handle(orig_col), orig_col.name() + "-copy"); + + if (copied_col == nullptr) { + LOG(log) << "Migration " << id() << " skipped: Unable to copy " << orig_col.name() << " column"; + return; + } + + auto it = db_->getColumnIterator(orig_col); + it->SeekToFirst(); + if (!it->Valid()) { + return; + } + + uint64_t start_period, end_period; + memcpy(&start_period, it->key().data(), sizeof(uint64_t)); + + it->SeekToLast(); + if (!it->Valid()) { + it->Prev(); + } + memcpy(&end_period, it->key().data(), sizeof(uint64_t)); + const auto diff = (end_period - start_period) ? (end_period - start_period) : 1; + uint64_t curr_progress = 0; + auto batch = db_->createWriteBatch(); + const size_t max_size = 500000000; + it->SeekToFirst(); + // Get and save data in new format for all blocks + for (; it->Valid(); it->Next()) { + uint64_t period; + memcpy(&period, it->key().data(), sizeof(uint64_t)); + std::string raw = it->value().ToString(); + const auto period_data_old_rlp = dev::RLP(raw); + auto period_data = ::taraxa::PeriodData::FromOldPeriodData(period_data_old_rlp); + db_->insert(batch, copied_col.get(), period, period_data.rlp()); + + if (batch.GetDataSize() > max_size) { + db_->commitWriteBatch(batch); + } + + auto percentage = (period - start_period) * 100 / diff; + if (percentage > curr_progress) { + curr_progress = percentage; + LOG(log) << "Migration " << id() << " progress " << curr_progress << "%"; + } + } + db_->commitWriteBatch(batch); + + db_->replaceColumn(orig_col, std::move(copied_col)); +} +} // namespace taraxa::storage::migration \ No newline at end of file diff --git a/libraries/core_libs/storage/src/migration/migration_manager.cpp b/libraries/core_libs/storage/src/migration/migration_manager.cpp index a7d2ea6924..b0587b1639 100644 --- a/libraries/core_libs/storage/src/migration/migration_manager.cpp +++ b/libraries/core_libs/storage/src/migration/migration_manager.cpp @@ -1,9 +1,12 @@ #include "storage/migration/migration_manager.hpp" +#include "storage/migration/dag_block_period_migration.hpp" #include "storage/migration/transaction_period.hpp" - namespace taraxa::storage::migration { -Manager::Manager(std::shared_ptr db, const addr_t& node_addr) : db_(db) { LOG_OBJECTS_CREATE("MIGRATIONS"); } +Manager::Manager(std::shared_ptr db, const addr_t& node_addr) : db_(db) { + LOG_OBJECTS_CREATE("MIGRATIONS"); + registerMigration(); +} void Manager::applyMigration(std::shared_ptr m) { if (m->isApplied()) { LOG(log_si_) << "Skip \"" << m->id() << "\" migration. It was already applied"; diff --git a/libraries/core_libs/storage/src/storage.cpp b/libraries/core_libs/storage/src/storage.cpp index 1f91d616e8..ec9a6edeac 100644 --- a/libraries/core_libs/storage/src/storage.cpp +++ b/libraries/core_libs/storage/src/storage.cpp @@ -7,6 +7,7 @@ #include #include "config/version.hpp" +#include "dag/dag_block_bundle_rlp.hpp" #include "dag/sortition_params_manager.hpp" #include "final_chain/data.hpp" #include "pillar_chain/pillar_block.hpp" @@ -150,7 +151,7 @@ std::unique_ptr DbStorage::copyColumn(rocksdb::Colu return nullptr; } - rocksdb::Checkpoint* checkpoint_raw; + rocksdb::Checkpoint* checkpoint_raw = nullptr; auto status = rocksdb::Checkpoint::Create(db_.get(), &checkpoint_raw); std::unique_ptr checkpoint(checkpoint_raw); checkStatus(status); @@ -161,7 +162,7 @@ std::unique_ptr DbStorage::copyColumn(rocksdb::Colu // Export dir should not exist before exporting the column family fs::remove_all(export_dir); - rocksdb::ExportImportFilesMetaData* metadata_raw; + rocksdb::ExportImportFilesMetaData* metadata_raw = nullptr; status = checkpoint->ExportColumnFamily(orig_column, export_dir, &metadata_raw); std::unique_ptr metadata(metadata_raw); checkStatus(status); @@ -175,7 +176,7 @@ std::unique_ptr DbStorage::copyColumn(rocksdb::Colu rocksdb::ImportColumnFamilyOptions import_options; import_options.move_files = move_data; - rocksdb::ColumnFamilyHandle* copied_column_raw; + rocksdb::ColumnFamilyHandle* copied_column_raw = nullptr; status = db_->CreateColumnFamilyWithImport(options, new_col_name, import_options, *metadata, &copied_column_raw); std::unique_ptr copied_column(copied_column_raw); checkStatus(status); @@ -439,7 +440,7 @@ std::shared_ptr DbStorage::getDagBlock(blk_hash_t const& hash) { if (period_data.size() > 0) { auto period_data_rlp = dev::RLP(period_data); auto dag_blocks_data = period_data_rlp[DAG_BLOCKS_POS_IN_PERIOD_DATA]; - return std::make_shared(dag_blocks_data[data->second]); + return decodeDAGBlockBundleRlp(data->second, dag_blocks_data); } } return nullptr; @@ -1247,9 +1248,10 @@ std::vector DbStorage::getFinalizedDagBlockHashesByPeriod(PbftPeriod std::vector ret; if (auto period_data = getPeriodDataRaw(period); period_data.size() > 0) { auto dag_blocks_data = dev::RLP(period_data)[DAG_BLOCKS_POS_IN_PERIOD_DATA]; - ret.reserve(dag_blocks_data.size()); - std::transform(dag_blocks_data.begin(), dag_blocks_data.end(), std::back_inserter(ret), - [](const auto& dag_block) { return DagBlock(dag_block).getHash(); }); + const auto dag_blocks = decodeDAGBlocksBundleRlp(dag_blocks_data); + ret.reserve(dag_blocks.size()); + std::transform(dag_blocks.begin(), dag_blocks.end(), std::back_inserter(ret), + [](const auto& dag_block) { return dag_block.getHash(); }); } return ret; @@ -1259,9 +1261,10 @@ std::vector> DbStorage::getFinalizedDagBlockByPeriod(P std::vector> ret; if (auto period_data = getPeriodDataRaw(period); period_data.size() > 0) { auto dag_blocks_data = dev::RLP(period_data)[DAG_BLOCKS_POS_IN_PERIOD_DATA]; - ret.reserve(dag_blocks_data.size()); - for (auto const block : dag_blocks_data) { - ret.emplace_back(std::make_shared(block)); + auto dag_blocks = decodeDAGBlocksBundleRlp(dag_blocks_data); + ret.reserve(dag_blocks.size()); + for (auto const block : dag_blocks) { + ret.emplace_back(std::make_shared(std::move(block))); } } return ret; @@ -1274,9 +1277,10 @@ DbStorage::getLastPbftBlockHashAndFinalizedDagBlockByPeriod(PbftPeriod period) { if (auto period_data = getPeriodDataRaw(period); period_data.size() > 0) { auto const period_data_rlp = dev::RLP(period_data); auto dag_blocks_data = period_data_rlp[DAG_BLOCKS_POS_IN_PERIOD_DATA]; - ret.reserve(dag_blocks_data.size()); - for (auto const block : dag_blocks_data) { - ret.emplace_back(std::make_shared(block)); + auto dag_blocks = decodeDAGBlocksBundleRlp(dag_blocks_data); + ret.reserve(dag_blocks.size()); + for (auto const block : dag_blocks) { + ret.emplace_back(std::make_shared(std::move(block))); } last_pbft_block_hash = period_data_rlp[PBFT_BLOCK_POS_IN_PERIOD_DATA][PREV_BLOCK_HASH_POS_IN_PBFT_BLOCK].toHash(); diff --git a/libraries/types/dag_block/CMakeLists.txt b/libraries/types/dag_block/CMakeLists.txt index c3f2271ca6..95192fed27 100644 --- a/libraries/types/dag_block/CMakeLists.txt +++ b/libraries/types/dag_block/CMakeLists.txt @@ -1,8 +1,10 @@ set(HEADERS include/dag/dag_block.hpp + include/dag/dag_block_bundle_rlp.hpp ) set(SOURCES src/dag_block.cpp + src/dag_block_bundle_rlp.cpp ) add_library(dag_block ${SOURCES} ${HEADERS}) diff --git a/libraries/types/dag_block/include/dag/dag_block.hpp b/libraries/types/dag_block/include/dag/dag_block.hpp index 91fa27fb58..376507e786 100644 --- a/libraries/types/dag_block/include/dag/dag_block.hpp +++ b/libraries/types/dag_block/include/dag/dag_block.hpp @@ -50,6 +50,7 @@ class DagBlock { explicit DagBlock(Json::Value const &doc); explicit DagBlock(string const &json); explicit DagBlock(dev::RLP const &_rlp); + explicit DagBlock(dev::RLP const &_rlp, vec_trx_t &&trxs); explicit DagBlock(dev::bytes const &_rlp) : DagBlock(dev::RLP(_rlp)) {} /** @@ -102,7 +103,7 @@ class DagBlock { bool verifySig() const; void verifyVdf(const SortitionParams &vdf_config, const h256 &proposal_period_hash, const vrf_wrapper::vrf_pk_t &pk, uint64_t vote_count, uint64_t total_vote_count) const; - bytes rlp(bool include_sig) const; + bytes rlp(bool include_sig, bool include_trxs = true) const; /** * @brief Returns dag block data rlp stream @@ -110,7 +111,7 @@ class DagBlock { * @param include_sig * @return dev::RLPStream */ - dev::RLPStream streamRLP(bool include_sig) const; + dev::RLPStream streamRLP(bool include_sig, bool include_trxs = true) const; private: blk_hash_t sha3(bool include_sig) const; diff --git a/libraries/types/dag_block/include/dag/dag_block_bundle_rlp.hpp b/libraries/types/dag_block/include/dag/dag_block_bundle_rlp.hpp new file mode 100644 index 0000000000..828399ec7d --- /dev/null +++ b/libraries/types/dag_block/include/dag/dag_block_bundle_rlp.hpp @@ -0,0 +1,44 @@ +#pragma once + +#include +#include + +#include + +namespace taraxa { + +class DagBlock; + +/** @addtogroup DAG + * @{ + */ + +constexpr static size_t kDAGBlocksBundleRlpSize{3}; + +/** + * @brief Encodes pbft blocks into optimized blocks bundle rlp + * + * @param blocks + * @return blocks bundle rlp bytes + */ +dev::bytes encodeDAGBlocksBundleRlp(const std::vector& blocks); + +/** + * @brief Decodes pbft blocks from optimized blocks bundle rlp + * + * @param blocks_bundle_rlp + * @return blocks + */ +std::vector decodeDAGBlocksBundleRlp(const dev::RLP& blocks_bundle_rlp); + +/** + * @brief Decodes single dag block from optimized blocks bundle rlp + * + * @param blocks_bundle_rlp + * @return block + */ +std::shared_ptr decodeDAGBlockBundleRlp(uint64_t index, const dev::RLP& blocks_bundle_rlp); + +/** @}*/ + +} // namespace taraxa diff --git a/libraries/types/dag_block/src/dag_block.cpp b/libraries/types/dag_block/src/dag_block.cpp index fb05d352db..5f9c0f2e57 100644 --- a/libraries/types/dag_block/src/dag_block.cpp +++ b/libraries/types/dag_block/src/dag_block.cpp @@ -75,6 +75,13 @@ DagBlock::DagBlock(dev::RLP const &rlp) { vdf_ = vdf_sortition::VdfSortition(vdf_bytes); } +DagBlock::DagBlock(dev::RLP const &rlp, vec_trx_t &&trxs) { + dev::bytes vdf_bytes; + util::rlp_tuple(util::RLPDecoderRef(rlp, true), pivot_, level_, timestamp_, vdf_bytes, tips_, sig_, gas_estimation_); + vdf_ = vdf_sortition::VdfSortition(vdf_bytes); + trxs_ = std::move(trxs); +} + level_t DagBlock::extract_dag_level_from_rlp(const dev::RLP &rlp) { return rlp[kLevelPosInRlp].toInt(); } sig_t DagBlock::extract_signature_from_rlp(const dev::RLP &rlp) { return rlp[kSigPosInRlp].toHash(); } @@ -158,17 +165,26 @@ addr_t const &DagBlock::getSender() const { return cached_sender_; } -dev::RLPStream DagBlock::streamRLP(bool include_sig) const { +dev::RLPStream DagBlock::streamRLP(bool include_sig, bool include_trxs) const { dev::RLPStream s; - constexpr auto base_field_count = 7; - s.appendList(include_sig ? base_field_count + 1 : base_field_count); + auto base_field_count = 6; + if (include_sig) { + base_field_count += 1; + } + if (include_trxs) { + base_field_count += 1; + } + + s.appendList(base_field_count); s << pivot_; s << level_; s << timestamp_; s << vdf_.rlp(); s.appendVector(tips_); - s.appendVector(trxs_); + if (include_trxs) { + s.appendVector(trxs_); + } if (include_sig) { s << sig_; } @@ -177,7 +193,9 @@ dev::RLPStream DagBlock::streamRLP(bool include_sig) const { return s; } -bytes DagBlock::rlp(bool include_sig) const { return streamRLP(include_sig).invalidate(); } +bytes DagBlock::rlp(bool include_sig, bool include_trxs) const { + return streamRLP(include_sig, include_trxs).invalidate(); +} blk_hash_t DagBlock::sha3(bool include_sig) const { return dev::sha3(rlp(include_sig)); } diff --git a/libraries/types/dag_block/src/dag_block_bundle_rlp.cpp b/libraries/types/dag_block/src/dag_block_bundle_rlp.cpp new file mode 100644 index 0000000000..af34357631 --- /dev/null +++ b/libraries/types/dag_block/src/dag_block_bundle_rlp.cpp @@ -0,0 +1,107 @@ +#include "dag/dag_block_bundle_rlp.hpp" + +#include + +#include "common/types.hpp" +#include "dag/dag_block.hpp" + +namespace taraxa { + +dev::bytes encodeDAGBlocksBundleRlp(const std::vector& blocks) { + if (blocks.empty()) { + return {}; + } + + std::unordered_map trx_hash_map; // Map to store transaction hash and its index + std::vector ordered_trx_hashes; + std::vector> indexes; + + for (const auto& block : blocks) { + std::vector idx; + idx.reserve(block.getTrxs().size()); + + for (const auto& trx : block.getTrxs()) { + if (const auto [_, ok] = trx_hash_map.try_emplace(trx, static_cast(trx_hash_map.size())); ok) { + ordered_trx_hashes.push_back(trx); // Track the insertion order + } + idx.push_back(trx_hash_map[trx]); + } + indexes.push_back(idx); + } + + dev::RLPStream blocks_bundle_rlp(kDAGBlocksBundleRlpSize); + blocks_bundle_rlp.appendList(ordered_trx_hashes.size()); + for (const auto& trx_hash : ordered_trx_hashes) { + blocks_bundle_rlp.append(trx_hash); + } + blocks_bundle_rlp.appendList(indexes.size()); + for (const auto& idx : indexes) { + blocks_bundle_rlp.appendList(idx.size()); + for (const auto& i : idx) { + blocks_bundle_rlp.append(i); + } + } + blocks_bundle_rlp.appendList(blocks.size()); + for (const auto& block : blocks) { + blocks_bundle_rlp.appendRaw(block.rlp(true, false)); + } + return blocks_bundle_rlp.invalidate(); +} + +std::vector decodeDAGBlocksBundleRlp(const dev::RLP& blocks_bundle_rlp) { + if (blocks_bundle_rlp.itemCount() != kDAGBlocksBundleRlpSize) { + return {}; + } + + std::vector ordered_trx_hashes; + std::vector> dags_trx_hashes; + + // Decode transaction hashes and + ordered_trx_hashes.reserve(blocks_bundle_rlp[0].itemCount()); + std::transform(blocks_bundle_rlp[0].begin(), blocks_bundle_rlp[0].end(), std::back_inserter(ordered_trx_hashes), + [](const auto& trx_hash_rlp) { return trx_hash_rlp.template toHash(); }); + + for (const auto& idx_rlp : blocks_bundle_rlp[1]) { + std::vector hashes; + hashes.reserve(idx_rlp.itemCount()); + std::transform(idx_rlp.begin(), idx_rlp.end(), std::back_inserter(hashes), + [&ordered_trx_hashes](const auto& i) { return ordered_trx_hashes[i.template toInt()]; }); + + dags_trx_hashes.push_back(std::move(hashes)); + } + + std::vector blocks; + blocks.reserve(blocks_bundle_rlp[2].itemCount()); + + for (size_t i = 0; i < blocks_bundle_rlp[2].itemCount(); i++) { + auto block = DagBlock(blocks_bundle_rlp[2][i], std::move(dags_trx_hashes[i])); + blocks.push_back(std::move(block)); + } + + return blocks; +} + +std::shared_ptr decodeDAGBlockBundleRlp(uint64_t index, const dev::RLP& blocks_bundle_rlp) { + if (blocks_bundle_rlp.itemCount() != kDAGBlocksBundleRlpSize) { + return {}; + } + if (index >= blocks_bundle_rlp[2].itemCount()) { + return {}; + } + + std::vector ordered_trx_hashes; + ordered_trx_hashes.reserve(blocks_bundle_rlp[0].itemCount()); + std::transform(blocks_bundle_rlp[0].begin(), blocks_bundle_rlp[0].end(), std::back_inserter(ordered_trx_hashes), + [](const auto& trx_hash_rlp) { return trx_hash_rlp.template toHash(); }); + + const auto idx_rlp = blocks_bundle_rlp[1][index]; + std::vector hashes; + hashes.reserve(idx_rlp.itemCount()); + std::transform(idx_rlp.begin(), idx_rlp.end(), std::back_inserter(hashes), + [&ordered_trx_hashes](const auto& i) { return ordered_trx_hashes[i.template toInt()]; }); + return std::make_shared(blocks_bundle_rlp[2][index], std::move(hashes)); +} + +/** @}*/ + +} // namespace taraxa diff --git a/libraries/types/pbft_block/include/pbft/period_data.hpp b/libraries/types/pbft_block/include/pbft/period_data.hpp index 5ea23e863d..432fbe5900 100644 --- a/libraries/types/pbft_block/include/pbft/period_data.hpp +++ b/libraries/types/pbft_block/include/pbft/period_data.hpp @@ -29,7 +29,10 @@ class PeriodData { const std::vector>& previous_block_cert_votes, std::optional>>&& pillar_votes = {}); explicit PeriodData(const dev::RLP& all_rlp); - explicit PeriodData(bytes const& all_rlp); + explicit PeriodData(const bytes& all_rlp); + + static PeriodData FromOldPeriodData(const dev::RLP& rlp); + static bytes ToOldPeriodData(const bytes& rlp); std::shared_ptr pbft_blk; std::vector> previous_block_cert_votes; // These votes are the cert votes of previous block diff --git a/libraries/types/pbft_block/src/period_data.cpp b/libraries/types/pbft_block/src/period_data.cpp index 47e81dc413..0ad9dcfabd 100644 --- a/libraries/types/pbft_block/src/period_data.cpp +++ b/libraries/types/pbft_block/src/period_data.cpp @@ -1,5 +1,6 @@ #include "pbft/period_data.hpp" +#include "dag/dag_block_bundle_rlp.hpp" #include "pbft/pbft_block.hpp" #include "vote/votes_bundle_rlp.hpp" @@ -23,9 +24,8 @@ PeriodData::PeriodData(const dev::RLP& rlp) { previous_block_cert_votes = decodePbftVotesBundleRlp(votes_bundle_rlp); } - for (auto const dag_block_rlp : *it++) { - dag_blocks.emplace_back(dag_block_rlp); - } + const auto block_bundle_rlp = *it++; + dag_blocks = decodeDAGBlocksBundleRlp(block_bundle_rlp); for (auto const trx_rlp : *it++) { transactions.emplace_back(std::make_shared(trx_rlp)); @@ -50,9 +50,10 @@ bytes PeriodData::rlp() const { s.append(""); } - s.appendList(dag_blocks.size()); - for (auto const& b : dag_blocks) { - s.appendRaw(b.rlp(true)); + if (dag_blocks.empty()) { + s.append(""); + } else { + s.appendRaw(encodeDAGBlocksBundleRlp(dag_blocks)); } s.appendList(transactions.size()); @@ -76,6 +77,61 @@ void PeriodData::clear() { pillar_votes_.reset(); } +PeriodData PeriodData::FromOldPeriodData(const dev::RLP& rlp) { + PeriodData period_data; + auto it = rlp.begin(); + period_data.pbft_blk = std::make_shared(*it++); + + const auto votes_bundle_rlp = *it++; + if (period_data.pbft_blk->getPeriod() > 1) [[likely]] { + period_data.previous_block_cert_votes = decodePbftVotesBundleRlp(votes_bundle_rlp); + } + + for (auto const dag_block_rlp : *it++) { + period_data.dag_blocks.emplace_back(dag_block_rlp); + } + + for (auto const trx_rlp : *it++) { + period_data.transactions.emplace_back(std::make_shared(trx_rlp)); + } + + // Pillar votes are optional data of period data since ficus hardfork + if (rlp.itemCount() == 5) { + period_data.pillar_votes_ = decodePillarVotesBundleRlp(*it); + } + return period_data; +} + +bytes PeriodData::ToOldPeriodData(const bytes& rlp) { + PeriodData period_data(rlp); + const auto kRlpSize = period_data.pillar_votes_.has_value() ? kBaseRlpItemCount + 1 : kBaseRlpItemCount; + dev::RLPStream s(kRlpSize); + s.appendRaw(period_data.pbft_blk->rlp(true)); + + if (period_data.pbft_blk->getPeriod() > 1) [[likely]] { + s.appendRaw(encodePbftVotesBundleRlp(period_data.previous_block_cert_votes)); + } else { + s.append(""); + } + + s.appendList(period_data.dag_blocks.size()); + for (auto const& b : period_data.dag_blocks) { + s.appendRaw(b.rlp(true)); + } + + s.appendList(period_data.transactions.size()); + for (auto const& t : period_data.transactions) { + s.appendRaw(t->rlp()); + } + + // Pillar votes are optional data of period data since ficus hardfork + if (period_data.pillar_votes_.has_value()) { + s.appendRaw(encodePillarVotesBundleRlp(*period_data.pillar_votes_)); + } + + return s.invalidate(); +} + std::ostream& operator<<(std::ostream& strm, PeriodData const& b) { strm << "[PeriodData] : " << b.pbft_blk << " , num of votes " << b.previous_block_cert_votes.size() << std::endl; return strm; diff --git a/submodules/taraxa-evm b/submodules/taraxa-evm index 2edc2f91df..476ac57228 160000 --- a/submodules/taraxa-evm +++ b/submodules/taraxa-evm @@ -1 +1 @@ -Subproject commit 2edc2f91df972511e1ccba6440a38efd32812ee2 +Subproject commit 476ac5722850410018f7ff6038fb9e3ef505fd2a diff --git a/tests/network_test.cpp b/tests/network_test.cpp index a60f16a6b2..fbc16bd7d6 100644 --- a/tests/network_test.cpp +++ b/tests/network_test.cpp @@ -1165,7 +1165,7 @@ TEST_F(NetworkTest, transaction_gossip_selection) { class TestTransactionPacketHandler : public network::tarcap::TransactionPacketHandler { public: TestTransactionPacketHandler(std::shared_ptr peers_state) - : TransactionPacketHandler({}, peers_state, {}, {}, {}, true) {} + : TransactionPacketHandler({}, peers_state, {}, {}, {}) {} std::vector< std::pair, std::pair>>> public_transactionsToSendToPeers(std::vector transactions) {