Skip to content

Commit

Permalink
Merge pull request #2880 from Taraxa-project/fix_votes_bundle
Browse files Browse the repository at this point in the history
refactor periodData getter + fix votes_bundle processing
  • Loading branch information
JakubFornadel authored Nov 5, 2024
2 parents c742907 + 61ccc0d commit d0b1dd8
Show file tree
Hide file tree
Showing 17 changed files with 98 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,16 @@ SortitionParamsManager::SortitionParamsManager(const addr_t& node_addr, Sortitio
auto period = params_changes_.back().period + 1;
ignored_efficiency_counter_ = 0;
while (true) {
auto data = db_->getPeriodDataRaw(period);
if (data.size() == 0) break;
auto period_data = db_->getPeriodData(period);
if (!period_data.has_value()) {
break;
}

period++;
PeriodData period_data(data);
if (period_data.pbft_blk->getPivotDagBlockHash() != kNullBlockHash) {
if (period_data->pbft_blk->getPivotDagBlockHash() != kNullBlockHash) {
if (static_cast<int32_t>(ignored_efficiency_counter_) >=
config_.changing_interval - config_.computation_interval) {
dag_efficiencies_.push_back(calculateDagEfficiency(period_data));
dag_efficiencies_.push_back(calculateDagEfficiency(*period_data));
} else {
ignored_efficiency_counter_++;
}
Expand Down
11 changes: 5 additions & 6 deletions libraries/core_libs/consensus/src/final_chain/final_chain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,12 @@ FinalChain::FinalChain(const std::shared_ptr<DbStorage>& db, const taraxa::FullN
if (*last_blk_num != state_db_descriptor.blk_num) [[unlikely]] {
auto batch = db_->createWriteBatch();
for (auto block_n = *last_blk_num; block_n != state_db_descriptor.blk_num; --block_n) {
auto raw_period_data = db_->getPeriodDataRaw(block_n);
assert(raw_period_data.size() > 0);
auto period_data = db_->getPeriodData(block_n);
assert(period_data.has_value());

const PeriodData period_data(std::move(raw_period_data));
if (period_data.transactions.size()) {
num_executed_dag_blk_ -= period_data.dag_blocks.size();
num_executed_trx_ -= period_data.transactions.size();
if (period_data->transactions.size()) {
num_executed_dag_blk_ -= period_data->dag_blocks.size();
num_executed_trx_ -= period_data->transactions.size();
}
auto period_system_transactions = db_->getPeriodSystemTransactionsHashes(block_n);
num_executed_trx_ -= period_system_transactions.size();
Expand Down
22 changes: 10 additions & 12 deletions libraries/core_libs/consensus/src/pbft/pbft_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,25 @@ PbftManager::PbftManager(const FullNodeConfig &conf, std::shared_ptr<DbStorage>

for (auto period = final_chain_->lastBlockNumber() + 1, curr_period = pbft_chain_->getPbftChainSize();
period <= curr_period; ++period) {
auto period_raw = db_->getPeriodDataRaw(period);
if (period_raw.size() == 0) {
auto period_data = db_->getPeriodData(period);
if (!period_data.has_value()) {
LOG(log_er_) << "DB corrupted - Cannot find PBFT block in period " << period << " in PBFT chain DB pbft_blocks.";
assert(false);
}

PeriodData period_data(period_raw);
if (period_data.pbft_blk->getPeriod() != period) {
LOG(log_er_) << "DB corrupted - PBFT block hash " << period_data.pbft_blk->getBlockHash()
<< " has different period " << period_data.pbft_blk->getPeriod()
if (period_data->pbft_blk->getPeriod() != period) {
LOG(log_er_) << "DB corrupted - PBFT block hash " << period_data->pbft_blk->getBlockHash()
<< " has different period " << period_data->pbft_blk->getPeriod()
<< " in block data than in block order db: " << period;
assert(false);
}

// We need this section because votes need to be verified for reward distribution
for (const auto &v : period_data.previous_block_cert_votes) {
for (const auto &v : period_data->previous_block_cert_votes) {
vote_mgr_->validateVote(v);
}

finalize_(std::move(period_data), db_->getFinalizedDagBlockHashesByPeriod(period), period == curr_period);
finalize_(std::move(*period_data), db_->getFinalizedDagBlockHashesByPeriod(period), period == curr_period);
}

PbftPeriod start_period = 1;
Expand All @@ -72,13 +71,12 @@ PbftManager::PbftManager(const FullNodeConfig &conf, std::shared_ptr<DbStorage>
start_period = pbft_chain_->getPbftChainSize() - recently_finalized_transactions_periods;
}
for (PbftPeriod period = start_period; period <= pbft_chain_->getPbftChainSize(); period++) {
auto period_raw = db_->getPeriodDataRaw(period);
if (period_raw.size() == 0) {
auto period_data = db_->getPeriodData(period);
if (!period_data.has_value()) {
LOG(log_er_) << "DB corrupted - Cannot find PBFT block in period " << period << " in PBFT chain DB pbft_blocks.";
assert(false);
}
PeriodData period_data(period_raw);
trx_mgr_->initializeRecentlyFinalizedTransactions(period_data);
trx_mgr_->initializeRecentlyFinalizedTransactions(*period_data);
}

// Initialize PBFT status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ namespace taraxa::network::tarcap {
struct PbftSyncPacket {
bool last_block;
PeriodData period_data;
// TODO: should it be optional ???
// TODO[2870]: optimize rlp size (use custom class), see encodePbftVotesBundleRlp
std::vector<std::shared_ptr<PbftVote>> current_block_cert_votes;
std::optional<OptimizedPbftVotesBundle> current_block_cert_votes_bundle;

RLP_FIELDS_DEFINE_INPLACE(last_block, period_data, current_block_cert_votes)
RLP_FIELDS_DEFINE_INPLACE(last_block, period_data, current_block_cert_votes_bundle)
};

} // namespace taraxa::network::tarcap
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@
namespace taraxa::network::tarcap {

struct PillarVotesBundlePacket {
std::vector<std::shared_ptr<PillarVote>> pillar_votes;
OptimizedPillarVotesBundle pillar_votes_bundle;

void rlp(::taraxa::util::RLPDecoderRef encoding) { pillar_votes = decodePillarVotesBundleRlp(encoding.value); }
void rlp(::taraxa::util::RLPEncoderRef encoding) const {
encoding.appendRaw(encodePillarVotesBundleRlp(pillar_votes));
}
RLP_FIELDS_DEFINE_INPLACE(pillar_votes_bundle)
};

} // namespace taraxa::network::tarcap
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
namespace taraxa::network::tarcap {

struct VotesBundlePacket {
std::vector<std::shared_ptr<PbftVote>> votes;
OptimizedPbftVotesBundle votes_bundle;

void rlp(::taraxa::util::RLPDecoderRef encoding) { votes = decodePbftVotesBundleRlp(encoding.value); }
void rlp(::taraxa::util::RLPEncoderRef encoding) const { encoding.appendRaw(encodePbftVotesBundleRlp(votes)); }
RLP_FIELDS_DEFINE_INPLACE(votes_bundle)
};

} // namespace taraxa::network::tarcap
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ class ExtVotesPacketHandler : public PacketHandler<PacketType> {
auto sendVotes = [this, &peer](std::vector<std::shared_ptr<PbftVote>>&& votes) {
// TODO[2868]: optimize this
auto votes_copy = votes;
if (this->sealAndSend(peer->getId(), SubprotocolPacketType::kVotesBundlePacket,
encodePacketRlp(VotesBundlePacket{std::move(votes_copy)}))) {
if (this->sealAndSend(
peer->getId(), SubprotocolPacketType::kVotesBundlePacket,
encodePacketRlp(VotesBundlePacket{OptimizedPbftVotesBundle{.votes = std::move(votes_copy)}}))) {
LOG(this->log_dg_) << " Votes bundle with " << votes.size() << " votes sent to " << peer->getId();
for (const auto& vote : votes) {
peer->markPbftVoteAsKnown(vote->getHash());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,13 @@ void GetPbftSyncPacketHandler::sendPbftBlocks(const std::shared_ptr<TaraxaPeer>

for (auto block_period = from_period; block_period < from_period + blocks_to_transfer; block_period++) {
bool last_block = (block_period == from_period + blocks_to_transfer - 1);
auto data = db_->getPeriodDataRaw(block_period);

if (data.size() == 0) {
auto period_data = db_->getPeriodData(block_period);
if (!period_data.has_value()) {
// This can happen when switching from light node to full node setting
LOG(log_er_) << "DB corrupted. Cannot find period " << block_period << " PBFT block in db";
return;
}

PeriodData period_data(std::move(data));
std::shared_ptr<PbftSyncPacket> pbft_sync_packet;

if (pbft_chain_synced && last_block) {
Expand All @@ -84,13 +82,13 @@ void GetPbftSyncPacketHandler::sendPbftBlocks(const std::shared_ptr<TaraxaPeer>
// It is possible that the node pushed another block to the chain in the meantime
if (reward_votes[0]->getPeriod() == block_period) {
// TODO[2870]: use custom votes bundle class instead of vector
pbft_sync_packet =
std::make_shared<PbftSyncPacket>(last_block, std::move(period_data), std::move(reward_votes));
pbft_sync_packet = std::make_shared<PbftSyncPacket>(last_block, std::move(*period_data),
OptimizedPbftVotesBundle{std::move(reward_votes)});
} else {
pbft_sync_packet = std::make_shared<PbftSyncPacket>(last_block, std::move(period_data));
pbft_sync_packet = std::make_shared<PbftSyncPacket>(last_block, std::move(*period_data));
}
} else {
pbft_sync_packet = std::make_shared<PbftSyncPacket>(last_block, std::move(period_data));
pbft_sync_packet = std::make_shared<PbftSyncPacket>(last_block, std::move(*period_data));
}

LOG(log_dg_) << "Sending PbftSyncPacket period " << block_period << " to " << peer_id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void GetPillarVotesBundlePacketHandler::process(GetPillarVotesBundlePacket &&pac
for (size_t i = 0; i < chunk_size; ++i) {
pillar_votes.emplace_back(votes[votes_sent + i]);
}
PillarVotesBundlePacket pillar_votes_bundle_packet(std::move(pillar_votes));
PillarVotesBundlePacket pillar_votes_bundle_packet(OptimizedPillarVotesBundle{std::move(pillar_votes)});

// Seal and send the chunk to the peer
if (sealAndSend(peer->getId(), SubprotocolPacketType::kPillarVotesBundlePacket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void PbftSyncPacketHandler::process(PbftSyncPacket &&packet, const std::shared_p

// Process received pbft blocks
// pbft_chain_synced is the flag to indicate own PBFT chain has synced with the peer's PBFT chain
const bool pbft_chain_synced = packet.current_block_cert_votes.size() > 0;
const bool pbft_chain_synced = packet.current_block_cert_votes_bundle.has_value();
const auto pbft_blk_hash = packet.period_data.pbft_blk->getBlockHash();

std::string received_dag_blocks_str; // This is just log related stuff
Expand Down Expand Up @@ -81,7 +81,7 @@ void PbftSyncPacketHandler::process(PbftSyncPacket &&packet, const std::shared_p

// Check cert vote matches if final synced block
if (pbft_chain_synced) {
for (auto const &vote : packet.current_block_cert_votes) {
for (auto const &vote : packet.current_block_cert_votes_bundle->votes) {
if (vote->getBlockHash() != pbft_blk_hash) {
LOG(log_er_) << "Invalid cert votes block hash " << vote->getBlockHash() << " instead of " << pbft_blk_hash
<< " from peer " << peer->getId().abridged() << " received, stop syncing.";
Expand Down Expand Up @@ -165,8 +165,11 @@ void PbftSyncPacketHandler::process(PbftSyncPacket &&packet, const std::shared_p
LOG(log_tr_) << "Synced PBFT block hash " << pbft_blk_hash << " with "
<< packet.period_data.previous_block_cert_votes.size() << " cert votes";
LOG(log_tr_) << "Synced PBFT block " << packet.period_data;
pbft_mgr_->periodDataQueuePush(std::move(packet.period_data), peer->getId(),
std::move(packet.current_block_cert_votes));
std::vector<std::shared_ptr<PbftVote>> current_block_cert_votes;
if (pbft_chain_synced) {
current_block_cert_votes = std::move(packet.current_block_cert_votes_bundle->votes);
}
pbft_mgr_->periodDataQueuePush(std::move(packet.period_data), peer->getId(), std::move(current_block_cert_votes));
}

auto pbft_sync_period = pbft_mgr_->pbftSyncingPeriod();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ PillarVotesBundlePacketHandler::PillarVotesBundlePacketHandler(

void PillarVotesBundlePacketHandler::process(PillarVotesBundlePacket &&packet,
const std::shared_ptr<TaraxaPeer> &peer) {
if (packet.pillar_votes.size() == 0 || packet.pillar_votes.size() > kMaxPillarVotesInBundleRlp) {
throw InvalidRlpItemsCountException("PillarVotesBundlePacket", packet.pillar_votes.size(),
if (packet.pillar_votes_bundle.pillar_votes.size() == 0 ||
packet.pillar_votes_bundle.pillar_votes.size() > kMaxPillarVotesInBundleRlp) {
throw InvalidRlpItemsCountException("PillarVotesBundlePacket", packet.pillar_votes_bundle.pillar_votes.size(),
kMaxPillarVotesInBundleRlp);
}

// TODO[2744]: there could be the same protection as in pbft syncing that only requested bundle packet is accepted
LOG(log_dg_) << "PillarVotesBundlePacket received from peer " << peer->getId();

for (const auto &pillar_vote : packet.pillar_votes) {
for (const auto &pillar_vote : packet.pillar_votes_bundle.pillar_votes) {
if (!kConf.genesis.state.hardforks.ficus_hf.isFicusHardfork(pillar_vote->getPeriod())) {
std::ostringstream err_msg;
err_msg << "Synced pillar vote " << pillar_vote->getHash() << ", period " << pillar_vote->getPeriod()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ VotesBundlePacketHandler::VotesBundlePacketHandler(const FullNodeConfig &conf, s
logs_prefix + "VOTES_BUNDLE_PH") {}

void VotesBundlePacketHandler::process(VotesBundlePacket &&packet, const std::shared_ptr<TaraxaPeer> &peer) {
if (packet.votes.size() == 0 || packet.votes.size() > kMaxVotesInBundleRlp) {
throw InvalidRlpItemsCountException("VotesBundlePacket", packet.votes.size(), kMaxVotesInBundleRlp);
if (packet.votes_bundle.votes.size() == 0 || packet.votes_bundle.votes.size() > kMaxVotesInBundleRlp) {
throw InvalidRlpItemsCountException("VotesBundlePacket", packet.votes_bundle.votes.size(), kMaxVotesInBundleRlp);
}

const auto [current_pbft_round, current_pbft_period] = pbft_mgr_->getPbftRoundAndPeriod();

const auto &reference_vote = packet.votes.front();
const auto &reference_vote = packet.votes_bundle.votes.front();
const auto votes_bundle_votes_type = reference_vote->getType();

// Votes sync bundles are allowed to cotain only votes bundles of the same type, period, round and step so if first
// vote is irrelevant, all of them are
if (!isPbftRelevantVote(packet.votes[0])) {
if (!isPbftRelevantVote(packet.votes_bundle.votes[0])) {
LOG(log_wr_) << "Drop votes sync bundle as it is irrelevant for current pbft state. Votes (period, round, step) = ("
<< reference_vote->getPeriod() << ", " << reference_vote->getRound() << ", "
<< reference_vote->getStep() << "). Current PBFT (period, round, step) = (" << current_pbft_period
Expand All @@ -53,7 +53,7 @@ void VotesBundlePacketHandler::process(VotesBundlePacket &&packet, const std::sh
}

size_t processed_votes_count = 0;
for (const auto &vote : packet.votes) {
for (const auto &vote : packet.votes_bundle.votes) {
peer->markPbftVoteAsKnown(vote->getHash());

// Do not process vote that has already been validated
Expand All @@ -71,11 +71,11 @@ void VotesBundlePacketHandler::process(VotesBundlePacket &&packet, const std::sh
processed_votes_count++;
}

LOG(log_nf_) << "Received " << packet.votes.size() << " (processed " << processed_votes_count
LOG(log_nf_) << "Received " << packet.votes_bundle.votes.size() << " (processed " << processed_votes_count
<< " ) sync votes from peer " << peer->getId() << " node current round " << current_pbft_round
<< ", peer pbft round " << reference_vote->getRound();

onNewPbftVotesBundle(packet.votes, false, peer->getId());
onNewPbftVotesBundle(packet.votes_bundle.votes, false, peer->getId());
}

void VotesBundlePacketHandler::onNewPbftVotesBundle(const std::vector<std::shared_ptr<PbftVote>> &votes,
Expand Down
2 changes: 1 addition & 1 deletion libraries/core_libs/storage/include/storage/storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ class DbStorage : public std::enable_shared_from_this<DbStorage> {
// Period data
void savePeriodData(const PeriodData& period_data, Batch& write_batch);
void clearPeriodDataHistory(PbftPeriod period, uint64_t dag_level_to_keep);
// TODO[2868]: return PeriodData instead of bytes
dev::bytes getPeriodDataRaw(PbftPeriod period) const;
std::optional<PeriodData> getPeriodData(PbftPeriod period) const;
std::optional<PbftBlock> getPbftBlock(PbftPeriod period) const;
std::vector<std::shared_ptr<PbftVote>> getPeriodCertVotes(PbftPeriod period) const;
blk_hash_t getPeriodBlockHash(PbftPeriod period) const;
Expand Down
9 changes: 9 additions & 0 deletions libraries/core_libs/storage/src/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,15 @@ dev::bytes DbStorage::getPeriodDataRaw(PbftPeriod period) const {
return asBytes(lookup(toSlice(period), Columns::period_data));
}

std::optional<PeriodData> DbStorage::getPeriodData(PbftPeriod period) const {
auto period_data_bytes = getPeriodDataRaw(period);
if (period_data_bytes.empty()) {
return {};
}

return PeriodData{std::move(period_data_bytes)};
}

void DbStorage::savePillarBlock(const std::shared_ptr<pillar_chain::PillarBlock>& pillar_block) {
insert(Columns::pillar_block, pillar_block->getPeriod(), pillar_block->getRlp());
}
Expand Down
15 changes: 15 additions & 0 deletions libraries/types/vote/include/vote/votes_bundle_rlp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

#include <vector>

#include "common/encoding_rlp.hpp"

namespace taraxa {

class PbftVote;
Expand All @@ -14,6 +16,7 @@ class PillarVote;
* @{
*/

// TOOD[2865]: move to cpp file
constexpr static size_t kPbftVotesBundleRlpSize{5};

/**
Expand All @@ -32,6 +35,12 @@ dev::bytes encodePbftVotesBundleRlp(const std::vector<std::shared_ptr<PbftVote>>
*/
std::vector<std::shared_ptr<PbftVote>> decodePbftVotesBundleRlp(const dev::RLP& votes_bundle_rlp);

struct OptimizedPbftVotesBundle {
std::vector<std::shared_ptr<PbftVote>> votes;

HAS_RLP_FIELDS
};

constexpr static size_t kPillarVotesBundleRlpSize{3};

/**
Expand All @@ -50,6 +59,12 @@ dev::bytes encodePillarVotesBundleRlp(const std::vector<std::shared_ptr<PillarVo
*/
std::vector<std::shared_ptr<PillarVote>> decodePillarVotesBundleRlp(const dev::RLP& votes_bundle_rlp);

struct OptimizedPillarVotesBundle {
std::vector<std::shared_ptr<PillarVote>> pillar_votes;

HAS_RLP_FIELDS
};

/** @}*/

} // namespace taraxa
14 changes: 14 additions & 0 deletions libraries/types/vote/src/votes_bundle_rlp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ std::vector<std::shared_ptr<PbftVote>> decodePbftVotesBundleRlp(const dev::RLP&
return votes;
}

void OptimizedPbftVotesBundle::rlp(::taraxa::util::RLPDecoderRef encoding) {
votes = decodePbftVotesBundleRlp(encoding.value);
}
void OptimizedPbftVotesBundle::rlp(::taraxa::util::RLPEncoderRef encoding) const {
encoding.appendRaw(encodePbftVotesBundleRlp(votes));
}

dev::bytes encodePillarVotesBundleRlp(const std::vector<std::shared_ptr<PillarVote>>& votes) {
if (votes.empty()) {
assert(false);
Expand Down Expand Up @@ -89,4 +96,11 @@ std::vector<std::shared_ptr<PillarVote>> decodePillarVotesBundleRlp(const dev::R
return votes;
}

void OptimizedPillarVotesBundle::rlp(::taraxa::util::RLPDecoderRef encoding) {
pillar_votes = decodePillarVotesBundleRlp(encoding.value);
}
void OptimizedPillarVotesBundle::rlp(::taraxa::util::RLPEncoderRef encoding) const {
encoding.appendRaw(encodePillarVotesBundleRlp(pillar_votes));
}

} // namespace taraxa
7 changes: 3 additions & 4 deletions tests/pbft_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -663,10 +663,9 @@ TEST_F(PbftManagerWithDagCreation, produce_overweighted_block) {

// verify that last block is overweighted, but it is in chain
const auto period = node->getFinalChain()->lastBlockNumber();
auto period_raw = node->getDB()->getPeriodDataRaw(period);
ASSERT_FALSE(period_raw.empty());
PeriodData period_data(period_raw);
EXPECT_FALSE(node->getPbftManager()->checkBlockWeight(period_data.dag_blocks));
auto period_data = node->getDB()->getPeriodData(period);
ASSERT_TRUE(period_data.has_value());
EXPECT_FALSE(node->getPbftManager()->checkBlockWeight(period_data->dag_blocks));
}

TEST_F(PbftManagerWithDagCreation, proposed_blocks) {
Expand Down

0 comments on commit d0b1dd8

Please sign in to comment.