Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: minor improvements + fixes #2436

Merged
merged 3 commits into from
Apr 10, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions libraries/core_libs/consensus/include/pbft/pbft_manager.hpp
Original file line number Diff line number Diff line change
@@ -268,18 +268,16 @@ class PbftManager : public std::enable_shared_from_this<PbftManager> {
size_t getPbftCommitteeSize() const { return config_.committee_size; }

/**
* @brief Broadcast or rebroadcast current round soft votes and previous round next votes
* @param rebroadcast
* @brief Test/enforce broadcastVotes() to actually send votes
*/
void broadcastSoftAndNextVotes(bool rebroadcast);
void testBroadcatVotesFunctionality();

private:
/**
* @brief Broadcast or rebroadcast reward votes
* @param rebroadcast
* @brief Broadcast or rebroadcast 2t+1 soft/reward/previous round next votes + all own votes if needed
*/
void broadcastRewardVotes(bool rebroadcast);
void broadcastVotes();

private:
/**
* @brief Check PBFT blocks syncing queue. If there are synced PBFT blocks in queue, push it to PBFT chain
*/
@@ -543,15 +541,14 @@ class PbftManager : public std::enable_shared_from_this<PbftManager> {

const uint32_t kBroadcastVotesLambdaTime = 20;
const uint32_t kRebroadcastVotesLambdaTime = 60;
uint32_t broadcast_soft_next_votes_counter_ = 1;
uint32_t rebroadcast_soft_next_votes_counter_ = 1;
uint32_t broadcast_votes_counter_ = 1;
uint32_t rebroadcast_votes_counter_ = 1;
uint32_t broadcast_reward_votes_counter_ = 1;
uint32_t rebroadcast_reward_votes_counter_ = 1;

PbftStates state_ = value_proposal_state;
std::atomic<PbftRound> round_ = 1;
PbftStep step_ = 1;
PbftStep startingStepInRound_ = 1;

// Block that node cert voted
std::optional<std::shared_ptr<PbftBlock>> cert_voted_block_for_round_{};
@@ -570,6 +567,10 @@ class PbftManager : public std::enable_shared_from_this<PbftManager> {
bool go_finish_state_ = false;
bool loop_back_finish_state_ = false;

// Used to avoid cyclic logging in voting steps that are called repeatedly
bool printSecondFinishStepInfo_ = true;
bool printCertStepInfo_ = true;

const blk_hash_t dag_genesis_block_hash_;

const PbftConfig &config_;
Original file line number Diff line number Diff line change
@@ -131,6 +131,25 @@ class VoteManager {
*/
PbftPeriod getRewardVotesPbftBlockPeriod();

/**
* @brief Saves own verified vote into memory and db
*
* @param vote
*/
void saveOwnVerifiedVote(const std::shared_ptr<Vote>& vote);

/**
* @return all own verified votes
*/
std::vector<std::shared_ptr<Vote>> getOwnVerifiedVotes();

/**
* @brief Clear own verified votes
*
* @param write_batch
*/
void clearOwnVerifiedVotes(DbStorage::Batch& write_batch);

/**
* @brief Place a vote, save it in the verified votes queue, and gossip to peers
* @param blockhash vote on PBFT block hash
@@ -286,6 +305,9 @@ class VoteManager {
std::vector<vote_hash_t> extra_reward_votes_;
mutable std::shared_mutex reward_votes_info_mutex_;

// Own votes generated during current period & round
std::vector<std::shared_ptr<Vote>> own_verified_votes_;

// Cache for current 2T+1 - <Vote type, <period, two_t_plus_one for period>>
// !!! Important: do not access it directly as it is not updated automatically, always call getPbftTwoTPlusOne instead
// !!!
189 changes: 111 additions & 78 deletions libraries/core_libs/consensus/src/pbft/pbft_manager.cpp
Original file line number Diff line number Diff line change
@@ -252,23 +252,36 @@ void PbftManager::setPbftStep(PbftStep pbft_step) {

// Node is still >= kMaxSteps steps behind the rest (at least 1/3) of the network - keep lambda at the standard
// value so node can catch up with the rest of the nodes
if (network_next_voting_step > step_ && network_next_voting_step - step_ >= kMaxSteps) {
lambda_ = kMinLambda;

// To get withing 1 round with the rest of the network - node cannot start exponentially backing off its lambda
// exactly when it is kMaxSteps behind the network as it would reach kMaxLambda lambda time before catching up. If
// we delay triggering exponential backoff by 4 steps, node should get within 1 round with the network.
// !!! Important: This is true only for values kMinLambda = 15000ms and kMaxLambda = 60000 ms
if (network_next_voting_step > step_ && network_next_voting_step - step_ >= kMaxSteps - 4 /* hardcoded delay */) {
// Reset it only if it was already increased compared to default value
if (lambda_ != kMinLambda) {
lambda_ = kMinLambda;
LOG(log_nf_) << "Node is " << network_next_voting_step - step_
<< " steps behind the rest of the network. Reset lambda to the default value " << lambda_.count()
<< " [ms]";
}
} else if (lambda_ < kMaxLambda) {
// Node is < kMaxSteps steps behind the rest (at least 1/3) of the network - start exponentially backing off
// lambda until it reaches kMaxLambda
// lambda until it reaches kMaxLambdagetNetworkTplusOneNextVotingStep
// Note: We calculate the lambda for a step independently of prior steps in case missed earlier steps.
lambda_ *= 2;
if (lambda_ > kMaxLambda) {
lambda_ = kMaxLambda;
}

LOG(log_nf_) << "No round progress - exponentially backing off lambda to " << lambda_.count() << " [ms] in step "
<< step_;
}
}
}

void PbftManager::resetStep() {
step_ = 1;
startingStepInRound_ = 1;
lambda_ = kMinLambda;
}

@@ -348,8 +361,8 @@ void PbftManager::resetPbftConsensus(PbftRound round) {
LOG(log_dg_) << "Reset PBFT consensus to: period " << getPbftPeriod() << ", round " << round << ", step 1";

// Reset broadcast counters
broadcast_soft_next_votes_counter_ = 1;
rebroadcast_soft_next_votes_counter_ = 1;
broadcast_votes_counter_ = 1;
rebroadcast_votes_counter_ = 1;

// Update current round and reset step to 1
round_ = round;
@@ -369,8 +382,8 @@ void PbftManager::resetPbftConsensus(PbftRound round) {
cert_voted_block_for_round_.reset();
}

// Remove all own votes generated in previous round
db_->clearOwnVerifiedVotes(batch);
// Clear all own votes generated in previous round
vote_mgr_->clearOwnVerifiedVotes(batch);

db_->commitWriteBatch(batch);

@@ -443,8 +456,6 @@ void PbftManager::initialState() {
assert(false);
}

// This is used to offset endtime for second finishing step...
startingStepInRound_ = current_pbft_step;
setPbftStep(current_pbft_step);
round_ = current_pbft_round;

@@ -506,6 +517,7 @@ void PbftManager::setCertifyState_() {
state_ = certify_state;
setPbftStep(step_ + 1);
next_step_time_ms_ = 2 * lambda_;
printCertStepInfo_ = true;
}

void PbftManager::setFinishState_() {
@@ -524,6 +536,7 @@ void PbftManager::setFinishPollingState_() {
db_->commitWriteBatch(batch);
already_next_voted_value_ = false;
already_next_voted_null_block_hash_ = false;
printSecondFinishStepInfo_ = true;
second_finish_step_start_datetime_ = std::chrono::system_clock::now();
next_step_time_ms_ += kPollingIntervalMs;
}
@@ -537,61 +550,92 @@ void PbftManager::loopBackFinishState_() {
db_->commitWriteBatch(batch);
already_next_voted_value_ = false;
already_next_voted_null_block_hash_ = false;
assert(step_ >= startingStepInRound_);
next_step_time_ms_ += kPollingIntervalMs;
}

void PbftManager::broadcastSoftAndNextVotes(bool rebroadcast) {
void PbftManager::broadcastVotes() {
auto net = network_.lock();
if (!net) {
LOG(log_er_) << "Unable to broadcast votes -> cant obtain net ptr";
return;
}

auto [round, period] = getPbftRoundAndPeriod();
const auto votes_sync_packet_handler = net->getSpecificHandler<network::tarcap::VotesSyncPacketHandler>();

// Broadcast 2t+1 soft votes
auto soft_votes = vote_mgr_->getTwoTPlusOneVotedBlockVotes(period, round, TwoTPlusOneVotedBlockType::SoftVotedBlock);
if (!soft_votes.empty()) {
LOG(log_dg_) << "Broadcast soft votes for period " << period << ", round " << round;
net->getSpecificHandler<network::tarcap::VotesSyncPacketHandler>()->onNewPbftVotesBundle(std::move(soft_votes),
rebroadcast);
}
// Send votes to the other peers
auto gossipVotes = [this, &votes_sync_packet_handler](std::vector<std::shared_ptr<Vote>> &&votes,
const std::string &votes_type_str, bool rebroadcast) {
if (!votes.empty()) {
LOG(log_dg_) << "Broadcast " << votes_type_str << " for period " << votes.back()->getPeriod() << ", round "
<< votes.back()->getRound();
votes_sync_packet_handler->onNewPbftVotesBundle(std::move(votes), rebroadcast);
}
};

// Broadcast reward votes - previous round 2t+1 cert votes
auto reward_votes = vote_mgr_->getRewardVotes();
if (!reward_votes.empty()) {
LOG(log_dg_) << "Broadcast propose reward votes for period " << period << ", round " << round;
net->getSpecificHandler<network::tarcap::VotesSyncPacketHandler>()->onNewPbftVotesBundle(std::move(reward_votes),
rebroadcast);
}
// (Re)broadcast 2t+1 soft/reward/previous round next votes + all own votes
auto broadcastVotes = [this, &net, &gossipVotes](bool rebroadcast) {
auto [round, period] = getPbftRoundAndPeriod();

// Broadcast 2t+1 soft votes
gossipVotes(vote_mgr_->getTwoTPlusOneVotedBlockVotes(period, round, TwoTPlusOneVotedBlockType::SoftVotedBlock),
"2t+1 soft votes", rebroadcast);

// Broadcast previous round 2t+1 next votes
if (round > 1) {
if (auto next_votes = vote_mgr_->getAllTwoTPlusOneNextVotes(period, round - 1); !next_votes.empty()) {
LOG(log_dg_) << "Broadcast next votes for period " << period << ", round " << round - 1;
net->getSpecificHandler<network::tarcap::VotesSyncPacketHandler>()->onNewPbftVotesBundle(std::move(next_votes),
rebroadcast);
// Broadcast reward votes - previous round 2t+1 cert votes
gossipVotes(vote_mgr_->getRewardVotes(), "2t+1 propose reward votes", rebroadcast);

// Broadcast previous round 2t+1 next votes
if (round > 1) {
gossipVotes(vote_mgr_->getAllTwoTPlusOneNextVotes(period, round - 1), "2t+1 next votes", rebroadcast);
}
}
}

void PbftManager::broadcastRewardVotes(bool rebroadcast) {
auto net = network_.lock();
if (!net) {
return;
}
// Broadcast own votes
auto vote_packet_handler = net->getSpecificHandler<network::tarcap::VotePacketHandler>();
// TODO: this could be optimized to use VotesSyncPacketHandler if we drop some of the checks in process function
// Send votes by one as votes sync packet must contain votes with the same type, period and round
const auto &own_votes = vote_mgr_->getOwnVerifiedVotes();
for (const auto &vote : own_votes) {
vote_packet_handler->onNewPbftVote(vote, getPbftProposedBlock(vote->getPeriod(), vote->getBlockHash()),
rebroadcast);
}
if (!own_votes.empty()) {
LOG(log_dg_) << "Broadcast own votes for period " << period << ", round " << round;
}
};

auto [round, period] = getPbftRoundAndPeriod();
const auto round_elapsed_time = elapsedTimeInMs(current_round_start_datetime_);
const auto period_elapsed_time = elapsedTimeInMs(current_period_start_datetime_);

// Broadcast reward votes - previous round 2t+1 cert votes
auto reward_votes = vote_mgr_->getRewardVotes();
if (!reward_votes.empty()) {
LOG(log_dg_) << "Broadcast propose reward votes for period " << period << ", round " << round;
net->getSpecificHandler<network::tarcap::VotesSyncPacketHandler>()->onNewPbftVotesBundle(std::move(reward_votes),
rebroadcast);
if (round_elapsed_time / kMinLambda > kRebroadcastVotesLambdaTime * rebroadcast_votes_counter_) {
// Stalled in the same round for kRebroadcastVotesLambdaTime * kMinLambda time -> rebroadcast votes
broadcastVotes(true);
rebroadcast_votes_counter_++;
// If there was a rebroadcast no need to do next broadcast either
broadcast_votes_counter_++;
} else if (round_elapsed_time / kMinLambda > kBroadcastVotesLambdaTime * broadcast_votes_counter_) {
// Stalled in the same round for kBroadcastVotesLambdaTime * kMinLambda time -> broadcast votes
broadcastVotes(false);
broadcast_votes_counter_++;
} else if (period_elapsed_time / kMinLambda > kRebroadcastVotesLambdaTime * rebroadcast_reward_votes_counter_) {
// Stalled in the same period for kRebroadcastVotesLambdaTime * kMinLambda time -> rebroadcast reward votes
gossipVotes(vote_mgr_->getRewardVotes(), "2t+1 propose reward votes", true);
rebroadcast_reward_votes_counter_++;
// If there was a rebroadcast no need to do next broadcast either
broadcast_reward_votes_counter_++;
} else if (period_elapsed_time / kMinLambda > kBroadcastVotesLambdaTime * broadcast_reward_votes_counter_) {
// Stalled in the same period for kBroadcastVotesLambdaTime * kMinLambda time -> broadcast reward votes
gossipVotes(vote_mgr_->getRewardVotes(), "2t+1 propose reward votes", false);
broadcast_reward_votes_counter_++;
}
}

void PbftManager::testBroadcatVotesFunctionality() {
// Set these variables to force broadcastVotes() send votes
current_round_start_datetime_ = time_point{};
current_period_start_datetime_ = time_point{};

broadcastVotes();
}

void PbftManager::printVotingSummary() const {
const auto [round, period] = getPbftRoundAndPeriod();
Json::Value json_obj;
@@ -614,41 +658,21 @@ void PbftManager::printVotingSummary() const {
}

bool PbftManager::stateOperations_() {
pushSyncedPbftBlocksIntoChain();

const auto round_elapsed_time = elapsedTimeInMs(current_round_start_datetime_);
const auto period_elapsed_time = elapsedTimeInMs(current_period_start_datetime_);

if (round_elapsed_time / kMinLambda > kRebroadcastVotesLambdaTime * rebroadcast_soft_next_votes_counter_) {
broadcastSoftAndNextVotes(true);
rebroadcast_soft_next_votes_counter_++;
// If there was a rebroadcast no need to do next broadcast either
broadcast_soft_next_votes_counter_++;
} else if (round_elapsed_time / kMinLambda > kBroadcastVotesLambdaTime * broadcast_soft_next_votes_counter_) {
broadcastSoftAndNextVotes(false);
broadcast_soft_next_votes_counter_++;
}

// Reward votes need to be broadcast even if we are advancing rounds but unable to advance a period
if (period_elapsed_time / kMinLambda > kRebroadcastVotesLambdaTime * rebroadcast_reward_votes_counter_) {
broadcastRewardVotes(true);
rebroadcast_reward_votes_counter_++;
// If there was a rebroadcast no need to do next broadcast either
broadcast_reward_votes_counter_++;
} else if (period_elapsed_time / kMinLambda > kBroadcastVotesLambdaTime * broadcast_reward_votes_counter_) {
broadcastRewardVotes(false);
broadcast_reward_votes_counter_++;
}

auto [round, period] = getPbftRoundAndPeriod();
LOG(log_tr_) << "PBFT current round: " << round << ", period: " << period << ", step " << step_;

// Check if these is already 2t+1 cert votes for some valid block, if so - push it into the chain
// Process synced blocks
pushSyncedPbftBlocksIntoChain();

// (Re)broadcast votes if needed
broadcastVotes();

// Check if there is 2t+1 cert votes for some valid block, if so - push it into the chain
if (tryPushCertVotesBlock()) {
return true;
}

// 2t+1 next votes were seen
// Check if there is 2t+1 next votes for some valid block, if so - advance round
if (advanceRound()) {
return true;
}
@@ -698,6 +722,9 @@ bool PbftManager::placeVote(const std::shared_ptr<Vote> &vote, std::string_view

gossipNewVote(vote, voted_block);

// Save own verified vote
vote_mgr_->saveOwnVerifiedVote(vote);

LOG(log_nf_) << "Placed " << log_vote_id << " " << vote->getHash() << " for block " << vote->getBlockHash()
<< ", vote weight " << *vote->getWeight() << ", period " << vote->getPeriod() << ", round "
<< vote->getRound() << ", step " << vote->getStep();
@@ -857,7 +884,11 @@ void PbftManager::identifyBlock_() {
void PbftManager::certifyBlock_() {
// The Certifying Step
auto [round, period] = getPbftRoundAndPeriod();
LOG(log_dg_) << "PBFT certifying state in period " << period << ", round " << round;

if (printCertStepInfo_) {
LOG(log_dg_) << "PBFT certifying state in period " << period << ", round " << round;
printCertStepInfo_ = false;
}

const auto elapsed_time_in_round = elapsedTimeInMs(current_round_start_datetime_);
go_finish_state_ = elapsed_time_in_round > 4 * lambda_ - kPollingIntervalMs;
@@ -974,9 +1005,11 @@ void PbftManager::firstFinish_() {
void PbftManager::secondFinish_() {
// Odd number steps from 5 are in second finish
auto [round, period] = getPbftRoundAndPeriod();
LOG(log_dg_) << "PBFT second finishing state in period " << period << ", round " << round << ", step " << step_;

assert(step_ >= startingStepInRound_);
if (printSecondFinishStepInfo_) {
LOG(log_dg_) << "PBFT second finishing state in period " << period << ", round " << round << ", step " << step_;
printSecondFinishStepInfo_ = false;
}

// Lambda function for next voting 2t+1 soft voted block from current round
auto next_vote_soft_voted_block = [this, period = period, round = round]() {
Loading