Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Unapplied transaction queue performance #7686

Merged
merged 19 commits into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
8f517ba
Add tests for duplicates
heifner Jul 25, 2019
964cebe
Add clear_applied
heifner Jul 25, 2019
4ad52cc
Clear applied on_block to avoid trying to applied already applied tra…
heifner Jul 25, 2019
8667daa
No need to track unapplied if not SPECULATIVE
heifner Jul 25, 2019
04821a5
Add aborted trxs to unapplied_transactions before push_block so trxs …
heifner Jul 25, 2019
c66bd0a
Missed removing capture on last commit
heifner Jul 25, 2019
34cea56
Make unapplied_transaction_queue responsible for tracking db_read_mode
heifner Jul 26, 2019
3752321
Change abort_block to always return applied transactions since unappl…
heifner Jul 26, 2019
e9b6775
Modify push_block to take a forked transactions callback so that prev…
heifner Jul 26, 2019
f8706ea
Modified add_forked to take a vector of transaction_metadata_ptr inst…
heifner Jul 26, 2019
66c77ff
push_block now takes a forked_trxs_callback
heifner Jul 26, 2019
dc0f76c
push_block now takes a forked_trxs_callback
heifner Jul 26, 2019
542fe66
Revert back to unapplied_transaction_queue taking a branch_type and b…
heifner Jul 26, 2019
36a6fd3
Use an enum for processing mode of unapplied_transaction_queue. Set m…
heifner Jul 26, 2019
10e725e
Remove unneeded include
heifner Jul 26, 2019
a58363c
Move unapplied_transaction_queue::set_mode to plugin_initialize
heifner Jul 27, 2019
9723af5
add to front of incoming_transaction_queue when block time/net exhaus…
heifner Jul 27, 2019
575dff6
emplace_front since that is the whole point
heifner Jul 29, 2019
536b258
Merge branch 'develop' into unapplied-performance
heifner Jul 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ struct controller_impl {
pending_head = fork_db.pending_head()
) {
wlog( "applying branch from fork database ending with block: ${id}", ("id", pending_head->id) );
maybe_switch_forks( pending_head, controller::block_status::complete );
maybe_switch_forks( pending_head, controller::block_status::complete, forked_branch_callback() );
}
}

Expand Down Expand Up @@ -1820,7 +1820,7 @@ struct controller_impl {
} );
}

branch_type push_block( std::future<block_state_ptr>& block_state_future ) {
void push_block( std::future<block_state_ptr>& block_state_future, const forked_branch_callback& forked_branch_cb ) {
controller::block_status s = controller::block_status::complete;
EOS_ASSERT(!pending, block_validate_exception, "it is not valid to push a block when there is a pending block");

Expand All @@ -1842,10 +1842,9 @@ struct controller_impl {
emit( self.accepted_block_header, bsp );

if( read_mode != db_read_mode::IRREVERSIBLE ) {
return maybe_switch_forks( fork_db.pending_head(), s );
maybe_switch_forks( fork_db.pending_head(), s, forked_branch_cb );
} else {
log_irreversible();
return {};
}

} FC_LOG_AND_RETHROW( )
Expand Down Expand Up @@ -1895,15 +1894,14 @@ struct controller_impl {
} else {
EOS_ASSERT( read_mode != db_read_mode::IRREVERSIBLE, block_validate_exception,
"invariant failure: cannot replay reversible blocks while in irreversible mode" );
maybe_switch_forks( bsp, s );
maybe_switch_forks( bsp, s, forked_branch_callback() );
}

} FC_LOG_AND_RETHROW( )
}

branch_type maybe_switch_forks( const block_state_ptr& new_head, controller::block_status s ) {
void maybe_switch_forks( const block_state_ptr& new_head, controller::block_status s, const forked_branch_callback& forked_branch_cb ) {
bool head_changed = true;
branch_type unapplied_branch;
if( new_head->header.previous == head->id ) {
try {
apply_block( new_head, s );
Expand All @@ -1925,6 +1923,8 @@ struct controller_impl {
}
EOS_ASSERT( self.head_block_id() == branches.second.back()->header.previous, fork_database_exception,
"loss of sync between fork_db and chainbase during fork switch" ); // _should_ never fail

if( forked_branch_cb ) forked_branch_cb( branches.second );
}

for( auto ritr = branches.first.rbegin(); ritr != branches.first.rend(); ++ritr ) {
Expand All @@ -1944,7 +1944,7 @@ struct controller_impl {
// Remove the block that threw and all forks built off it.
fork_db.remove( (*ritr)->id );

// pop all blocks from the bad fork
// pop all blocks from the bad fork, discarding their transactions
// ritr base is a forward itr to the last block successfully applied
auto applied_itr = ritr.base();
for( auto itr = applied_itr; itr != branches.first.end(); ++itr ) {
Expand All @@ -1962,8 +1962,6 @@ struct controller_impl {
} // end if exception
} /// end for each block in branch

unapplied_branch = std::move( branches.second );

ilog("successfully switched fork to new head ${new_head_id}", ("new_head_id", new_head->id));
} else {
head_changed = false;
Expand All @@ -1972,15 +1970,12 @@ struct controller_impl {
if( head_changed )
log_irreversible();

return unapplied_branch;
} /// push_block

vector<transaction_metadata_ptr> abort_block() {
vector<transaction_metadata_ptr> applied_trxs;
if( pending ) {
if ( read_mode == db_read_mode::SPECULATIVE ) {
applied_trxs = pending->extract_trx_metas();
}
applied_trxs = pending->extract_trx_metas();
pending.reset();
protocol_features.popped_blocks_to( head->block_num );
}
Expand Down Expand Up @@ -2503,10 +2498,10 @@ std::future<block_state_ptr> controller::create_block_state_future( const signed
return my->create_block_state_future( b );
}

branch_type controller::push_block( std::future<block_state_ptr>& block_state_future ) {
void controller::push_block( std::future<block_state_ptr>& block_state_future, const forked_branch_callback& forked_branch_cb ) {
validate_db_available_size();
validate_reversible_available_size();
return my->push_block( block_state_future );
my->push_block( block_state_future, forked_branch_cb );
}

transaction_trace_ptr controller::push_transaction( const transaction_metadata_ptr& trx, fc::time_point deadline, uint32_t billed_cpu_time_us ) {
Expand Down
5 changes: 3 additions & 2 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace eosio { namespace chain {
class account_object;
using resource_limits::resource_limits_manager;
using apply_handler = std::function<void(apply_context&)>;
using forked_branch_callback = std::function<void(const branch_type&)>;

class fork_database;

Expand Down Expand Up @@ -151,9 +152,9 @@ namespace eosio { namespace chain {

/**
* @param block_state_future provide from call to create_block_state_future
* @return branch of unapplied blocks from fork switch
* @param cb calls cb with forked applied transactions for each forked block
*/
branch_type push_block( std::future<block_state_ptr>& block_state_future );
void push_block( std::future<block_state_ptr>& block_state_future, const forked_branch_callback& cb );

boost::asio::io_context& get_thread_pool();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,17 @@ struct unapplied_transaction {

/**
* Track unapplied transactions for persisted, forked blocks, and aborted blocks.
* Persisted to first so that they can be applied in each block until expired.
* Persisted are first so that they can be applied in each block until expired.
*/
class unapplied_transaction_queue {
public:
enum class process_mode {
non_speculative, // HEAD, READ_ONLY, IRREVERSIBLE
speculative_non_producer, // will never produce
speculative_producer // can produce
};

private:
struct by_trx_id;
struct by_type;
struct by_expiry;
Expand All @@ -65,15 +72,15 @@ class unapplied_transaction_queue {
> unapplied_trx_queue_type;

unapplied_trx_queue_type queue;
bool only_track_persisted = false;
process_mode mode = process_mode::speculative_producer;

public:

void set_only_track_persisted( bool v ) {
if( v ) {
FC_ASSERT( empty(), "set_only_track_persisted queue required to be empty" );
void set_mode( process_mode new_mode ) {
if( new_mode != mode ) {
FC_ASSERT( empty(), "set_mode, queue required to be empty" );
}
only_track_persisted = v;
mode = new_mode;
}

bool empty() const {
Expand Down Expand Up @@ -111,8 +118,15 @@ class unapplied_transaction_queue {
return true;
}

void clear_applied( const std::vector<transaction_metadata_ptr>& applied_trxs ) {
auto& idx = queue.get<by_trx_id>();
for( const auto& trx : applied_trxs ) {
idx.erase( trx->id() );
}
}

void add_forked( const branch_type& forked_branch ) {
if( only_track_persisted ) return;
if( mode == process_mode::non_speculative || mode == process_mode::speculative_non_producer ) return;
// forked_branch is in reverse order
for( auto ritr = forked_branch.rbegin(), rend = forked_branch.rend(); ritr != rend; ++ritr ) {
const block_state_ptr& bsptr = *ritr;
Expand All @@ -125,14 +139,15 @@ class unapplied_transaction_queue {
}

void add_aborted( std::vector<transaction_metadata_ptr> aborted_trxs ) {
if( aborted_trxs.empty() || only_track_persisted ) return;
if( mode == process_mode::non_speculative || mode == process_mode::speculative_non_producer ) return;
for( auto& trx : aborted_trxs ) {
fc::time_point expiry = trx->packed_trx()->expiration();
queue.insert( { std::move( trx ), expiry, trx_enum_type::aborted } );
}
}

void add_persisted( const transaction_metadata_ptr& trx ) {
if( mode == process_mode::non_speculative ) return;
auto itr = queue.get<by_trx_id>().find( trx->id() );
if( itr == queue.get<by_trx_id>().end() ) {
fc::time_point expiry = trx->packed_trx()->expiration();
Expand Down
10 changes: 5 additions & 5 deletions libraries/testing/include/eosio/testing/tester.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,8 @@ namespace eosio { namespace testing {

signed_block_ptr produce_block( fc::microseconds skip_time = fc::milliseconds(config::block_interval_ms) )override {
auto sb = _produce_block(skip_time, false);
auto bs = validating_node->create_block_state_future( sb );
validating_node->push_block( bs );
auto bsf = validating_node->create_block_state_future( sb );
validating_node->push_block( bsf, forked_branch_callback() );

return sb;
}
Expand All @@ -437,14 +437,14 @@ namespace eosio { namespace testing {

void validate_push_block(const signed_block_ptr& sb) {
auto bs = validating_node->create_block_state_future( sb );
validating_node->push_block( bs );
validating_node->push_block( bs, forked_branch_callback() );
}

signed_block_ptr produce_empty_block( fc::microseconds skip_time = fc::milliseconds(config::block_interval_ms) )override {
unapplied_transactions.add_aborted( control->abort_block() );
auto sb = _produce_block(skip_time, true);
auto bs = validating_node->create_block_state_future( sb );
validating_node->push_block( bs );
auto bsf = validating_node->create_block_state_future( sb );
validating_node->push_block( bsf, forked_branch_callback() );

return sb;
}
Expand Down
13 changes: 7 additions & 6 deletions libraries/testing/tester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,11 @@ namespace eosio { namespace testing {
}

void base_tester::push_block(signed_block_ptr b) {
auto bs = control->create_block_state_future(b);
vector<transaction_metadata_ptr> aborted_trxs = control->abort_block();
unapplied_transactions.add_forked( control->push_block( bs ) );
unapplied_transactions.add_aborted( std::move( aborted_trxs ) );
auto bsf = control->create_block_state_future(b);
unapplied_transactions.add_aborted( control->abort_block() );
control->push_block( bsf, [this]( const branch_type& forked_branch ) {
unapplied_transactions.add_forked( forked_branch );
} );

auto itr = last_produced_block.find(b->producer);
if (itr == last_produced_block.end() || block_header::num_from_id(b->id()) > block_header::num_from_id(itr->second)) {
Expand Down Expand Up @@ -928,9 +929,9 @@ namespace eosio { namespace testing {

auto block = a.control->fetch_block_by_number(i);
if( block ) { //&& !b.control->is_known_block(block->id()) ) {
auto bs = b.control->create_block_state_future( block );
auto bsf = b.control->create_block_state_future( block );
b.control->abort_block();
b.control->push_block(bs); //, eosio::chain::validation_steps::created_block);
b.control->push_block(bsf, forked_branch_callback()); //, eosio::chain::validation_steps::created_block);
}
}
};
Expand Down
36 changes: 26 additions & 10 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin


void on_block( const block_state_ptr& bsp ) {
_unapplied_transactions.clear_applied( bsp->trxs );

if( bsp->header.timestamp <= _last_signed_block_time ) return;
if( bsp->header.timestamp <= _start_time ) return;
if( bsp->block_num <= _last_signed_block_num ) return;
Expand Down Expand Up @@ -352,18 +354,19 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
auto bsf = chain.create_block_state_future( block );

// abort the pending block
vector<transaction_metadata_ptr> aborted_trxs = chain.abort_block();
_unapplied_transactions.add_aborted( chain.abort_block() );

// exceptions throw out, make sure we restart our loop
auto ensure = fc::make_scoped_exit([this, &aborted_trxs](){
_unapplied_transactions.add_aborted( std::move( aborted_trxs ) );
auto ensure = fc::make_scoped_exit([this](){
schedule_production_loop();
});

// push the new block
bool except = false;
try {
_unapplied_transactions.add_forked( chain.push_block( bsf ) );
chain.push_block( bsf, [this]( const branch_type& forked_branch ) {
_unapplied_transactions.add_forked( forked_branch );
} );
} catch ( const guard_exception& e ) {
chain_plug->handle_guard_exception(e);
return;
Expand Down Expand Up @@ -403,13 +406,22 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
return trx->packed_trx()->get_unprunable_size() + trx->packed_trx()->get_prunable_size() + sizeof( *trx );
}

void add_size( const transaction_metadata_ptr& trx ) {
auto size = calc_size( trx );
EOS_ASSERT( size_in_bytes + size < max_incoming_transaction_queue_size, tx_resource_exhaustion, "Transaction exceeded producer resource limit" );
size_in_bytes += size;
}

public:
void set_max_incoming_transaction_queue_size( uint64_t v ) { max_incoming_transaction_queue_size = v; }

void add( const transaction_metadata_ptr& trx, bool persist_until_expired, next_function<transaction_trace_ptr> next ) {
auto size = calc_size( trx );
EOS_ASSERT( size_in_bytes + size < max_incoming_transaction_queue_size, tx_resource_exhaustion, "Transaction exceeded producer resource limit" );
size_in_bytes += size;
add_size( trx );
_incoming_transactions.emplace_back( trx, persist_until_expired, std::move( next ) );
}

void add_front( const transaction_metadata_ptr& trx, bool persist_until_expired, next_function<transaction_trace_ptr> next ) {
add_size( trx );
_incoming_transactions.emplace_back( trx, persist_until_expired, std::move( next ) );
arhag marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down Expand Up @@ -511,7 +523,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
auto trace = chain.push_transaction( trx, deadline );
if( trace->except ) {
if( failure_is_subjective( *trace->except, deadline_is_subjective )) {
_pending_incoming_transactions.add( trx, persist_until_expired, next );
_pending_incoming_transactions.add_front( trx, persist_until_expired, next );
if( _pending_block_mode == pending_block_mode::producing ) {
fc_dlog( _trx_trace_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} COULD NOT FIT, tx: ${txid} RETRYING ",
("block_num", chain.head_block_num() + 1)
Expand Down Expand Up @@ -720,7 +732,12 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_
my->_options = &options;
LOAD_VALUE_SET(options, "producer-name", my->_producers)

my->_unapplied_transactions.set_only_track_persisted( my->_producers.empty() );
chain::controller& chain = my->chain_plug->chain();
unapplied_transaction_queue::process_mode unapplied_mode =
(chain.get_read_mode() != chain::db_read_mode::SPECULATIVE) ? unapplied_transaction_queue::process_mode::non_speculative :
my->_producers.empty() ? unapplied_transaction_queue::process_mode::speculative_non_producer :
unapplied_transaction_queue::process_mode::speculative_producer;
my->_unapplied_transactions.set_mode( unapplied_mode );

if( options.count("private-key") )
{
Expand Down Expand Up @@ -851,7 +868,6 @@ void producer_plugin::plugin_startup()
EOS_ASSERT( my->_producers.empty() || chain.get_validation_mode() == chain::validation_mode::FULL, plugin_config_exception,
"node cannot have any producer-name configured because block production is not safe when validation_mode is not \"full\"" );


my->_accepted_block_connection.emplace(chain.accepted_block.connect( [this]( const auto& bsp ){ my->on_block( bsp ); } ));
my->_irreversible_block_connection.emplace(chain.irreversible_block.connect( [this]( const auto& bsp ){ my->on_irreversible_block( bsp->block ); } ));

Expand Down
2 changes: 1 addition & 1 deletion unittests/block_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ BOOST_AUTO_TEST_CASE(block_with_invalid_tx_test)
tester validator;
auto bs = validator.control->create_block_state_future( copy_b );
validator.control->abort_block();
BOOST_REQUIRE_EXCEPTION(validator.control->push_block( bs ), fc::exception ,
BOOST_REQUIRE_EXCEPTION(validator.control->push_block( bs, forked_branch_callback() ), fc::exception ,
[] (const fc::exception &e)->bool {
return e.code() == account_name_exists_exception::code_value ;
}) ;
Expand Down
2 changes: 1 addition & 1 deletion unittests/forked_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ BOOST_AUTO_TEST_CASE( forking ) try {
bad_block.transaction_mroot = bad_block.previous;
auto bad_block_bs = c.control->create_block_state_future( std::make_shared<signed_block>(std::move(bad_block)) );
c.control->abort_block();
BOOST_REQUIRE_EXCEPTION(c.control->push_block( bad_block_bs ), fc::exception,
BOOST_REQUIRE_EXCEPTION(c.control->push_block( bad_block_bs, forked_branch_callback() ), fc::exception,
[] (const fc::exception &ex)->bool {
return ex.to_detail_string().find("block not signed by expected key") != std::string::npos;
});
Expand Down
Loading