Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/dpos-pbft-bos-upgrade bugfix and snapshot support #94

Merged
merged 11 commits into from
May 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 94 additions & 49 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,21 +361,23 @@ struct controller_impl {

read_from_snapshot( snapshot );

//do upgrade migration if necessary;
migrate_upgrade(); //compatiable for snapshot integrity test
//do upgrade migration if necessary;
migrate_upgrade(); //compatiable for snapshot integrity test

auto end = blog.read_head();
if( !end ) {
blog.reset( conf.genesis, signed_block_ptr(), head->block_num + 1 );
auto reset_block_num = head->block_num + 1;
if (pbft_enabled) reset_block_num = head->pbft_stable_checkpoint_blocknum;
blog.reset( conf.genesis, signed_block_ptr(), reset_block_num );
} else if( end->block_num() > head->block_num ) {
replay( shutdown );
} else {
EOS_ASSERT( end->block_num() == head->block_num, fork_database_exception,
"Block log is provided with snapshot but does not contain the head block from the snapshot" );
}
} else {
//do upgrade migration if necessary;
migrate_upgrade(); //compatiable for snapshot integrity test
//do upgrade migration if necessary;
migrate_upgrade(); //compatiable for snapshot integrity test
if( !head ) {
initialize_fork_db(); // set head to genesis state
}
Expand All @@ -388,7 +390,7 @@ struct controller_impl {
report_integrity_hash = true;
}
}

if( shutdown() ) return;

const auto& ubi = reversible_blocks.get_index<reversible_block_index,by_num>();
Expand Down Expand Up @@ -430,11 +432,39 @@ struct controller_impl {
//generate upo.
try {
db.get<upgrade_property_object>();
if (pbft_enabled) wlog("pbft enabled");
} catch( const boost::exception& e) {
wlog("no upo found, generating...");
db.create<upgrade_property_object>([](auto&){});
}
update_pbft_status();
}

void update_pbft_status() {
auto utb = optional<block_num_type>{};
auto& upo = db.get<upgrade_property_object>();
if (upo.upgrade_target_block_num > 0) utb = upo.upgrade_target_block_num;

auto ucb = optional<block_num_type>{};
if (upo.upgrade_complete_block_num > 0) ucb = upo.upgrade_complete_block_num;


if (utb && !ucb && head->dpos_irreversible_blocknum >= *utb) {
db.modify( upo, [&]( auto& up ) {
up.upgrade_complete_block_num = head->block_num;
});
if (!replaying) wlog("pbft will be working after the block ${b}", ("b", head->block_num));
}

if ( !pbft_enabled && utb && head->block_num >= *utb) {
if (!pbft_upgrading) pbft_upgrading = true;

// new version starts from the next block of ucb, this is to avoid inconsistency after pre calculation inside schedule loop.
if (ucb && head->block_num > *ucb) {
if (pbft_upgrading) pbft_upgrading = false;
pbft_enabled = true;
}
}
}

~controller_impl() {
Expand Down Expand Up @@ -527,9 +557,21 @@ struct controller_impl {
section.add_row(batch_pbft_snapshot_migration{}, db);
});

snapshot->write_section<block_state>([this]( auto &section ){
section.template add_row<block_header_state>(*fork_db.head(), db);
});
if (pbft_enabled) {
snapshot->write_section<batch_pbft_enabled>([this]( auto &section ) {
section.add_row(batch_pbft_enabled{}, db);
});
snapshot->write_section<branch_type>([this](auto &section) {
auto bid = fork_db.get_block_in_current_chain_by_num(fork_db.head()->pbft_stable_checkpoint_blocknum)->id;
EOS_ASSERT(bid != block_id_type{}, snapshot_exception, "cannot find lscb block");
auto bss = fork_db.fetch_branch_from(fork_db.head()->id, bid).first;
section.template add_row<branch_type>(bss, db);
});
} else {
snapshot->write_section<block_state>([this]( auto &section ){
section.template add_row<block_header_state>(*fork_db.head(), db);
});
}

controller_index_set::walk_indices([this, &snapshot]( auto utils ){
using value_t = typename decltype(utils)::index_t::value_type;
Expand Down Expand Up @@ -560,19 +602,42 @@ struct controller_impl {
});

bool migrated = snapshot->has_section<batch_pbft_snapshot_migration>();
if(migrated) {
snapshot->read_section<block_state>([this](auto &section) {
block_header_state head_header_state;
section.read_row(head_header_state, db);

auto head_state = std::make_shared<block_state>(head_header_state);
fork_db.set(head_state);
fork_db.set_validity(head_state, true);
fork_db.mark_in_current_chain(head_state, true);
head = head_state;
snapshot_head_block = head->block_num;
});
}else{
if (migrated) {
auto upgraded = snapshot->has_section<batch_pbft_enabled>();
if (upgraded) {
snapshot->read_section<branch_type>([this](auto &section) {
branch_type bss;
section.template read_row<branch_type>(bss, db);
EOS_ASSERT(!bss.empty(), snapshot_exception, "no last stable checkpoint block in the snapshot");

wlog("${n} reversible blocks found in the snapshot", ("n", bss.size()));

for (auto i = bss.rbegin(); i != bss.rend(); ++i ) {
if (i == bss.rbegin()) {
fork_db.set(*i);
snapshot_head_block = (*i)->block_num;
} else {
fork_db.add((*i), true, true);
}
fork_db.set_validity((*i), true);
fork_db.mark_in_current_chain((*i), true);
}
head = fork_db.head();
});
} else {
snapshot->read_section<block_state>([this](auto &section) {
block_header_state head_header_state;
section.read_row(head_header_state, db);

auto head_state = std::make_shared<block_state>(head_header_state);
fork_db.set(head_state);
fork_db.set_validity(head_state, true);
fork_db.mark_in_current_chain(head_state, true);
head = head_state;
snapshot_head_block = head->block_num;
});
}
} else {
snapshot->read_section<block_state>([this](snapshot_reader::section_reader &section) {
block_header_state head_header_state;
section.read_pbft_migrate_row(head_header_state, db);
Expand Down Expand Up @@ -1338,30 +1403,7 @@ struct controller_impl {
pending.emplace(maybe_session());
}

auto utb = optional<block_num_type>{};
auto& upo = db.get<upgrade_property_object>();
if (upo.upgrade_target_block_num > 0) utb = upo.upgrade_target_block_num;

auto ucb = optional<block_num_type>{};
if (upo.upgrade_complete_block_num > 0) ucb = upo.upgrade_complete_block_num;


if (utb && !ucb && head->dpos_irreversible_blocknum >= *utb) {
db.modify( upo, [&]( auto& up ) {
up.upgrade_complete_block_num = head->block_num;
});
if (!replaying) wlog("pbft will be working after the block ${b}", ("b", head->block_num));
}

if ( !pbft_enabled && utb && head->block_num >= *utb) {
if (!pbft_upgrading) pbft_upgrading = true;

// new version starts from the next block of ucb, this is to avoid inconsistency after pre calculation inside schedule loop.
if (ucb && head->block_num > *ucb) {
if (pbft_upgrading) pbft_upgrading = false;
pbft_enabled = true;
}
}
update_pbft_status();

pending->_block_status = s;
pending->_producer_block_id = producer_block_id;
Expand Down Expand Up @@ -1616,6 +1658,9 @@ struct controller_impl {

EOS_ASSERT(!pending, block_validate_exception, "it is not valid to push a block when there is a pending block");

set_pbft_lib();
set_pbft_lscb();

try {
EOS_ASSERT( b, block_validate_exception, "trying to push empty block" );
EOS_ASSERT( (s == controller::block_status::irreversible || s == controller::block_status::validated),
Expand All @@ -1636,9 +1681,7 @@ struct controller_impl {
for (const auto &extn: b->block_extensions) {
if (extn.first == static_cast<uint16_t>(block_extension_type::pbft_stable_checkpoint)) {
pbft_commit_local(b->id());
set_pbft_lib();
set_pbft_latest_checkpoint(b->id());
set_pbft_lscb();
break;
}
}
Expand Down Expand Up @@ -2330,7 +2373,9 @@ block_id_type controller::last_stable_checkpoint_block_id() const {
if( block_header::num_from_id(tapos_block_summary.block_id) == lscb_num )
return tapos_block_summary.block_id;

return fetch_block_by_number(lscb_num)->id();
auto b = fetch_block_by_number(lscb_num);
if (b) return b->id();
return block_id_type{};
}


Expand Down
36 changes: 25 additions & 11 deletions libraries/chain/fork_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,29 +67,43 @@ namespace eosio { namespace chain {
bool is_version_1 = version_label != "version";
if(is_version_1){
/*start upgrade migration and this is a hack and ineffecient, but lucky we only need to do it once */

wlog("doing LIB upgrade migration");
auto start = ds.pos();
unsigned_int size; fc::raw::unpack( ds, size );
auto skipped_size_pos = ds.pos();

vector<char> data(content.begin()+(skipped_size_pos - start), content.end());

data.insert(data.end(),{0,0,0,0});//append 4 bytes for the very last block state, avoid underflow in case
fc::datastream<const char*> tmp_ds(data.data(), data.size());

for( uint32_t i = 0, n = size.value; i < n; ++i ) {
vector<char> tmp = data;
tmp.insert(tmp.begin(), {0,0,0,0});
fc::datastream<const char*> tmp_ds(tmp.data(), tmp.size());
block_state s;
fc::raw::unpack( tmp_ds, s );
//prepend 4bytes for pbft_stable_checkpoint_blocknum and append 2 bytes for pbft_prepared and pbft_my_prepare
auto tmp_data_length = tmp_ds.tellp() - 6;
data.erase(data.begin(),data.begin()+tmp_data_length);
wlog("processing block state in fork database ${i} of ${size}", ("i",i+1)("size",n));
block_header_state h;
fc::raw::unpack( tmp_ds, h );
h.pbft_stable_checkpoint_blocknum = 0;

//move pos backward 4 bytes for pbft_stable_checkpoint_blocknum
auto tmp_accumulated_data_length = tmp_ds.tellp() - 4;
tmp_ds.seekp(tmp_accumulated_data_length);

signed_block_ptr b;
fc::raw::unpack( tmp_ds, b );
bool validated;
fc::raw::unpack( tmp_ds, validated );
bool in_current_chain;
fc::raw::unpack( tmp_ds, in_current_chain );
block_state s{h};
s.block = b;
s.validated = validated;
s.in_current_chain = in_current_chain;

s.pbft_prepared = false;
s.pbft_my_prepare = false;
set( std::make_shared<block_state>( move( s ) ) );
}
fc::datastream<const char*> head_id_stream(data.data(), data.size());
block_id_type head_id;
fc::raw::unpack( head_id_stream, head_id );
fc::raw::unpack( tmp_ds, head_id );

my->head = get_block( head_id );
/*end upgrade migration*/
Expand Down
6 changes: 3 additions & 3 deletions libraries/chain/include/eosio/chain/block_header_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace eosio { namespace chain {
* @brief defines the minimum state necessary to validate transaction headers
*/
struct block_header_state {
uint32_t pbft_stable_checkpoint_blocknum = 0;
block_id_type id;
uint32_t block_num = 0;
signed_block_header header;
Expand All @@ -28,6 +27,7 @@ struct block_header_state {
public_key_type block_signing_key;
vector<uint8_t> confirm_count;
vector<header_confirmation> confirmations;
uint32_t pbft_stable_checkpoint_blocknum = 0;

block_header_state next( const signed_block_header& h, bool trust = false, bool new_version = false)const;
block_header_state generate_next( block_timestamp_type when, bool new_version = false )const;
Expand Down Expand Up @@ -62,10 +62,10 @@ struct block_header_state {
} } /// namespace eosio::chain

FC_REFLECT( eosio::chain::block_header_state,
(pbft_stable_checkpoint_blocknum)
(id)(block_num)(header)(dpos_proposed_irreversible_blocknum)(dpos_irreversible_blocknum)(bft_irreversible_blocknum)

(pending_schedule_lib_num)(pending_schedule_hash)
(pending_schedule)(active_schedule)(blockroot_merkle)
(producer_to_last_produced)(producer_to_last_implied_irb)(block_signing_key)
(confirm_count)(confirmations) )
(confirm_count)(confirmations)
(pbft_stable_checkpoint_blocknum))
5 changes: 5 additions & 0 deletions libraries/chain/include/eosio/chain/chain_snapshot.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ struct batch_pbft_snapshot_migration{
bool migrated = true;
};

struct batch_pbft_enabled {
bool enabled = true;
};

} }

FC_REFLECT(eosio::chain::chain_snapshot_header,(version))
FC_REFLECT(eosio::chain::batch_pbft_snapshot_migration,(migrated))
FC_REFLECT(eosio::chain::batch_pbft_enabled,(enabled))
4 changes: 2 additions & 2 deletions libraries/chain/include/eosio/chain/snapshot.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,9 @@ namespace eosio { namespace chain {
std::ostringstream sstream;
sstream << in.rdbuf();
std::string str(sstream.str());
//prepend uint32_t 0
//append uint32_t 0
std::vector<char> tmp(str.begin(), str.end());
tmp.insert(tmp.begin(), {0,0,0,0});
tmp.insert(tmp.end(), {0,0,0,0});
fc::datastream<const char*> tmp_ds(tmp.data(), tmp.size());
fc::raw::unpack(tmp_ds, data);
auto original_data_length = tmp_ds.tellp() - 4;
Expand Down
19 changes: 12 additions & 7 deletions libraries/chain/pbft_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,7 @@ namespace eosio {
const auto& ucb = ctrl.get_upgrade_properties().upgrade_complete_block_num;
if (!ctrl.is_pbft_enabled()) return false;
return in >= ucb
&& (in % 100 == 1 || std::find(prepare_watermarks.begin(), prepare_watermarks.end(), in) != prepare_watermarks.end());
&& (in == ucb + 1 || in % 100 == 1 || std::find(prepare_watermarks.begin(), prepare_watermarks.end(), in) != prepare_watermarks.end());
};

for (auto i = psp->block_num;
Expand Down Expand Up @@ -1413,14 +1413,19 @@ namespace eosio {
}

producer_schedule_type pbft_database::lscb_active_producers() const {
auto lscb_num = ctrl.last_stable_checkpoint_block_num();
if (lscb_num == 0) return ctrl.initial_schedule();
auto num = ctrl.last_stable_checkpoint_block_num();

if (num == 0) {
const auto &ucb = ctrl.get_upgrade_properties().upgrade_complete_block_num;
if (ucb == 0) return ctrl.initial_schedule();
num = ucb;
}

auto lscb_state = ctrl.fetch_block_state_by_number(lscb_num);
if (!lscb_state) return ctrl.initial_schedule();
auto bs = ctrl.fetch_block_state_by_number(num);
if (!bs) return ctrl.initial_schedule();

if (lscb_state->pending_schedule.producers.empty()) return lscb_state->active_schedule;
return lscb_state->pending_schedule;
if (bs->pending_schedule.producers.empty()) return bs->active_schedule;
return bs->pending_schedule;
}

chain_id_type pbft_database::chain_id() {
Expand Down
16 changes: 12 additions & 4 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1607,10 +1607,18 @@ void producer_plugin_impl::produce_block() {
block_state_ptr new_bs = chain.head_block_state();
_producer_watermarks[new_bs->header.producer] = chain.head_block_num();

ilog("Produced block ${id}... #${n} @ ${t} signed by ${p} [trxs: ${count}, lib: ${lib}, lscb: ${lscb}]",
("p",new_bs->header.producer)("id",fc::variant(new_bs->id).as_string().substr(0,16))
("n",new_bs->block_num)("t",new_bs->header.timestamp)
("count",new_bs->block->transactions.size())("lib",chain.last_irreversible_block_num())("lscb", chain.last_stable_checkpoint_block_num()));
if (chain.is_pbft_enabled()) {
ilog("Produced block ${id}... #${n} @ ${t} signed by ${p} [trxs: ${count}, lib: ${lib}, lscb: ${lscb}]",
("p", new_bs->header.producer)("id", fc::variant(new_bs->id).as_string().substr(0, 16))
("n", new_bs->block_num)("t", new_bs->header.timestamp)
("count", new_bs->block->transactions.size())
("lib", chain.last_irreversible_block_num())("lscb", chain.last_stable_checkpoint_block_num()));
} else {
ilog("Produced block ${id}... #${n} @ ${t} signed by ${p} [trxs: ${count}, lib: ${lib}, confirmed: ${confs}]",
("p",new_bs->header.producer)("id",fc::variant(new_bs->id).as_string().substr(0,16))
("n",new_bs->block_num)("t",new_bs->header.timestamp)
("count",new_bs->block->transactions.size())("lib",chain.last_irreversible_block_num())("confs", new_bs->header.confirmed));
}
}

} // namespace eosio