Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor periodData getter + fix votes_bundle processing #2880

Merged
merged 2 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,16 @@ SortitionParamsManager::SortitionParamsManager(const addr_t& node_addr, Sortitio
auto period = params_changes_.back().period + 1;
ignored_efficiency_counter_ = 0;
while (true) {
auto data = db_->getPeriodDataRaw(period);
if (data.size() == 0) break;
auto period_data = db_->getPeriodData(period);
if (!period_data.has_value()) {
break;
}

period++;
PeriodData period_data(data);
if (period_data.pbft_blk->getPivotDagBlockHash() != kNullBlockHash) {
if (period_data->pbft_blk->getPivotDagBlockHash() != kNullBlockHash) {
if (static_cast<int32_t>(ignored_efficiency_counter_) >=
config_.changing_interval - config_.computation_interval) {
dag_efficiencies_.push_back(calculateDagEfficiency(period_data));
dag_efficiencies_.push_back(calculateDagEfficiency(*period_data));
} else {
ignored_efficiency_counter_++;
}
Expand Down
11 changes: 5 additions & 6 deletions libraries/core_libs/consensus/src/final_chain/final_chain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,12 @@ FinalChain::FinalChain(const std::shared_ptr<DbStorage>& db, const taraxa::FullN
if (*last_blk_num != state_db_descriptor.blk_num) [[unlikely]] {
auto batch = db_->createWriteBatch();
for (auto block_n = *last_blk_num; block_n != state_db_descriptor.blk_num; --block_n) {
auto raw_period_data = db_->getPeriodDataRaw(block_n);
assert(raw_period_data.size() > 0);
auto period_data = db_->getPeriodData(block_n);
assert(period_data.has_value());

const PeriodData period_data(std::move(raw_period_data));
if (period_data.transactions.size()) {
num_executed_dag_blk_ -= period_data.dag_blocks.size();
num_executed_trx_ -= period_data.transactions.size();
if (period_data->transactions.size()) {
num_executed_dag_blk_ -= period_data->dag_blocks.size();
num_executed_trx_ -= period_data->transactions.size();
}
auto period_system_transactions = db_->getPeriodSystemTransactionsHashes(block_n);
num_executed_trx_ -= period_system_transactions.size();
Expand Down
22 changes: 10 additions & 12 deletions libraries/core_libs/consensus/src/pbft/pbft_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,25 @@ PbftManager::PbftManager(const FullNodeConfig &conf, std::shared_ptr<DbStorage>

for (auto period = final_chain_->lastBlockNumber() + 1, curr_period = pbft_chain_->getPbftChainSize();
period <= curr_period; ++period) {
auto period_raw = db_->getPeriodDataRaw(period);
if (period_raw.size() == 0) {
auto period_data = db_->getPeriodData(period);
if (!period_data.has_value()) {
LOG(log_er_) << "DB corrupted - Cannot find PBFT block in period " << period << " in PBFT chain DB pbft_blocks.";
assert(false);
}

PeriodData period_data(period_raw);
if (period_data.pbft_blk->getPeriod() != period) {
LOG(log_er_) << "DB corrupted - PBFT block hash " << period_data.pbft_blk->getBlockHash()
<< " has different period " << period_data.pbft_blk->getPeriod()
if (period_data->pbft_blk->getPeriod() != period) {
LOG(log_er_) << "DB corrupted - PBFT block hash " << period_data->pbft_blk->getBlockHash()
<< " has different period " << period_data->pbft_blk->getPeriod()
<< " in block data than in block order db: " << period;
assert(false);
}

// We need this section because votes need to be verified for reward distribution
for (const auto &v : period_data.previous_block_cert_votes) {
for (const auto &v : period_data->previous_block_cert_votes) {
vote_mgr_->validateVote(v);
}

finalize_(std::move(period_data), db_->getFinalizedDagBlockHashesByPeriod(period), period == curr_period);
finalize_(std::move(*period_data), db_->getFinalizedDagBlockHashesByPeriod(period), period == curr_period);
}

PbftPeriod start_period = 1;
Expand All @@ -72,13 +71,12 @@ PbftManager::PbftManager(const FullNodeConfig &conf, std::shared_ptr<DbStorage>
start_period = pbft_chain_->getPbftChainSize() - recently_finalized_transactions_periods;
}
for (PbftPeriod period = start_period; period <= pbft_chain_->getPbftChainSize(); period++) {
auto period_raw = db_->getPeriodDataRaw(period);
if (period_raw.size() == 0) {
auto period_data = db_->getPeriodData(period);
if (!period_data.has_value()) {
LOG(log_er_) << "DB corrupted - Cannot find PBFT block in period " << period << " in PBFT chain DB pbft_blocks.";
assert(false);
}
PeriodData period_data(period_raw);
trx_mgr_->initializeRecentlyFinalizedTransactions(period_data);
trx_mgr_->initializeRecentlyFinalizedTransactions(*period_data);
}

// Initialize PBFT status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ namespace taraxa::network::tarcap {
struct PbftSyncPacket {
bool last_block;
PeriodData period_data;
// TODO: should it be optional ???
// TODO[2870]: optimize rlp size (use custom class), see encodePbftVotesBundleRlp
std::vector<std::shared_ptr<PbftVote>> current_block_cert_votes;
std::optional<OptimizedPbftVotesBundle> current_block_cert_votes_bundle;

RLP_FIELDS_DEFINE_INPLACE(last_block, period_data, current_block_cert_votes)
RLP_FIELDS_DEFINE_INPLACE(last_block, period_data, current_block_cert_votes_bundle)
};

} // namespace taraxa::network::tarcap
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@
namespace taraxa::network::tarcap {

struct PillarVotesBundlePacket {
std::vector<std::shared_ptr<PillarVote>> pillar_votes;
OptimizedPillarVotesBundle pillar_votes_bundle;

void rlp(::taraxa::util::RLPDecoderRef encoding) { pillar_votes = decodePillarVotesBundleRlp(encoding.value); }
void rlp(::taraxa::util::RLPEncoderRef encoding) const {
encoding.appendRaw(encodePillarVotesBundleRlp(pillar_votes));
}
RLP_FIELDS_DEFINE_INPLACE(pillar_votes_bundle)
};

} // namespace taraxa::network::tarcap
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
namespace taraxa::network::tarcap {

struct VotesBundlePacket {
std::vector<std::shared_ptr<PbftVote>> votes;
OptimizedPbftVotesBundle votes_bundle;

void rlp(::taraxa::util::RLPDecoderRef encoding) { votes = decodePbftVotesBundleRlp(encoding.value); }
void rlp(::taraxa::util::RLPEncoderRef encoding) const { encoding.appendRaw(encodePbftVotesBundleRlp(votes)); }
RLP_FIELDS_DEFINE_INPLACE(votes_bundle)
};

} // namespace taraxa::network::tarcap
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ class ExtVotesPacketHandler : public PacketHandler<PacketType> {
auto sendVotes = [this, &peer](std::vector<std::shared_ptr<PbftVote>>&& votes) {
// TODO[2868]: optimize this
auto votes_copy = votes;
if (this->sealAndSend(peer->getId(), SubprotocolPacketType::kVotesBundlePacket,
encodePacketRlp(VotesBundlePacket{std::move(votes_copy)}))) {
if (this->sealAndSend(
peer->getId(), SubprotocolPacketType::kVotesBundlePacket,
encodePacketRlp(VotesBundlePacket{OptimizedPbftVotesBundle{.votes = std::move(votes_copy)}}))) {
LOG(this->log_dg_) << " Votes bundle with " << votes.size() << " votes sent to " << peer->getId();
for (const auto& vote : votes) {
peer->markPbftVoteAsKnown(vote->getHash());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,13 @@ void GetPbftSyncPacketHandler::sendPbftBlocks(const std::shared_ptr<TaraxaPeer>

for (auto block_period = from_period; block_period < from_period + blocks_to_transfer; block_period++) {
bool last_block = (block_period == from_period + blocks_to_transfer - 1);
auto data = db_->getPeriodDataRaw(block_period);

if (data.size() == 0) {
auto period_data = db_->getPeriodData(block_period);
if (!period_data.has_value()) {
// This can happen when switching from light node to full node setting
LOG(log_er_) << "DB corrupted. Cannot find period " << block_period << " PBFT block in db";
return;
}

PeriodData period_data(std::move(data));
std::shared_ptr<PbftSyncPacket> pbft_sync_packet;

if (pbft_chain_synced && last_block) {
Expand All @@ -84,13 +82,13 @@ void GetPbftSyncPacketHandler::sendPbftBlocks(const std::shared_ptr<TaraxaPeer>
// It is possible that the node pushed another block to the chain in the meantime
if (reward_votes[0]->getPeriod() == block_period) {
// TODO[2870]: use custom votes bundle class instead of vector
pbft_sync_packet =
std::make_shared<PbftSyncPacket>(last_block, std::move(period_data), std::move(reward_votes));
pbft_sync_packet = std::make_shared<PbftSyncPacket>(last_block, std::move(*period_data),
OptimizedPbftVotesBundle{std::move(reward_votes)});
} else {
pbft_sync_packet = std::make_shared<PbftSyncPacket>(last_block, std::move(period_data));
pbft_sync_packet = std::make_shared<PbftSyncPacket>(last_block, std::move(*period_data));
}
} else {
pbft_sync_packet = std::make_shared<PbftSyncPacket>(last_block, std::move(period_data));
pbft_sync_packet = std::make_shared<PbftSyncPacket>(last_block, std::move(*period_data));
}

LOG(log_dg_) << "Sending PbftSyncPacket period " << block_period << " to " << peer_id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void GetPillarVotesBundlePacketHandler::process(GetPillarVotesBundlePacket &&pac
for (size_t i = 0; i < chunk_size; ++i) {
pillar_votes.emplace_back(votes[votes_sent + i]);
}
PillarVotesBundlePacket pillar_votes_bundle_packet(std::move(pillar_votes));
PillarVotesBundlePacket pillar_votes_bundle_packet(OptimizedPillarVotesBundle{std::move(pillar_votes)});

// Seal and send the chunk to the peer
if (sealAndSend(peer->getId(), SubprotocolPacketType::kPillarVotesBundlePacket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void PbftSyncPacketHandler::process(PbftSyncPacket &&packet, const std::shared_p

// Process received pbft blocks
// pbft_chain_synced is the flag to indicate own PBFT chain has synced with the peer's PBFT chain
const bool pbft_chain_synced = packet.current_block_cert_votes.size() > 0;
const bool pbft_chain_synced = packet.current_block_cert_votes_bundle.has_value();
const auto pbft_blk_hash = packet.period_data.pbft_blk->getBlockHash();

std::string received_dag_blocks_str; // This is just log related stuff
Expand Down Expand Up @@ -81,7 +81,7 @@ void PbftSyncPacketHandler::process(PbftSyncPacket &&packet, const std::shared_p

// Check cert vote matches if final synced block
if (pbft_chain_synced) {
for (auto const &vote : packet.current_block_cert_votes) {
for (auto const &vote : packet.current_block_cert_votes_bundle->votes) {
if (vote->getBlockHash() != pbft_blk_hash) {
LOG(log_er_) << "Invalid cert votes block hash " << vote->getBlockHash() << " instead of " << pbft_blk_hash
<< " from peer " << peer->getId().abridged() << " received, stop syncing.";
Expand Down Expand Up @@ -165,8 +165,11 @@ void PbftSyncPacketHandler::process(PbftSyncPacket &&packet, const std::shared_p
LOG(log_tr_) << "Synced PBFT block hash " << pbft_blk_hash << " with "
<< packet.period_data.previous_block_cert_votes.size() << " cert votes";
LOG(log_tr_) << "Synced PBFT block " << packet.period_data;
pbft_mgr_->periodDataQueuePush(std::move(packet.period_data), peer->getId(),
std::move(packet.current_block_cert_votes));
std::vector<std::shared_ptr<PbftVote>> current_block_cert_votes;
if (pbft_chain_synced) {
current_block_cert_votes = std::move(packet.current_block_cert_votes_bundle->votes);
}
pbft_mgr_->periodDataQueuePush(std::move(packet.period_data), peer->getId(), std::move(current_block_cert_votes));
}

auto pbft_sync_period = pbft_mgr_->pbftSyncingPeriod();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ PillarVotesBundlePacketHandler::PillarVotesBundlePacketHandler(

void PillarVotesBundlePacketHandler::process(PillarVotesBundlePacket &&packet,
const std::shared_ptr<TaraxaPeer> &peer) {
if (packet.pillar_votes.size() == 0 || packet.pillar_votes.size() > kMaxPillarVotesInBundleRlp) {
throw InvalidRlpItemsCountException("PillarVotesBundlePacket", packet.pillar_votes.size(),
if (packet.pillar_votes_bundle.pillar_votes.size() == 0 ||
packet.pillar_votes_bundle.pillar_votes.size() > kMaxPillarVotesInBundleRlp) {
throw InvalidRlpItemsCountException("PillarVotesBundlePacket", packet.pillar_votes_bundle.pillar_votes.size(),
kMaxPillarVotesInBundleRlp);
}

// TODO[2744]: there could be the same protection as in pbft syncing that only requested bundle packet is accepted
LOG(log_dg_) << "PillarVotesBundlePacket received from peer " << peer->getId();

for (const auto &pillar_vote : packet.pillar_votes) {
for (const auto &pillar_vote : packet.pillar_votes_bundle.pillar_votes) {
if (!kConf.genesis.state.hardforks.ficus_hf.isFicusHardfork(pillar_vote->getPeriod())) {
std::ostringstream err_msg;
err_msg << "Synced pillar vote " << pillar_vote->getHash() << ", period " << pillar_vote->getPeriod()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ VotesBundlePacketHandler::VotesBundlePacketHandler(const FullNodeConfig &conf, s
logs_prefix + "VOTES_BUNDLE_PH") {}

void VotesBundlePacketHandler::process(VotesBundlePacket &&packet, const std::shared_ptr<TaraxaPeer> &peer) {
if (packet.votes.size() == 0 || packet.votes.size() > kMaxVotesInBundleRlp) {
throw InvalidRlpItemsCountException("VotesBundlePacket", packet.votes.size(), kMaxVotesInBundleRlp);
if (packet.votes_bundle.votes.size() == 0 || packet.votes_bundle.votes.size() > kMaxVotesInBundleRlp) {
throw InvalidRlpItemsCountException("VotesBundlePacket", packet.votes_bundle.votes.size(), kMaxVotesInBundleRlp);
}

const auto [current_pbft_round, current_pbft_period] = pbft_mgr_->getPbftRoundAndPeriod();

const auto &reference_vote = packet.votes.front();
const auto &reference_vote = packet.votes_bundle.votes.front();
const auto votes_bundle_votes_type = reference_vote->getType();

// Votes sync bundles are allowed to cotain only votes bundles of the same type, period, round and step so if first
// vote is irrelevant, all of them are
if (!isPbftRelevantVote(packet.votes[0])) {
if (!isPbftRelevantVote(packet.votes_bundle.votes[0])) {
LOG(log_wr_) << "Drop votes sync bundle as it is irrelevant for current pbft state. Votes (period, round, step) = ("
<< reference_vote->getPeriod() << ", " << reference_vote->getRound() << ", "
<< reference_vote->getStep() << "). Current PBFT (period, round, step) = (" << current_pbft_period
Expand All @@ -53,7 +53,7 @@ void VotesBundlePacketHandler::process(VotesBundlePacket &&packet, const std::sh
}

size_t processed_votes_count = 0;
for (const auto &vote : packet.votes) {
for (const auto &vote : packet.votes_bundle.votes) {
peer->markPbftVoteAsKnown(vote->getHash());

// Do not process vote that has already been validated
Expand All @@ -71,11 +71,11 @@ void VotesBundlePacketHandler::process(VotesBundlePacket &&packet, const std::sh
processed_votes_count++;
}

LOG(log_nf_) << "Received " << packet.votes.size() << " (processed " << processed_votes_count
LOG(log_nf_) << "Received " << packet.votes_bundle.votes.size() << " (processed " << processed_votes_count
<< " ) sync votes from peer " << peer->getId() << " node current round " << current_pbft_round
<< ", peer pbft round " << reference_vote->getRound();

onNewPbftVotesBundle(packet.votes, false, peer->getId());
onNewPbftVotesBundle(packet.votes_bundle.votes, false, peer->getId());
}

void VotesBundlePacketHandler::onNewPbftVotesBundle(const std::vector<std::shared_ptr<PbftVote>> &votes,
Expand Down
2 changes: 1 addition & 1 deletion libraries/core_libs/storage/include/storage/storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ class DbStorage : public std::enable_shared_from_this<DbStorage> {
// Period data
void savePeriodData(const PeriodData& period_data, Batch& write_batch);
void clearPeriodDataHistory(PbftPeriod period, uint64_t dag_level_to_keep);
// TODO[2868]: return PeriodData instead of bytes
dev::bytes getPeriodDataRaw(PbftPeriod period) const;
std::optional<PeriodData> getPeriodData(PbftPeriod period) const;
std::optional<PbftBlock> getPbftBlock(PbftPeriod period) const;
std::vector<std::shared_ptr<PbftVote>> getPeriodCertVotes(PbftPeriod period) const;
blk_hash_t getPeriodBlockHash(PbftPeriod period) const;
Expand Down
9 changes: 9 additions & 0 deletions libraries/core_libs/storage/src/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,15 @@ dev::bytes DbStorage::getPeriodDataRaw(PbftPeriod period) const {
return asBytes(lookup(toSlice(period), Columns::period_data));
}

std::optional<PeriodData> DbStorage::getPeriodData(PbftPeriod period) const {
auto period_data_bytes = getPeriodDataRaw(period);
if (period_data_bytes.empty()) {
return {};
}

return PeriodData{std::move(period_data_bytes)};
}

void DbStorage::savePillarBlock(const std::shared_ptr<pillar_chain::PillarBlock>& pillar_block) {
insert(Columns::pillar_block, pillar_block->getPeriod(), pillar_block->getRlp());
}
Expand Down
15 changes: 15 additions & 0 deletions libraries/types/vote/include/vote/votes_bundle_rlp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

#include <vector>

#include "common/encoding_rlp.hpp"

namespace taraxa {

class PbftVote;
Expand All @@ -14,6 +16,7 @@ class PillarVote;
* @{
*/

// TOOD[2865]: move to cpp file
constexpr static size_t kPbftVotesBundleRlpSize{5};

/**
Expand All @@ -32,6 +35,12 @@ dev::bytes encodePbftVotesBundleRlp(const std::vector<std::shared_ptr<PbftVote>>
*/
std::vector<std::shared_ptr<PbftVote>> decodePbftVotesBundleRlp(const dev::RLP& votes_bundle_rlp);

struct OptimizedPbftVotesBundle {
std::vector<std::shared_ptr<PbftVote>> votes;

HAS_RLP_FIELDS
};

constexpr static size_t kPillarVotesBundleRlpSize{3};

/**
Expand All @@ -50,6 +59,12 @@ dev::bytes encodePillarVotesBundleRlp(const std::vector<std::shared_ptr<PillarVo
*/
std::vector<std::shared_ptr<PillarVote>> decodePillarVotesBundleRlp(const dev::RLP& votes_bundle_rlp);

struct OptimizedPillarVotesBundle {
std::vector<std::shared_ptr<PillarVote>> pillar_votes;

HAS_RLP_FIELDS
};

/** @}*/

} // namespace taraxa
14 changes: 14 additions & 0 deletions libraries/types/vote/src/votes_bundle_rlp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ std::vector<std::shared_ptr<PbftVote>> decodePbftVotesBundleRlp(const dev::RLP&
return votes;
}

void OptimizedPbftVotesBundle::rlp(::taraxa::util::RLPDecoderRef encoding) {
votes = decodePbftVotesBundleRlp(encoding.value);
}
void OptimizedPbftVotesBundle::rlp(::taraxa::util::RLPEncoderRef encoding) const {
encoding.appendRaw(encodePbftVotesBundleRlp(votes));
}

dev::bytes encodePillarVotesBundleRlp(const std::vector<std::shared_ptr<PillarVote>>& votes) {
if (votes.empty()) {
assert(false);
Expand Down Expand Up @@ -89,4 +96,11 @@ std::vector<std::shared_ptr<PillarVote>> decodePillarVotesBundleRlp(const dev::R
return votes;
}

void OptimizedPillarVotesBundle::rlp(::taraxa::util::RLPDecoderRef encoding) {
pillar_votes = decodePillarVotesBundleRlp(encoding.value);
}
void OptimizedPillarVotesBundle::rlp(::taraxa::util::RLPEncoderRef encoding) const {
encoding.appendRaw(encodePillarVotesBundleRlp(pillar_votes));
}

} // namespace taraxa
7 changes: 3 additions & 4 deletions tests/pbft_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -663,10 +663,9 @@ TEST_F(PbftManagerWithDagCreation, produce_overweighted_block) {

// verify that last block is overweighted, but it is in chain
const auto period = node->getFinalChain()->lastBlockNumber();
auto period_raw = node->getDB()->getPeriodDataRaw(period);
ASSERT_FALSE(period_raw.empty());
PeriodData period_data(period_raw);
EXPECT_FALSE(node->getPbftManager()->checkBlockWeight(period_data.dag_blocks));
auto period_data = node->getDB()->getPeriodData(period);
ASSERT_TRUE(period_data.has_value());
EXPECT_FALSE(node->getPbftManager()->checkBlockWeight(period_data->dag_blocks));
}

TEST_F(PbftManagerWithDagCreation, proposed_blocks) {
Expand Down
Loading