Skip to content

Commit

Permalink
Merge pull request #2839 from Taraxa-project/db-opt
Browse files Browse the repository at this point in the history
Db opt
  • Loading branch information
MatusKysel authored Sep 5, 2024
2 parents 3799637 + 7303d2e commit f6ea9a0
Show file tree
Hide file tree
Showing 28 changed files with 931 additions and 165 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ set(TARAXA_PATCH_VERSION 3)
set(TARAXA_VERSION ${TARAXA_MAJOR_VERSION}.${TARAXA_MINOR_VERSION}.${TARAXA_PATCH_VERSION})

# Any time a change in the network protocol is introduced this version should be increased
set(TARAXA_NET_VERSION 3)
set(TARAXA_NET_VERSION 4)
# Major version is modified when DAG blocks, pbft blocks and any basic building blocks of our blockchain is modified
# in the db
set(TARAXA_DB_MAJOR_VERSION 1)
Expand Down
2 changes: 1 addition & 1 deletion libraries/common/include/common/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ constexpr uint32_t kMinTransactionPoolSize{30000};
constexpr uint32_t kDefaultTransactionPoolSize{200000};
constexpr uint32_t kMaxNonFinalizedTransactions{1000000};

const size_t kV2NetworkVersion = 2;
const size_t kV3NetworkVersion = 3;

const uint32_t kRecentlyFinalizedTransactionsFactor = 2;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ class DagBlockPacketHandler : public ExtSyncingPacketHandler {
std::shared_ptr<PbftSyncingState> pbft_syncing_state, std::shared_ptr<PbftChain> pbft_chain,
std::shared_ptr<PbftManager> pbft_mgr, std::shared_ptr<DagManager> dag_mgr,
std::shared_ptr<TransactionManager> trx_mgr, std::shared_ptr<DbStorage> db,
bool trxs_in_dag_packet, const addr_t &node_addr, const std::string &logs_prefix = "");
const addr_t &node_addr, const std::string &logs_prefix = "");

void sendBlock(dev::p2p::NodeID const &peer_id, DagBlock block, const SharedTransactions &trxs);
void sendBlockWithTransactions(dev::p2p::NodeID const &peer_id, DagBlock block, const SharedTransactions &trxs);
void onNewBlockReceived(DagBlock &&block, const std::shared_ptr<TaraxaPeer> &peer = nullptr,
const std::unordered_map<trx_hash_t, std::shared_ptr<Transaction>> &trxs = {});
Expand All @@ -34,7 +33,6 @@ class DagBlockPacketHandler : public ExtSyncingPacketHandler {

protected:
std::shared_ptr<TransactionManager> trx_mgr_{nullptr};
const bool kTrxsInDagPacket;
};

} // namespace taraxa::network::tarcap
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class TransactionPacketHandler : public PacketHandler {
public:
TransactionPacketHandler(const FullNodeConfig& conf, std::shared_ptr<PeersState> peers_state,
std::shared_ptr<TimePeriodPacketsStats> packets_stats,
std::shared_ptr<TransactionManager> trx_mgr, const addr_t& node_addr, bool hash_gossip,
std::shared_ptr<TransactionManager> trx_mgr, const addr_t& node_addr,
const std::string& logs_prefix = "TRANSACTION_PH");

/**
Expand Down Expand Up @@ -48,15 +48,6 @@ class TransactionPacketHandler : public PacketHandler {
virtual void process(const threadpool::PacketData& packet_data, const std::shared_ptr<TaraxaPeer>& peer) override;

protected:
/**
* @brief Sends batch of transactions to all connected peers
* @note Support of the old V2 version, remove once most of the network is updated or after a hardfork. This method is
* used as periodic event to broadcast transactions to the other peers in network
*
* @param transactions to be sent
*/
void periodicSendTransactionsWithoutHashGossip(std::vector<SharedTransactions>&& transactions);

/**
* @brief select which transactions and hashes to send to which connected peer
*
Expand All @@ -82,7 +73,6 @@ class TransactionPacketHandler : public PacketHandler {

std::atomic<uint64_t> received_trx_count_{0};
std::atomic<uint64_t> unique_received_trx_count_{0};
const bool kHashGossip = true;
};

} // namespace taraxa::network::tarcap
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

#include "../latest/common/packet_handler.hpp"

namespace taraxa {
class PbftChain;
class DbStorage;
class VoteManager;
} // namespace taraxa

namespace taraxa::network::tarcap {
class PbftSyncingState;
}

namespace taraxa::network::tarcap::v3 {
class GetPbftSyncPacketHandler : public PacketHandler {
public:
GetPbftSyncPacketHandler(const FullNodeConfig& conf, std::shared_ptr<taraxa::network::tarcap::PeersState> peers_state,
std::shared_ptr<taraxa::network::tarcap::TimePeriodPacketsStats> packets_stats,
std::shared_ptr<PbftSyncingState> pbft_syncing_state, std::shared_ptr<PbftChain> pbft_chain,
std::shared_ptr<VoteManager> vote_mgr, std::shared_ptr<DbStorage> db,
const addr_t& node_addr, const std::string& logs_prefix = "GET_PBFT_SYNC_PH");

// Packet type that is processed by this handler
static constexpr SubprotocolPacketType kPacketType_ = SubprotocolPacketType::GetPbftSyncPacket;

private:
virtual void validatePacketRlpFormat(const threadpool::PacketData& packet_data) const override;
virtual void process(const threadpool::PacketData& packet_data, const std::shared_ptr<TaraxaPeer>& peer) override;

virtual void sendPbftBlocks(const std::shared_ptr<TaraxaPeer>& peer, PbftPeriod from_period,
size_t blocks_to_transfer, bool pbft_chain_synced);

protected:
std::shared_ptr<PbftSyncingState> pbft_syncing_state_;
std::shared_ptr<PbftChain> pbft_chain_;
std::shared_ptr<VoteManager> vote_mgr_;
std::shared_ptr<DbStorage> db_;
};

} // namespace taraxa::network::tarcap::v3
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#pragma once

#include "../latest/common/ext_syncing_packet_handler.hpp"
#include "common/thread_pool.hpp"
#include "vote_manager/vote_manager.hpp"

namespace taraxa::network::tarcap::v3 {

class PbftSyncPacketHandler : public ExtSyncingPacketHandler {
public:
PbftSyncPacketHandler(const FullNodeConfig& conf, std::shared_ptr<PeersState> peers_state,
std::shared_ptr<TimePeriodPacketsStats> packets_stats,
std::shared_ptr<PbftSyncingState> pbft_syncing_state, std::shared_ptr<PbftChain> pbft_chain,
std::shared_ptr<PbftManager> pbft_mgr, std::shared_ptr<DagManager> dag_mgr,
std::shared_ptr<VoteManager> vote_mgr, std::shared_ptr<DbStorage> db, const addr_t& node_addr,
const std::string& logs_prefix = "PBFT_SYNC_PH");

void handleMaliciousSyncPeer(const dev::p2p::NodeID& id);

// Packet type that is processed by this handler
static constexpr SubprotocolPacketType kPacketType_ = SubprotocolPacketType::PbftSyncPacket;

private:
virtual void validatePacketRlpFormat(const threadpool::PacketData& packet_data) const override;
virtual void process(const threadpool::PacketData& packet_data, const std::shared_ptr<TaraxaPeer>& peer) override;

protected:
virtual PeriodData decodePeriodData(const dev::RLP& period_data_rlp) const;
virtual std::vector<std::shared_ptr<PbftVote>> decodeVotesBundle(const dev::RLP& votes_bundle_rlp) const;

void pbftSyncComplete();
void delayedPbftSync(int counter);

static constexpr uint32_t kDelayedPbftSyncDelayMs = 10;

std::shared_ptr<VoteManager> vote_mgr_;
util::ThreadPool periodic_events_tp_;

static constexpr size_t kStandardPacketSize = 2;
static constexpr size_t kChainSyncedPacketSize = 3;
};

} // namespace taraxa::network::tarcap::v3
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class TaraxaCapability final : public dev::p2p::CapabilityFace {
*/
static const InitPacketsHandlers kInitLatestVersionHandlers;

// TODO: remove this once we pass HF
static const InitPacketsHandlers kInitV3Handlers;

public:
TaraxaCapability(TarcapVersion version, const FullNodeConfig &conf, const h256 &genesis_hash,
std::weak_ptr<dev::p2p::Host> host, const dev::KeyPair &key,
Expand Down
11 changes: 6 additions & 5 deletions libraries/core_libs/network/src/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,16 @@ Network::Network(const FullNodeConfig &config, const h256 &genesis_hash, std::fi
dev::p2p::Host::CapabilitiesFactory constructCapabilities = [&](std::weak_ptr<dev::p2p::Host> host) {
assert(!host.expired());

assert(kV2NetworkVersion < TARAXA_NET_VERSION);
assert(kV3NetworkVersion < TARAXA_NET_VERSION);

dev::p2p::Host::CapabilityList capabilities;

// Register old version (V2) of taraxa capability
auto v2_tarcap = std::make_shared<network::tarcap::TaraxaCapability>(
kV2NetworkVersion, config, genesis_hash, host, key, packets_tp_, all_packets_stats_, pbft_syncing_state_, db,
pbft_mgr, pbft_chain, vote_mgr, dag_mgr, trx_mgr, slashing_manager, pillar_chain_mgr);
capabilities.emplace_back(v2_tarcap);
auto v3_tarcap = std::make_shared<network::tarcap::TaraxaCapability>(
kV3NetworkVersion, config, genesis_hash, host, key, packets_tp_, all_packets_stats_, pbft_syncing_state_, db,
pbft_mgr, pbft_chain, vote_mgr, dag_mgr, trx_mgr, slashing_manager, pillar_chain_mgr,
network::tarcap::TaraxaCapability::kInitV3Handlers);
capabilities.emplace_back(v3_tarcap);

// Register latest version of taraxa capability
auto latest_tarcap = std::make_shared<network::tarcap::TaraxaCapability>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,17 @@ DagBlockPacketHandler::DagBlockPacketHandler(const FullNodeConfig &conf, std::sh
std::shared_ptr<PbftChain> pbft_chain,
std::shared_ptr<PbftManager> pbft_mgr, std::shared_ptr<DagManager> dag_mgr,
std::shared_ptr<TransactionManager> trx_mgr, std::shared_ptr<DbStorage> db,
bool trxs_in_dag_packet, const addr_t &node_addr,
const std::string &logs_prefix)
const addr_t &node_addr, const std::string &logs_prefix)
: ExtSyncingPacketHandler(conf, std::move(peers_state), std::move(packets_stats), std::move(pbft_syncing_state),
std::move(pbft_chain), std::move(pbft_mgr), std::move(dag_mgr), std::move(db), node_addr,
logs_prefix + "DAG_BLOCK_PH"),
trx_mgr_(std::move(trx_mgr)),
kTrxsInDagPacket(trxs_in_dag_packet) {}
trx_mgr_(std::move(trx_mgr)) {}

void DagBlockPacketHandler::validatePacketRlpFormat(const threadpool::PacketData &packet_data) const {
constexpr size_t required_size_v2 = 8;
constexpr size_t required_size = 2;
// Only one dag block can be received
if (kTrxsInDagPacket && packet_data.rlp_.itemCount() != required_size) {
if (packet_data.rlp_.itemCount() != required_size) {
throw InvalidRlpItemsCountException(packet_data.type_str_, packet_data.rlp_.itemCount(), required_size);
} else if (!kTrxsInDagPacket && packet_data.rlp_.itemCount() != required_size_v2) {
throw InvalidRlpItemsCountException(packet_data.type_str_, packet_data.rlp_.itemCount(), required_size_v2);
}
}

Expand Down Expand Up @@ -69,51 +64,6 @@ void DagBlockPacketHandler::process(const threadpool::PacketData &packet_data,
onNewBlockReceived(std::move(block), peer, transactions);
}

void DagBlockPacketHandler::sendBlock(dev::p2p::NodeID const &peer_id, taraxa::DagBlock block,
const SharedTransactions &trxs) {
std::shared_ptr<TaraxaPeer> peer = peers_state_->getPeer(peer_id);
if (!peer) {
LOG(log_wr_) << "Send dag block " << block.getHash() << ". Failed to obtain peer " << peer_id;
return;
}

// This lock prevents race condition between syncing and gossiping dag blocks
std::unique_lock lock(peer->mutex_for_sending_dag_blocks_);

// Transactions are first sent in transactions packet before sending the block
uint32_t index = 0;
while (index < trxs.size()) {
const uint32_t trx_count_to_send = std::min(static_cast<size_t>(kMaxTransactionsInPacket), trxs.size() - index);

dev::RLPStream s(TransactionPacketHandler::kTransactionPacketItemCount);
s.appendList(trx_count_to_send);

taraxa::bytes trx_bytes;
for (uint32_t i = index; i < index + trx_count_to_send; i++) {
auto trx_data = trxs[i]->rlp();
s << trxs[i]->getHash();
trx_bytes.insert(trx_bytes.end(), std::begin(trx_data), std::end(trx_data));
}

s.appendList(trx_count_to_send);
s.appendRaw(trx_bytes, trx_count_to_send);
sealAndSend(peer_id, TransactionPacket, std::move(s));

index += trx_count_to_send;
}

if (!sealAndSend(peer_id, DagBlockPacket, block.streamRLP(true))) {
LOG(log_wr_) << "Sending DagBlock " << block.getHash() << " failed to " << peer_id;
return;
}

// Mark data as known if sending was successful
peer->markDagBlockAsKnown(block.getHash());
for (const auto &trx : trxs) {
peer->markTransactionAsKnown(trx->getHash());
}
}

void DagBlockPacketHandler::sendBlockWithTransactions(dev::p2p::NodeID const &peer_id, taraxa::DagBlock block,
const SharedTransactions &trxs) {
std::shared_ptr<TaraxaPeer> peer = peers_state_->getPeer(peer_id);
Expand Down Expand Up @@ -290,11 +240,8 @@ void DagBlockPacketHandler::onNewBlockVerified(const DagBlock &block, bool propo
transactions_to_send.push_back(trx);
peer_and_transactions_to_log += trx_hash.abridged();
}
if (kTrxsInDagPacket) {
sendBlockWithTransactions(peer_id, block, transactions_to_send);
} else {
sendBlock(peer_id, block, transactions_to_send);
}

sendBlockWithTransactions(peer_id, block, transactions_to_send);
peer->markDagBlockAsKnown(block_hash);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ namespace taraxa::network::tarcap {
TransactionPacketHandler::TransactionPacketHandler(const FullNodeConfig &conf, std::shared_ptr<PeersState> peers_state,
std::shared_ptr<TimePeriodPacketsStats> packets_stats,
std::shared_ptr<TransactionManager> trx_mgr, const addr_t &node_addr,
bool hash_gossip, const std::string &logs_prefix)
const std::string &logs_prefix)
: PacketHandler(conf, std::move(peers_state), std::move(packets_stats), node_addr, logs_prefix + "TRANSACTION_PH"),
trx_mgr_(std::move(trx_mgr)),
kHashGossip(hash_gossip) {}
trx_mgr_(std::move(trx_mgr)) {}

void TransactionPacketHandler::validatePacketRlpFormat(const threadpool::PacketData &packet_data) const {
auto items = packet_data.rlp_.itemCount();
Expand Down Expand Up @@ -100,46 +99,6 @@ inline void TransactionPacketHandler::process(const threadpool::PacketData &pack
}
}

void TransactionPacketHandler::periodicSendTransactionsWithoutHashGossip(
std::vector<SharedTransactions> &&transactions) {
std::vector<std::pair<dev::p2p::NodeID, std::pair<SharedTransactions, std::vector<trx_hash_t>>>>
peers_with_transactions_to_send;

auto peers = peers_state_->getAllPeers();
for (const auto &peer : peers) {
// Confirm that status messages were exchanged otherwise message might be ignored and node would
// incorrectly markTransactionAsKnown
if (!peer.second->syncing_) {
SharedTransactions peer_trxs;
for (auto const &account_trx : transactions) {
for (auto const &trx : account_trx) {
auto trx_hash = trx->getHash();
if (peer.second->isTransactionKnown(trx_hash)) {
continue;
}
peer_trxs.push_back(trx);
if (peer_trxs.size() == kMaxTransactionsInPacket) {
peers_with_transactions_to_send.push_back({peer.first, {peer_trxs, {}}});
peer_trxs.clear();
};
}
}
if (peer_trxs.size() > 0) {
peers_with_transactions_to_send.push_back({peer.first, {peer_trxs, {}}});
}
}
}
const auto peers_to_send_count = peers_with_transactions_to_send.size();
if (peers_to_send_count > 0) {
// Sending it in same order favours some peers over others, always start with a different position
uint32_t start_with = rand() % peers_to_send_count;
for (uint32_t i = 0; i < peers_to_send_count; i++) {
auto peer_to_send = peers_with_transactions_to_send[(start_with + i) % peers_to_send_count];
sendTransactions(peers[peer_to_send.first], std::move(peer_to_send.second));
}
}
}

std::pair<uint32_t, std::pair<SharedTransactions, std::vector<trx_hash_t>>>
TransactionPacketHandler::transactionsToSendToPeer(std::shared_ptr<TaraxaPeer> peer,
const std::vector<SharedTransactions> &transactions,
Expand Down Expand Up @@ -216,12 +175,6 @@ TransactionPacketHandler::transactionsToSendToPeers(std::vector<SharedTransactio
}

void TransactionPacketHandler::periodicSendTransactions(std::vector<SharedTransactions> &&transactions) {
// Support of old v2 net version. Remove once network is fully updated
if (!kHashGossip) {
periodicSendTransactionsWithoutHashGossip(std::move(transactions));
return;
}

auto peers_with_transactions_to_send = transactionsToSendToPeers(std::move(transactions));
const auto peers_to_send_count = peers_with_transactions_to_send.size();
if (peers_to_send_count > 0) {
Expand Down
Loading

0 comments on commit f6ea9a0

Please sign in to comment.