Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #9760 from EOSIO/opt-kh
Browse files Browse the repository at this point in the history
Optimizations
  • Loading branch information
heifner authored Dec 7, 2020
2 parents e7eb812 + 1a158ea commit bf58c7b
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 26 deletions.
89 changes: 75 additions & 14 deletions libraries/chain/block_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
#include <eosio/chain/log_index.hpp>
#include <eosio/chain/log_catalog.hpp>
#include <eosio/chain/block_log_config.hpp>
#include <eosio/chain/thread_utils.hpp>
#include <fc/bitutil.hpp>
#include <fc/io/raw.hpp>
#include <future>
#include <regex>

namespace eosio { namespace chain {
Expand Down Expand Up @@ -521,10 +523,11 @@ namespace eosio { namespace chain {
fc::datastream<fc::cfile> index_file;
bool genesis_written_to_block_log = false;
block_log_preamble preamble;
size_t stride = std::numeric_limits<size_t>::max();
uint32_t future_version;
const size_t stride;
static uint32_t default_version;

block_log_impl(const block_log::config_type& config);
explicit block_log_impl(const block_log::config_type& config);

static void ensure_file_exists(fc::cfile& f) {
if (fc::exists(f.get_file_path()))
Expand All @@ -541,7 +544,13 @@ namespace eosio { namespace chain {

uint64_t append(const signed_block_ptr& b, packed_transaction::cf_compression_type segment_compression);

uint64_t write_log_entry(const signed_block& b, packed_transaction::cf_compression_type segment_compression);
// create futures for append, must call in order of blocks
std::future<std::tuple<signed_block_ptr, std::vector<char>>>
create_append_future(boost::asio::io_context& thread_pool,
const signed_block_ptr& b, packed_transaction::cf_compression_type segment_compression);
uint64_t append(std::future<std::tuple<signed_block_ptr, std::vector<char>>> f);

uint64_t write_log_entry(const std::vector<char>& block_buffer);

void split_log();
bool recover_from_incomplete_block_head(block_log_data& log_data, block_log_index& index);
Expand All @@ -562,15 +571,16 @@ namespace eosio { namespace chain {
void block_log::set_version(uint32_t ver) { detail::block_log_impl::default_version = ver; }
uint32_t block_log::version() const { return my->preamble.version; }

detail::block_log_impl::block_log_impl(const block_log::config_type& config) {
detail::block_log_impl::block_log_impl(const block_log::config_type& config)
: stride( config.stride )
{

if (!fc::is_directory(config.log_dir))
fc::create_directories(config.log_dir);

catalog.open(config.log_dir, config.retained_dir, config.archive_dir, "blocks");

catalog.max_retained_files = config.max_retained_files;
this->stride = config.stride;

block_file.set_file_path(config.log_dir / "blocks.log");
index_file.set_file_path(config.log_dir / "blocks.index");
Expand Down Expand Up @@ -601,6 +611,7 @@ namespace eosio { namespace chain {
ilog("Log is nonempty");
block_log_data log_data(block_file.get_file_path());
preamble = log_data.get_preamble();
future_version = preamble.version;

EOS_ASSERT(catalog.verifier.chain_id.empty() || catalog.verifier.chain_id == preamble.chain_id(), block_log_exception,
"block log file ${path} has a different chain id", ("path", block_file.get_file_path()));
Expand Down Expand Up @@ -649,20 +660,26 @@ namespace eosio { namespace chain {
read_head();
}

uint64_t detail::block_log_impl::write_log_entry(const signed_block& b, packed_transaction::cf_compression_type segment_compression) {
uint64_t pos = block_file.tellp();
std::vector<char> create_block_buffer( const signed_block& b, uint32_t version, packed_transaction::cf_compression_type segment_compression ) {
std::vector<char> buffer;
if (preamble.version >= pruned_transaction_version) {

if (version >= pruned_transaction_version) {
buffer = pack(b, segment_compression);
} else {
auto block_ptr = b.to_signed_block_v0();
EOS_ASSERT(block_ptr, block_log_append_fail, "Unable to convert block to legacy format");
EOS_ASSERT(segment_compression == packed_transaction::cf_compression_type::none, block_log_append_fail,
"the compression must be \"none\" for legacy format");
"the compression must be \"none\" for legacy format");
buffer = fc::raw::pack(*block_ptr);
}
block_file.write(buffer.data(), buffer.size());

return buffer;
}

uint64_t detail::block_log_impl::write_log_entry(const std::vector<char>& block_buffer) {
uint64_t pos = block_file.tellp();

block_file.write(block_buffer.data(), block_buffer.size());
block_file.write((char*)&pos, sizeof(pos));
index_file.write((char*)&pos, sizeof(pos));
flush();
Expand All @@ -673,19 +690,43 @@ namespace eosio { namespace chain {
return my->append(b, segment_compression);
}

uint64_t detail::block_log_impl::append(const signed_block_ptr& b, packed_transaction::cf_compression_type segment_compression) {
uint64_t detail::block_log_impl::append(const signed_block_ptr& b, packed_transaction::cf_compression_type segment_compression) {
try {
EOS_ASSERT( genesis_written_to_block_log, block_log_append_fail, "Cannot append to block log until the genesis is first written" );

block_file.seek_end(0);
index_file.seek_end(0);
EOS_ASSERT(index_file.tellp() == sizeof(uint64_t) * (b->block_num() - preamble.first_block_num),
block_log_append_fail,
"Append to index file occuring at wrong position.",
("position", (uint64_t) index_file.tellp())
("expected", (b->block_num() - preamble.first_block_num) * sizeof(uint64_t)));

std::vector<char> buffer = create_block_buffer( *b, preamble.version, segment_compression );
auto pos = write_log_entry(buffer);
head = b;
if (b->block_num() % stride == 0) {
split_log();
}
return pos;
}
FC_LOG_AND_RETHROW()
}

uint64_t detail::block_log_impl::append(std::future<std::tuple<signed_block_ptr, std::vector<char>>> f) {
try {
EOS_ASSERT( genesis_written_to_block_log, block_log_append_fail, "Cannot append to block log until the genesis is first written" );

block_file.seek_end(0);
index_file.seek_end(0);
auto[b, buffer] = f.get();
EOS_ASSERT(index_file.tellp() == sizeof(uint64_t) * (b->block_num() - preamble.first_block_num),
block_log_append_fail,
"Append to index file occuring at wrong position.",
("position", (uint64_t) index_file.tellp())
("expected", (b->block_num() - preamble.first_block_num) * sizeof(uint64_t)));

auto pos = write_log_entry(*b, segment_compression);
auto pos = write_log_entry(buffer);
head = b;
if (b->block_num() % stride == 0) {
split_log();
Expand All @@ -695,7 +736,26 @@ namespace eosio { namespace chain {
FC_LOG_AND_RETHROW()
}

void detail::block_log_impl::split_log() {
std::future<std::tuple<signed_block_ptr, std::vector<char>>>
block_log::create_append_future(boost::asio::io_context& thread_pool, const signed_block_ptr& b, packed_transaction::cf_compression_type segment_compression) {
return my->create_append_future(thread_pool, b, segment_compression);
}

std::future<std::tuple<signed_block_ptr, std::vector<char>>>
detail::block_log_impl::create_append_future(boost::asio::io_context& thread_pool, const signed_block_ptr& b, packed_transaction::cf_compression_type segment_compression) {
future_version = (b->block_num() % stride == 0) ? block_log::max_supported_version : future_version;
std::promise<std::tuple<signed_block_ptr, std::vector<char>>> p;
std::future<std::tuple<signed_block_ptr, std::vector<char>>> f = p.get_future();
return async_thread_pool( thread_pool, [b, version=future_version, segment_compression]() {
return std::make_tuple(b, create_block_buffer(*b, version, segment_compression));
} );
}

uint64_t block_log::append(std::future<std::tuple<signed_block_ptr, std::vector<char>>> f) {
return my->append( std::move( f ) );
}

void detail::block_log_impl::split_log() {
block_file.close();
index_file.close();

Expand All @@ -720,6 +780,7 @@ namespace eosio { namespace chain {
block_file.open(fc::cfile::truncate_rw_mode);
index_file.open(fc::cfile::truncate_rw_mode);

future_version = block_log_impl::default_version;
preamble.version = block_log_impl::default_version;
preamble.first_block_num = first_bnum;
preamble.chain_context = std::move(chain_context);
Expand Down
40 changes: 32 additions & 8 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,18 @@ struct controller_impl {
if( fork_head->dpos_irreversible_blocknum <= lib_num )
return;

const auto branch = fork_db.fetch_branch( fork_head->id, fork_head->dpos_irreversible_blocknum );
auto branch = fork_db.fetch_branch( fork_head->id, fork_head->dpos_irreversible_blocknum );
try {
const auto& rbi = reversible_blocks.get_index<reversible_block_index,by_num>();

std::vector<std::future<std::tuple<signed_block_ptr, std::vector<char>>>> v;
v.reserve( branch.size() );
for( auto bitr = branch.rbegin(); bitr != branch.rend(); ++bitr ) {
v.emplace_back( blog.create_append_future( thread_pool.get_executor(), (*bitr)->block,
packed_transaction::cf_compression_type::none ) );
}
auto it = v.begin();

for( auto bitr = branch.rbegin(); bitr != branch.rend(); ++bitr ) {
if( read_mode == db_read_mode::IRREVERSIBLE ) {
apply_block( *bitr, controller::block_status::complete, trx_meta_cache_lookup{} );
Expand All @@ -354,7 +362,8 @@ struct controller_impl {

// blog.append could fail due to failures like running out of space.
// Do it before commit so that in case it throws, DB can be rolled back.
blog.append( (*bitr)->block, packed_transaction::cf_compression_type::none );
blog.append( std::move( *it ) );
++it;

kv_db.commit( (*bitr)->block_num );
root_id = (*bitr)->id;
Expand All @@ -373,8 +382,12 @@ struct controller_impl {
}

if( root_id != fork_db.root()->id ) {
branch.emplace_back(fork_db.root());
fork_db.advance_root( root_id );
}

// delete branch in thread pool
boost::asio::post( thread_pool.get_executor(), [branch{std::move(branch)}]() {} );
}

/**
Expand Down Expand Up @@ -1536,6 +1549,21 @@ struct controller_impl {

auto& pbhs = pending->get_pending_block_header_state();

auto& bb = std::get<building_block>(pending->_block_stage);

auto action_merkle_fut = async_thread_pool( thread_pool.get_executor(),
[ids{std::move( bb._action_receipt_digests )}]() mutable {
return merkle( std::move( ids ) );
} );
const bool calc_trx_merkle = !std::holds_alternative<checksum256_type>(bb._trx_mroot_or_receipt_digests);
std::future<checksum256_type> trx_merkle_fut;
if( calc_trx_merkle ) {
trx_merkle_fut = async_thread_pool( thread_pool.get_executor(),
[ids{std::move( std::get<digests_t>(bb._trx_mroot_or_receipt_digests) )}]() mutable {
return merkle( std::move( ids ) );
} );
}

// Update resource limits:
resource_limits.process_account_limit_updates();
const auto& chain_config = self.get_global_properties().configuration;
Expand All @@ -1546,14 +1574,10 @@ struct controller_impl {
);
resource_limits.process_block_usage(pbhs.block_num);

auto& bb = std::get<building_block>(pending->_block_stage);

// Create (unsigned) block:
auto block_ptr = std::make_shared<signed_block>( pbhs.make_block_header(
std::holds_alternative<checksum256_type>(bb._trx_mroot_or_receipt_digests) ?
std::get<checksum256_type>(bb._trx_mroot_or_receipt_digests) :
merkle( std::move( std::get<digests_t>(bb._trx_mroot_or_receipt_digests) ) ),
merkle( std::move( std::get<building_block>(pending->_block_stage)._action_receipt_digests ) ),
calc_trx_merkle ? trx_merkle_fut.get() : std::get<checksum256_type>(bb._trx_mroot_or_receipt_digests),
action_merkle_fut.get(),
bb._new_pending_producer_schedule,
std::move( bb._new_protocol_feature_activations ),
protocol_features.get_protocol_feature_set()
Expand Down
7 changes: 7 additions & 0 deletions libraries/chain/include/eosio/chain/block_log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <eosio/chain/block.hpp>
#include <eosio/chain/genesis_state.hpp>
#include <eosio/chain/block_log_config.hpp>
#include <future>

namespace eosio { namespace chain {

Expand Down Expand Up @@ -46,6 +47,12 @@ namespace eosio { namespace chain {

uint64_t append(const signed_block_ptr& block, packed_transaction::cf_compression_type segment_compression);

// create futures for append, must call in order of blocks
std::future<std::tuple<signed_block_ptr, std::vector<char>>>
create_append_future(boost::asio::io_context& thread_pool,
const signed_block_ptr& b, packed_transaction::cf_compression_type segment_compression);
uint64_t append(std::future<std::tuple<signed_block_ptr, std::vector<char>>> f);

void reset( const genesis_state& gs, const signed_block_ptr& genesis_block, packed_transaction::cf_compression_type segment_compression);
void reset( const chain_id_type& chain_id, uint32_t first_block_num );

Expand Down
8 changes: 4 additions & 4 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -563,13 +563,13 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
if( exception_is_exhausted( *trace->except, deadline_is_subjective )) {
_unapplied_transactions.add_incoming( trx, persist_until_expired, next );
if( _pending_block_mode == pending_block_mode::producing ) {
fc_dlog(_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} COULD NOT FIT, tx: ${txid} RETRYING ",
fc_dlog(_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} COULD NOT FIT, tx: ${txid} RETRYING, ec: ${c} ",
("block_num", chain.head_block_num() + 1)
("prod", get_pending_block_producer())
("txid", trx->id()));
("txid", trx->id())("c", trace->except->code()));
} else {
fc_dlog(_log, "[TRX_TRACE] Speculative execution COULD NOT FIT tx: ${txid} RETRYING",
("txid", trx->id()));
fc_dlog(_log, "[TRX_TRACE] Speculative execution COULD NOT FIT tx: ${txid} RETRYING, ec: ${c}",
("txid", trx->id())("c", trace->except->code()));
}
if( !exhausted )
exhausted = block_is_exhausted();
Expand Down

0 comments on commit bf58c7b

Please sign in to comment.