Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Db opt #2839

Merged
merged 17 commits into from
Sep 5, 2024
Merged

Db opt #2839

2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ set(TARAXA_PATCH_VERSION 3)
set(TARAXA_VERSION ${TARAXA_MAJOR_VERSION}.${TARAXA_MINOR_VERSION}.${TARAXA_PATCH_VERSION})

# Any time a change in the network protocol is introduced this version should be increased
set(TARAXA_NET_VERSION 3)
set(TARAXA_NET_VERSION 4)
# Major version is modified when DAG blocks, pbft blocks and any basic building blocks of our blockchain is modified
# in the db
set(TARAXA_DB_MAJOR_VERSION 1)
Expand Down
2 changes: 1 addition & 1 deletion libraries/common/include/common/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ constexpr uint32_t kMinTransactionPoolSize{30000};
constexpr uint32_t kDefaultTransactionPoolSize{200000};
constexpr uint32_t kMaxNonFinalizedTransactions{1000000};

const size_t kV2NetworkVersion = 2;
const size_t kV3NetworkVersion = 3;

const uint32_t kRecentlyFinalizedTransactionsFactor = 2;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ class DagBlockPacketHandler : public ExtSyncingPacketHandler {
std::shared_ptr<PbftSyncingState> pbft_syncing_state, std::shared_ptr<PbftChain> pbft_chain,
std::shared_ptr<PbftManager> pbft_mgr, std::shared_ptr<DagManager> dag_mgr,
std::shared_ptr<TransactionManager> trx_mgr, std::shared_ptr<DbStorage> db,
bool trxs_in_dag_packet, const addr_t &node_addr, const std::string &logs_prefix = "");
const addr_t &node_addr, const std::string &logs_prefix = "");

void sendBlock(dev::p2p::NodeID const &peer_id, DagBlock block, const SharedTransactions &trxs);
void sendBlockWithTransactions(dev::p2p::NodeID const &peer_id, DagBlock block, const SharedTransactions &trxs);
void onNewBlockReceived(DagBlock &&block, const std::shared_ptr<TaraxaPeer> &peer = nullptr,
const std::unordered_map<trx_hash_t, std::shared_ptr<Transaction>> &trxs = {});
Expand All @@ -34,7 +33,6 @@ class DagBlockPacketHandler : public ExtSyncingPacketHandler {

protected:
std::shared_ptr<TransactionManager> trx_mgr_{nullptr};
const bool kTrxsInDagPacket;
};

} // namespace taraxa::network::tarcap
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class TransactionPacketHandler : public PacketHandler {
public:
TransactionPacketHandler(const FullNodeConfig& conf, std::shared_ptr<PeersState> peers_state,
std::shared_ptr<TimePeriodPacketsStats> packets_stats,
std::shared_ptr<TransactionManager> trx_mgr, const addr_t& node_addr, bool hash_gossip,
std::shared_ptr<TransactionManager> trx_mgr, const addr_t& node_addr,
const std::string& logs_prefix = "TRANSACTION_PH");

/**
Expand Down Expand Up @@ -48,15 +48,6 @@ class TransactionPacketHandler : public PacketHandler {
virtual void process(const threadpool::PacketData& packet_data, const std::shared_ptr<TaraxaPeer>& peer) override;

protected:
/**
* @brief Sends batch of transactions to all connected peers
* @note Support of the old V2 version, remove once most of the network is updated or after a hardfork. This method is
* used as periodic event to broadcast transactions to the other peers in network
*
* @param transactions to be sent
*/
void periodicSendTransactionsWithoutHashGossip(std::vector<SharedTransactions>&& transactions);

/**
* @brief select which transactions and hashes to send to which connected peer
*
Expand All @@ -82,7 +73,6 @@ class TransactionPacketHandler : public PacketHandler {

std::atomic<uint64_t> received_trx_count_{0};
std::atomic<uint64_t> unique_received_trx_count_{0};
const bool kHashGossip = true;
};

} // namespace taraxa::network::tarcap
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

#include "../latest/common/packet_handler.hpp"

namespace taraxa {
class PbftChain;
class DbStorage;
class VoteManager;
} // namespace taraxa

namespace taraxa::network::tarcap {
class PbftSyncingState;
}

namespace taraxa::network::tarcap::v3 {
class GetPbftSyncPacketHandler : public PacketHandler {
public:
GetPbftSyncPacketHandler(const FullNodeConfig& conf, std::shared_ptr<taraxa::network::tarcap::PeersState> peers_state,
std::shared_ptr<taraxa::network::tarcap::TimePeriodPacketsStats> packets_stats,
std::shared_ptr<PbftSyncingState> pbft_syncing_state, std::shared_ptr<PbftChain> pbft_chain,
std::shared_ptr<VoteManager> vote_mgr, std::shared_ptr<DbStorage> db,
const addr_t& node_addr, const std::string& logs_prefix = "GET_PBFT_SYNC_PH");

// Packet type that is processed by this handler
static constexpr SubprotocolPacketType kPacketType_ = SubprotocolPacketType::GetPbftSyncPacket;

private:
virtual void validatePacketRlpFormat(const threadpool::PacketData& packet_data) const override;
virtual void process(const threadpool::PacketData& packet_data, const std::shared_ptr<TaraxaPeer>& peer) override;

virtual void sendPbftBlocks(const std::shared_ptr<TaraxaPeer>& peer, PbftPeriod from_period,
size_t blocks_to_transfer, bool pbft_chain_synced);

protected:
std::shared_ptr<PbftSyncingState> pbft_syncing_state_;
std::shared_ptr<PbftChain> pbft_chain_;
std::shared_ptr<VoteManager> vote_mgr_;
std::shared_ptr<DbStorage> db_;
};

} // namespace taraxa::network::tarcap::v3
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#pragma once

#include "../latest/common/ext_syncing_packet_handler.hpp"
#include "common/thread_pool.hpp"
#include "vote_manager/vote_manager.hpp"

namespace taraxa::network::tarcap::v3 {

class PbftSyncPacketHandler : public ExtSyncingPacketHandler {
public:
PbftSyncPacketHandler(const FullNodeConfig& conf, std::shared_ptr<PeersState> peers_state,
std::shared_ptr<TimePeriodPacketsStats> packets_stats,
std::shared_ptr<PbftSyncingState> pbft_syncing_state, std::shared_ptr<PbftChain> pbft_chain,
std::shared_ptr<PbftManager> pbft_mgr, std::shared_ptr<DagManager> dag_mgr,
std::shared_ptr<VoteManager> vote_mgr, std::shared_ptr<DbStorage> db, const addr_t& node_addr,
const std::string& logs_prefix = "PBFT_SYNC_PH");

void handleMaliciousSyncPeer(const dev::p2p::NodeID& id);

// Packet type that is processed by this handler
static constexpr SubprotocolPacketType kPacketType_ = SubprotocolPacketType::PbftSyncPacket;

private:
virtual void validatePacketRlpFormat(const threadpool::PacketData& packet_data) const override;
virtual void process(const threadpool::PacketData& packet_data, const std::shared_ptr<TaraxaPeer>& peer) override;

protected:
virtual PeriodData decodePeriodData(const dev::RLP& period_data_rlp) const;
virtual std::vector<std::shared_ptr<PbftVote>> decodeVotesBundle(const dev::RLP& votes_bundle_rlp) const;

void pbftSyncComplete();
void delayedPbftSync(int counter);

static constexpr uint32_t kDelayedPbftSyncDelayMs = 10;

std::shared_ptr<VoteManager> vote_mgr_;
util::ThreadPool periodic_events_tp_;

static constexpr size_t kStandardPacketSize = 2;
static constexpr size_t kChainSyncedPacketSize = 3;
};

} // namespace taraxa::network::tarcap::v3
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class TaraxaCapability final : public dev::p2p::CapabilityFace {
*/
static const InitPacketsHandlers kInitLatestVersionHandlers;

// TODO: remove this once we pass HF
static const InitPacketsHandlers kInitV3Handlers;

public:
TaraxaCapability(TarcapVersion version, const FullNodeConfig &conf, const h256 &genesis_hash,
std::weak_ptr<dev::p2p::Host> host, const dev::KeyPair &key,
Expand Down
11 changes: 6 additions & 5 deletions libraries/core_libs/network/src/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,16 @@ Network::Network(const FullNodeConfig &config, const h256 &genesis_hash, std::fi
dev::p2p::Host::CapabilitiesFactory constructCapabilities = [&](std::weak_ptr<dev::p2p::Host> host) {
assert(!host.expired());

assert(kV2NetworkVersion < TARAXA_NET_VERSION);
assert(kV3NetworkVersion < TARAXA_NET_VERSION);

dev::p2p::Host::CapabilityList capabilities;

// Register old version (V2) of taraxa capability
auto v2_tarcap = std::make_shared<network::tarcap::TaraxaCapability>(
kV2NetworkVersion, config, genesis_hash, host, key, packets_tp_, all_packets_stats_, pbft_syncing_state_, db,
pbft_mgr, pbft_chain, vote_mgr, dag_mgr, trx_mgr, slashing_manager, pillar_chain_mgr);
capabilities.emplace_back(v2_tarcap);
auto v3_tarcap = std::make_shared<network::tarcap::TaraxaCapability>(
kV3NetworkVersion, config, genesis_hash, host, key, packets_tp_, all_packets_stats_, pbft_syncing_state_, db,
pbft_mgr, pbft_chain, vote_mgr, dag_mgr, trx_mgr, slashing_manager, pillar_chain_mgr,
network::tarcap::TaraxaCapability::kInitV3Handlers);
capabilities.emplace_back(v3_tarcap);

// Register latest version of taraxa capability
auto latest_tarcap = std::make_shared<network::tarcap::TaraxaCapability>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,17 @@ DagBlockPacketHandler::DagBlockPacketHandler(const FullNodeConfig &conf, std::sh
std::shared_ptr<PbftChain> pbft_chain,
std::shared_ptr<PbftManager> pbft_mgr, std::shared_ptr<DagManager> dag_mgr,
std::shared_ptr<TransactionManager> trx_mgr, std::shared_ptr<DbStorage> db,
bool trxs_in_dag_packet, const addr_t &node_addr,
const std::string &logs_prefix)
const addr_t &node_addr, const std::string &logs_prefix)
: ExtSyncingPacketHandler(conf, std::move(peers_state), std::move(packets_stats), std::move(pbft_syncing_state),
std::move(pbft_chain), std::move(pbft_mgr), std::move(dag_mgr), std::move(db), node_addr,
logs_prefix + "DAG_BLOCK_PH"),
trx_mgr_(std::move(trx_mgr)),
kTrxsInDagPacket(trxs_in_dag_packet) {}
trx_mgr_(std::move(trx_mgr)) {}

void DagBlockPacketHandler::validatePacketRlpFormat(const threadpool::PacketData &packet_data) const {
constexpr size_t required_size_v2 = 8;
constexpr size_t required_size = 2;
// Only one dag block can be received
if (kTrxsInDagPacket && packet_data.rlp_.itemCount() != required_size) {
if (packet_data.rlp_.itemCount() != required_size) {
throw InvalidRlpItemsCountException(packet_data.type_str_, packet_data.rlp_.itemCount(), required_size);
} else if (!kTrxsInDagPacket && packet_data.rlp_.itemCount() != required_size_v2) {
throw InvalidRlpItemsCountException(packet_data.type_str_, packet_data.rlp_.itemCount(), required_size_v2);
}
}

Expand Down Expand Up @@ -69,51 +64,6 @@ void DagBlockPacketHandler::process(const threadpool::PacketData &packet_data,
onNewBlockReceived(std::move(block), peer, transactions);
}

void DagBlockPacketHandler::sendBlock(dev::p2p::NodeID const &peer_id, taraxa::DagBlock block,
const SharedTransactions &trxs) {
std::shared_ptr<TaraxaPeer> peer = peers_state_->getPeer(peer_id);
if (!peer) {
LOG(log_wr_) << "Send dag block " << block.getHash() << ". Failed to obtain peer " << peer_id;
return;
}

// This lock prevents race condition between syncing and gossiping dag blocks
std::unique_lock lock(peer->mutex_for_sending_dag_blocks_);

// Transactions are first sent in transactions packet before sending the block
uint32_t index = 0;
while (index < trxs.size()) {
const uint32_t trx_count_to_send = std::min(static_cast<size_t>(kMaxTransactionsInPacket), trxs.size() - index);

dev::RLPStream s(TransactionPacketHandler::kTransactionPacketItemCount);
s.appendList(trx_count_to_send);

taraxa::bytes trx_bytes;
for (uint32_t i = index; i < index + trx_count_to_send; i++) {
auto trx_data = trxs[i]->rlp();
s << trxs[i]->getHash();
trx_bytes.insert(trx_bytes.end(), std::begin(trx_data), std::end(trx_data));
}

s.appendList(trx_count_to_send);
s.appendRaw(trx_bytes, trx_count_to_send);
sealAndSend(peer_id, TransactionPacket, std::move(s));

index += trx_count_to_send;
}

if (!sealAndSend(peer_id, DagBlockPacket, block.streamRLP(true))) {
LOG(log_wr_) << "Sending DagBlock " << block.getHash() << " failed to " << peer_id;
return;
}

// Mark data as known if sending was successful
peer->markDagBlockAsKnown(block.getHash());
for (const auto &trx : trxs) {
peer->markTransactionAsKnown(trx->getHash());
}
}

void DagBlockPacketHandler::sendBlockWithTransactions(dev::p2p::NodeID const &peer_id, taraxa::DagBlock block,
const SharedTransactions &trxs) {
std::shared_ptr<TaraxaPeer> peer = peers_state_->getPeer(peer_id);
Expand Down Expand Up @@ -290,11 +240,8 @@ void DagBlockPacketHandler::onNewBlockVerified(const DagBlock &block, bool propo
transactions_to_send.push_back(trx);
peer_and_transactions_to_log += trx_hash.abridged();
}
if (kTrxsInDagPacket) {
sendBlockWithTransactions(peer_id, block, transactions_to_send);
} else {
sendBlock(peer_id, block, transactions_to_send);
}

sendBlockWithTransactions(peer_id, block, transactions_to_send);
peer->markDagBlockAsKnown(block_hash);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ namespace taraxa::network::tarcap {
TransactionPacketHandler::TransactionPacketHandler(const FullNodeConfig &conf, std::shared_ptr<PeersState> peers_state,
std::shared_ptr<TimePeriodPacketsStats> packets_stats,
std::shared_ptr<TransactionManager> trx_mgr, const addr_t &node_addr,
bool hash_gossip, const std::string &logs_prefix)
const std::string &logs_prefix)
: PacketHandler(conf, std::move(peers_state), std::move(packets_stats), node_addr, logs_prefix + "TRANSACTION_PH"),
trx_mgr_(std::move(trx_mgr)),
kHashGossip(hash_gossip) {}
trx_mgr_(std::move(trx_mgr)) {}

void TransactionPacketHandler::validatePacketRlpFormat(const threadpool::PacketData &packet_data) const {
auto items = packet_data.rlp_.itemCount();
Expand Down Expand Up @@ -100,46 +99,6 @@ inline void TransactionPacketHandler::process(const threadpool::PacketData &pack
}
}

void TransactionPacketHandler::periodicSendTransactionsWithoutHashGossip(
std::vector<SharedTransactions> &&transactions) {
std::vector<std::pair<dev::p2p::NodeID, std::pair<SharedTransactions, std::vector<trx_hash_t>>>>
peers_with_transactions_to_send;

auto peers = peers_state_->getAllPeers();
for (const auto &peer : peers) {
// Confirm that status messages were exchanged otherwise message might be ignored and node would
// incorrectly markTransactionAsKnown
if (!peer.second->syncing_) {
SharedTransactions peer_trxs;
for (auto const &account_trx : transactions) {
for (auto const &trx : account_trx) {
auto trx_hash = trx->getHash();
if (peer.second->isTransactionKnown(trx_hash)) {
continue;
}
peer_trxs.push_back(trx);
if (peer_trxs.size() == kMaxTransactionsInPacket) {
peers_with_transactions_to_send.push_back({peer.first, {peer_trxs, {}}});
peer_trxs.clear();
};
}
}
if (peer_trxs.size() > 0) {
peers_with_transactions_to_send.push_back({peer.first, {peer_trxs, {}}});
}
}
}
const auto peers_to_send_count = peers_with_transactions_to_send.size();
if (peers_to_send_count > 0) {
// Sending it in same order favours some peers over others, always start with a different position
uint32_t start_with = rand() % peers_to_send_count;
for (uint32_t i = 0; i < peers_to_send_count; i++) {
auto peer_to_send = peers_with_transactions_to_send[(start_with + i) % peers_to_send_count];
sendTransactions(peers[peer_to_send.first], std::move(peer_to_send.second));
}
}
}

std::pair<uint32_t, std::pair<SharedTransactions, std::vector<trx_hash_t>>>
TransactionPacketHandler::transactionsToSendToPeer(std::shared_ptr<TaraxaPeer> peer,
const std::vector<SharedTransactions> &transactions,
Expand Down Expand Up @@ -216,12 +175,6 @@ TransactionPacketHandler::transactionsToSendToPeers(std::vector<SharedTransactio
}

void TransactionPacketHandler::periodicSendTransactions(std::vector<SharedTransactions> &&transactions) {
// Support of old v2 net version. Remove once network is fully updated
if (!kHashGossip) {
periodicSendTransactionsWithoutHashGossip(std::move(transactions));
return;
}

auto peers_with_transactions_to_send = transactionsToSendToPeers(std::move(transactions));
const auto peers_to_send_count = peers_with_transactions_to_send.size();
if (peers_to_send_count > 0) {
Expand Down
Loading