Skip to content

Commit

Permalink
Merge pull request EOSIO#91 from eosiosg/feature/dpos-pbft-bos-upgrade
Browse files Browse the repository at this point in the history
bugfix/dpos pbft bos upgrade
  • Loading branch information
Thaipanda authored May 5, 2019
2 parents 6c65226 + 84be1d1 commit 76fc46f
Show file tree
Hide file tree
Showing 13 changed files with 718 additions and 182 deletions.
139 changes: 77 additions & 62 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ struct controller_impl {
chainbase::database reversible_blocks; ///< a special database to persist blocks that have successfully been applied but are still reversible
block_log blog;
optional<pending_state> pending;
bool pbft_enabled = false;
bool pbft_upgrading = false;
optional<block_id_type> pending_pbft_lib;
optional<block_id_type> pending_pbft_checkpoint;
vector<block_num_type> proposed_schedule_blocks;
Expand Down Expand Up @@ -351,13 +353,17 @@ struct controller_impl {

void init(std::function<bool()> shutdown, const snapshot_reader_ptr& snapshot) {


bool report_integrity_hash = !!snapshot;
if (snapshot) {
EOS_ASSERT( !head, fork_database_exception, "" );
snapshot->validate();

read_from_snapshot( snapshot );

//do upgrade migration if necessary;
migrate_upgrade(); //compatiable for snapshot integrity test

auto end = blog.read_head();
if( !end ) {
blog.reset( conf.genesis, signed_block_ptr(), head->block_num + 1 );
Expand All @@ -368,6 +374,8 @@ struct controller_impl {
"Block log is provided with snapshot but does not contain the head block from the snapshot" );
}
} else {
//do upgrade migration if necessary;
migrate_upgrade(); //compatiable for snapshot integrity test
if( !head ) {
initialize_fork_db(); // set head to genesis state
}
Expand All @@ -381,9 +389,6 @@ struct controller_impl {
}
}

//do upgrade migration if necessary;
migrate_upgrade();

if( shutdown() ) return;

const auto& ubi = reversible_blocks.get_index<reversible_block_index,by_num>();
Expand Down Expand Up @@ -881,41 +886,21 @@ struct controller_impl {
}
}

bool is_new_version() {
auto ucb = upgrade_complete_block();
//new version starts from the next block of ucb, this is to avoid inconsistency after pre calculation inside schedule loop.
if (ucb) return head->block_num > *ucb;
return false;
}

bool is_upgrading() {
auto utb = upgrade_target_block();
auto ucb = upgrade_complete_block();
auto is_upgrading = false;
if (utb) is_upgrading = head->block_num >= *utb;
if (ucb) is_upgrading = is_upgrading && head->block_num <= *ucb;
return is_upgrading;
}

/**
* @post regardless of the success of commit block there is no active pending block
*/
void commit_block( bool add_to_fork_db ) {
auto reset_pending_on_exit = fc::make_scoped_exit([this]{
pending.reset();
set_pbft_lib();
set_pbft_lscb();

});

try {
set_pbft_lib();
set_pbft_lscb();

if (add_to_fork_db) {
pending->_pending_block_state->validated = true;

auto new_version = is_new_version();

auto new_bsp = fork_db.add(pending->_pending_block_state, true, new_version);
auto new_bsp = fork_db.add(pending->_pending_block_state, true, pbft_enabled);
emit(self.accepted_block_header, pending->_pending_block_state);

head = fork_db.head();
Expand Down Expand Up @@ -1337,6 +1322,9 @@ struct controller_impl {
{
EOS_ASSERT( !pending, block_validate_exception, "pending block already exists" );

set_pbft_lib();
set_pbft_lscb();

auto guard_pending = fc::make_scoped_exit([this](){
pending.reset();
});
Expand All @@ -1350,31 +1338,41 @@ struct controller_impl {
pending.emplace(maybe_session());
}

auto utb = upgrade_target_block();
auto ucb = upgrade_complete_block();
if (utb && !ucb) {
if (head->dpos_irreversible_blocknum >= *utb) {
const auto& upo = db.get<upgrade_property_object>();
auto utb = optional<block_num_type>{};
auto& upo = db.get<upgrade_property_object>();
if (upo.upgrade_target_block_num > 0) utb = upo.upgrade_target_block_num;

auto ucb = optional<block_num_type>{};
if (upo.upgrade_complete_block_num > 0) ucb = upo.upgrade_complete_block_num;


if (utb && !ucb && head->dpos_irreversible_blocknum >= *utb) {
db.modify( upo, [&]( auto& up ) {
up.upgrade_complete_block_num = head->block_num;
});
wlog("system is going to be new version after the block ${b}", ("b", head->block_num));
}
if (!replaying) wlog("pbft will be working after the block ${b}", ("b", head->block_num));
}

auto new_version = is_new_version();
auto upgrading = is_upgrading();
if ( !pbft_enabled && utb && head->block_num >= *utb) {
if (!pbft_upgrading) pbft_upgrading = true;

// new version starts from the next block of ucb, this is to avoid inconsistency after pre calculation inside schedule loop.
if (ucb && head->block_num > *ucb) {
if (pbft_upgrading) pbft_upgrading = false;
pbft_enabled = true;
}
}

pending->_block_status = s;
pending->_producer_block_id = producer_block_id;
pending->_signer = signer;
pending->_pending_block_state = std::make_shared<block_state>( *head, when, new_version); // promotes pending schedule (if any) to active
pending->_pending_block_state = std::make_shared<block_state>( *head, when, pbft_enabled); // promotes pending schedule (if any) to active
pending->_pending_block_state->in_current_chain = true;

pending->_pending_block_state->set_confirmed(confirm_block_count, new_version);
pending->_pending_block_state->set_confirmed(confirm_block_count, pbft_enabled);


auto was_pending_promoted = pending->_pending_block_state->maybe_promote_pending(new_version);
auto was_pending_promoted = pending->_pending_block_state->maybe_promote_pending(pbft_enabled);

//modify state in speculative block only if we are speculative reads mode (other wise we need clean state for head or irreversible reads)
if ( read_mode == db_read_mode::SPECULATIVE || pending->_block_status != controller::block_status::incomplete ) {
Expand All @@ -1384,7 +1382,7 @@ struct controller_impl {
auto lib_num = std::max(pending->_pending_block_state->dpos_irreversible_blocknum, pending->_pending_block_state->bft_irreversible_blocknum);
auto lscb_num = pending->_pending_block_state->pbft_stable_checkpoint_blocknum;

if (new_version && gpo.proposed_schedule_block_num) {
if (pbft_enabled && gpo.proposed_schedule_block_num) {
proposed_schedule_blocks.emplace_back(*gpo.proposed_schedule_block_num);
for ( auto itr = proposed_schedule_blocks.begin(); itr != proposed_schedule_blocks.end();) {
if ((*itr) < lscb_num) {
Expand All @@ -1401,19 +1399,19 @@ struct controller_impl {
&& pending->_pending_block_state->pending_schedule.producers.size() == 0 // ... and there is room for a new pending schedule ...
&& !was_pending_promoted; // ... and not just because it was promoted to active at the start of this block, then:

if (new_version) {
if (pbft_enabled) {
should_promote_pending_schedule = should_promote_pending_schedule
&& pending->_pending_block_state->block_num > *gpo.proposed_schedule_block_num;
} else {
should_promote_pending_schedule = should_promote_pending_schedule
&& ( *gpo.proposed_schedule_block_num <= pending->_pending_block_state->dpos_irreversible_blocknum );
}

if ( upgrading && !replaying) wlog("system is upgrading, no producer schedule promotion will happen until fully upgraded.");
if ( pbft_upgrading && !replaying) wlog("system is upgrading, no producer schedule promotion will happen until fully upgraded.");

if ( should_promote_pending_schedule )
{
if (!upgrading) {
if (!pbft_upgrading) {
// Promote proposed schedule to pending schedule.
if (!replaying) {
ilog("promoting proposed schedule (set in block ${proposed_num}) to pending; current block: ${n} lib: ${lib} schedule: ${schedule} ",
Expand All @@ -1423,7 +1421,7 @@ struct controller_impl {
}
pending->_pending_block_state->set_new_producers(gpo.proposed_schedule);

if (new_version) {
if (pbft_enabled) {
promoted_schedule_blocks.emplace_back(pending->_pending_block_state->block_num);
for ( auto itr = promoted_schedule_blocks.begin(); itr != promoted_schedule_blocks.end();) {
if ((*itr) < lscb_num) {
Expand Down Expand Up @@ -1576,11 +1574,11 @@ struct controller_impl {
auto prev = fork_db.get_block( b->previous );
EOS_ASSERT( prev, unlinkable_block_exception, "unlinkable block ${id}", ("id", id)("previous", b->previous) );

auto new_version = is_new_version();
auto pbft = pbft_enabled;

return async_thread_pool( thread_pool, [b, prev, new_version]() {
return async_thread_pool( thread_pool, [b, prev, pbft]() {
const bool skip_validate_signee = false;
return std::make_shared<block_state>( *prev, move( b ), skip_validate_signee, new_version);
return std::make_shared<block_state>( *prev, move( b ), skip_validate_signee, pbft);
} );
}

Expand All @@ -1595,8 +1593,7 @@ struct controller_impl {
auto& b = new_header_state->block;
emit( self.pre_accepted_block, b );

auto new_version = is_new_version();
fork_db.add( new_header_state, false, new_version);
fork_db.add( new_header_state, false, pbft_enabled);

if (conf.trusted_producers.count(b->producer)) {
trusted_producer_light_validation = true;
Expand Down Expand Up @@ -1626,9 +1623,7 @@ struct controller_impl {
emit( self.pre_accepted_block, b );
const bool skip_validate_signee = !conf.force_all_checks;

auto new_version = is_new_version();

auto new_header_state = fork_db.add( b, skip_validate_signee, new_version);
auto new_header_state = fork_db.add( b, skip_validate_signee, pbft_enabled);

emit( self.accepted_block_header, new_header_state );

Expand Down Expand Up @@ -1661,11 +1656,13 @@ struct controller_impl {

void set_pbft_lib() {

if ((!pending || pending->_block_status != controller::block_status::incomplete) && pending_pbft_lib ) {
if (!pbft_enabled) return;

if ( pending_pbft_lib ) {
fork_db.set_bft_irreversible(*pending_pbft_lib);
pending_pbft_lib.reset();

if (read_mode != db_read_mode::IRREVERSIBLE) {
if (!pending && read_mode != db_read_mode::IRREVERSIBLE) {
maybe_switch_forks(controller::block_status::complete);
}
}
Expand All @@ -1677,7 +1674,10 @@ struct controller_impl {
}

void set_pbft_lscb() {
if ((!pending || pending->_block_status != controller::block_status::incomplete) && pending_pbft_checkpoint ) {

if (!pbft_enabled) return;

if ( pending_pbft_checkpoint ) {

auto checkpoint_block_state = fork_db.get_block(*pending_pbft_checkpoint);
if (checkpoint_block_state) {
Expand Down Expand Up @@ -2131,7 +2131,7 @@ chainbase::database& controller::mutable_db()const { return my->db; }

const fork_database& controller::fork_db()const { return my->fork_db; }

std::map<chain::public_key_type, signature_provider_type> controller::my_signature_providers()const{
std::map<chain::public_key_type, signature_provider_type> controller:: my_signature_providers()const{
return my->conf.my_signature_providers;
}

Expand Down Expand Up @@ -2520,21 +2520,23 @@ chain_id_type controller::get_chain_id()const {
return my->chain_id;
}

void controller::set_pbft_prepared(const block_id_type& id) const {
void controller::set_pbft_prepared(const block_id_type& id) {
my->pbft_prepared.reset();
auto bs = fetch_block_state_by_id(id);
if (bs) {
my->pbft_prepared = bs;
my->fork_db.mark_pbft_prepared_fork(bs);
maybe_switch_forks();
}
}

void controller::set_pbft_my_prepare(const block_id_type& id) const {
void controller::set_pbft_my_prepare(const block_id_type& id) {
my->my_prepare.reset();
auto bs = fetch_block_state_by_id(id);
if (bs) {
my->my_prepare = bs;
my->fork_db.mark_pbft_my_prepare_fork(bs);
maybe_switch_forks();
}
}

Expand All @@ -2543,11 +2545,18 @@ block_id_type controller::get_pbft_my_prepare() const {
return block_id_type{};
}

void controller::reset_pbft_my_prepare() const {
void controller::reset_pbft_my_prepare() {
my->fork_db.remove_pbft_my_prepare_fork();
maybe_switch_forks();
if (my->my_prepare) my->my_prepare.reset();
}

void controller::reset_pbft_prepared() {
my->fork_db.remove_pbft_prepared_fork();
maybe_switch_forks();
if (my->pbft_prepared) my->pbft_prepared.reset();
}

db_read_mode controller::get_read_mode()const {
return my->read_mode;
}
Expand Down Expand Up @@ -2703,12 +2712,18 @@ const upgrade_property_object& controller::get_upgrade_properties()const {
return my->db.get<upgrade_property_object>();
}

bool controller::is_upgraded() const {
return my->is_new_version();
bool controller::is_pbft_enabled() const {
return my->pbft_enabled;
}

bool controller::under_upgrade() const {
return my->is_upgrading();
bool controller::under_maintenance() const {
return my->pbft_upgrading;
}

void controller::maybe_switch_forks() {
if (!pending_block_state() && my->read_mode != db_read_mode::IRREVERSIBLE) {
my->maybe_switch_forks(controller::block_status::complete);
}
}

// this will be used in unit_test only, should not be called anywhere else.
Expand Down
33 changes: 33 additions & 0 deletions libraries/chain/fork_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,38 @@ namespace eosio { namespace chain {
my->head = *my->index.get<by_lib_block_num>().begin();
}

void fork_database::remove_pbft_prepared_fork() {
auto oldest = *my->index.get<by_block_num>().begin();

auto& by_id_idx = my->index.get<by_block_id>();
auto itr = by_id_idx.find( oldest->id );
by_id_idx.modify( itr, [&]( auto& bsp ) { bsp->pbft_prepared = false; });

auto update = [&]( const vector<block_id_type>& in ) {
vector<block_id_type> updated;

for( const auto& i : in ) {
auto& pidx = my->index.get<by_prev>();
auto pitr = pidx.lower_bound( i );
auto epitr = pidx.upper_bound( i );
while( pitr != epitr ) {
pidx.modify( pitr, [&]( auto& bsp ) {
bsp->pbft_prepared = false;
updated.push_back( bsp->id );
});
++pitr;
}
}
return updated;
};

vector<block_id_type> queue{ oldest->id };
while(!queue.empty()) {
queue = update( queue );
}
my->head = *my->index.get<by_lib_block_num>().begin();
}

block_state_ptr fork_database::get_block_in_current_chain_by_num( uint32_t n )const {
const auto& numidx = my->index.get<by_block_num>();
auto nitr = numidx.lower_bound( n );
Expand Down Expand Up @@ -515,6 +547,7 @@ namespace eosio { namespace chain {
while( queue.size() ) {
queue = update( queue );
}
my->head = *my->index.get<by_lib_block_num>().begin();
}

void fork_database::set_latest_checkpoint( block_id_type id) {
Expand Down
Loading

0 comments on commit 76fc46f

Please sign in to comment.