Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

apply_block optimization #7825

Merged
merged 19 commits into from
Sep 4, 2019
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
e3a240e
Move unapplied_transaction_queue to controller
heifner Aug 26, 2019
a7f6f49
Check unapplied_transaction_queue in apply_block
heifner Aug 26, 2019
421bee4
Merge branch 'develop' into unapplied-apply-sig
heifner Aug 26, 2019
f73ade9
Merge branch 'develop' into unapplied-apply-sig
heifner Aug 27, 2019
af13a22
Disable sig recovery cache
heifner Aug 27, 2019
2968724
Merge branch 'develop' into unapplied-apply-sig
heifner Aug 27, 2019
7ea2f48
Completely remove uneeded signature recovery cache. The signature rec…
heifner Aug 27, 2019
f0a8e52
Remove test for expected sig recovery time to be the same since a cac…
heifner Aug 27, 2019
93c2c1a
Encapsulate unapplied_transaction_queue addition of aborted transacti…
heifner Aug 28, 2019
8c2f9f5
Verify recovered keys exist if not skipping auth check before reusing…
heifner Aug 28, 2019
8e173fe
Avoid some shared_ptr copies
heifner Aug 28, 2019
44d7257
Remove unused var
heifner Aug 28, 2019
7084f5d
Merge branch 'develop' into unapplied-apply-sig
heifner Aug 28, 2019
d4b7f32
Revert "Encapsulate unapplied_transaction_queue addition of aborted t…
heifner Aug 30, 2019
ba24da0
Revert "Move unapplied_transaction_queue to controller"
heifner Aug 30, 2019
6a65287
Pass lambda to push_block for access to unapplied_transaction_queue c…
heifner Aug 30, 2019
0b00226
Check for optional trx_lookup function
heifner Aug 30, 2019
f8c8e8b
Merge branch 'develop' into unapplied-apply-sig
heifner Sep 3, 2019
736c664
Add const
heifner Sep 4, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 40 additions & 31 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ struct controller_impl {

for( auto bitr = branch.rbegin(); bitr != branch.rend(); ++bitr ) {
if( read_mode == db_read_mode::IRREVERSIBLE ) {
apply_block( *bitr, controller::block_status::complete );
apply_block( *bitr, controller::block_status::complete, trx_meta_cache_lookup{} );
head = (*bitr);
fork_db.mark_valid( head );
}
Expand Down Expand Up @@ -699,7 +699,7 @@ struct controller_impl {
pending_head = fork_db.pending_head()
) {
wlog( "applying branch from fork database ending with block: ${id}", ("id", pending_head->id) );
maybe_switch_forks( pending_head, controller::block_status::complete, forked_branch_callback() );
maybe_switch_forks( pending_head, controller::block_status::complete, forked_branch_callback{}, trx_meta_cache_lookup{} );
}
}

Expand Down Expand Up @@ -1752,7 +1752,7 @@ struct controller_impl {
}
}

void apply_block( const block_state_ptr& bsp, controller::block_status s )
void apply_block( const block_state_ptr& bsp, controller::block_status s, const trx_meta_cache_lookup& trx_lookup )
{ try {
try {
const signed_block_ptr& b = bsp->block;
Expand All @@ -1764,27 +1764,28 @@ struct controller_impl {
const bool existing_trxs_metas = !bsp->trxs_metas().empty();
const bool pub_keys_recovered = bsp->is_pub_keys_recovered();
const bool skip_auth_checks = self.skip_auth_check();
std::vector<recover_keys_future> trx_futures;
std::vector<transaction_metadata_ptr> trx_metas;
bool use_bsp_cached = false, use_trx_metas = false;
std::vector<std::tuple<transaction_metadata_ptr, recover_keys_future>> trx_metas;
bool use_bsp_cached = false;
if( pub_keys_recovered || (skip_auth_checks && existing_trxs_metas) ) {
use_bsp_cached = true;
} else if( skip_auth_checks ) {
use_trx_metas = true;
} else {
trx_metas.reserve( b->transactions.size() );
for( const auto& receipt : b->transactions ) {
if( receipt.trx.contains<packed_transaction>()) {
auto& pt = receipt.trx.get<packed_transaction>();
arhag marked this conversation as resolved.
Show resolved Hide resolved
trx_metas.emplace_back( transaction_metadata::create_no_recover_keys( pt, transaction_metadata::trx_type::input ) );
}
}
} else {
trx_futures.reserve( b->transactions.size() );
for( const auto& receipt : b->transactions ) {
if( receipt.trx.contains<packed_transaction>()) {
auto ptrx = std::make_shared<packed_transaction>( receipt.trx.get<packed_transaction>() );
auto fut = transaction_metadata::start_recover_keys( ptrx, thread_pool.get_executor(), chain_id, microseconds::maximum() );
trx_futures.emplace_back( std::move( fut ) );
transaction_metadata_ptr trx_meta_ptr = trx_lookup ? trx_lookup( pt.id() ) : transaction_metadata_ptr{};
if( trx_meta_ptr && ( skip_auth_checks || !trx_meta_ptr->recovered_keys().empty() ) ) {
trx_metas.emplace_back( std::move( trx_meta_ptr ), recover_keys_future{} );
} else if( skip_auth_checks ) {
trx_metas.emplace_back(
transaction_metadata::create_no_recover_keys( pt, transaction_metadata::trx_type::input ),
recover_keys_future{} );
} else {
auto ptrx = std::make_shared<packed_transaction>( pt );
auto fut = transaction_metadata::start_recover_keys(
std::move( ptrx ), thread_pool.get_executor(), chain_id, microseconds::maximum() );
trx_metas.emplace_back( transaction_metadata_ptr{}, std::move( fut ) );
}
}
}
}
Expand All @@ -1796,10 +1797,12 @@ struct controller_impl {
const auto& trx_receipts = pending->_block_stage.get<building_block>()._pending_trx_receipts;
auto num_pending_receipts = trx_receipts.size();
if( receipt.trx.contains<packed_transaction>() ) {
const auto& trx_meta = ( use_bsp_cached ? bsp->trxs_metas().at( packed_idx++ )
: ( use_trx_metas ? trx_metas.at( packed_idx++ )
: trx_futures.at( packed_idx++ ).get() ) );
const auto& trx_meta = ( use_bsp_cached ? bsp->trxs_metas().at( packed_idx )
: ( !!std::get<0>( trx_metas.at( packed_idx ) ) ?
std::get<0>( trx_metas.at( packed_idx ) )
: std::get<1>( trx_metas.at( packed_idx ) ).get() ) );
trace = push_transaction( trx_meta, fc::time_point::maximum(), receipt.cpu_usage_us, true );
++packed_idx;
} else if( receipt.trx.contains<transaction_id_type>() ) {
trace = push_scheduled_transaction( receipt.trx.get<transaction_id_type>(), fc::time_point::maximum(), receipt.cpu_usage_us, true );
} else {
Expand Down Expand Up @@ -1878,7 +1881,9 @@ struct controller_impl {
} );
}

void push_block( std::future<block_state_ptr>& block_state_future, const forked_branch_callback& forked_branch_cb ) {
void push_block( std::future<block_state_ptr>& block_state_future,
const forked_branch_callback& forked_branch_cb, const trx_meta_cache_lookup& trx_lookup )
{
controller::block_status s = controller::block_status::complete;
EOS_ASSERT(!pending, block_validate_exception, "it is not valid to push a block when there is a pending block");

Expand All @@ -1900,7 +1905,7 @@ struct controller_impl {
emit( self.accepted_block_header, bsp );

if( read_mode != db_read_mode::IRREVERSIBLE ) {
maybe_switch_forks( fork_db.pending_head(), s, forked_branch_cb );
maybe_switch_forks( fork_db.pending_head(), s, forked_branch_cb, trx_lookup );
} else {
log_irreversible();
}
Expand Down Expand Up @@ -1939,7 +1944,7 @@ struct controller_impl {
emit( self.accepted_block_header, bsp );

if( s == controller::block_status::irreversible ) {
apply_block( bsp, s );
apply_block( bsp, s, trx_meta_cache_lookup{} );
head = bsp;

// On replay, log_irreversible is not called and so no irreversible_block signal is emittted.
Expand All @@ -1953,17 +1958,19 @@ struct controller_impl {
} else {
EOS_ASSERT( read_mode != db_read_mode::IRREVERSIBLE, block_validate_exception,
"invariant failure: cannot replay reversible blocks while in irreversible mode" );
maybe_switch_forks( bsp, s, forked_branch_callback() );
maybe_switch_forks( bsp, s, forked_branch_callback{}, trx_meta_cache_lookup{} );
}

} FC_LOG_AND_RETHROW( )
}

void maybe_switch_forks( const block_state_ptr& new_head, controller::block_status s, const forked_branch_callback& forked_branch_cb ) {
void maybe_switch_forks( const block_state_ptr& new_head, controller::block_status s,
const forked_branch_callback& forked_branch_cb, const trx_meta_cache_lookup& trx_lookup )
{
bool head_changed = true;
if( new_head->header.previous == head->id ) {
try {
apply_block( new_head, s );
apply_block( new_head, s, trx_lookup );
fork_db.mark_valid( new_head );
head = new_head;
} catch ( const fc::exception& e ) {
Expand All @@ -1990,7 +1997,7 @@ struct controller_impl {
optional<fc::exception> except;
try {
apply_block( *ritr, (*ritr)->is_valid() ? controller::block_status::validated
: controller::block_status::complete );
: controller::block_status::complete, trx_lookup );
fork_db.mark_valid( *ritr );
head = *ritr;
} catch (const fc::exception& e) {
Expand All @@ -2014,7 +2021,7 @@ struct controller_impl {

// re-apply good blocks
for( auto ritr = branches.second.rbegin(); ritr != branches.second.rend(); ++ritr ) {
apply_block( *ritr, controller::block_status::validated /* we previously validated these blocks*/ );
apply_block( *ritr, controller::block_status::validated /* we previously validated these blocks*/, trx_lookup );
head = *ritr;
}
throw *except;
Expand Down Expand Up @@ -2558,10 +2565,12 @@ std::future<block_state_ptr> controller::create_block_state_future( const signed
return my->create_block_state_future( b );
}

void controller::push_block( std::future<block_state_ptr>& block_state_future, const forked_branch_callback& forked_branch_cb ) {
void controller::push_block( std::future<block_state_ptr>& block_state_future,
const forked_branch_callback& forked_branch_cb, const trx_meta_cache_lookup& trx_lookup )
{
validate_db_available_size();
validate_reversible_available_size();
my->push_block( block_state_future, forked_branch_cb );
my->push_block( block_state_future, forked_branch_cb, trx_lookup );
}

transaction_trace_ptr controller::push_transaction( const transaction_metadata_ptr& trx, fc::time_point deadline, uint32_t billed_cpu_time_us ) {
Expand Down
7 changes: 6 additions & 1 deletion libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ namespace eosio { namespace chain {
using resource_limits::resource_limits_manager;
using apply_handler = std::function<void(apply_context&)>;
using forked_branch_callback = std::function<void(const branch_type&)>;
// lookup transaction_metadata via supplied function to avoid re-creation
using trx_meta_cache_lookup = std::function<transaction_metadata_ptr( const transaction_id_type&)>;

class fork_database;

Expand Down Expand Up @@ -153,8 +155,11 @@ namespace eosio { namespace chain {
/**
* @param block_state_future provide from call to create_block_state_future
* @param cb calls cb with forked applied transactions for each forked block
* @param trx_lookup user provided lookup function for externally cached transaction_metadata
*/
void push_block( std::future<block_state_ptr>& block_state_future, const forked_branch_callback& cb );
void push_block( std::future<block_state_ptr>& block_state_future,
const forked_branch_callback& cb,
const trx_meta_cache_lookup& trx_lookup );

boost::asio::io_context& get_thread_pool();

Expand Down
6 changes: 3 additions & 3 deletions libraries/chain/include/eosio/chain/transaction_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ class transaction_metadata {

public:
// creation of tranaction_metadata restricted to start_recover_keys and create_no_recover_keys below, public for make_shared
explicit transaction_metadata( const private_type& pt, const packed_transaction_ptr& ptrx,
explicit transaction_metadata( const private_type& pt, packed_transaction_ptr ptrx,
fc::microseconds sig_cpu_usage, flat_set<public_key_type> recovered_pub_keys,
bool _implicit = false, bool _scheduled = false)
: _packed_trx( ptrx )
: _packed_trx( std::move( ptrx ) )
, _sig_cpu_usage( sig_cpu_usage )
, _recovered_pub_keys( std::move( recovered_pub_keys ) )
, implicit( _implicit )
Expand All @@ -72,7 +72,7 @@ class transaction_metadata {
/// Thread safe.
/// @returns transaction_metadata_ptr or exception via future
static recover_keys_future
start_recover_keys( const packed_transaction_ptr& trx, boost::asio::io_context& thread_pool,
start_recover_keys( packed_transaction_ptr trx, boost::asio::io_context& thread_pool,
const chain_id_type& chain_id, fc::microseconds time_limit,
uint32_t max_variable_sig_size = UINT32_MAX );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ class unapplied_transaction_queue {
return itr->trx_type == trx_enum_type::persisted;
}

transaction_metadata_ptr get_trx( const transaction_id_type& id ) const {
auto itr = queue.get<by_trx_id>().find( id );
if( itr == queue.get<by_trx_id>().end() ) return {};
return itr->trx_meta;
}

template <typename Func>
bool clear_expired( const time_point& pending_block_time, const time_point& deadline, Func&& callback ) {
auto& persisted_by_expiry = queue.get<by_expiry>();
Expand Down
76 changes: 6 additions & 70 deletions libraries/chain/transaction.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
#include <fc/io/raw.hpp>
#include <fc/bitutil.hpp>
#include <fc/smart_ref_impl.hpp>
#include <algorithm>
#include <mutex>

#include <boost/range/adaptor/transformed.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/device/back_inserter.hpp>
#include <boost/iostreams/filter/zlib.hpp>
Expand All @@ -19,33 +13,6 @@

namespace eosio { namespace chain {

using namespace boost::multi_index;

struct cached_pub_key {
transaction_id_type trx_id;
public_key_type pub_key;
signature_type sig;
fc::microseconds cpu_usage;
cached_pub_key(const cached_pub_key&) = delete;
cached_pub_key() = delete;
cached_pub_key& operator=(const cached_pub_key&) = delete;
cached_pub_key(cached_pub_key&&) = default;
};
struct by_sig{};

typedef multi_index_container<
cached_pub_key,
indexed_by<
sequenced<>,
hashed_unique<
tag<by_sig>,
member<cached_pub_key,
signature_type,
&cached_pub_key::sig>
>
>
> recovery_cache_type;

void deferred_transaction_generation_context::reflector_init() {
static_assert( fc::raw::has_feature_reflector_init_on_unpacked_reflected_types,
"deferred_transaction_generation_context expects FC to support reflector_init" );
Expand Down Expand Up @@ -93,52 +60,21 @@ fc::microseconds transaction::get_signature_keys( const vector<signature_type>&
const chain_id_type& chain_id, fc::time_point deadline, const vector<bytes>& cfd,
flat_set<public_key_type>& recovered_pub_keys, bool allow_duplicate_keys)const
{ try {
using boost::adaptors::transformed;

constexpr size_t recovery_cache_size = 10000;
static recovery_cache_type recovery_cache;
static std::mutex cache_mtx;

auto start = fc::time_point::now();
recovered_pub_keys.clear();
const digest_type digest = sig_digest(chain_id, cfd);

std::unique_lock<std::mutex> lock(cache_mtx, std::defer_lock);
fc::microseconds sig_cpu_usage;
const auto digest_time = fc::time_point::now() - start;
for(const signature_type& sig : signatures) {
auto sig_start = fc::time_point::now();
EOS_ASSERT( sig_start < deadline, tx_cpu_usage_exceeded, "transaction signature verification executed for too long ${time}us",
("time", sig_start - start)("now", sig_start)("deadline", deadline)("start", start) );
public_key_type recov;
const auto& tid = id();
lock.lock();
recovery_cache_type::index<by_sig>::type::iterator it = recovery_cache.get<by_sig>().find( sig );
if( it == recovery_cache.get<by_sig>().end() || it->trx_id != tid ) {
lock.unlock();
recov = public_key_type( sig, digest );
fc::microseconds cpu_usage = fc::time_point::now() - sig_start;
lock.lock();
recovery_cache.emplace_back( cached_pub_key{tid, recov, sig, cpu_usage} ); //could fail on dup signatures; not a problem
sig_cpu_usage += cpu_usage;
} else {
recov = it->pub_key;
sig_cpu_usage += it->cpu_usage;
}
lock.unlock();
bool successful_insertion = false;
std::tie(std::ignore, successful_insertion) = recovered_pub_keys.insert(recov);
auto now = fc::time_point::now();
EOS_ASSERT( now < deadline, tx_cpu_usage_exceeded, "transaction signature verification executed for too long ${time}us",
("time", now - start)("now", now)("deadline", deadline)("start", start) );
auto[ itr, successful_insertion ] = recovered_pub_keys.emplace( sig, digest );
EOS_ASSERT( allow_duplicate_keys || successful_insertion, tx_duplicate_sig,
"transaction includes more than one signature signed using the same key associated with public key: ${key}",
("key", recov) );
("key", *itr ) );
}

lock.lock();
while ( recovery_cache.size() > recovery_cache_size )
recovery_cache.erase( recovery_cache.begin());
lock.unlock();

return sig_cpu_usage + digest_time;
return fc::time_point::now() - start;
} FC_CAPTURE_AND_RETHROW() }

flat_multimap<uint16_t, transaction_extension> transaction::validate_and_extract_extensions()const {
Expand Down
6 changes: 3 additions & 3 deletions libraries/chain/transaction_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@

namespace eosio { namespace chain {

recover_keys_future transaction_metadata::start_recover_keys( const packed_transaction_ptr& trx,
recover_keys_future transaction_metadata::start_recover_keys( packed_transaction_ptr trx,
boost::asio::io_context& thread_pool,
const chain_id_type& chain_id,
fc::microseconds time_limit,
uint32_t max_variable_sig_size )
{
return async_thread_pool( thread_pool, [trx, chain_id, time_limit, max_variable_sig_size]() {
return async_thread_pool( thread_pool, [trx{std::move(trx)}, chain_id, time_limit, max_variable_sig_size]() mutable {
fc::time_point deadline = time_limit == fc::microseconds::maximum() ?
fc::time_point::maximum() : fc::time_point::now() + time_limit;
check_variable_sig_size( trx, max_variable_sig_size );
const signed_transaction& trn = trx->get_signed_transaction();
flat_set<public_key_type> recovered_pub_keys;
fc::microseconds cpu_usage = trn.get_signature_keys( chain_id, deadline, recovered_pub_keys );
return std::make_shared<transaction_metadata>( private_type(), trx, cpu_usage, std::move( recovered_pub_keys ) );
return std::make_shared<transaction_metadata>( private_type(), std::move( trx ), cpu_usage, std::move( recovered_pub_keys ) );
}
);
}
Expand Down
6 changes: 3 additions & 3 deletions libraries/testing/include/eosio/testing/tester.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ namespace eosio { namespace testing {
signed_block_ptr produce_block( fc::microseconds skip_time = fc::milliseconds(config::block_interval_ms) )override {
auto sb = _produce_block(skip_time, false);
auto bsf = validating_node->create_block_state_future( sb );
validating_node->push_block( bsf, forked_branch_callback() );
validating_node->push_block( bsf, forked_branch_callback{}, trx_meta_cache_lookup{} );

return sb;
}
Expand All @@ -445,14 +445,14 @@ namespace eosio { namespace testing {

void validate_push_block(const signed_block_ptr& sb) {
auto bs = validating_node->create_block_state_future( sb );
validating_node->push_block( bs, forked_branch_callback() );
validating_node->push_block( bs, forked_branch_callback{}, trx_meta_cache_lookup{} );
}

signed_block_ptr produce_empty_block( fc::microseconds skip_time = fc::milliseconds(config::block_interval_ms) )override {
unapplied_transactions.add_aborted( control->abort_block() );
auto sb = _produce_block(skip_time, true);
auto bsf = validating_node->create_block_state_future( sb );
validating_node->push_block( bsf, forked_branch_callback() );
validating_node->push_block( bsf, forked_branch_callback{}, trx_meta_cache_lookup{} );

return sb;
}
Expand Down
Loading