Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Unapplied transaction queue performance #7686

Merged
merged 19 commits into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
8f517ba
Add tests for duplicates
heifner Jul 25, 2019
964cebe
Add clear_applied
heifner Jul 25, 2019
4ad52cc
Clear applied on_block to avoid trying to applied already applied tra…
heifner Jul 25, 2019
8667daa
No need to track unapplied if not SPECULATIVE
heifner Jul 25, 2019
04821a5
Add aborted trxs to unapplied_transactions before push_block so trxs …
heifner Jul 25, 2019
c66bd0a
Missed removing capture on last commit
heifner Jul 25, 2019
34cea56
Make unapplied_transaction_queue responsible for tracking db_read_mode
heifner Jul 26, 2019
3752321
Change abort_block to always return applied transactions since unappl…
heifner Jul 26, 2019
e9b6775
Modify push_block to take a forked transactions callback so that prev…
heifner Jul 26, 2019
f8706ea
Modified add_forked to take a vector of transaction_metadata_ptr inst…
heifner Jul 26, 2019
66c77ff
push_block now takes a forked_trxs_callback
heifner Jul 26, 2019
dc0f76c
push_block now takes a forked_trxs_callback
heifner Jul 26, 2019
542fe66
Revert back to unapplied_transaction_queue taking a branch_type and b…
heifner Jul 26, 2019
36a6fd3
Use an enum for processing mode of unapplied_transaction_queue. Set m…
heifner Jul 26, 2019
10e725e
Remove unneeded include
heifner Jul 26, 2019
a58363c
Move unapplied_transaction_queue::set_mode to plugin_initialize
heifner Jul 27, 2019
9723af5
add to front of incoming_transaction_queue when block time/net exhaus…
heifner Jul 27, 2019
575dff6
emplace_front since that is the whole point
heifner Jul 29, 2019
536b258
Merge branch 'develop' into unapplied-performance
heifner Jul 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ struct controller_impl {
pending_head = fork_db.pending_head()
) {
wlog( "applying branch from fork database ending with block: ${id}", ("id", pending_head->id) );
maybe_switch_forks( pending_head, controller::block_status::complete, forked_trxs_callback() );
maybe_switch_forks( pending_head, controller::block_status::complete, forked_branch_callback() );
}
}

Expand Down Expand Up @@ -1820,7 +1820,7 @@ struct controller_impl {
} );
}

void push_block( std::future<block_state_ptr>& block_state_future, const forked_trxs_callback& forked_trxs_cb ) {
void push_block( std::future<block_state_ptr>& block_state_future, const forked_branch_callback& forked_branch_cb ) {
controller::block_status s = controller::block_status::complete;
EOS_ASSERT(!pending, block_validate_exception, "it is not valid to push a block when there is a pending block");

Expand All @@ -1842,7 +1842,7 @@ struct controller_impl {
emit( self.accepted_block_header, bsp );

if( read_mode != db_read_mode::IRREVERSIBLE ) {
maybe_switch_forks( fork_db.pending_head(), s, forked_trxs_cb );
maybe_switch_forks( fork_db.pending_head(), s, forked_branch_cb );
} else {
log_irreversible();
}
Expand Down Expand Up @@ -1894,13 +1894,13 @@ struct controller_impl {
} else {
EOS_ASSERT( read_mode != db_read_mode::IRREVERSIBLE, block_validate_exception,
"invariant failure: cannot replay reversible blocks while in irreversible mode" );
maybe_switch_forks( bsp, s, forked_trxs_callback() );
maybe_switch_forks( bsp, s, forked_branch_callback() );
}

} FC_LOG_AND_RETHROW( )
}

void maybe_switch_forks( const block_state_ptr& new_head, controller::block_status s, const forked_trxs_callback& forked_trxs_cb ) {
void maybe_switch_forks( const block_state_ptr& new_head, controller::block_status s, const forked_branch_callback& forked_branch_cb ) {
bool head_changed = true;
if( new_head->header.previous == head->id ) {
try {
Expand All @@ -1918,14 +1918,13 @@ struct controller_impl {
auto branches = fork_db.fetch_branch_from( new_head->id, head->id );

if( branches.second.size() > 0 ) {
// forked branches are in reverse order of how they were applied, signal callback in the order applied
for( auto ritr = branches.second.rbegin(), rend = branches.second.rend(); ritr != rend; ++ritr ) {
const block_state_ptr& bsptr = *ritr;
for( auto itr = branches.second.begin(); itr != branches.second.end(); ++itr ) {
pop_block();
forked_trxs_cb( bsptr->trxs );
}
EOS_ASSERT( self.head_block_id() == branches.second.back()->header.previous, fork_database_exception,
"loss of sync between fork_db and chainbase during fork switch" ); // _should_ never fail

if( forked_branch_cb ) forked_branch_cb( branches.second );
}

for( auto ritr = branches.first.rbegin(); ritr != branches.first.rend(); ++ritr ) {
Expand Down Expand Up @@ -2499,10 +2498,10 @@ std::future<block_state_ptr> controller::create_block_state_future( const signed
return my->create_block_state_future( b );
}

void controller::push_block( std::future<block_state_ptr>& block_state_future, const forked_trxs_callback& forked_trxs_cb ) {
void controller::push_block( std::future<block_state_ptr>& block_state_future, const forked_branch_callback& forked_branch_cb ) {
validate_db_available_size();
validate_reversible_available_size();
my->push_block( block_state_future, forked_trxs_cb );
my->push_block( block_state_future, forked_branch_cb );
}

transaction_trace_ptr controller::push_transaction( const transaction_metadata_ptr& trx, fc::time_point deadline, uint32_t billed_cpu_time_us ) {
Expand Down
4 changes: 2 additions & 2 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace eosio { namespace chain {
class account_object;
using resource_limits::resource_limits_manager;
using apply_handler = std::function<void(apply_context&)>;
using forked_trxs_callback = std::function<void(const std::vector<transaction_metadata_ptr>&)>;
using forked_branch_callback = std::function<void(const branch_type&)>;

class fork_database;

Expand Down Expand Up @@ -154,7 +154,7 @@ namespace eosio { namespace chain {
* @param block_state_future provide from call to create_block_state_future
* @param cb calls cb with forked applied transactions for each forked block
*/
void push_block( std::future<block_state_ptr>& block_state_future, const forked_trxs_callback& cb );
void push_block( std::future<block_state_ptr>& block_state_future, const forked_branch_callback& cb );

boost::asio::io_context& get_thread_pool();

Expand Down
45 changes: 24 additions & 21 deletions libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

#include <eosio/chain/transaction_metadata.hpp>
#include <eosio/chain/block_state.hpp>
#include <eosio/chain/controller.hpp>
#include <eosio/chain/exceptions.hpp>

#include <boost/multi_index_container.hpp>
Expand Down Expand Up @@ -50,7 +49,14 @@ struct unapplied_transaction {
* Persisted are first so that they can be applied in each block until expired.
*/
class unapplied_transaction_queue {
public:
enum class process_mode {
non_speculative, // HEAD, READ_ONLY, IRREVERSIBLE
speculative_non_producer, // will never produce
speculative_producer // can produce
};

private:
struct by_trx_id;
struct by_type;
struct by_expiry;
Expand All @@ -66,23 +72,15 @@ class unapplied_transaction_queue {
> unapplied_trx_queue_type;

unapplied_trx_queue_type queue;
db_read_mode read_mode = db_read_mode::SPECULATIVE;
bool only_track_persisted = false;
process_mode mode = process_mode::speculative_producer;

public:

void set_only_track_persisted( bool v ) {
if( v ) {
FC_ASSERT( empty(), "set_only_track_persisted, queue required to be empty" );
}
only_track_persisted = v;
}

void set_chain_read_mode( db_read_mode mode ) {
if( mode != read_mode ) {
FC_ASSERT( empty(), "set_chain_read_mode, queue required to be empty" );
void set_mode( process_mode new_mode ) {
if( new_mode != mode ) {
FC_ASSERT( empty(), "set_mode, queue required to be empty" );
}
read_mode = mode;
mode = new_mode;
}

bool empty() const {
Expand Down Expand Up @@ -127,24 +125,29 @@ class unapplied_transaction_queue {
}
}

void add_forked( const std::vector<transaction_metadata_ptr>& forked_trxs ) {
if( only_track_persisted || read_mode != db_read_mode::SPECULATIVE ) return;
for( const auto& trx : forked_trxs ) {
fc::time_point expiry = trx->packed_trx()->expiration();
queue.insert( { trx, expiry, trx_enum_type::forked } );
void add_forked( const branch_type& forked_branch ) {
if( mode == process_mode::non_speculative || mode == process_mode::speculative_non_producer ) return;
// forked_branch is in reverse order
for( auto ritr = forked_branch.rbegin(), rend = forked_branch.rend(); ritr != rend; ++ritr ) {
const block_state_ptr& bsptr = *ritr;
for( auto itr = bsptr->trxs.begin(), end = bsptr->trxs.end(); itr != end; ++itr ) {
const auto& trx = *itr;
fc::time_point expiry = trx->packed_trx()->expiration();
queue.insert( { trx, expiry, trx_enum_type::forked } );
}
}
}

void add_aborted( std::vector<transaction_metadata_ptr> aborted_trxs ) {
if( only_track_persisted || read_mode != db_read_mode::SPECULATIVE || aborted_trxs.empty() ) return;
if( mode == process_mode::non_speculative || mode == process_mode::speculative_non_producer ) return;
for( auto& trx : aborted_trxs ) {
fc::time_point expiry = trx->packed_trx()->expiration();
queue.insert( { std::move( trx ), expiry, trx_enum_type::aborted } );
}
}

void add_persisted( const transaction_metadata_ptr& trx ) {
if( read_mode != db_read_mode::SPECULATIVE ) return;
if( mode == process_mode::non_speculative ) return;
auto itr = queue.get<by_trx_id>().find( trx->id() );
if( itr == queue.get<by_trx_id>().end() ) {
fc::time_point expiry = trx->packed_trx()->expiration();
Expand Down
6 changes: 3 additions & 3 deletions libraries/testing/include/eosio/testing/tester.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ namespace eosio { namespace testing {
signed_block_ptr produce_block( fc::microseconds skip_time = fc::milliseconds(config::block_interval_ms) )override {
auto sb = _produce_block(skip_time, false);
auto bsf = validating_node->create_block_state_future( sb );
validating_node->push_block( bsf, forked_trxs_callback() );
validating_node->push_block( bsf, forked_branch_callback() );

return sb;
}
Expand All @@ -437,14 +437,14 @@ namespace eosio { namespace testing {

void validate_push_block(const signed_block_ptr& sb) {
auto bs = validating_node->create_block_state_future( sb );
validating_node->push_block( bs, forked_trxs_callback() );
validating_node->push_block( bs, forked_branch_callback() );
}

signed_block_ptr produce_empty_block( fc::microseconds skip_time = fc::milliseconds(config::block_interval_ms) )override {
unapplied_transactions.add_aborted( control->abort_block() );
auto sb = _produce_block(skip_time, true);
auto bsf = validating_node->create_block_state_future( sb );
validating_node->push_block( bsf, forked_trxs_callback() );
validating_node->push_block( bsf, forked_branch_callback() );

return sb;
}
Expand Down
6 changes: 3 additions & 3 deletions libraries/testing/tester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ namespace eosio { namespace testing {
void base_tester::push_block(signed_block_ptr b) {
auto bsf = control->create_block_state_future(b);
unapplied_transactions.add_aborted( control->abort_block() );
control->push_block( bsf, [this]( const std::vector<transaction_metadata_ptr>& forked_trxs ) {
unapplied_transactions.add_forked( forked_trxs );
control->push_block( bsf, [this]( const branch_type& forked_branch ) {
unapplied_transactions.add_forked( forked_branch );
} );

auto itr = last_produced_block.find(b->producer);
Expand Down Expand Up @@ -931,7 +931,7 @@ namespace eosio { namespace testing {
if( block ) { //&& !b.control->is_known_block(block->id()) ) {
auto bsf = b.control->create_block_state_future( block );
b.control->abort_block();
b.control->push_block(bsf, forked_trxs_callback()); //, eosio::chain::validation_steps::created_block);
b.control->push_block(bsf, forked_branch_callback()); //, eosio::chain::validation_steps::created_block);
}
}
};
Expand Down
13 changes: 7 additions & 6 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

#include <iostream>
#include <algorithm>
#include <functional>
#include <boost/algorithm/string.hpp>
#include <boost/range/adaptor/map.hpp>
#include <boost/function_output_iterator.hpp>
Expand Down Expand Up @@ -365,8 +364,8 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
// push the new block
bool except = false;
try {
chain.push_block( bsf, [this]( const std::vector<transaction_metadata_ptr>& forked_trxs ) {
_unapplied_transactions.add_forked( forked_trxs );
chain.push_block( bsf, [this]( const branch_type& forked_branch ) {
_unapplied_transactions.add_forked( forked_branch );
} );
} catch ( const guard_exception& e ) {
chain_plug->handle_guard_exception(e);
Expand Down Expand Up @@ -724,8 +723,6 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_
my->_options = &options;
LOAD_VALUE_SET(options, "producer-name", my->_producers)

my->_unapplied_transactions.set_only_track_persisted( my->_producers.empty() );

if( options.count("private-key") )
{
const std::vector<std::string> key_id_to_wif_pair_strings = options["private-key"].as<std::vector<std::string>>();
Expand Down Expand Up @@ -849,13 +846,17 @@ void producer_plugin::plugin_startup()
ilog("producer plugin: plugin_startup() begin");

chain::controller& chain = my->chain_plug->chain();
my->_unapplied_transactions.set_chain_read_mode( chain.get_read_mode() );
EOS_ASSERT( my->_producers.empty() || chain.get_read_mode() == chain::db_read_mode::SPECULATIVE, plugin_config_exception,
"node cannot have any producer-name configured because block production is impossible when read_mode is not \"speculative\"" );

EOS_ASSERT( my->_producers.empty() || chain.get_validation_mode() == chain::validation_mode::FULL, plugin_config_exception,
"node cannot have any producer-name configured because block production is not safe when validation_mode is not \"full\"" );

unapplied_transaction_queue::process_mode unapplied_mode =
(chain.get_read_mode() != chain::db_read_mode::SPECULATIVE) ? unapplied_transaction_queue::process_mode::non_speculative :
my->_producers.empty() ? unapplied_transaction_queue::process_mode::speculative_non_producer :
unapplied_transaction_queue::process_mode::speculative_producer;
my->_unapplied_transactions.set_mode( unapplied_mode );
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do this in producer_plugin::plugin_initiailize. Since producer_plugin depends on chain_plugin, it is safe to get the read_mode from the controller at that point.


my->_accepted_block_connection.emplace(chain.accepted_block.connect( [this]( const auto& bsp ){ my->on_block( bsp ); } ));
my->_irreversible_block_connection.emplace(chain.irreversible_block.connect( [this]( const auto& bsp ){ my->on_irreversible_block( bsp->block ); } ));
Expand Down
2 changes: 1 addition & 1 deletion unittests/block_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ BOOST_AUTO_TEST_CASE(block_with_invalid_tx_test)
tester validator;
auto bs = validator.control->create_block_state_future( copy_b );
validator.control->abort_block();
BOOST_REQUIRE_EXCEPTION(validator.control->push_block( bs, forked_trxs_callback() ), fc::exception ,
BOOST_REQUIRE_EXCEPTION(validator.control->push_block( bs, forked_branch_callback() ), fc::exception ,
[] (const fc::exception &e)->bool {
return e.code() == account_name_exists_exception::code_value ;
}) ;
Expand Down
2 changes: 1 addition & 1 deletion unittests/forked_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ BOOST_AUTO_TEST_CASE( forking ) try {
bad_block.transaction_mroot = bad_block.previous;
auto bad_block_bs = c.control->create_block_state_future( std::make_shared<signed_block>(std::move(bad_block)) );
c.control->abort_block();
BOOST_REQUIRE_EXCEPTION(c.control->push_block( bad_block_bs, forked_trxs_callback() ), fc::exception,
BOOST_REQUIRE_EXCEPTION(c.control->push_block( bad_block_bs, forked_branch_callback() ), fc::exception,
[] (const fc::exception &ex)->bool {
return ex.to_detail_string().find("block not signed by expected key") != std::string::npos;
});
Expand Down
36 changes: 11 additions & 25 deletions unittests/unapplied_transaction_queue_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,7 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try {
bs2->trxs = { trx3, trx4, trx5 };
auto bs3 = std::make_shared<block_state>();
bs3->trxs = { trx6 };
q.add_forked( bs1->trxs );
q.add_forked( bs1->trxs ); // bs1 duplicate ignored
q.add_forked( bs2->trxs );
q.add_forked( bs3->trxs );
q.add_forked( { bs3, bs2, bs1, bs1 } ); // bs1 duplicate ignored
BOOST_CHECK( q.size() == 6 );
BOOST_REQUIRE( next( q ) == trx1 );
BOOST_CHECK( q.size() == 5 );
Expand All @@ -151,10 +148,9 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try {
// fifo forked
auto bs4 = std::make_shared<block_state>();
bs4->trxs = { trx7 };
q.add_forked( bs1->trxs );
q.add_forked( bs2->trxs );
q.add_forked( bs3->trxs );
q.add_forked( bs4->trxs );
q.add_forked( { bs1 } );
q.add_forked( { bs3, bs2 } );
q.add_forked( { bs4 } );
BOOST_CHECK( q.size() == 7 );
BOOST_REQUIRE( next( q ) == trx1 );
BOOST_CHECK( q.size() == 6 );
Expand Down Expand Up @@ -188,14 +184,10 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try {
auto bs6 = std::make_shared<block_state>();
bs5->trxs = { trx11, trx12, trx13 };
bs6->trxs = { trx11, trx15 };
q.add_forked( bs1->trxs );
q.add_forked( bs2->trxs );
q.add_forked( bs3->trxs );
q.add_forked( bs4->trxs );
q.add_forked( bs3->trxs ); // dups not added
q.add_forked( bs2->trxs ); // dups not added
q.add_forked( bs5->trxs );
q.add_forked( bs6->trxs );
q.add_forked( { bs3, bs2, bs1 } );
q.add_forked( { bs4 } );
q.add_forked( { bs3, bs2 } ); // dups ignored
q.add_forked( { bs6, bs5 } );
BOOST_CHECK_EQUAL( q.size(), 11 );
BOOST_REQUIRE( next( q ) == trx1 );
BOOST_CHECK( q.size() == 10 );
Expand Down Expand Up @@ -223,16 +215,12 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try {
BOOST_CHECK( q.empty() );

// altogether, order fifo: persisted, forked, aborted
q.add_forked( bs1->trxs );
q.add_forked( bs2->trxs );
q.add_forked( bs3->trxs );
q.add_forked( { bs3, bs2, bs1 } );
q.add_persisted( trx16 );
q.add_aborted( { trx9, trx14 } );
q.add_persisted( trx8 );
q.add_aborted( { trx18, trx19 } );
q.add_forked( bs4->trxs);
q.add_forked( bs5->trxs );
q.add_forked( bs6->trxs );
q.add_forked( { bs6, bs5, bs4 } );
BOOST_CHECK( q.size() == 17 );
BOOST_REQUIRE( next( q ) == trx16 );
BOOST_CHECK( q.size() == 16 );
Expand Down Expand Up @@ -271,9 +259,7 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try {
BOOST_REQUIRE( next( q ) == nullptr );
BOOST_CHECK( q.empty() );

q.add_forked( bs1->trxs );
q.add_forked( bs2->trxs );
q.add_forked( bs3->trxs );
q.add_forked( { bs3, bs2, bs1 } );
q.add_aborted( { trx9, trx11 } );
q.add_persisted( trx8 );
q.clear();
Expand Down