From bb3017e4e9061d6ace46a8775251e85f42b35905 Mon Sep 17 00:00:00 2001 From: JakubFornadel Date: Wed, 5 Apr 2023 16:06:53 -0700 Subject: [PATCH 1/3] chore: minor improvements + fixes --- .../consensus/include/pbft/pbft_manager.hpp | 5 ++- .../consensus/src/pbft/pbft_manager.cpp | 37 ++++++++++++++----- .../src/vote_manager/vote_manager.cpp | 1 + 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/libraries/core_libs/consensus/include/pbft/pbft_manager.hpp b/libraries/core_libs/consensus/include/pbft/pbft_manager.hpp index 0661f3a0cb..def7868a5c 100644 --- a/libraries/core_libs/consensus/include/pbft/pbft_manager.hpp +++ b/libraries/core_libs/consensus/include/pbft/pbft_manager.hpp @@ -551,7 +551,6 @@ class PbftManager : public std::enable_shared_from_this { PbftStates state_ = value_proposal_state; std::atomic round_ = 1; PbftStep step_ = 1; - PbftStep startingStepInRound_ = 1; // Block that node cert voted std::optional> cert_voted_block_for_round_{}; @@ -570,6 +569,10 @@ class PbftManager : public std::enable_shared_from_this { bool go_finish_state_ = false; bool loop_back_finish_state_ = false; + // Used to avoid cyclic logging in voting steps that are called repeatedly + bool printSecondFinishStepInfo = true; + bool printCertStepInfo = true; + const blk_hash_t dag_genesis_block_hash_; const PbftConfig &config_; diff --git a/libraries/core_libs/consensus/src/pbft/pbft_manager.cpp b/libraries/core_libs/consensus/src/pbft/pbft_manager.cpp index 127fcf91b8..fb9702b2d4 100644 --- a/libraries/core_libs/consensus/src/pbft/pbft_manager.cpp +++ b/libraries/core_libs/consensus/src/pbft/pbft_manager.cpp @@ -252,23 +252,35 @@ void PbftManager::setPbftStep(PbftStep pbft_step) { // Node is still >= kMaxSteps steps behind the rest (at least 1/3) of the network - keep lambda at the standard // value so node can catch up with the rest of the nodes - if (network_next_voting_step > step_ && network_next_voting_step - step_ >= kMaxSteps) { - lambda_ = kMinLambda; + + // To get withing 1 round with the rest of the network - node cannot start exponentially backing off its lambda + // exactly when it is kMaxSteps behind the network as it would reach kMaxLambda lambda time before catching up. If + // we delay triggering exponential backoff by 4 steps, node should get within 1 round with the network. + // !!! Important: This is true only for values kMinLambda = 15000ms and kMaxLambda = 60000 ms + if (network_next_voting_step > step_ && network_next_voting_step - step_ >= kMaxSteps - 4 /* hardcoded delay */) { + // Reset it only if it was already increased compared to default value + if (lambda_ != kMinLambda) { + lambda_ = kMinLambda; + LOG(log_nf_) << "Node is " << network_next_voting_step - step_ + << " steps behind the rest of the network. Reset lambda to the default value " + << lambda_.count() << " [ms]"; + } } else if (lambda_ < kMaxLambda) { // Node is < kMaxSteps steps behind the rest (at least 1/3) of the network - start exponentially backing off - // lambda until it reaches kMaxLambda + // lambda until it reaches kMaxLambdagetNetworkTplusOneNextVotingStep // Note: We calculate the lambda for a step independently of prior steps in case missed earlier steps. lambda_ *= 2; if (lambda_ > kMaxLambda) { lambda_ = kMaxLambda; } + + LOG(log_nf_) << "Exponentially backing off lambda to " << lambda_.count() << " [ms] "; } } } void PbftManager::resetStep() { step_ = 1; - startingStepInRound_ = 1; lambda_ = kMinLambda; } @@ -443,8 +455,6 @@ void PbftManager::initialState() { assert(false); } - // This is used to offset endtime for second finishing step... - startingStepInRound_ = current_pbft_step; setPbftStep(current_pbft_step); round_ = current_pbft_round; @@ -506,6 +516,7 @@ void PbftManager::setCertifyState_() { state_ = certify_state; setPbftStep(step_ + 1); next_step_time_ms_ = 2 * lambda_; + printCertStepInfo = true; } void PbftManager::setFinishState_() { @@ -524,6 +535,7 @@ void PbftManager::setFinishPollingState_() { db_->commitWriteBatch(batch); already_next_voted_value_ = false; already_next_voted_null_block_hash_ = false; + printSecondFinishStepInfo = true; second_finish_step_start_datetime_ = std::chrono::system_clock::now(); next_step_time_ms_ += kPollingIntervalMs; } @@ -537,7 +549,6 @@ void PbftManager::loopBackFinishState_() { db_->commitWriteBatch(batch); already_next_voted_value_ = false; already_next_voted_null_block_hash_ = false; - assert(step_ >= startingStepInRound_); next_step_time_ms_ += kPollingIntervalMs; } @@ -857,7 +868,11 @@ void PbftManager::identifyBlock_() { void PbftManager::certifyBlock_() { // The Certifying Step auto [round, period] = getPbftRoundAndPeriod(); - LOG(log_dg_) << "PBFT certifying state in period " << period << ", round " << round; + + if (printCertStepInfo) { + LOG(log_dg_) << "PBFT certifying state in period " << period << ", round " << round; + printCertStepInfo = false; + } const auto elapsed_time_in_round = elapsedTimeInMs(current_round_start_datetime_); go_finish_state_ = elapsed_time_in_round > 4 * lambda_ - kPollingIntervalMs; @@ -974,9 +989,11 @@ void PbftManager::firstFinish_() { void PbftManager::secondFinish_() { // Odd number steps from 5 are in second finish auto [round, period] = getPbftRoundAndPeriod(); - LOG(log_dg_) << "PBFT second finishing state in period " << period << ", round " << round << ", step " << step_; - assert(step_ >= startingStepInRound_); + if (printSecondFinishStepInfo) { + LOG(log_dg_) << "PBFT second finishing state in period " << period << ", round " << round << ", step " << step_; + printSecondFinishStepInfo = false; + } // Lambda function for next voting 2t+1 soft voted block from current round auto next_vote_soft_voted_block = [this, period = period, round = round]() { diff --git a/libraries/core_libs/consensus/src/vote_manager/vote_manager.cpp b/libraries/core_libs/consensus/src/vote_manager/vote_manager.cpp index 2cc98476a3..30765e3528 100644 --- a/libraries/core_libs/consensus/src/vote_manager/vote_manager.cpp +++ b/libraries/core_libs/consensus/src/vote_manager/vote_manager.cpp @@ -253,6 +253,7 @@ bool VoteManager::addVerifiedVote(const std::shared_ptr& vote) { if (vote->getType() == PbftVoteTypes::next_vote && total_weight >= t_plus_one && vote->getStep() > found_round_it->second.network_t_plus_one_step) { found_round_it->second.network_t_plus_one_step = vote->getStep(); + LOG(log_nf_) << "Set t+1 next voted block " << vote->getHash() << " in step " << vote->getStep(); } // Not enough votes - do not set 2t+1 voted block for period,round and step From 89cebc490e7d45f3ff508fcb1d3c47e49c9a3e4d Mon Sep 17 00:00:00 2001 From: JakubFornadel Date: Thu, 6 Apr 2023 16:15:02 -0700 Subject: [PATCH 2/3] fix: save own votes and gossip them in case network get stalled --- .../consensus/include/pbft/pbft_manager.hpp | 20 +-- .../include/vote_manager/vote_manager.hpp | 22 +++ .../consensus/src/pbft/pbft_manager.cpp | 169 ++++++++++-------- .../src/vote_manager/vote_manager.cpp | 38 +++- .../network/tarcap/taraxa_capability.hpp | 2 +- .../network/src/tarcap/taraxa_capability.cpp | 27 +-- .../storage/include/storage/storage.hpp | 2 +- libraries/core_libs/storage/src/storage.cpp | 11 +- tests/full_node_test.cpp | 2 +- tests/network_test.cpp | 2 +- 10 files changed, 163 insertions(+), 132 deletions(-) diff --git a/libraries/core_libs/consensus/include/pbft/pbft_manager.hpp b/libraries/core_libs/consensus/include/pbft/pbft_manager.hpp index def7868a5c..0685c1ce6c 100644 --- a/libraries/core_libs/consensus/include/pbft/pbft_manager.hpp +++ b/libraries/core_libs/consensus/include/pbft/pbft_manager.hpp @@ -268,18 +268,16 @@ class PbftManager : public std::enable_shared_from_this { size_t getPbftCommitteeSize() const { return config_.committee_size; } /** - * @brief Broadcast or rebroadcast current round soft votes and previous round next votes - * @param rebroadcast + * @brief Test/enforce broadcastVotes() to actually send votes */ - void broadcastSoftAndNextVotes(bool rebroadcast); + void testBroadcatVotesFunctionality(); + private: /** - * @brief Broadcast or rebroadcast reward votes - * @param rebroadcast + * @brief Broadcast or rebroadcast 2t+1 soft/reward/previous round next votes + all own votes if needed */ - void broadcastRewardVotes(bool rebroadcast); + void broadcastVotes(); - private: /** * @brief Check PBFT blocks syncing queue. If there are synced PBFT blocks in queue, push it to PBFT chain */ @@ -543,8 +541,8 @@ class PbftManager : public std::enable_shared_from_this { const uint32_t kBroadcastVotesLambdaTime = 20; const uint32_t kRebroadcastVotesLambdaTime = 60; - uint32_t broadcast_soft_next_votes_counter_ = 1; - uint32_t rebroadcast_soft_next_votes_counter_ = 1; + uint32_t broadcast_votes_counter_ = 1; + uint32_t rebroadcast_votes_counter_ = 1; uint32_t broadcast_reward_votes_counter_ = 1; uint32_t rebroadcast_reward_votes_counter_ = 1; @@ -570,8 +568,8 @@ class PbftManager : public std::enable_shared_from_this { bool loop_back_finish_state_ = false; // Used to avoid cyclic logging in voting steps that are called repeatedly - bool printSecondFinishStepInfo = true; - bool printCertStepInfo = true; + bool printSecondFinishStepInfo_ = true; + bool printCertStepInfo_ = true; const blk_hash_t dag_genesis_block_hash_; diff --git a/libraries/core_libs/consensus/include/vote_manager/vote_manager.hpp b/libraries/core_libs/consensus/include/vote_manager/vote_manager.hpp index ab7a1a03ca..b387810031 100644 --- a/libraries/core_libs/consensus/include/vote_manager/vote_manager.hpp +++ b/libraries/core_libs/consensus/include/vote_manager/vote_manager.hpp @@ -131,6 +131,25 @@ class VoteManager { */ PbftPeriod getRewardVotesPbftBlockPeriod(); + /** + * @brief Saves own verified vote into memory and db + * + * @param vote + */ + void saveOwnVerifiedVote(const std::shared_ptr& vote); + + /** + * @return all own verified votes + */ + std::vector> getOwnVerifiedVotes(); + + /** + * @brief Clear own verified votes + * + * @param write_batch + */ + void clearOwnVerifiedVotes(DbStorage::Batch& write_batch); + /** * @brief Place a vote, save it in the verified votes queue, and gossip to peers * @param blockhash vote on PBFT block hash @@ -286,6 +305,9 @@ class VoteManager { std::vector extra_reward_votes_; mutable std::shared_mutex reward_votes_info_mutex_; + // Own votes generated during current period & round + std::vector> own_verified_votes_; + // Cache for current 2T+1 - > // !!! Important: do not access it directly as it is not updated automatically, always call getPbftTwoTPlusOne instead // !!! diff --git a/libraries/core_libs/consensus/src/pbft/pbft_manager.cpp b/libraries/core_libs/consensus/src/pbft/pbft_manager.cpp index fb9702b2d4..5faa4a9428 100644 --- a/libraries/core_libs/consensus/src/pbft/pbft_manager.cpp +++ b/libraries/core_libs/consensus/src/pbft/pbft_manager.cpp @@ -262,8 +262,8 @@ void PbftManager::setPbftStep(PbftStep pbft_step) { if (lambda_ != kMinLambda) { lambda_ = kMinLambda; LOG(log_nf_) << "Node is " << network_next_voting_step - step_ - << " steps behind the rest of the network. Reset lambda to the default value " - << lambda_.count() << " [ms]"; + << " steps behind the rest of the network. Reset lambda to the default value " << lambda_.count() + << " [ms]"; } } else if (lambda_ < kMaxLambda) { // Node is < kMaxSteps steps behind the rest (at least 1/3) of the network - start exponentially backing off @@ -274,7 +274,7 @@ void PbftManager::setPbftStep(PbftStep pbft_step) { lambda_ = kMaxLambda; } - LOG(log_nf_) << "Exponentially backing off lambda to " << lambda_.count() << " [ms] "; + LOG(log_nf_) << "No round progress - exponentially backing off lambda to " << lambda_.count() << " [ms] in step " << step_; } } } @@ -360,8 +360,8 @@ void PbftManager::resetPbftConsensus(PbftRound round) { LOG(log_dg_) << "Reset PBFT consensus to: period " << getPbftPeriod() << ", round " << round << ", step 1"; // Reset broadcast counters - broadcast_soft_next_votes_counter_ = 1; - rebroadcast_soft_next_votes_counter_ = 1; + broadcast_votes_counter_ = 1; + rebroadcast_votes_counter_ = 1; // Update current round and reset step to 1 round_ = round; @@ -381,8 +381,8 @@ void PbftManager::resetPbftConsensus(PbftRound round) { cert_voted_block_for_round_.reset(); } - // Remove all own votes generated in previous round - db_->clearOwnVerifiedVotes(batch); + // Clear all own votes generated in previous round + vote_mgr_->clearOwnVerifiedVotes(batch); db_->commitWriteBatch(batch); @@ -516,7 +516,7 @@ void PbftManager::setCertifyState_() { state_ = certify_state; setPbftStep(step_ + 1); next_step_time_ms_ = 2 * lambda_; - printCertStepInfo = true; + printCertStepInfo_ = true; } void PbftManager::setFinishState_() { @@ -535,7 +535,7 @@ void PbftManager::setFinishPollingState_() { db_->commitWriteBatch(batch); already_next_voted_value_ = false; already_next_voted_null_block_hash_ = false; - printSecondFinishStepInfo = true; + printSecondFinishStepInfo_ = true; second_finish_step_start_datetime_ = std::chrono::system_clock::now(); next_step_time_ms_ += kPollingIntervalMs; } @@ -552,57 +552,89 @@ void PbftManager::loopBackFinishState_() { next_step_time_ms_ += kPollingIntervalMs; } -void PbftManager::broadcastSoftAndNextVotes(bool rebroadcast) { +void PbftManager::broadcastVotes() { auto net = network_.lock(); if (!net) { + LOG(log_er_) << "Unable to broadcast votes -> cant obtain net ptr"; return; } - auto [round, period] = getPbftRoundAndPeriod(); + const auto votes_sync_packet_handler = net->getSpecificHandler(); - // Broadcast 2t+1 soft votes - auto soft_votes = vote_mgr_->getTwoTPlusOneVotedBlockVotes(period, round, TwoTPlusOneVotedBlockType::SoftVotedBlock); - if (!soft_votes.empty()) { - LOG(log_dg_) << "Broadcast soft votes for period " << period << ", round " << round; - net->getSpecificHandler()->onNewPbftVotesBundle(std::move(soft_votes), - rebroadcast); - } + // Send votes to the other peers + auto gossipVotes = [this, &votes_sync_packet_handler](std::vector> &&votes, + const std::string &votes_type_str, bool rebroadcast) { + if (!votes.empty()) { + LOG(log_dg_) << "Broadcast " << votes_type_str << " for period " << votes.back()->getPeriod() << ", round " + << votes.back()->getRound(); + votes_sync_packet_handler->onNewPbftVotesBundle(std::move(votes), rebroadcast); + } + }; - // Broadcast reward votes - previous round 2t+1 cert votes - auto reward_votes = vote_mgr_->getRewardVotes(); - if (!reward_votes.empty()) { - LOG(log_dg_) << "Broadcast propose reward votes for period " << period << ", round " << round; - net->getSpecificHandler()->onNewPbftVotesBundle(std::move(reward_votes), - rebroadcast); - } + // (Re)broadcast 2t+1 soft/reward/previous round next votes + all own votes + auto broadcastVotes = [this, &net, &gossipVotes](bool rebroadcast) { + auto [round, period] = getPbftRoundAndPeriod(); + + // Broadcast 2t+1 soft votes + gossipVotes(vote_mgr_->getTwoTPlusOneVotedBlockVotes(period, round, TwoTPlusOneVotedBlockType::SoftVotedBlock), + "2t+1 soft votes", rebroadcast); - // Broadcast previous round 2t+1 next votes - if (round > 1) { - if (auto next_votes = vote_mgr_->getAllTwoTPlusOneNextVotes(period, round - 1); !next_votes.empty()) { - LOG(log_dg_) << "Broadcast next votes for period " << period << ", round " << round - 1; - net->getSpecificHandler()->onNewPbftVotesBundle(std::move(next_votes), - rebroadcast); + // Broadcast reward votes - previous round 2t+1 cert votes + gossipVotes(vote_mgr_->getRewardVotes(), "2t+1 propose reward votes", rebroadcast); + + // Broadcast previous round 2t+1 next votes + if (round > 1) { + gossipVotes(vote_mgr_->getAllTwoTPlusOneNextVotes(period, round - 1), "2t+1 next votes", rebroadcast); } - } -} -void PbftManager::broadcastRewardVotes(bool rebroadcast) { - auto net = network_.lock(); - if (!net) { - return; - } + // Broadcast own votes + auto vote_packet_handler = net->getSpecificHandler(); + // TODO: this could be optimized to use VotesSyncPacketHandler if we drop some of the checks in process function + // TODO: onNewPbftVote does not use rebroadcast flag to force sending the votes + // Send votes by one as votes sync packet must contain votes with the same type, period and round + const auto& own_votes = vote_mgr_->getOwnVerifiedVotes(); + for (const auto &vote : own_votes) { + vote_packet_handler->onNewPbftVote(vote, getPbftProposedBlock(vote->getPeriod(), vote->getBlockHash())); + } + if (!own_votes.empty()) { + LOG(log_dg_) << "Broadcast own votes for period " << period << ", round " << round; + } + }; - auto [round, period] = getPbftRoundAndPeriod(); + const auto round_elapsed_time = elapsedTimeInMs(current_round_start_datetime_); + const auto period_elapsed_time = elapsedTimeInMs(current_period_start_datetime_); - // Broadcast reward votes - previous round 2t+1 cert votes - auto reward_votes = vote_mgr_->getRewardVotes(); - if (!reward_votes.empty()) { - LOG(log_dg_) << "Broadcast propose reward votes for period " << period << ", round " << round; - net->getSpecificHandler()->onNewPbftVotesBundle(std::move(reward_votes), - rebroadcast); + if (round_elapsed_time / kMinLambda > kRebroadcastVotesLambdaTime * rebroadcast_votes_counter_) { + // Stalled in the same round for kRebroadcastVotesLambdaTime * kMinLambda time -> rebroadcast votes + broadcastVotes(true); + rebroadcast_votes_counter_++; + // If there was a rebroadcast no need to do next broadcast either + broadcast_votes_counter_++; + } else if (round_elapsed_time / kMinLambda > kBroadcastVotesLambdaTime * broadcast_votes_counter_) { + // Stalled in the same round for kBroadcastVotesLambdaTime * kMinLambda time -> broadcast votes + broadcastVotes(false); + broadcast_votes_counter_++; + } else if (period_elapsed_time / kMinLambda > kRebroadcastVotesLambdaTime * rebroadcast_reward_votes_counter_) { + // Stalled in the same period for kRebroadcastVotesLambdaTime * kMinLambda time -> rebroadcast reward votes + gossipVotes(vote_mgr_->getRewardVotes(), "2t+1 propose reward votes", true); + rebroadcast_reward_votes_counter_++; + // If there was a rebroadcast no need to do next broadcast either + broadcast_reward_votes_counter_++; + } else if (period_elapsed_time / kMinLambda > kBroadcastVotesLambdaTime * broadcast_reward_votes_counter_) { + // Stalled in the same period for kBroadcastVotesLambdaTime * kMinLambda time -> broadcast reward votes + gossipVotes(vote_mgr_->getRewardVotes(), "2t+1 propose reward votes", false); + broadcast_reward_votes_counter_++; } } +void PbftManager::testBroadcatVotesFunctionality() { + // Set these variables to force broadcastVotes() send votes + current_round_start_datetime_ = time_point{}; + current_period_start_datetime_ = time_point{}; + + broadcastVotes(); +} + void PbftManager::printVotingSummary() const { const auto [round, period] = getPbftRoundAndPeriod(); Json::Value json_obj; @@ -625,41 +657,21 @@ void PbftManager::printVotingSummary() const { } bool PbftManager::stateOperations_() { - pushSyncedPbftBlocksIntoChain(); - - const auto round_elapsed_time = elapsedTimeInMs(current_round_start_datetime_); - const auto period_elapsed_time = elapsedTimeInMs(current_period_start_datetime_); - - if (round_elapsed_time / kMinLambda > kRebroadcastVotesLambdaTime * rebroadcast_soft_next_votes_counter_) { - broadcastSoftAndNextVotes(true); - rebroadcast_soft_next_votes_counter_++; - // If there was a rebroadcast no need to do next broadcast either - broadcast_soft_next_votes_counter_++; - } else if (round_elapsed_time / kMinLambda > kBroadcastVotesLambdaTime * broadcast_soft_next_votes_counter_) { - broadcastSoftAndNextVotes(false); - broadcast_soft_next_votes_counter_++; - } - - // Reward votes need to be broadcast even if we are advancing rounds but unable to advance a period - if (period_elapsed_time / kMinLambda > kRebroadcastVotesLambdaTime * rebroadcast_reward_votes_counter_) { - broadcastRewardVotes(true); - rebroadcast_reward_votes_counter_++; - // If there was a rebroadcast no need to do next broadcast either - broadcast_reward_votes_counter_++; - } else if (period_elapsed_time / kMinLambda > kBroadcastVotesLambdaTime * broadcast_reward_votes_counter_) { - broadcastRewardVotes(false); - broadcast_reward_votes_counter_++; - } - auto [round, period] = getPbftRoundAndPeriod(); LOG(log_tr_) << "PBFT current round: " << round << ", period: " << period << ", step " << step_; - // Check if these is already 2t+1 cert votes for some valid block, if so - push it into the chain + // Process synced blocks + pushSyncedPbftBlocksIntoChain(); + + // (Re)broadcast votes if needed + broadcastVotes(); + + // Check if there is 2t+1 cert votes for some valid block, if so - push it into the chain if (tryPushCertVotesBlock()) { return true; } - // 2t+1 next votes were seen + // Check if there is 2t+1 next votes for some valid block, if so - advance round if (advanceRound()) { return true; } @@ -709,6 +721,9 @@ bool PbftManager::placeVote(const std::shared_ptr &vote, std::string_view gossipNewVote(vote, voted_block); + // Save own verified vote + vote_mgr_->saveOwnVerifiedVote(vote); + LOG(log_nf_) << "Placed " << log_vote_id << " " << vote->getHash() << " for block " << vote->getBlockHash() << ", vote weight " << *vote->getWeight() << ", period " << vote->getPeriod() << ", round " << vote->getRound() << ", step " << vote->getStep(); @@ -869,9 +884,9 @@ void PbftManager::certifyBlock_() { // The Certifying Step auto [round, period] = getPbftRoundAndPeriod(); - if (printCertStepInfo) { + if (printCertStepInfo_) { LOG(log_dg_) << "PBFT certifying state in period " << period << ", round " << round; - printCertStepInfo = false; + printCertStepInfo_ = false; } const auto elapsed_time_in_round = elapsedTimeInMs(current_round_start_datetime_); @@ -990,9 +1005,9 @@ void PbftManager::secondFinish_() { // Odd number steps from 5 are in second finish auto [round, period] = getPbftRoundAndPeriod(); - if (printSecondFinishStepInfo) { + if (printSecondFinishStepInfo_) { LOG(log_dg_) << "PBFT second finishing state in period " << period << ", round " << round << ", step " << step_; - printSecondFinishStepInfo = false; + printSecondFinishStepInfo_ = false; } // Lambda function for next voting 2t+1 soft voted block from current round diff --git a/libraries/core_libs/consensus/src/vote_manager/vote_manager.cpp b/libraries/core_libs/consensus/src/vote_manager/vote_manager.cpp index 30765e3528..9b791c4ba7 100644 --- a/libraries/core_libs/consensus/src/vote_manager/vote_manager.cpp +++ b/libraries/core_libs/consensus/src/vote_manager/vote_manager.cpp @@ -31,7 +31,7 @@ VoteManager::VoteManager(const addr_t& node_addr, const PbftConfig& pbft_config, auto db_votes = db_->getAllTwoTPlusOneVotes(); - auto loadVotesFromDb = [this](const std::vector>& votes) { + auto addVerifiedVotes = [this](const std::vector>& votes) { bool reward_votes_info_set = false; for (const auto& vote : votes) { // Check if votes are unique per round, step & voter @@ -51,11 +51,22 @@ VoteManager::VoteManager(const addr_t& node_addr, const PbftConfig& pbft_config, } }; - loadVotesFromDb(db_->getAllTwoTPlusOneVotes()); - loadVotesFromDb(db_->getOwnVerifiedVotes()); - auto reward_votes = db_->getRewardVotes(); - for (const auto& vote : reward_votes) extra_reward_votes_.emplace_back(vote->getHash()); - loadVotesFromDb(reward_votes); + // Load 2t+1 vote blocks votes + addVerifiedVotes(db_->getAllTwoTPlusOneVotes()); + + // Load own votes + const auto own_votes = db_->getOwnVerifiedVotes(); + for (const auto& own_vote : own_votes) { + own_verified_votes_.emplace_back(own_vote); + } + addVerifiedVotes(own_votes); + + // Load reward votes + const auto reward_votes = db_->getRewardVotes(); + for (const auto& reward_vote : reward_votes) { + extra_reward_votes_.emplace_back(reward_vote->getHash()); + } + addVerifiedVotes(reward_votes); } void VoteManager::setNetwork(std::weak_ptr network) { network_ = std::move(network); } @@ -253,7 +264,8 @@ bool VoteManager::addVerifiedVote(const std::shared_ptr& vote) { if (vote->getType() == PbftVoteTypes::next_vote && total_weight >= t_plus_one && vote->getStep() > found_round_it->second.network_t_plus_one_step) { found_round_it->second.network_t_plus_one_step = vote->getStep(); - LOG(log_nf_) << "Set t+1 next voted block " << vote->getHash() << " in step " << vote->getStep(); + LOG(log_nf_) << "Set t+1 next voted block " << vote->getHash() << " for period " << vote->getPeriod() + << ", round " << vote->getRound() << ", step " << vote->getStep(); } // Not enough votes - do not set 2t+1 voted block for period,round and step @@ -776,6 +788,18 @@ std::vector> VoteManager::getRewardVotes() { return reward_votes; } +void VoteManager::saveOwnVerifiedVote(const std::shared_ptr& vote) { + own_verified_votes_.push_back(vote); + db_->saveOwnVerifiedVote(vote); +} + +std::vector> VoteManager::getOwnVerifiedVotes() { return own_verified_votes_; } + +void VoteManager::clearOwnVerifiedVotes(DbStorage::Batch& write_batch) { + db_->clearOwnVerifiedVotes(write_batch, own_verified_votes_); + own_verified_votes_.clear(); +} + uint64_t VoteManager::getPbftSortitionThreshold(uint64_t total_dpos_votes_count, PbftVoteTypes vote_type) const { switch (vote_type) { case PbftVoteTypes::propose_vote: diff --git a/libraries/core_libs/network/include/network/tarcap/taraxa_capability.hpp b/libraries/core_libs/network/include/network/tarcap/taraxa_capability.hpp index faad6cc7ec..b651d85b5c 100644 --- a/libraries/core_libs/network/include/network/tarcap/taraxa_capability.hpp +++ b/libraries/core_libs/network/include/network/tarcap/taraxa_capability.hpp @@ -88,7 +88,7 @@ class TaraxaCapability : public dev::p2p::CapabilityFace { // END METHODS USED IN TESTS ONLY protected: - virtual void initPeriodicEvents(const std::shared_ptr &pbft_mgr, const std::shared_ptr &db, + virtual void initPeriodicEvents(const std::shared_ptr &pbft_mgr, std::shared_ptr trx_mgr, std::shared_ptr packets_stats); virtual void registerPacketHandlers(const h256 &genesis_hash, diff --git a/libraries/core_libs/network/src/tarcap/taraxa_capability.cpp b/libraries/core_libs/network/src/tarcap/taraxa_capability.cpp index 4744e12bdd..27344616ab 100644 --- a/libraries/core_libs/network/src/tarcap/taraxa_capability.cpp +++ b/libraries/core_libs/network/src/tarcap/taraxa_capability.cpp @@ -68,7 +68,7 @@ void TaraxaCapability::init(const h256 &genesis_hash, std::shared_ptr node_addr); // Inits periodic events. Must be called after registerHandlers !!! - initPeriodicEvents(pbft_mgr, db, trx_mgr, all_packets_stats_); + initPeriodicEvents(pbft_mgr, trx_mgr, all_packets_stats_); } void TaraxaCapability::addBootNodes(bool initial) { @@ -117,7 +117,6 @@ void TaraxaCapability::addBootNodes(bool initial) { } void TaraxaCapability::initPeriodicEvents(const std::shared_ptr &pbft_mgr, - const std::shared_ptr &db, std::shared_ptr trx_mgr, std::shared_ptr packets_stats) { // TODO: refactor this: @@ -173,30 +172,6 @@ void TaraxaCapability::initPeriodicEvents(const std::shared_ptr &pb addBootNodes(); } }); - - // If period and round did not change after 60 seconds from node start, rebroadcast own pbft votes - if (pbft_mgr && db /* just because of tests */) { - auto vote_packet_handler = packets_handlers_->getSpecificHandler(); - const auto [init_round, init_period] = pbft_mgr->getPbftRoundAndPeriod(); - periodic_events_tp_->post(60000, [init_round = init_round, init_period = init_period, db = db, pbft_mgr = pbft_mgr, - vote_packet_handler = std::move(vote_packet_handler)] { - const auto [curent_round, curent_period] = pbft_mgr->getPbftRoundAndPeriod(); - if (curent_period != init_period || curent_round != init_round) { - return; - } - - const auto own_votes = db->getOwnVerifiedVotes(); - if (own_votes.empty()) { - return; - } - - // Send votes by one as votes sync packet must contain votes with the same type, period and round - for (const auto &vote : own_votes) { - vote_packet_handler->onNewPbftVote(vote, - pbft_mgr->getPbftProposedBlock(vote->getPeriod(), vote->getBlockHash())); - } - }); - } } void TaraxaCapability::registerPacketHandlers( diff --git a/libraries/core_libs/storage/include/storage/storage.hpp b/libraries/core_libs/storage/include/storage/storage.hpp index 992fb61b51..eae9f453e6 100644 --- a/libraries/core_libs/storage/include/storage/storage.hpp +++ b/libraries/core_libs/storage/include/storage/storage.hpp @@ -265,7 +265,7 @@ class DbStorage : public std::enable_shared_from_this { // Own votes for the latest round void saveOwnVerifiedVote(const std::shared_ptr& vote); std::vector> getOwnVerifiedVotes(); - void clearOwnVerifiedVotes(Batch& write_batch); + void clearOwnVerifiedVotes(Batch& write_batch, const std::vector>& own_verified_votes); // 2t+1 votes bundles for the latest round void replaceTwoTPlusOneVotes(TwoTPlusOneVotedBlockType type, const std::vector>& votes); diff --git a/libraries/core_libs/storage/src/storage.cpp b/libraries/core_libs/storage/src/storage.cpp index d3bd850c93..e5181e3184 100644 --- a/libraries/core_libs/storage/src/storage.cpp +++ b/libraries/core_libs/storage/src/storage.cpp @@ -829,13 +829,10 @@ std::vector> DbStorage::getOwnVerifiedVotes() { return votes; } -void DbStorage::clearOwnVerifiedVotes(Batch& write_batch) { - // TODO: deletion could be optimized if we save votes in memory - auto it = - std::unique_ptr(db_->NewIterator(read_options_, handle(Columns::latest_round_own_votes))); - for (it->SeekToFirst(); it->Valid(); it->Next()) { - const auto vote = std::make_shared(asBytes(it->value().ToString())); - remove(write_batch, Columns::latest_round_own_votes, vote->getHash().asBytes()); +void DbStorage::clearOwnVerifiedVotes(Batch& write_batch, + const std::vector>& own_verified_votes) { + for (const auto& own_vote : own_verified_votes) { + remove(write_batch, Columns::latest_round_own_votes, own_vote->getHash().asBytes()); } } diff --git a/tests/full_node_test.cpp b/tests/full_node_test.cpp index 16e00a70f6..0b0ce29a37 100644 --- a/tests/full_node_test.cpp +++ b/tests/full_node_test.cpp @@ -242,7 +242,7 @@ TEST_F(FullNodeTest, db_test) { } batch = db.createWriteBatch(); - db.clearOwnVerifiedVotes(batch); + db.clearOwnVerifiedVotes(batch, verified_votes); db.commitWriteBatch(batch); EXPECT_TRUE(db.getOwnVerifiedVotes().empty()); diff --git a/tests/network_test.cpp b/tests/network_test.cpp index 4441728508..21a901c543 100644 --- a/tests/network_test.cpp +++ b/tests/network_test.cpp @@ -883,7 +883,7 @@ TEST_F(NetworkTest, pbft_next_votes_sync_in_same_round) { node2->getPbftManager()->setPbftRound(2); // Node 1 broadcast his votes - node1_pbft_mgr->broadcastSoftAndNextVotes(false); + node1_pbft_mgr->testBroadcatVotesFunctionality(); // Node 2 should receive votes from node 1, node 1 has its own 2 votes EXPECT_EQ(node1_vote_mgr->getVerifiedVotesSize(), 2); EXPECT_HAPPENS({5s, 100ms}, [&](auto& ctx) { WAIT_EXPECT_EQ(ctx, node2_vote_mgr->getVerifiedVotesSize(), 3) }); From 74944cd65e6468ebbcd50572858d141e48f8ba3b Mon Sep 17 00:00:00 2001 From: JakubFornadel Date: Fri, 7 Apr 2023 12:47:00 -0700 Subject: [PATCH 3/3] use rebroadcast flag in onNewPbftVote method --- libraries/core_libs/consensus/src/pbft/pbft_manager.cpp | 9 +++++---- .../tarcap/packets_handlers/vote_packet_handler.hpp | 4 +++- .../src/tarcap/packets_handlers/vote_packet_handler.cpp | 5 +++-- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/libraries/core_libs/consensus/src/pbft/pbft_manager.cpp b/libraries/core_libs/consensus/src/pbft/pbft_manager.cpp index 5faa4a9428..ffb281e153 100644 --- a/libraries/core_libs/consensus/src/pbft/pbft_manager.cpp +++ b/libraries/core_libs/consensus/src/pbft/pbft_manager.cpp @@ -274,7 +274,8 @@ void PbftManager::setPbftStep(PbftStep pbft_step) { lambda_ = kMaxLambda; } - LOG(log_nf_) << "No round progress - exponentially backing off lambda to " << lambda_.count() << " [ms] in step " << step_; + LOG(log_nf_) << "No round progress - exponentially backing off lambda to " << lambda_.count() << " [ms] in step " + << step_; } } } @@ -590,11 +591,11 @@ void PbftManager::broadcastVotes() { // Broadcast own votes auto vote_packet_handler = net->getSpecificHandler(); // TODO: this could be optimized to use VotesSyncPacketHandler if we drop some of the checks in process function - // TODO: onNewPbftVote does not use rebroadcast flag to force sending the votes // Send votes by one as votes sync packet must contain votes with the same type, period and round - const auto& own_votes = vote_mgr_->getOwnVerifiedVotes(); + const auto &own_votes = vote_mgr_->getOwnVerifiedVotes(); for (const auto &vote : own_votes) { - vote_packet_handler->onNewPbftVote(vote, getPbftProposedBlock(vote->getPeriod(), vote->getBlockHash())); + vote_packet_handler->onNewPbftVote(vote, getPbftProposedBlock(vote->getPeriod(), vote->getBlockHash()), + rebroadcast); } if (!own_votes.empty()) { LOG(log_dg_) << "Broadcast own votes for period " << period << ", round " << round; diff --git a/libraries/core_libs/network/include/network/tarcap/packets_handlers/vote_packet_handler.hpp b/libraries/core_libs/network/include/network/tarcap/packets_handlers/vote_packet_handler.hpp index 9035d7945a..13c30d346a 100644 --- a/libraries/core_libs/network/include/network/tarcap/packets_handlers/vote_packet_handler.hpp +++ b/libraries/core_libs/network/include/network/tarcap/packets_handlers/vote_packet_handler.hpp @@ -16,8 +16,10 @@ class VotePacketHandler final : public ExtVotesPacketHandler { * * @param vote Votes to send * @param block block to send - nullptr means no block + * @param rebroadcast - send even of vote i known for the peer */ - void onNewPbftVote(const std::shared_ptr& vote, const std::shared_ptr& block); + void onNewPbftVote(const std::shared_ptr& vote, const std::shared_ptr& block, + bool rebroadcast = false); void sendPbftVote(const std::shared_ptr& peer, const std::shared_ptr& vote, const std::shared_ptr& block); diff --git a/libraries/core_libs/network/src/tarcap/packets_handlers/vote_packet_handler.cpp b/libraries/core_libs/network/src/tarcap/packets_handlers/vote_packet_handler.cpp index 7c4f3aa6cd..aafbc0e2fe 100644 --- a/libraries/core_libs/network/src/tarcap/packets_handlers/vote_packet_handler.cpp +++ b/libraries/core_libs/network/src/tarcap/packets_handlers/vote_packet_handler.cpp @@ -77,14 +77,15 @@ void VotePacketHandler::process(const PacketData &packet_data, const std::shared } } -void VotePacketHandler::onNewPbftVote(const std::shared_ptr &vote, const std::shared_ptr &block) { +void VotePacketHandler::onNewPbftVote(const std::shared_ptr &vote, const std::shared_ptr &block, + bool rebroadcast) { for (const auto &peer : peers_state_->getAllPeers()) { if (peer.second->syncing_) { LOG(log_dg_) << " PBFT vote " << vote->getHash() << " not sent to " << peer.first << " peer syncing"; continue; } - if (peer.second->isVoteKnown(vote->getHash())) { + if (!rebroadcast && peer.second->isVoteKnown(vote->getHash())) { continue; }