Skip to content

Commit

Permalink
Merge pull request #2436 from Taraxa-project/exponential_backoff
Browse files Browse the repository at this point in the history
chore: minor improvements + fixes
  • Loading branch information
JakubFornadel authored Apr 10, 2023
2 parents 6570426 + 74944cd commit 10dae0b
Show file tree
Hide file tree
Showing 12 changed files with 190 additions and 134 deletions.
21 changes: 11 additions & 10 deletions libraries/core_libs/consensus/include/pbft/pbft_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,18 +268,16 @@ class PbftManager : public std::enable_shared_from_this<PbftManager> {
size_t getPbftCommitteeSize() const { return config_.committee_size; }

/**
* @brief Broadcast or rebroadcast current round soft votes and previous round next votes
* @param rebroadcast
* @brief Test/enforce broadcastVotes() to actually send votes
*/
void broadcastSoftAndNextVotes(bool rebroadcast);
void testBroadcatVotesFunctionality();

private:
/**
* @brief Broadcast or rebroadcast reward votes
* @param rebroadcast
* @brief Broadcast or rebroadcast 2t+1 soft/reward/previous round next votes + all own votes if needed
*/
void broadcastRewardVotes(bool rebroadcast);
void broadcastVotes();

private:
/**
* @brief Check PBFT blocks syncing queue. If there are synced PBFT blocks in queue, push it to PBFT chain
*/
Expand Down Expand Up @@ -543,15 +541,14 @@ class PbftManager : public std::enable_shared_from_this<PbftManager> {

const uint32_t kBroadcastVotesLambdaTime = 20;
const uint32_t kRebroadcastVotesLambdaTime = 60;
uint32_t broadcast_soft_next_votes_counter_ = 1;
uint32_t rebroadcast_soft_next_votes_counter_ = 1;
uint32_t broadcast_votes_counter_ = 1;
uint32_t rebroadcast_votes_counter_ = 1;
uint32_t broadcast_reward_votes_counter_ = 1;
uint32_t rebroadcast_reward_votes_counter_ = 1;

PbftStates state_ = value_proposal_state;
std::atomic<PbftRound> round_ = 1;
PbftStep step_ = 1;
PbftStep startingStepInRound_ = 1;

// Block that node cert voted
std::optional<std::shared_ptr<PbftBlock>> cert_voted_block_for_round_{};
Expand All @@ -570,6 +567,10 @@ class PbftManager : public std::enable_shared_from_this<PbftManager> {
bool go_finish_state_ = false;
bool loop_back_finish_state_ = false;

// Used to avoid cyclic logging in voting steps that are called repeatedly
bool printSecondFinishStepInfo_ = true;
bool printCertStepInfo_ = true;

const blk_hash_t dag_genesis_block_hash_;

const PbftConfig &config_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,25 @@ class VoteManager {
*/
PbftPeriod getRewardVotesPbftBlockPeriod();

/**
* @brief Saves own verified vote into memory and db
*
* @param vote
*/
void saveOwnVerifiedVote(const std::shared_ptr<Vote>& vote);

/**
* @return all own verified votes
*/
std::vector<std::shared_ptr<Vote>> getOwnVerifiedVotes();

/**
* @brief Clear own verified votes
*
* @param write_batch
*/
void clearOwnVerifiedVotes(DbStorage::Batch& write_batch);

/**
* @brief Place a vote, save it in the verified votes queue, and gossip to peers
* @param blockhash vote on PBFT block hash
Expand Down Expand Up @@ -286,6 +305,9 @@ class VoteManager {
std::vector<vote_hash_t> extra_reward_votes_;
mutable std::shared_mutex reward_votes_info_mutex_;

// Own votes generated during current period & round
std::vector<std::shared_ptr<Vote>> own_verified_votes_;

// Cache for current 2T+1 - <Vote type, <period, two_t_plus_one for period>>
// !!! Important: do not access it directly as it is not updated automatically, always call getPbftTwoTPlusOne instead
// !!!
Expand Down
189 changes: 111 additions & 78 deletions libraries/core_libs/consensus/src/pbft/pbft_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,23 +252,36 @@ void PbftManager::setPbftStep(PbftStep pbft_step) {

// Node is still >= kMaxSteps steps behind the rest (at least 1/3) of the network - keep lambda at the standard
// value so node can catch up with the rest of the nodes
if (network_next_voting_step > step_ && network_next_voting_step - step_ >= kMaxSteps) {
lambda_ = kMinLambda;

// To get withing 1 round with the rest of the network - node cannot start exponentially backing off its lambda
// exactly when it is kMaxSteps behind the network as it would reach kMaxLambda lambda time before catching up. If
// we delay triggering exponential backoff by 4 steps, node should get within 1 round with the network.
// !!! Important: This is true only for values kMinLambda = 15000ms and kMaxLambda = 60000 ms
if (network_next_voting_step > step_ && network_next_voting_step - step_ >= kMaxSteps - 4 /* hardcoded delay */) {
// Reset it only if it was already increased compared to default value
if (lambda_ != kMinLambda) {
lambda_ = kMinLambda;
LOG(log_nf_) << "Node is " << network_next_voting_step - step_
<< " steps behind the rest of the network. Reset lambda to the default value " << lambda_.count()
<< " [ms]";
}
} else if (lambda_ < kMaxLambda) {
// Node is < kMaxSteps steps behind the rest (at least 1/3) of the network - start exponentially backing off
// lambda until it reaches kMaxLambda
// lambda until it reaches kMaxLambdagetNetworkTplusOneNextVotingStep
// Note: We calculate the lambda for a step independently of prior steps in case missed earlier steps.
lambda_ *= 2;
if (lambda_ > kMaxLambda) {
lambda_ = kMaxLambda;
}

LOG(log_nf_) << "No round progress - exponentially backing off lambda to " << lambda_.count() << " [ms] in step "
<< step_;
}
}
}

void PbftManager::resetStep() {
step_ = 1;
startingStepInRound_ = 1;
lambda_ = kMinLambda;
}

Expand Down Expand Up @@ -348,8 +361,8 @@ void PbftManager::resetPbftConsensus(PbftRound round) {
LOG(log_dg_) << "Reset PBFT consensus to: period " << getPbftPeriod() << ", round " << round << ", step 1";

// Reset broadcast counters
broadcast_soft_next_votes_counter_ = 1;
rebroadcast_soft_next_votes_counter_ = 1;
broadcast_votes_counter_ = 1;
rebroadcast_votes_counter_ = 1;

// Update current round and reset step to 1
round_ = round;
Expand All @@ -369,8 +382,8 @@ void PbftManager::resetPbftConsensus(PbftRound round) {
cert_voted_block_for_round_.reset();
}

// Remove all own votes generated in previous round
db_->clearOwnVerifiedVotes(batch);
// Clear all own votes generated in previous round
vote_mgr_->clearOwnVerifiedVotes(batch);

db_->commitWriteBatch(batch);

Expand Down Expand Up @@ -443,8 +456,6 @@ void PbftManager::initialState() {
assert(false);
}

// This is used to offset endtime for second finishing step...
startingStepInRound_ = current_pbft_step;
setPbftStep(current_pbft_step);
round_ = current_pbft_round;

Expand Down Expand Up @@ -506,6 +517,7 @@ void PbftManager::setCertifyState_() {
state_ = certify_state;
setPbftStep(step_ + 1);
next_step_time_ms_ = 2 * lambda_;
printCertStepInfo_ = true;
}

void PbftManager::setFinishState_() {
Expand All @@ -524,6 +536,7 @@ void PbftManager::setFinishPollingState_() {
db_->commitWriteBatch(batch);
already_next_voted_value_ = false;
already_next_voted_null_block_hash_ = false;
printSecondFinishStepInfo_ = true;
second_finish_step_start_datetime_ = std::chrono::system_clock::now();
next_step_time_ms_ += kPollingIntervalMs;
}
Expand All @@ -537,61 +550,92 @@ void PbftManager::loopBackFinishState_() {
db_->commitWriteBatch(batch);
already_next_voted_value_ = false;
already_next_voted_null_block_hash_ = false;
assert(step_ >= startingStepInRound_);
next_step_time_ms_ += kPollingIntervalMs;
}

void PbftManager::broadcastSoftAndNextVotes(bool rebroadcast) {
void PbftManager::broadcastVotes() {
auto net = network_.lock();
if (!net) {
LOG(log_er_) << "Unable to broadcast votes -> cant obtain net ptr";
return;
}

auto [round, period] = getPbftRoundAndPeriod();
const auto votes_sync_packet_handler = net->getSpecificHandler<network::tarcap::VotesSyncPacketHandler>();

// Broadcast 2t+1 soft votes
auto soft_votes = vote_mgr_->getTwoTPlusOneVotedBlockVotes(period, round, TwoTPlusOneVotedBlockType::SoftVotedBlock);
if (!soft_votes.empty()) {
LOG(log_dg_) << "Broadcast soft votes for period " << period << ", round " << round;
net->getSpecificHandler<network::tarcap::VotesSyncPacketHandler>()->onNewPbftVotesBundle(std::move(soft_votes),
rebroadcast);
}
// Send votes to the other peers
auto gossipVotes = [this, &votes_sync_packet_handler](std::vector<std::shared_ptr<Vote>> &&votes,
const std::string &votes_type_str, bool rebroadcast) {
if (!votes.empty()) {
LOG(log_dg_) << "Broadcast " << votes_type_str << " for period " << votes.back()->getPeriod() << ", round "
<< votes.back()->getRound();
votes_sync_packet_handler->onNewPbftVotesBundle(std::move(votes), rebroadcast);
}
};

// Broadcast reward votes - previous round 2t+1 cert votes
auto reward_votes = vote_mgr_->getRewardVotes();
if (!reward_votes.empty()) {
LOG(log_dg_) << "Broadcast propose reward votes for period " << period << ", round " << round;
net->getSpecificHandler<network::tarcap::VotesSyncPacketHandler>()->onNewPbftVotesBundle(std::move(reward_votes),
rebroadcast);
}
// (Re)broadcast 2t+1 soft/reward/previous round next votes + all own votes
auto broadcastVotes = [this, &net, &gossipVotes](bool rebroadcast) {
auto [round, period] = getPbftRoundAndPeriod();

// Broadcast 2t+1 soft votes
gossipVotes(vote_mgr_->getTwoTPlusOneVotedBlockVotes(period, round, TwoTPlusOneVotedBlockType::SoftVotedBlock),
"2t+1 soft votes", rebroadcast);

// Broadcast previous round 2t+1 next votes
if (round > 1) {
if (auto next_votes = vote_mgr_->getAllTwoTPlusOneNextVotes(period, round - 1); !next_votes.empty()) {
LOG(log_dg_) << "Broadcast next votes for period " << period << ", round " << round - 1;
net->getSpecificHandler<network::tarcap::VotesSyncPacketHandler>()->onNewPbftVotesBundle(std::move(next_votes),
rebroadcast);
// Broadcast reward votes - previous round 2t+1 cert votes
gossipVotes(vote_mgr_->getRewardVotes(), "2t+1 propose reward votes", rebroadcast);

// Broadcast previous round 2t+1 next votes
if (round > 1) {
gossipVotes(vote_mgr_->getAllTwoTPlusOneNextVotes(period, round - 1), "2t+1 next votes", rebroadcast);
}
}
}

void PbftManager::broadcastRewardVotes(bool rebroadcast) {
auto net = network_.lock();
if (!net) {
return;
}
// Broadcast own votes
auto vote_packet_handler = net->getSpecificHandler<network::tarcap::VotePacketHandler>();
// TODO: this could be optimized to use VotesSyncPacketHandler if we drop some of the checks in process function
// Send votes by one as votes sync packet must contain votes with the same type, period and round
const auto &own_votes = vote_mgr_->getOwnVerifiedVotes();
for (const auto &vote : own_votes) {
vote_packet_handler->onNewPbftVote(vote, getPbftProposedBlock(vote->getPeriod(), vote->getBlockHash()),
rebroadcast);
}
if (!own_votes.empty()) {
LOG(log_dg_) << "Broadcast own votes for period " << period << ", round " << round;
}
};

auto [round, period] = getPbftRoundAndPeriod();
const auto round_elapsed_time = elapsedTimeInMs(current_round_start_datetime_);
const auto period_elapsed_time = elapsedTimeInMs(current_period_start_datetime_);

// Broadcast reward votes - previous round 2t+1 cert votes
auto reward_votes = vote_mgr_->getRewardVotes();
if (!reward_votes.empty()) {
LOG(log_dg_) << "Broadcast propose reward votes for period " << period << ", round " << round;
net->getSpecificHandler<network::tarcap::VotesSyncPacketHandler>()->onNewPbftVotesBundle(std::move(reward_votes),
rebroadcast);
if (round_elapsed_time / kMinLambda > kRebroadcastVotesLambdaTime * rebroadcast_votes_counter_) {
// Stalled in the same round for kRebroadcastVotesLambdaTime * kMinLambda time -> rebroadcast votes
broadcastVotes(true);
rebroadcast_votes_counter_++;
// If there was a rebroadcast no need to do next broadcast either
broadcast_votes_counter_++;
} else if (round_elapsed_time / kMinLambda > kBroadcastVotesLambdaTime * broadcast_votes_counter_) {
// Stalled in the same round for kBroadcastVotesLambdaTime * kMinLambda time -> broadcast votes
broadcastVotes(false);
broadcast_votes_counter_++;
} else if (period_elapsed_time / kMinLambda > kRebroadcastVotesLambdaTime * rebroadcast_reward_votes_counter_) {
// Stalled in the same period for kRebroadcastVotesLambdaTime * kMinLambda time -> rebroadcast reward votes
gossipVotes(vote_mgr_->getRewardVotes(), "2t+1 propose reward votes", true);
rebroadcast_reward_votes_counter_++;
// If there was a rebroadcast no need to do next broadcast either
broadcast_reward_votes_counter_++;
} else if (period_elapsed_time / kMinLambda > kBroadcastVotesLambdaTime * broadcast_reward_votes_counter_) {
// Stalled in the same period for kBroadcastVotesLambdaTime * kMinLambda time -> broadcast reward votes
gossipVotes(vote_mgr_->getRewardVotes(), "2t+1 propose reward votes", false);
broadcast_reward_votes_counter_++;
}
}

void PbftManager::testBroadcatVotesFunctionality() {
// Set these variables to force broadcastVotes() send votes
current_round_start_datetime_ = time_point{};
current_period_start_datetime_ = time_point{};

broadcastVotes();
}

void PbftManager::printVotingSummary() const {
const auto [round, period] = getPbftRoundAndPeriod();
Json::Value json_obj;
Expand All @@ -614,41 +658,21 @@ void PbftManager::printVotingSummary() const {
}

bool PbftManager::stateOperations_() {
pushSyncedPbftBlocksIntoChain();

const auto round_elapsed_time = elapsedTimeInMs(current_round_start_datetime_);
const auto period_elapsed_time = elapsedTimeInMs(current_period_start_datetime_);

if (round_elapsed_time / kMinLambda > kRebroadcastVotesLambdaTime * rebroadcast_soft_next_votes_counter_) {
broadcastSoftAndNextVotes(true);
rebroadcast_soft_next_votes_counter_++;
// If there was a rebroadcast no need to do next broadcast either
broadcast_soft_next_votes_counter_++;
} else if (round_elapsed_time / kMinLambda > kBroadcastVotesLambdaTime * broadcast_soft_next_votes_counter_) {
broadcastSoftAndNextVotes(false);
broadcast_soft_next_votes_counter_++;
}

// Reward votes need to be broadcast even if we are advancing rounds but unable to advance a period
if (period_elapsed_time / kMinLambda > kRebroadcastVotesLambdaTime * rebroadcast_reward_votes_counter_) {
broadcastRewardVotes(true);
rebroadcast_reward_votes_counter_++;
// If there was a rebroadcast no need to do next broadcast either
broadcast_reward_votes_counter_++;
} else if (period_elapsed_time / kMinLambda > kBroadcastVotesLambdaTime * broadcast_reward_votes_counter_) {
broadcastRewardVotes(false);
broadcast_reward_votes_counter_++;
}

auto [round, period] = getPbftRoundAndPeriod();
LOG(log_tr_) << "PBFT current round: " << round << ", period: " << period << ", step " << step_;

// Check if these is already 2t+1 cert votes for some valid block, if so - push it into the chain
// Process synced blocks
pushSyncedPbftBlocksIntoChain();

// (Re)broadcast votes if needed
broadcastVotes();

// Check if there is 2t+1 cert votes for some valid block, if so - push it into the chain
if (tryPushCertVotesBlock()) {
return true;
}

// 2t+1 next votes were seen
// Check if there is 2t+1 next votes for some valid block, if so - advance round
if (advanceRound()) {
return true;
}
Expand Down Expand Up @@ -698,6 +722,9 @@ bool PbftManager::placeVote(const std::shared_ptr<Vote> &vote, std::string_view

gossipNewVote(vote, voted_block);

// Save own verified vote
vote_mgr_->saveOwnVerifiedVote(vote);

LOG(log_nf_) << "Placed " << log_vote_id << " " << vote->getHash() << " for block " << vote->getBlockHash()
<< ", vote weight " << *vote->getWeight() << ", period " << vote->getPeriod() << ", round "
<< vote->getRound() << ", step " << vote->getStep();
Expand Down Expand Up @@ -857,7 +884,11 @@ void PbftManager::identifyBlock_() {
void PbftManager::certifyBlock_() {
// The Certifying Step
auto [round, period] = getPbftRoundAndPeriod();
LOG(log_dg_) << "PBFT certifying state in period " << period << ", round " << round;

if (printCertStepInfo_) {
LOG(log_dg_) << "PBFT certifying state in period " << period << ", round " << round;
printCertStepInfo_ = false;
}

const auto elapsed_time_in_round = elapsedTimeInMs(current_round_start_datetime_);
go_finish_state_ = elapsed_time_in_round > 4 * lambda_ - kPollingIntervalMs;
Expand Down Expand Up @@ -974,9 +1005,11 @@ void PbftManager::firstFinish_() {
void PbftManager::secondFinish_() {
// Odd number steps from 5 are in second finish
auto [round, period] = getPbftRoundAndPeriod();
LOG(log_dg_) << "PBFT second finishing state in period " << period << ", round " << round << ", step " << step_;

assert(step_ >= startingStepInRound_);
if (printSecondFinishStepInfo_) {
LOG(log_dg_) << "PBFT second finishing state in period " << period << ", round " << round << ", step " << step_;
printSecondFinishStepInfo_ = false;
}

// Lambda function for next voting 2t+1 soft voted block from current round
auto next_vote_soft_voted_block = [this, period = period, round = round]() {
Expand Down
Loading

0 comments on commit 10dae0b

Please sign in to comment.