diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index 30a5d846a1f..80d848ee8ca 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -696,7 +696,7 @@ struct controller_impl { pending_head = fork_db.pending_head() ) { wlog( "applying branch from fork database ending with block: ${id}", ("id", pending_head->id) ); - maybe_switch_forks( pending_head, controller::block_status::complete ); + maybe_switch_forks( pending_head, controller::block_status::complete, forked_branch_callback() ); } } @@ -1842,7 +1842,7 @@ struct controller_impl { } ); } - branch_type push_block( std::future& block_state_future ) { + void push_block( std::future& block_state_future, const forked_branch_callback& forked_branch_cb ) { controller::block_status s = controller::block_status::complete; EOS_ASSERT(!pending, block_validate_exception, "it is not valid to push a block when there is a pending block"); @@ -1864,10 +1864,9 @@ struct controller_impl { emit( self.accepted_block_header, bsp ); if( read_mode != db_read_mode::IRREVERSIBLE ) { - return maybe_switch_forks( fork_db.pending_head(), s ); + maybe_switch_forks( fork_db.pending_head(), s, forked_branch_cb ); } else { log_irreversible(); - return {}; } } FC_LOG_AND_RETHROW( ) @@ -1918,15 +1917,14 @@ struct controller_impl { } else { EOS_ASSERT( read_mode != db_read_mode::IRREVERSIBLE, block_validate_exception, "invariant failure: cannot replay reversible blocks while in irreversible mode" ); - maybe_switch_forks( bsp, s ); + maybe_switch_forks( bsp, s, forked_branch_callback() ); } } FC_LOG_AND_RETHROW( ) } - branch_type maybe_switch_forks( const block_state_ptr& new_head, controller::block_status s ) { + void maybe_switch_forks( const block_state_ptr& new_head, controller::block_status s, const forked_branch_callback& forked_branch_cb ) { bool head_changed = true; - branch_type unapplied_branch; if( new_head->header.previous == head->id ) { try { apply_block( new_head, s ); @@ -1948,6 +1946,8 @@ struct controller_impl { } EOS_ASSERT( self.head_block_id() == branches.second.back()->header.previous, fork_database_exception, "loss of sync between fork_db and chainbase during fork switch" ); // _should_ never fail + + if( forked_branch_cb ) forked_branch_cb( branches.second ); } for( auto ritr = branches.first.rbegin(); ritr != branches.first.rend(); ++ritr ) { @@ -1967,7 +1967,7 @@ struct controller_impl { // Remove the block that threw and all forks built off it. fork_db.remove( (*ritr)->id ); - // pop all blocks from the bad fork + // pop all blocks from the bad fork, discarding their transactions // ritr base is a forward itr to the last block successfully applied auto applied_itr = ritr.base(); for( auto itr = applied_itr; itr != branches.first.end(); ++itr ) { @@ -1985,8 +1985,6 @@ struct controller_impl { } // end if exception } /// end for each block in branch - unapplied_branch = std::move( branches.second ); - ilog("successfully switched fork to new head ${new_head_id}", ("new_head_id", new_head->id)); } else { head_changed = false; @@ -1995,15 +1993,12 @@ struct controller_impl { if( head_changed ) log_irreversible(); - return unapplied_branch; } /// push_block vector abort_block() { vector applied_trxs; if( pending ) { - if ( read_mode == db_read_mode::SPECULATIVE ) { - applied_trxs = pending->extract_trx_metas(); - } + applied_trxs = pending->extract_trx_metas(); pending.reset(); protocol_features.popped_blocks_to( head->block_num ); } @@ -2527,10 +2522,10 @@ std::future controller::create_block_state_future( const signed return my->create_block_state_future( b ); } -branch_type controller::push_block( std::future& block_state_future ) { +void controller::push_block( std::future& block_state_future, const forked_branch_callback& forked_branch_cb ) { validate_db_available_size(); validate_reversible_available_size(); - return my->push_block( block_state_future ); + my->push_block( block_state_future, forked_branch_cb ); } transaction_trace_ptr controller::push_transaction( const transaction_metadata_ptr& trx, fc::time_point deadline, uint32_t billed_cpu_time_us ) { diff --git a/libraries/chain/include/eosio/chain/controller.hpp b/libraries/chain/include/eosio/chain/controller.hpp index f12fbdcb725..b57100fa2fa 100644 --- a/libraries/chain/include/eosio/chain/controller.hpp +++ b/libraries/chain/include/eosio/chain/controller.hpp @@ -36,6 +36,7 @@ namespace eosio { namespace chain { class account_object; using resource_limits::resource_limits_manager; using apply_handler = std::function; + using forked_branch_callback = std::function; class fork_database; @@ -151,9 +152,9 @@ namespace eosio { namespace chain { /** * @param block_state_future provide from call to create_block_state_future - * @return branch of unapplied blocks from fork switch + * @param cb calls cb with forked applied transactions for each forked block */ - branch_type push_block( std::future& block_state_future ); + void push_block( std::future& block_state_future, const forked_branch_callback& cb ); boost::asio::io_context& get_thread_pool(); diff --git a/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp b/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp index eb9275eab5e..b3abd024954 100644 --- a/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp +++ b/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp @@ -46,10 +46,17 @@ struct unapplied_transaction { /** * Track unapplied transactions for persisted, forked blocks, and aborted blocks. - * Persisted to first so that they can be applied in each block until expired. + * Persisted are first so that they can be applied in each block until expired. */ class unapplied_transaction_queue { +public: + enum class process_mode { + non_speculative, // HEAD, READ_ONLY, IRREVERSIBLE + speculative_non_producer, // will never produce + speculative_producer // can produce + }; +private: struct by_trx_id; struct by_type; struct by_expiry; @@ -65,15 +72,15 @@ class unapplied_transaction_queue { > unapplied_trx_queue_type; unapplied_trx_queue_type queue; - bool only_track_persisted = false; + process_mode mode = process_mode::speculative_producer; public: - void set_only_track_persisted( bool v ) { - if( v ) { - FC_ASSERT( empty(), "set_only_track_persisted queue required to be empty" ); + void set_mode( process_mode new_mode ) { + if( new_mode != mode ) { + FC_ASSERT( empty(), "set_mode, queue required to be empty" ); } - only_track_persisted = v; + mode = new_mode; } bool empty() const { @@ -111,8 +118,15 @@ class unapplied_transaction_queue { return true; } + void clear_applied( const std::vector& applied_trxs ) { + auto& idx = queue.get(); + for( const auto& trx : applied_trxs ) { + idx.erase( trx->id() ); + } + } + void add_forked( const branch_type& forked_branch ) { - if( only_track_persisted ) return; + if( mode == process_mode::non_speculative || mode == process_mode::speculative_non_producer ) return; // forked_branch is in reverse order for( auto ritr = forked_branch.rbegin(), rend = forked_branch.rend(); ritr != rend; ++ritr ) { const block_state_ptr& bsptr = *ritr; @@ -125,7 +139,7 @@ class unapplied_transaction_queue { } void add_aborted( std::vector aborted_trxs ) { - if( aborted_trxs.empty() || only_track_persisted ) return; + if( mode == process_mode::non_speculative || mode == process_mode::speculative_non_producer ) return; for( auto& trx : aborted_trxs ) { fc::time_point expiry = trx->packed_trx()->expiration(); queue.insert( { std::move( trx ), expiry, trx_enum_type::aborted } ); @@ -133,6 +147,7 @@ class unapplied_transaction_queue { } void add_persisted( const transaction_metadata_ptr& trx ) { + if( mode == process_mode::non_speculative ) return; auto itr = queue.get().find( trx->id() ); if( itr == queue.get().end() ) { fc::time_point expiry = trx->packed_trx()->expiration(); diff --git a/libraries/testing/include/eosio/testing/tester.hpp b/libraries/testing/include/eosio/testing/tester.hpp index 699100803e3..4aab7510774 100644 --- a/libraries/testing/include/eosio/testing/tester.hpp +++ b/libraries/testing/include/eosio/testing/tester.hpp @@ -433,8 +433,8 @@ namespace eosio { namespace testing { signed_block_ptr produce_block( fc::microseconds skip_time = fc::milliseconds(config::block_interval_ms) )override { auto sb = _produce_block(skip_time, false); - auto bs = validating_node->create_block_state_future( sb ); - validating_node->push_block( bs ); + auto bsf = validating_node->create_block_state_future( sb ); + validating_node->push_block( bsf, forked_branch_callback() ); return sb; } @@ -445,14 +445,14 @@ namespace eosio { namespace testing { void validate_push_block(const signed_block_ptr& sb) { auto bs = validating_node->create_block_state_future( sb ); - validating_node->push_block( bs ); + validating_node->push_block( bs, forked_branch_callback() ); } signed_block_ptr produce_empty_block( fc::microseconds skip_time = fc::milliseconds(config::block_interval_ms) )override { unapplied_transactions.add_aborted( control->abort_block() ); auto sb = _produce_block(skip_time, true); - auto bs = validating_node->create_block_state_future( sb ); - validating_node->push_block( bs ); + auto bsf = validating_node->create_block_state_future( sb ); + validating_node->push_block( bsf, forked_branch_callback() ); return sb; } diff --git a/libraries/testing/tester.cpp b/libraries/testing/tester.cpp index ab6e39dd3f1..e7e8ee6af57 100644 --- a/libraries/testing/tester.cpp +++ b/libraries/testing/tester.cpp @@ -252,10 +252,11 @@ namespace eosio { namespace testing { } void base_tester::push_block(signed_block_ptr b) { - auto bs = control->create_block_state_future(b); - vector aborted_trxs = control->abort_block(); - unapplied_transactions.add_forked( control->push_block( bs ) ); - unapplied_transactions.add_aborted( std::move( aborted_trxs ) ); + auto bsf = control->create_block_state_future(b); + unapplied_transactions.add_aborted( control->abort_block() ); + control->push_block( bsf, [this]( const branch_type& forked_branch ) { + unapplied_transactions.add_forked( forked_branch ); + } ); auto itr = last_produced_block.find(b->producer); if (itr == last_produced_block.end() || block_header::num_from_id(b->id()) > block_header::num_from_id(itr->second)) { @@ -963,9 +964,9 @@ namespace eosio { namespace testing { auto block = a.control->fetch_block_by_number(i); if( block ) { //&& !b.control->is_known_block(block->id()) ) { - auto bs = b.control->create_block_state_future( block ); + auto bsf = b.control->create_block_state_future( block ); b.control->abort_block(); - b.control->push_block(bs); //, eosio::chain::validation_steps::created_block); + b.control->push_block(bsf, forked_branch_callback()); //, eosio::chain::validation_steps::created_block); } } }; diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 22e0887d949..a528bf21223 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -229,6 +229,8 @@ class producer_plugin_impl : public std::enable_shared_from_thistrxs ); + if( bsp->header.timestamp <= _start_time ) return; // simplify handling of watermark in on_block @@ -318,18 +320,19 @@ class producer_plugin_impl : public std::enable_shared_from_this aborted_trxs = chain.abort_block(); + _unapplied_transactions.add_aborted( chain.abort_block() ); // exceptions throw out, make sure we restart our loop - auto ensure = fc::make_scoped_exit([this, &aborted_trxs](){ - _unapplied_transactions.add_aborted( std::move( aborted_trxs ) ); + auto ensure = fc::make_scoped_exit([this](){ schedule_production_loop(); }); // push the new block bool except = false; try { - _unapplied_transactions.add_forked( chain.push_block( bsf ) ); + chain.push_block( bsf, [this]( const branch_type& forked_branch ) { + _unapplied_transactions.add_forked( forked_branch ); + } ); } catch ( const guard_exception& e ) { chain_plug->handle_guard_exception(e); return; @@ -369,16 +372,25 @@ class producer_plugin_impl : public std::enable_shared_from_thispacked_trx()->get_unprunable_size() + trx->packed_trx()->get_prunable_size() + sizeof( *trx ); } + void add_size( const transaction_metadata_ptr& trx ) { + auto size = calc_size( trx ); + EOS_ASSERT( size_in_bytes + size < max_incoming_transaction_queue_size, tx_resource_exhaustion, "Transaction exceeded producer resource limit" ); + size_in_bytes += size; + } + public: void set_max_incoming_transaction_queue_size( uint64_t v ) { max_incoming_transaction_queue_size = v; } void add( const transaction_metadata_ptr& trx, bool persist_until_expired, next_function next ) { - auto size = calc_size( trx ); - EOS_ASSERT( size_in_bytes + size < max_incoming_transaction_queue_size, tx_resource_exhaustion, "Transaction exceeded producer resource limit" ); - size_in_bytes += size; + add_size( trx ); _incoming_transactions.emplace_back( trx, persist_until_expired, std::move( next ) ); } + void add_front( const transaction_metadata_ptr& trx, bool persist_until_expired, next_function next ) { + add_size( trx ); + _incoming_transactions.emplace_front( trx, persist_until_expired, std::move( next ) ); + } + auto pop_front() { EOS_ASSERT( !_incoming_transactions.empty(), producer_exception, "logic error, front() called on empty incoming_transactions" ); auto intrx = _incoming_transactions.front(); @@ -477,7 +489,7 @@ class producer_plugin_impl : public std::enable_shared_from_thisexcept ) { if( failure_is_subjective( *trace->except, deadline_is_subjective )) { - _pending_incoming_transactions.add( trx, persist_until_expired, next ); + _pending_incoming_transactions.add_front( trx, persist_until_expired, next ); if( _pending_block_mode == pending_block_mode::producing ) { fc_dlog( _trx_trace_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} COULD NOT FIT, tx: ${txid} RETRYING ", ("block_num", chain.head_block_num() + 1) @@ -686,7 +698,12 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_ my->_options = &options; LOAD_VALUE_SET(options, "producer-name", my->_producers) - my->_unapplied_transactions.set_only_track_persisted( my->_producers.empty() ); + chain::controller& chain = my->chain_plug->chain(); + unapplied_transaction_queue::process_mode unapplied_mode = + (chain.get_read_mode() != chain::db_read_mode::SPECULATIVE) ? unapplied_transaction_queue::process_mode::non_speculative : + my->_producers.empty() ? unapplied_transaction_queue::process_mode::speculative_non_producer : + unapplied_transaction_queue::process_mode::speculative_producer; + my->_unapplied_transactions.set_mode( unapplied_mode ); if( options.count("private-key") ) { @@ -817,7 +834,6 @@ void producer_plugin::plugin_startup() EOS_ASSERT( my->_producers.empty() || chain.get_validation_mode() == chain::validation_mode::FULL, plugin_config_exception, "node cannot have any producer-name configured because block production is not safe when validation_mode is not \"full\"" ); - my->_accepted_block_connection.emplace(chain.accepted_block.connect( [this]( const auto& bsp ){ my->on_block( bsp ); } )); my->_irreversible_block_connection.emplace(chain.irreversible_block.connect( [this]( const auto& bsp ){ my->on_irreversible_block( bsp->block ); } )); diff --git a/unittests/block_tests.cpp b/unittests/block_tests.cpp index 8b8fe2c11cf..873b5d2333f 100644 --- a/unittests/block_tests.cpp +++ b/unittests/block_tests.cpp @@ -43,7 +43,7 @@ BOOST_AUTO_TEST_CASE(block_with_invalid_tx_test) tester validator; auto bs = validator.control->create_block_state_future( copy_b ); validator.control->abort_block(); - BOOST_REQUIRE_EXCEPTION(validator.control->push_block( bs ), fc::exception , + BOOST_REQUIRE_EXCEPTION(validator.control->push_block( bs, forked_branch_callback() ), fc::exception , [] (const fc::exception &e)->bool { return e.code() == account_name_exists_exception::code_value ; }) ; diff --git a/unittests/forked_tests.cpp b/unittests/forked_tests.cpp index ece1fb09329..3bcc72a42a4 100644 --- a/unittests/forked_tests.cpp +++ b/unittests/forked_tests.cpp @@ -272,7 +272,7 @@ BOOST_AUTO_TEST_CASE( forking ) try { bad_block.transaction_mroot = bad_block.previous; auto bad_block_bs = c.control->create_block_state_future( std::make_shared(std::move(bad_block)) ); c.control->abort_block(); - BOOST_REQUIRE_EXCEPTION(c.control->push_block( bad_block_bs ), fc::exception, + BOOST_REQUIRE_EXCEPTION(c.control->push_block( bad_block_bs, forked_branch_callback() ), fc::exception, [] (const fc::exception &ex)->bool { return ex.to_detail_string().find("block signed by unexpected key") != std::string::npos; }); diff --git a/unittests/unapplied_transaction_queue_tests.cpp b/unittests/unapplied_transaction_queue_tests.cpp index c1d530c412f..fc02680c2f5 100644 --- a/unittests/unapplied_transaction_queue_tests.cpp +++ b/unittests/unapplied_transaction_queue_tests.cpp @@ -78,6 +78,7 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try { // fifo aborted q.add_aborted( { trx1, trx2, trx3 } ); + q.add_aborted( { trx1, trx2, trx3 } ); // duplicates ignored BOOST_CHECK( q.size() == 3 ); BOOST_REQUIRE( next( q ) == trx1 ); BOOST_CHECK( q.size() == 2 ); @@ -88,6 +89,15 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try { BOOST_REQUIRE( next( q ) == nullptr ); BOOST_CHECK( q.empty() ); + // clear applied + q.add_aborted( { trx1, trx2, trx3 } ); + q.clear_applied( { trx1, trx3, trx4 } ); + BOOST_CHECK( q.size() == 1 ); + BOOST_REQUIRE( next( q ) == trx2 ); + BOOST_CHECK( q.size() == 0 ); + BOOST_REQUIRE( next( q ) == nullptr ); + BOOST_CHECK( q.empty() ); + // order: persisted, aborted q.add_persisted( trx6 ); q.add_aborted( { trx1, trx2, trx3 } ); @@ -118,7 +128,7 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try { bs2->trxs = { trx3, trx4, trx5 }; auto bs3 = std::make_shared(); bs3->trxs = { trx6 }; - q.add_forked( { bs3, bs2, bs1 } ); + q.add_forked( { bs3, bs2, bs1, bs1 } ); // bs1 duplicate ignored BOOST_CHECK( q.size() == 6 ); BOOST_REQUIRE( next( q ) == trx1 ); BOOST_CHECK( q.size() == 5 ); @@ -176,7 +186,7 @@ BOOST_AUTO_TEST_CASE( unapplied_transaction_queue_test ) try { bs6->trxs = { trx11, trx15 }; q.add_forked( { bs3, bs2, bs1 } ); q.add_forked( { bs4 } ); - q.add_forked( { bs3, bs2 } ); // dups removed + q.add_forked( { bs3, bs2 } ); // dups ignored q.add_forked( { bs6, bs5 } ); BOOST_CHECK_EQUAL( q.size(), 11 ); BOOST_REQUIRE( next( q ) == trx1 );