From 8f517ba1929c61e44518d57bf12fe479c07b2fea Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Thu, 25 Jul 2019 08:06:15 -0500 Subject: [PATCH 01/18] Add tests for duplicates --- unittests/unapplied_transaction_queue_tests.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/unittests/unapplied_transaction_queue_tests.cpp b/unittests/unapplied_transaction_queue_tests.cpp index c1d530c412f..5cb80681a6e 100644 --- a/unittests/unapplied_transaction_queue_tests.cpp +++ b/unittests/unapplied_transaction_queue_tests.cpp @@ -78,6 +78,7 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try { // fifo aborted q.add_aborted( { trx1, trx2, trx3 } ); + q.add_aborted( { trx1, trx2, trx3 } ); // duplicates ignored BOOST_CHECK( q.size() == 3 ); BOOST_REQUIRE( next( q ) == trx1 ); BOOST_CHECK( q.size() == 2 ); @@ -118,7 +119,7 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try { bs2->trxs = { trx3, trx4, trx5 }; auto bs3 = std::make_shared(); bs3->trxs = { trx6 }; - q.add_forked( { bs3, bs2, bs1 } ); + q.add_forked( { bs3, bs2, bs1, bs1 } ); // bs1 duplicate ignored BOOST_CHECK( q.size() == 6 ); BOOST_REQUIRE( next( q ) == trx1 ); BOOST_CHECK( q.size() == 5 ); From 964cebe43b2b53a87c88c4887294cb677fcce2ad Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Thu, 25 Jul 2019 08:11:14 -0500 Subject: [PATCH 02/18] Add clear_applied --- .../include/eosio/chain/unapplied_transaction_queue.hpp | 7 +++++++ unittests/unapplied_transaction_queue_tests.cpp | 9 +++++++++ 2 files changed, 16 insertions(+) diff --git a/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp b/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp index eb9275eab5e..65f56b31481 100644 --- a/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp +++ b/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp @@ -111,6 +111,13 @@ class unapplied_transaction_queue { return true; } + void clear_applied( const std::vector& applied_trxs ) { + auto& idx = queue.get(); + for( const auto& trx : applied_trxs ) { + idx.erase( trx->id() ); + } + } + void add_forked( const branch_type& forked_branch ) { if( only_track_persisted ) return; // forked_branch is in reverse order diff --git a/unittests/unapplied_transaction_queue_tests.cpp b/unittests/unapplied_transaction_queue_tests.cpp index 5cb80681a6e..4a69f4954ed 100644 --- a/unittests/unapplied_transaction_queue_tests.cpp +++ b/unittests/unapplied_transaction_queue_tests.cpp @@ -89,6 +89,15 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try { BOOST_REQUIRE( next( q ) == nullptr ); BOOST_CHECK( q.empty() ); + // clear applied + q.add_aborted( { trx1, trx2, trx3 } ); + q.clear_applied( { trx1, trx3, trx4 } ); + BOOST_CHECK( q.size() == 1 ); + BOOST_REQUIRE( next( q ) == trx2 ); + BOOST_CHECK( q.size() == 0 ); + BOOST_REQUIRE( next( q ) == nullptr ); + BOOST_CHECK( q.empty() ); + // order: persisted, aborted q.add_persisted( trx6 ); q.add_aborted( { trx1, trx2, trx3 } ); From 4ad52ccc4e7196c269bf136317a7e5c2de69d46e Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Thu, 25 Jul 2019 08:16:09 -0500 Subject: [PATCH 03/18] Clear applied on_block to avoid trying to applied already applied transactions --- plugins/producer_plugin/producer_plugin.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index a969f4240c4..1555bd5c149 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -231,6 +231,8 @@ class producer_plugin_impl : public std::enable_shared_from_thistrxs ); + if( bsp->header.timestamp <= _last_signed_block_time ) return; if( bsp->header.timestamp <= _start_time ) return; if( bsp->block_num <= _last_signed_block_num ) return; From 8667daae309ff378404dae5d635e8136b55b0df3 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Thu, 25 Jul 2019 09:27:10 -0500 Subject: [PATCH 04/18] No need to track unapplied if not SPECULATIVE --- libraries/chain/controller.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index 920fd7d3e53..33d7a05d55e 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -1962,7 +1962,9 @@ struct controller_impl { } // end if exception } /// end for each block in branch - unapplied_branch = std::move( branches.second ); + if ( read_mode == db_read_mode::SPECULATIVE ) { + unapplied_branch = std::move( branches.second ); + } ilog("successfully switched fork to new head ${new_head_id}", ("new_head_id", new_head->id)); } else { From 04821a5d9a7c1250766c843f949633effd16f793 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Thu, 25 Jul 2019 10:56:13 -0500 Subject: [PATCH 05/18] Add aborted trxs to unapplied_transactions before push_block so trxs in aborted that are applied can be removed from unapplied_transactions avoiding having to determine they are a duplicate later. --- plugins/producer_plugin/producer_plugin.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 1555bd5c149..59a2e357b84 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -354,11 +354,10 @@ class producer_plugin_impl : public std::enable_shared_from_this aborted_trxs = chain.abort_block(); + _unapplied_transactions.add_aborted( chain.abort_block() ); // exceptions throw out, make sure we restart our loop auto ensure = fc::make_scoped_exit([this, &aborted_trxs](){ - _unapplied_transactions.add_aborted( std::move( aborted_trxs ) ); schedule_production_loop(); }); From c66bd0a50e4c5119dd9e590e46183137842889e2 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Thu, 25 Jul 2019 11:10:05 -0500 Subject: [PATCH 06/18] Missed removing capture on last commit --- plugins/producer_plugin/producer_plugin.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 59a2e357b84..ab7b7bcc9bd 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -357,7 +357,7 @@ class producer_plugin_impl : public std::enable_shared_from_this Date: Fri, 26 Jul 2019 09:07:31 -0500 Subject: [PATCH 07/18] Make unapplied_transaction_queue responsible for tracking db_read_mode --- .../chain/unapplied_transaction_queue.hpp | 18 ++++++++++++++---- plugins/producer_plugin/producer_plugin.cpp | 1 + 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp b/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp index 65f56b31481..51689d0353d 100644 --- a/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp +++ b/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp @@ -7,6 +7,7 @@ #include #include +#include #include #include @@ -46,7 +47,7 @@ struct unapplied_transaction { /** * Track unapplied transactions for persisted, forked blocks, and aborted blocks. - * Persisted to first so that they can be applied in each block until expired. + * Persisted are first so that they can be applied in each block until expired. */ class unapplied_transaction_queue { @@ -65,17 +66,25 @@ class unapplied_transaction_queue { > unapplied_trx_queue_type; unapplied_trx_queue_type queue; + db_read_mode read_mode = db_read_mode::SPECULATIVE; bool only_track_persisted = false; public: void set_only_track_persisted( bool v ) { if( v ) { - FC_ASSERT( empty(), "set_only_track_persisted queue required to be empty" ); + FC_ASSERT( empty(), "set_only_track_persisted, queue required to be empty" ); } only_track_persisted = v; } + void set_chain_read_mode( db_read_mode mode ) { + if( mode != read_mode ) { + FC_ASSERT( empty(), "set_chain_read_mode, queue required to be empty" ); + } + read_mode = mode; + } + bool empty() const { return queue.empty(); } @@ -119,7 +128,7 @@ class unapplied_transaction_queue { } void add_forked( const branch_type& forked_branch ) { - if( only_track_persisted ) return; + if( only_track_persisted || read_mode != db_read_mode::SPECULATIVE ) return; // forked_branch is in reverse order for( auto ritr = forked_branch.rbegin(), rend = forked_branch.rend(); ritr != rend; ++ritr ) { const block_state_ptr& bsptr = *ritr; @@ -132,7 +141,7 @@ class unapplied_transaction_queue { } void add_aborted( std::vector aborted_trxs ) { - if( aborted_trxs.empty() || only_track_persisted ) return; + if( only_track_persisted || read_mode != db_read_mode::SPECULATIVE || aborted_trxs.empty() ) return; for( auto& trx : aborted_trxs ) { fc::time_point expiry = trx->packed_trx()->expiration(); queue.insert( { std::move( trx ), expiry, trx_enum_type::aborted } ); @@ -140,6 +149,7 @@ class unapplied_transaction_queue { } void add_persisted( const transaction_metadata_ptr& trx ) { + if( read_mode != db_read_mode::SPECULATIVE ) return; auto itr = queue.get().find( trx->id() ); if( itr == queue.get().end() ) { fc::time_point expiry = trx->packed_trx()->expiration(); diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index ab7b7bcc9bd..83d073f302b 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -846,6 +846,7 @@ void producer_plugin::plugin_startup() ilog("producer plugin: plugin_startup() begin"); chain::controller& chain = my->chain_plug->chain(); + my->_unapplied_transactions.set_chain_read_mode( chain.get_read_mode() ); EOS_ASSERT( my->_producers.empty() || chain.get_read_mode() == chain::db_read_mode::SPECULATIVE, plugin_config_exception, "node cannot have any producer-name configured because block production is impossible when read_mode is not \"speculative\"" ); From 375232119dc231c01e24256a12fbaea3095f855b Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 26 Jul 2019 09:10:57 -0500 Subject: [PATCH 08/18] Change abort_block to always return applied transactions since unapplied_transaction_queue will determine if they should be used or not --- libraries/chain/controller.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index 33d7a05d55e..097e960d803 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -1980,9 +1980,7 @@ struct controller_impl { vector abort_block() { vector applied_trxs; if( pending ) { - if ( read_mode == db_read_mode::SPECULATIVE ) { - applied_trxs = pending->extract_trx_metas(); - } + applied_trxs = pending->extract_trx_metas(); pending.reset(); protocol_features.popped_blocks_to( head->block_num ); } From e9b6775df395157f79229d98a23d9890299efd84 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 26 Jul 2019 11:52:36 -0500 Subject: [PATCH 09/18] Modify push_block to take a forked transactions callback so that previously applied transactions can be added to the unapplied_transaction_queue as they are popped. This allows the transactions to be removed from the unapplied_transaction_queue as they are applied in apply_block. Before they were added after the apply which would require re-attempting to apply them when processing unapplied_tranactions --- libraries/chain/controller.cpp | 28 ++++++++----------- .../chain/include/eosio/chain/controller.hpp | 5 ++-- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index 097e960d803..df8e6e4768d 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -690,7 +690,7 @@ struct controller_impl { pending_head = fork_db.pending_head() ) { wlog( "applying branch from fork database ending with block: ${id}", ("id", pending_head->id) ); - maybe_switch_forks( pending_head, controller::block_status::complete ); + maybe_switch_forks( pending_head, controller::block_status::complete, forked_trxs_callback() ); } } @@ -1820,7 +1820,7 @@ struct controller_impl { } ); } - branch_type push_block( std::future& block_state_future ) { + void push_block( std::future& block_state_future, const forked_trxs_callback& forked_trxs_cb ) { controller::block_status s = controller::block_status::complete; EOS_ASSERT(!pending, block_validate_exception, "it is not valid to push a block when there is a pending block"); @@ -1842,10 +1842,9 @@ struct controller_impl { emit( self.accepted_block_header, bsp ); if( read_mode != db_read_mode::IRREVERSIBLE ) { - return maybe_switch_forks( fork_db.pending_head(), s ); + maybe_switch_forks( fork_db.pending_head(), s, forked_trxs_cb ); } else { log_irreversible(); - return {}; } } FC_LOG_AND_RETHROW( ) @@ -1895,15 +1894,14 @@ struct controller_impl { } else { EOS_ASSERT( read_mode != db_read_mode::IRREVERSIBLE, block_validate_exception, "invariant failure: cannot replay reversible blocks while in irreversible mode" ); - maybe_switch_forks( bsp, s ); + maybe_switch_forks( bsp, s, forked_trxs_callback() ); } } FC_LOG_AND_RETHROW( ) } - branch_type maybe_switch_forks( const block_state_ptr& new_head, controller::block_status s ) { + void maybe_switch_forks( const block_state_ptr& new_head, controller::block_status s, const forked_trxs_callback& forked_trxs_cb ) { bool head_changed = true; - branch_type unapplied_branch; if( new_head->header.previous == head->id ) { try { apply_block( new_head, s ); @@ -1920,8 +1918,11 @@ struct controller_impl { auto branches = fork_db.fetch_branch_from( new_head->id, head->id ); if( branches.second.size() > 0 ) { - for( auto itr = branches.second.begin(); itr != branches.second.end(); ++itr ) { + // forked branches are in reverse order of how they were applied, signal callback in the order applied + for( auto ritr = branches.second.rbegin(), rend = branches.second.rend(); ritr != rend; ++ritr ) { + const block_state_ptr& bsptr = *ritr; pop_block(); + forked_trxs_cb( bsptr->trxs ); } EOS_ASSERT( self.head_block_id() == branches.second.back()->header.previous, fork_database_exception, "loss of sync between fork_db and chainbase during fork switch" ); // _should_ never fail @@ -1944,7 +1945,7 @@ struct controller_impl { // Remove the block that threw and all forks built off it. fork_db.remove( (*ritr)->id ); - // pop all blocks from the bad fork + // pop all blocks from the bad fork, discarding their transactions // ritr base is a forward itr to the last block successfully applied auto applied_itr = ritr.base(); for( auto itr = applied_itr; itr != branches.first.end(); ++itr ) { @@ -1962,10 +1963,6 @@ struct controller_impl { } // end if exception } /// end for each block in branch - if ( read_mode == db_read_mode::SPECULATIVE ) { - unapplied_branch = std::move( branches.second ); - } - ilog("successfully switched fork to new head ${new_head_id}", ("new_head_id", new_head->id)); } else { head_changed = false; @@ -1974,7 +1971,6 @@ struct controller_impl { if( head_changed ) log_irreversible(); - return unapplied_branch; } /// push_block vector abort_block() { @@ -2503,10 +2499,10 @@ std::future controller::create_block_state_future( const signed return my->create_block_state_future( b ); } -branch_type controller::push_block( std::future& block_state_future ) { +void controller::push_block( std::future& block_state_future, const forked_trxs_callback& forked_trxs_cb ) { validate_db_available_size(); validate_reversible_available_size(); - return my->push_block( block_state_future ); + my->push_block( block_state_future, forked_trxs_cb ); } transaction_trace_ptr controller::push_transaction( const transaction_metadata_ptr& trx, fc::time_point deadline, uint32_t billed_cpu_time_us ) { diff --git a/libraries/chain/include/eosio/chain/controller.hpp b/libraries/chain/include/eosio/chain/controller.hpp index 3a3ff2550f1..f3529d71a59 100644 --- a/libraries/chain/include/eosio/chain/controller.hpp +++ b/libraries/chain/include/eosio/chain/controller.hpp @@ -36,6 +36,7 @@ namespace eosio { namespace chain { class account_object; using resource_limits::resource_limits_manager; using apply_handler = std::function; + using forked_trxs_callback = std::function&)>; class fork_database; @@ -151,9 +152,9 @@ namespace eosio { namespace chain { /** * @param block_state_future provide from call to create_block_state_future - * @return branch of unapplied blocks from fork switch + * @param cb calls cb with forked applied transactions for each forked block */ - branch_type push_block( std::future& block_state_future ); + void push_block( std::future& block_state_future, const forked_trxs_callback& cb ); boost::asio::io_context& get_thread_pool(); From f8706ea8bd7a458dd996af3fb05f52543fd1f262 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 26 Jul 2019 12:09:52 -0500 Subject: [PATCH 10/18] Modified add_forked to take a vector of transaction_metadata_ptr instead of a branch_type since each forked block is added individually now --- .../chain/unapplied_transaction_queue.hpp | 13 +++---- .../unapplied_transaction_queue_tests.cpp | 36 +++++++++++++------ 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp b/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp index 51689d0353d..a6e785fab67 100644 --- a/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp +++ b/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp @@ -127,16 +127,11 @@ class unapplied_transaction_queue { } } - void add_forked( const branch_type& forked_branch ) { + void add_forked( const std::vector& forked_trxs ) { if( only_track_persisted || read_mode != db_read_mode::SPECULATIVE ) return; - // forked_branch is in reverse order - for( auto ritr = forked_branch.rbegin(), rend = forked_branch.rend(); ritr != rend; ++ritr ) { - const block_state_ptr& bsptr = *ritr; - for( auto itr = bsptr->trxs.begin(), end = bsptr->trxs.end(); itr != end; ++itr ) { - const auto& trx = *itr; - fc::time_point expiry = trx->packed_trx()->expiration(); - queue.insert( { trx, expiry, trx_enum_type::forked } ); - } + for( const auto& trx : forked_trxs ) { + fc::time_point expiry = trx->packed_trx()->expiration(); + queue.insert( { trx, expiry, trx_enum_type::forked } ); } } diff --git a/unittests/unapplied_transaction_queue_tests.cpp b/unittests/unapplied_transaction_queue_tests.cpp index 4a69f4954ed..d30861f90fb 100644 --- a/unittests/unapplied_transaction_queue_tests.cpp +++ b/unittests/unapplied_transaction_queue_tests.cpp @@ -128,7 +128,10 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try { bs2->trxs = { trx3, trx4, trx5 }; auto bs3 = std::make_shared(); bs3->trxs = { trx6 }; - q.add_forked( { bs3, bs2, bs1, bs1 } ); // bs1 duplicate ignored + q.add_forked( bs1->trxs ); + q.add_forked( bs1->trxs ); // bs1 duplicate ignored + q.add_forked( bs2->trxs ); + q.add_forked( bs3->trxs ); BOOST_CHECK( q.size() == 6 ); BOOST_REQUIRE( next( q ) == trx1 ); BOOST_CHECK( q.size() == 5 ); @@ -148,9 +151,10 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try { // fifo forked auto bs4 = std::make_shared(); bs4->trxs = { trx7 }; - q.add_forked( { bs1 } ); - q.add_forked( { bs3, bs2 } ); - q.add_forked( { bs4 } ); + q.add_forked( bs1->trxs ); + q.add_forked( bs2->trxs ); + q.add_forked( bs3->trxs ); + q.add_forked( bs4->trxs ); BOOST_CHECK( q.size() == 7 ); BOOST_REQUIRE( next( q ) == trx1 ); BOOST_CHECK( q.size() == 6 ); @@ -184,10 +188,14 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try { auto bs6 = std::make_shared(); bs5->trxs = { trx11, trx12, trx13 }; bs6->trxs = { trx11, trx15 }; - q.add_forked( { bs3, bs2, bs1 } ); - q.add_forked( { bs4 } ); - q.add_forked( { bs3, bs2 } ); // dups removed - q.add_forked( { bs6, bs5 } ); + q.add_forked( bs1->trxs ); + q.add_forked( bs2->trxs ); + q.add_forked( bs3->trxs ); + q.add_forked( bs4->trxs ); + q.add_forked( bs3->trxs ); // dups not added + q.add_forked( bs2->trxs ); // dups not added + q.add_forked( bs5->trxs ); + q.add_forked( bs6->trxs ); BOOST_CHECK_EQUAL( q.size(), 11 ); BOOST_REQUIRE( next( q ) == trx1 ); BOOST_CHECK( q.size() == 10 ); @@ -215,12 +223,16 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try { BOOST_CHECK( q.empty() ); // altogether, order fifo: persisted, forked, aborted - q.add_forked( { bs3, bs2, bs1 } ); + q.add_forked( bs1->trxs ); + q.add_forked( bs2->trxs ); + q.add_forked( bs3->trxs ); q.add_persisted( trx16 ); q.add_aborted( { trx9, trx14 } ); q.add_persisted( trx8 ); q.add_aborted( { trx18, trx19 } ); - q.add_forked( { bs6, bs5, bs4 } ); + q.add_forked( bs4->trxs); + q.add_forked( bs5->trxs ); + q.add_forked( bs6->trxs ); BOOST_CHECK( q.size() == 17 ); BOOST_REQUIRE( next( q ) == trx16 ); BOOST_CHECK( q.size() == 16 ); @@ -259,7 +271,9 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try { BOOST_REQUIRE( next( q ) == nullptr ); BOOST_CHECK( q.empty() ); - q.add_forked( { bs3, bs2, bs1 } ); + q.add_forked( bs1->trxs ); + q.add_forked( bs2->trxs ); + q.add_forked( bs3->trxs ); q.add_aborted( { trx9, trx11 } ); q.add_persisted( trx8 ); q.clear(); From 66c77ffb7ed4564746a50c6e278ade5e71f5c785 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 26 Jul 2019 12:10:32 -0500 Subject: [PATCH 11/18] push_block now takes a forked_trxs_callback --- plugins/producer_plugin/producer_plugin.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 83d073f302b..83718be47d3 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -364,7 +365,9 @@ class producer_plugin_impl : public std::enable_shared_from_this& forked_trxs ) { + _unapplied_transactions.add_forked( forked_trxs ); + } ); } catch ( const guard_exception& e ) { chain_plug->handle_guard_exception(e); return; From dc0f76c9a161bc3c631b767b9c05978e731dc4b3 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 26 Jul 2019 12:12:58 -0500 Subject: [PATCH 12/18] push_block now takes a forked_trxs_callback --- libraries/testing/include/eosio/testing/tester.hpp | 10 +++++----- libraries/testing/tester.cpp | 13 +++++++------ unittests/block_tests.cpp | 2 +- unittests/forked_tests.cpp | 2 +- 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/libraries/testing/include/eosio/testing/tester.hpp b/libraries/testing/include/eosio/testing/tester.hpp index 27edc7b7cdc..e2556443928 100644 --- a/libraries/testing/include/eosio/testing/tester.hpp +++ b/libraries/testing/include/eosio/testing/tester.hpp @@ -425,8 +425,8 @@ namespace eosio { namespace testing { signed_block_ptr produce_block( fc::microseconds skip_time = fc::milliseconds(config::block_interval_ms) )override { auto sb = _produce_block(skip_time, false); - auto bs = validating_node->create_block_state_future( sb ); - validating_node->push_block( bs ); + auto bsf = validating_node->create_block_state_future( sb ); + validating_node->push_block( bsf, forked_trxs_callback() ); return sb; } @@ -437,14 +437,14 @@ namespace eosio { namespace testing { void validate_push_block(const signed_block_ptr& sb) { auto bs = validating_node->create_block_state_future( sb ); - validating_node->push_block( bs ); + validating_node->push_block( bs, forked_trxs_callback() ); } signed_block_ptr produce_empty_block( fc::microseconds skip_time = fc::milliseconds(config::block_interval_ms) )override { unapplied_transactions.add_aborted( control->abort_block() ); auto sb = _produce_block(skip_time, true); - auto bs = validating_node->create_block_state_future( sb ); - validating_node->push_block( bs ); + auto bsf = validating_node->create_block_state_future( sb ); + validating_node->push_block( bsf, forked_trxs_callback() ); return sb; } diff --git a/libraries/testing/tester.cpp b/libraries/testing/tester.cpp index c8db3c0fa16..09144b23edf 100644 --- a/libraries/testing/tester.cpp +++ b/libraries/testing/tester.cpp @@ -224,10 +224,11 @@ namespace eosio { namespace testing { } void base_tester::push_block(signed_block_ptr b) { - auto bs = control->create_block_state_future(b); - vector aborted_trxs = control->abort_block(); - unapplied_transactions.add_forked( control->push_block( bs ) ); - unapplied_transactions.add_aborted( std::move( aborted_trxs ) ); + auto bsf = control->create_block_state_future(b); + unapplied_transactions.add_aborted( control->abort_block() ); + control->push_block( bsf, [this]( const std::vector& forked_trxs ) { + unapplied_transactions.add_forked( forked_trxs ); + } ); auto itr = last_produced_block.find(b->producer); if (itr == last_produced_block.end() || block_header::num_from_id(b->id()) > block_header::num_from_id(itr->second)) { @@ -928,9 +929,9 @@ namespace eosio { namespace testing { auto block = a.control->fetch_block_by_number(i); if( block ) { //&& !b.control->is_known_block(block->id()) ) { - auto bs = b.control->create_block_state_future( block ); + auto bsf = b.control->create_block_state_future( block ); b.control->abort_block(); - b.control->push_block(bs); //, eosio::chain::validation_steps::created_block); + b.control->push_block(bsf, forked_trxs_callback()); //, eosio::chain::validation_steps::created_block); } } }; diff --git a/unittests/block_tests.cpp b/unittests/block_tests.cpp index 8b8fe2c11cf..def5657b3a4 100644 --- a/unittests/block_tests.cpp +++ b/unittests/block_tests.cpp @@ -43,7 +43,7 @@ BOOST_AUTO_TEST_CASE(block_with_invalid_tx_test) tester validator; auto bs = validator.control->create_block_state_future( copy_b ); validator.control->abort_block(); - BOOST_REQUIRE_EXCEPTION(validator.control->push_block( bs ), fc::exception , + BOOST_REQUIRE_EXCEPTION(validator.control->push_block( bs, forked_trxs_callback() ), fc::exception , [] (const fc::exception &e)->bool { return e.code() == account_name_exists_exception::code_value ; }) ; diff --git a/unittests/forked_tests.cpp b/unittests/forked_tests.cpp index 4352c381820..10564cf4876 100644 --- a/unittests/forked_tests.cpp +++ b/unittests/forked_tests.cpp @@ -278,7 +278,7 @@ BOOST_AUTO_TEST_CASE( forking ) try { bad_block.transaction_mroot = bad_block.previous; auto bad_block_bs = c.control->create_block_state_future( std::make_shared(std::move(bad_block)) ); c.control->abort_block(); - BOOST_REQUIRE_EXCEPTION(c.control->push_block( bad_block_bs ), fc::exception, + BOOST_REQUIRE_EXCEPTION(c.control->push_block( bad_block_bs, forked_trxs_callback() ), fc::exception, [] (const fc::exception &ex)->bool { return ex.to_detail_string().find("block not signed by expected key") != std::string::npos; }); From 542fe66deb994fac8e70c72091e50bdc713b1071 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 26 Jul 2019 15:57:54 -0500 Subject: [PATCH 13/18] Revert back to unapplied_transaction_queue taking a branch_type and being responsible for processing it in the correct order --- libraries/chain/controller.cpp | 21 ++++++----- .../chain/include/eosio/chain/controller.hpp | 4 +-- .../chain/unapplied_transaction_queue.hpp | 13 ++++--- .../testing/include/eosio/testing/tester.hpp | 6 ++-- libraries/testing/tester.cpp | 6 ++-- plugins/producer_plugin/producer_plugin.cpp | 4 +-- unittests/block_tests.cpp | 2 +- unittests/forked_tests.cpp | 2 +- .../unapplied_transaction_queue_tests.cpp | 36 ++++++------------- 9 files changed, 42 insertions(+), 52 deletions(-) diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index df8e6e4768d..f083df3b3c3 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -690,7 +690,7 @@ struct controller_impl { pending_head = fork_db.pending_head() ) { wlog( "applying branch from fork database ending with block: ${id}", ("id", pending_head->id) ); - maybe_switch_forks( pending_head, controller::block_status::complete, forked_trxs_callback() ); + maybe_switch_forks( pending_head, controller::block_status::complete, forked_branch_callback() ); } } @@ -1820,7 +1820,7 @@ struct controller_impl { } ); } - void push_block( std::future& block_state_future, const forked_trxs_callback& forked_trxs_cb ) { + void push_block( std::future& block_state_future, const forked_branch_callback& forked_branch_cb ) { controller::block_status s = controller::block_status::complete; EOS_ASSERT(!pending, block_validate_exception, "it is not valid to push a block when there is a pending block"); @@ -1842,7 +1842,7 @@ struct controller_impl { emit( self.accepted_block_header, bsp ); if( read_mode != db_read_mode::IRREVERSIBLE ) { - maybe_switch_forks( fork_db.pending_head(), s, forked_trxs_cb ); + maybe_switch_forks( fork_db.pending_head(), s, forked_branch_cb ); } else { log_irreversible(); } @@ -1894,13 +1894,13 @@ struct controller_impl { } else { EOS_ASSERT( read_mode != db_read_mode::IRREVERSIBLE, block_validate_exception, "invariant failure: cannot replay reversible blocks while in irreversible mode" ); - maybe_switch_forks( bsp, s, forked_trxs_callback() ); + maybe_switch_forks( bsp, s, forked_branch_callback() ); } } FC_LOG_AND_RETHROW( ) } - void maybe_switch_forks( const block_state_ptr& new_head, controller::block_status s, const forked_trxs_callback& forked_trxs_cb ) { + void maybe_switch_forks( const block_state_ptr& new_head, controller::block_status s, const forked_branch_callback& forked_branch_cb ) { bool head_changed = true; if( new_head->header.previous == head->id ) { try { @@ -1918,14 +1918,13 @@ struct controller_impl { auto branches = fork_db.fetch_branch_from( new_head->id, head->id ); if( branches.second.size() > 0 ) { - // forked branches are in reverse order of how they were applied, signal callback in the order applied - for( auto ritr = branches.second.rbegin(), rend = branches.second.rend(); ritr != rend; ++ritr ) { - const block_state_ptr& bsptr = *ritr; + for( auto itr = branches.second.begin(); itr != branches.second.end(); ++itr ) { pop_block(); - forked_trxs_cb( bsptr->trxs ); } EOS_ASSERT( self.head_block_id() == branches.second.back()->header.previous, fork_database_exception, "loss of sync between fork_db and chainbase during fork switch" ); // _should_ never fail + + if( forked_branch_cb ) forked_branch_cb( branches.second ); } for( auto ritr = branches.first.rbegin(); ritr != branches.first.rend(); ++ritr ) { @@ -2499,10 +2498,10 @@ std::future controller::create_block_state_future( const signed return my->create_block_state_future( b ); } -void controller::push_block( std::future& block_state_future, const forked_trxs_callback& forked_trxs_cb ) { +void controller::push_block( std::future& block_state_future, const forked_branch_callback& forked_branch_cb ) { validate_db_available_size(); validate_reversible_available_size(); - my->push_block( block_state_future, forked_trxs_cb ); + my->push_block( block_state_future, forked_branch_cb ); } transaction_trace_ptr controller::push_transaction( const transaction_metadata_ptr& trx, fc::time_point deadline, uint32_t billed_cpu_time_us ) { diff --git a/libraries/chain/include/eosio/chain/controller.hpp b/libraries/chain/include/eosio/chain/controller.hpp index f3529d71a59..c15e9684998 100644 --- a/libraries/chain/include/eosio/chain/controller.hpp +++ b/libraries/chain/include/eosio/chain/controller.hpp @@ -36,7 +36,7 @@ namespace eosio { namespace chain { class account_object; using resource_limits::resource_limits_manager; using apply_handler = std::function; - using forked_trxs_callback = std::function&)>; + using forked_branch_callback = std::function; class fork_database; @@ -154,7 +154,7 @@ namespace eosio { namespace chain { * @param block_state_future provide from call to create_block_state_future * @param cb calls cb with forked applied transactions for each forked block */ - void push_block( std::future& block_state_future, const forked_trxs_callback& cb ); + void push_block( std::future& block_state_future, const forked_branch_callback& cb ); boost::asio::io_context& get_thread_pool(); diff --git a/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp b/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp index a6e785fab67..51689d0353d 100644 --- a/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp +++ b/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp @@ -127,11 +127,16 @@ class unapplied_transaction_queue { } } - void add_forked( const std::vector& forked_trxs ) { + void add_forked( const branch_type& forked_branch ) { if( only_track_persisted || read_mode != db_read_mode::SPECULATIVE ) return; - for( const auto& trx : forked_trxs ) { - fc::time_point expiry = trx->packed_trx()->expiration(); - queue.insert( { trx, expiry, trx_enum_type::forked } ); + // forked_branch is in reverse order + for( auto ritr = forked_branch.rbegin(), rend = forked_branch.rend(); ritr != rend; ++ritr ) { + const block_state_ptr& bsptr = *ritr; + for( auto itr = bsptr->trxs.begin(), end = bsptr->trxs.end(); itr != end; ++itr ) { + const auto& trx = *itr; + fc::time_point expiry = trx->packed_trx()->expiration(); + queue.insert( { trx, expiry, trx_enum_type::forked } ); + } } } diff --git a/libraries/testing/include/eosio/testing/tester.hpp b/libraries/testing/include/eosio/testing/tester.hpp index e2556443928..c6331c2bba8 100644 --- a/libraries/testing/include/eosio/testing/tester.hpp +++ b/libraries/testing/include/eosio/testing/tester.hpp @@ -426,7 +426,7 @@ namespace eosio { namespace testing { signed_block_ptr produce_block( fc::microseconds skip_time = fc::milliseconds(config::block_interval_ms) )override { auto sb = _produce_block(skip_time, false); auto bsf = validating_node->create_block_state_future( sb ); - validating_node->push_block( bsf, forked_trxs_callback() ); + validating_node->push_block( bsf, forked_branch_callback() ); return sb; } @@ -437,14 +437,14 @@ namespace eosio { namespace testing { void validate_push_block(const signed_block_ptr& sb) { auto bs = validating_node->create_block_state_future( sb ); - validating_node->push_block( bs, forked_trxs_callback() ); + validating_node->push_block( bs, forked_branch_callback() ); } signed_block_ptr produce_empty_block( fc::microseconds skip_time = fc::milliseconds(config::block_interval_ms) )override { unapplied_transactions.add_aborted( control->abort_block() ); auto sb = _produce_block(skip_time, true); auto bsf = validating_node->create_block_state_future( sb ); - validating_node->push_block( bsf, forked_trxs_callback() ); + validating_node->push_block( bsf, forked_branch_callback() ); return sb; } diff --git a/libraries/testing/tester.cpp b/libraries/testing/tester.cpp index 09144b23edf..4b942fbc125 100644 --- a/libraries/testing/tester.cpp +++ b/libraries/testing/tester.cpp @@ -226,8 +226,8 @@ namespace eosio { namespace testing { void base_tester::push_block(signed_block_ptr b) { auto bsf = control->create_block_state_future(b); unapplied_transactions.add_aborted( control->abort_block() ); - control->push_block( bsf, [this]( const std::vector& forked_trxs ) { - unapplied_transactions.add_forked( forked_trxs ); + control->push_block( bsf, [this]( const branch_type& forked_branch ) { + unapplied_transactions.add_forked( forked_branch ); } ); auto itr = last_produced_block.find(b->producer); @@ -931,7 +931,7 @@ namespace eosio { namespace testing { if( block ) { //&& !b.control->is_known_block(block->id()) ) { auto bsf = b.control->create_block_state_future( block ); b.control->abort_block(); - b.control->push_block(bsf, forked_trxs_callback()); //, eosio::chain::validation_steps::created_block); + b.control->push_block(bsf, forked_branch_callback()); //, eosio::chain::validation_steps::created_block); } } }; diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 83718be47d3..3051a0945c8 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -365,8 +365,8 @@ class producer_plugin_impl : public std::enable_shared_from_this& forked_trxs ) { - _unapplied_transactions.add_forked( forked_trxs ); + chain.push_block( bsf, [this]( const branch_type& forked_branch ) { + _unapplied_transactions.add_forked( forked_branch ); } ); } catch ( const guard_exception& e ) { chain_plug->handle_guard_exception(e); diff --git a/unittests/block_tests.cpp b/unittests/block_tests.cpp index def5657b3a4..873b5d2333f 100644 --- a/unittests/block_tests.cpp +++ b/unittests/block_tests.cpp @@ -43,7 +43,7 @@ BOOST_AUTO_TEST_CASE(block_with_invalid_tx_test) tester validator; auto bs = validator.control->create_block_state_future( copy_b ); validator.control->abort_block(); - BOOST_REQUIRE_EXCEPTION(validator.control->push_block( bs, forked_trxs_callback() ), fc::exception , + BOOST_REQUIRE_EXCEPTION(validator.control->push_block( bs, forked_branch_callback() ), fc::exception , [] (const fc::exception &e)->bool { return e.code() == account_name_exists_exception::code_value ; }) ; diff --git a/unittests/forked_tests.cpp b/unittests/forked_tests.cpp index 10564cf4876..f25b7854295 100644 --- a/unittests/forked_tests.cpp +++ b/unittests/forked_tests.cpp @@ -278,7 +278,7 @@ BOOST_AUTO_TEST_CASE( forking ) try { bad_block.transaction_mroot = bad_block.previous; auto bad_block_bs = c.control->create_block_state_future( std::make_shared(std::move(bad_block)) ); c.control->abort_block(); - BOOST_REQUIRE_EXCEPTION(c.control->push_block( bad_block_bs, forked_trxs_callback() ), fc::exception, + BOOST_REQUIRE_EXCEPTION(c.control->push_block( bad_block_bs, forked_branch_callback() ), fc::exception, [] (const fc::exception &ex)->bool { return ex.to_detail_string().find("block not signed by expected key") != std::string::npos; }); diff --git a/unittests/unapplied_transaction_queue_tests.cpp b/unittests/unapplied_transaction_queue_tests.cpp index d30861f90fb..fc02680c2f5 100644 --- a/unittests/unapplied_transaction_queue_tests.cpp +++ b/unittests/unapplied_transaction_queue_tests.cpp @@ -128,10 +128,7 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try { bs2->trxs = { trx3, trx4, trx5 }; auto bs3 = std::make_shared(); bs3->trxs = { trx6 }; - q.add_forked( bs1->trxs ); - q.add_forked( bs1->trxs ); // bs1 duplicate ignored - q.add_forked( bs2->trxs ); - q.add_forked( bs3->trxs ); + q.add_forked( { bs3, bs2, bs1, bs1 } ); // bs1 duplicate ignored BOOST_CHECK( q.size() == 6 ); BOOST_REQUIRE( next( q ) == trx1 ); BOOST_CHECK( q.size() == 5 ); @@ -151,10 +148,9 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try { // fifo forked auto bs4 = std::make_shared(); bs4->trxs = { trx7 }; - q.add_forked( bs1->trxs ); - q.add_forked( bs2->trxs ); - q.add_forked( bs3->trxs ); - q.add_forked( bs4->trxs ); + q.add_forked( { bs1 } ); + q.add_forked( { bs3, bs2 } ); + q.add_forked( { bs4 } ); BOOST_CHECK( q.size() == 7 ); BOOST_REQUIRE( next( q ) == trx1 ); BOOST_CHECK( q.size() == 6 ); @@ -188,14 +184,10 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try { auto bs6 = std::make_shared(); bs5->trxs = { trx11, trx12, trx13 }; bs6->trxs = { trx11, trx15 }; - q.add_forked( bs1->trxs ); - q.add_forked( bs2->trxs ); - q.add_forked( bs3->trxs ); - q.add_forked( bs4->trxs ); - q.add_forked( bs3->trxs ); // dups not added - q.add_forked( bs2->trxs ); // dups not added - q.add_forked( bs5->trxs ); - q.add_forked( bs6->trxs ); + q.add_forked( { bs3, bs2, bs1 } ); + q.add_forked( { bs4 } ); + q.add_forked( { bs3, bs2 } ); // dups ignored + q.add_forked( { bs6, bs5 } ); BOOST_CHECK_EQUAL( q.size(), 11 ); BOOST_REQUIRE( next( q ) == trx1 ); BOOST_CHECK( q.size() == 10 ); @@ -223,16 +215,12 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try { BOOST_CHECK( q.empty() ); // altogether, order fifo: persisted, forked, aborted - q.add_forked( bs1->trxs ); - q.add_forked( bs2->trxs ); - q.add_forked( bs3->trxs ); + q.add_forked( { bs3, bs2, bs1 } ); q.add_persisted( trx16 ); q.add_aborted( { trx9, trx14 } ); q.add_persisted( trx8 ); q.add_aborted( { trx18, trx19 } ); - q.add_forked( bs4->trxs); - q.add_forked( bs5->trxs ); - q.add_forked( bs6->trxs ); + q.add_forked( { bs6, bs5, bs4 } ); BOOST_CHECK( q.size() == 17 ); BOOST_REQUIRE( next( q ) == trx16 ); BOOST_CHECK( q.size() == 16 ); @@ -271,9 +259,7 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try { BOOST_REQUIRE( next( q ) == nullptr ); BOOST_CHECK( q.empty() ); - q.add_forked( bs1->trxs ); - q.add_forked( bs2->trxs ); - q.add_forked( bs3->trxs ); + q.add_forked( { bs3, bs2, bs1 } ); q.add_aborted( { trx9, trx11 } ); q.add_persisted( trx8 ); q.clear(); From 36a6fd32a0622a43d41bd2a4dea17c7cc5cd3a26 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 26 Jul 2019 16:26:51 -0500 Subject: [PATCH 14/18] Use an enum for processing mode of unapplied_transaction_queue. Set mode from producer_plugin. --- .../chain/unapplied_transaction_queue.hpp | 32 +++++++++---------- plugins/producer_plugin/producer_plugin.cpp | 8 +++-- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp b/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp index 51689d0353d..b3abd024954 100644 --- a/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp +++ b/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp @@ -7,7 +7,6 @@ #include #include -#include #include #include @@ -50,7 +49,14 @@ struct unapplied_transaction { * Persisted are first so that they can be applied in each block until expired. */ class unapplied_transaction_queue { +public: + enum class process_mode { + non_speculative, // HEAD, READ_ONLY, IRREVERSIBLE + speculative_non_producer, // will never produce + speculative_producer // can produce + }; +private: struct by_trx_id; struct by_type; struct by_expiry; @@ -66,23 +72,15 @@ class unapplied_transaction_queue { > unapplied_trx_queue_type; unapplied_trx_queue_type queue; - db_read_mode read_mode = db_read_mode::SPECULATIVE; - bool only_track_persisted = false; + process_mode mode = process_mode::speculative_producer; public: - void set_only_track_persisted( bool v ) { - if( v ) { - FC_ASSERT( empty(), "set_only_track_persisted, queue required to be empty" ); - } - only_track_persisted = v; - } - - void set_chain_read_mode( db_read_mode mode ) { - if( mode != read_mode ) { - FC_ASSERT( empty(), "set_chain_read_mode, queue required to be empty" ); + void set_mode( process_mode new_mode ) { + if( new_mode != mode ) { + FC_ASSERT( empty(), "set_mode, queue required to be empty" ); } - read_mode = mode; + mode = new_mode; } bool empty() const { @@ -128,7 +126,7 @@ class unapplied_transaction_queue { } void add_forked( const branch_type& forked_branch ) { - if( only_track_persisted || read_mode != db_read_mode::SPECULATIVE ) return; + if( mode == process_mode::non_speculative || mode == process_mode::speculative_non_producer ) return; // forked_branch is in reverse order for( auto ritr = forked_branch.rbegin(), rend = forked_branch.rend(); ritr != rend; ++ritr ) { const block_state_ptr& bsptr = *ritr; @@ -141,7 +139,7 @@ class unapplied_transaction_queue { } void add_aborted( std::vector aborted_trxs ) { - if( only_track_persisted || read_mode != db_read_mode::SPECULATIVE || aborted_trxs.empty() ) return; + if( mode == process_mode::non_speculative || mode == process_mode::speculative_non_producer ) return; for( auto& trx : aborted_trxs ) { fc::time_point expiry = trx->packed_trx()->expiration(); queue.insert( { std::move( trx ), expiry, trx_enum_type::aborted } ); @@ -149,7 +147,7 @@ class unapplied_transaction_queue { } void add_persisted( const transaction_metadata_ptr& trx ) { - if( read_mode != db_read_mode::SPECULATIVE ) return; + if( mode == process_mode::non_speculative ) return; auto itr = queue.get().find( trx->id() ); if( itr == queue.get().end() ) { fc::time_point expiry = trx->packed_trx()->expiration(); diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 3051a0945c8..972ccbb1f9f 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -724,8 +724,6 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_ my->_options = &options; LOAD_VALUE_SET(options, "producer-name", my->_producers) - my->_unapplied_transactions.set_only_track_persisted( my->_producers.empty() ); - if( options.count("private-key") ) { const std::vector key_id_to_wif_pair_strings = options["private-key"].as>(); @@ -849,13 +847,17 @@ void producer_plugin::plugin_startup() ilog("producer plugin: plugin_startup() begin"); chain::controller& chain = my->chain_plug->chain(); - my->_unapplied_transactions.set_chain_read_mode( chain.get_read_mode() ); EOS_ASSERT( my->_producers.empty() || chain.get_read_mode() == chain::db_read_mode::SPECULATIVE, plugin_config_exception, "node cannot have any producer-name configured because block production is impossible when read_mode is not \"speculative\"" ); EOS_ASSERT( my->_producers.empty() || chain.get_validation_mode() == chain::validation_mode::FULL, plugin_config_exception, "node cannot have any producer-name configured because block production is not safe when validation_mode is not \"full\"" ); + unapplied_transaction_queue::process_mode unapplied_mode = + (chain.get_read_mode() != chain::db_read_mode::SPECULATIVE) ? unapplied_transaction_queue::process_mode::non_speculative : + my->_producers.empty() ? unapplied_transaction_queue::process_mode::speculative_non_producer : + unapplied_transaction_queue::process_mode::speculative_producer; + my->_unapplied_transactions.set_mode( unapplied_mode ); my->_accepted_block_connection.emplace(chain.accepted_block.connect( [this]( const auto& bsp ){ my->on_block( bsp ); } )); my->_irreversible_block_connection.emplace(chain.irreversible_block.connect( [this]( const auto& bsp ){ my->on_irreversible_block( bsp->block ); } )); From 10e725e737a5513d84ce68f0a08341df4c4b3917 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 26 Jul 2019 16:31:03 -0500 Subject: [PATCH 15/18] Remove unneeded include --- plugins/producer_plugin/producer_plugin.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 972ccbb1f9f..e7083958c3f 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -21,7 +21,6 @@ #include #include -#include #include #include #include From a58363cfa805ee8f15d61d61ac148435b2d2e3be Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 26 Jul 2019 22:26:58 -0500 Subject: [PATCH 16/18] Move unapplied_transaction_queue::set_mode to plugin_initialize --- plugins/producer_plugin/producer_plugin.cpp | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index e7083958c3f..c5e315a03cb 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -723,6 +723,13 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_ my->_options = &options; LOAD_VALUE_SET(options, "producer-name", my->_producers) + chain::controller& chain = my->chain_plug->chain(); + unapplied_transaction_queue::process_mode unapplied_mode = + (chain.get_read_mode() != chain::db_read_mode::SPECULATIVE) ? unapplied_transaction_queue::process_mode::non_speculative : + my->_producers.empty() ? unapplied_transaction_queue::process_mode::speculative_non_producer : + unapplied_transaction_queue::process_mode::speculative_producer; + my->_unapplied_transactions.set_mode( unapplied_mode ); + if( options.count("private-key") ) { const std::vector key_id_to_wif_pair_strings = options["private-key"].as>(); @@ -852,12 +859,6 @@ void producer_plugin::plugin_startup() EOS_ASSERT( my->_producers.empty() || chain.get_validation_mode() == chain::validation_mode::FULL, plugin_config_exception, "node cannot have any producer-name configured because block production is not safe when validation_mode is not \"full\"" ); - unapplied_transaction_queue::process_mode unapplied_mode = - (chain.get_read_mode() != chain::db_read_mode::SPECULATIVE) ? unapplied_transaction_queue::process_mode::non_speculative : - my->_producers.empty() ? unapplied_transaction_queue::process_mode::speculative_non_producer : - unapplied_transaction_queue::process_mode::speculative_producer; - my->_unapplied_transactions.set_mode( unapplied_mode ); - my->_accepted_block_connection.emplace(chain.accepted_block.connect( [this]( const auto& bsp ){ my->on_block( bsp ); } )); my->_irreversible_block_connection.emplace(chain.irreversible_block.connect( [this]( const auto& bsp ){ my->on_irreversible_block( bsp->block ); } )); From 9723af53fe159df8d2ae1c6814cca1e6b85fe3a8 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 26 Jul 2019 22:34:23 -0500 Subject: [PATCH 17/18] add to front of incoming_transaction_queue when block time/net exhausted so it will be the first one next time around --- plugins/producer_plugin/producer_plugin.cpp | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index c5e315a03cb..1ea43305bec 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -406,13 +406,22 @@ class producer_plugin_impl : public std::enable_shared_from_thispacked_trx()->get_unprunable_size() + trx->packed_trx()->get_prunable_size() + sizeof( *trx ); } + void add_size( const transaction_metadata_ptr& trx ) { + auto size = calc_size( trx ); + EOS_ASSERT( size_in_bytes + size < max_incoming_transaction_queue_size, tx_resource_exhaustion, "Transaction exceeded producer resource limit" ); + size_in_bytes += size; + } + public: void set_max_incoming_transaction_queue_size( uint64_t v ) { max_incoming_transaction_queue_size = v; } void add( const transaction_metadata_ptr& trx, bool persist_until_expired, next_function next ) { - auto size = calc_size( trx ); - EOS_ASSERT( size_in_bytes + size < max_incoming_transaction_queue_size, tx_resource_exhaustion, "Transaction exceeded producer resource limit" ); - size_in_bytes += size; + add_size( trx ); + _incoming_transactions.emplace_back( trx, persist_until_expired, std::move( next ) ); + } + + void add_front( const transaction_metadata_ptr& trx, bool persist_until_expired, next_function next ) { + add_size( trx ); _incoming_transactions.emplace_back( trx, persist_until_expired, std::move( next ) ); } @@ -514,7 +523,7 @@ class producer_plugin_impl : public std::enable_shared_from_thisexcept ) { if( failure_is_subjective( *trace->except, deadline_is_subjective )) { - _pending_incoming_transactions.add( trx, persist_until_expired, next ); + _pending_incoming_transactions.add_front( trx, persist_until_expired, next ); if( _pending_block_mode == pending_block_mode::producing ) { fc_dlog( _trx_trace_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} COULD NOT FIT, tx: ${txid} RETRYING ", ("block_num", chain.head_block_num() + 1) From 575dff68e7b9b622b9171033fb1cd3a222edbecf Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 29 Jul 2019 09:22:08 -0500 Subject: [PATCH 18/18] emplace_front since that is the whole point --- plugins/producer_plugin/producer_plugin.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 1ea43305bec..1b0486237e6 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -422,7 +422,7 @@ class producer_plugin_impl : public std::enable_shared_from_this next ) { add_size( trx ); - _incoming_transactions.emplace_back( trx, persist_until_expired, std::move( next ) ); + _incoming_transactions.emplace_front( trx, persist_until_expired, std::move( next ) ); } auto pop_front() {