Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Move unapplied_transaction_queue to controller
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Aug 26, 2019
1 parent f39f3e2 commit e3a240e
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 22 deletions.
7 changes: 7 additions & 0 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <eosio/chain/chain_snapshot.hpp>
#include <eosio/chain/thread_utils.hpp>
#include <eosio/chain/platform_timer.hpp>
#include <eosio/chain/unapplied_transaction_queue.hpp>

#include <chainbase/chainbase.hpp>
#include <fc/io/json.hpp>
Expand Down Expand Up @@ -217,6 +218,7 @@ struct controller_impl {
chainbase::database db;
chainbase::database reversible_blocks; ///< a special database to persist blocks that have successfully been applied but are still reversible
block_log blog;
unapplied_transaction_queue unapplied_transactions;
optional<pending_state> pending;
block_state_ptr head;
fork_database fork_db;
Expand Down Expand Up @@ -2554,6 +2556,11 @@ boost::asio::io_context& controller::get_thread_pool() {
return my->thread_pool.get_executor();
}

unapplied_transaction_queue& controller::unapplied_transaction_queue() {
return my->unapplied_transactions;
}


std::future<block_state_ptr> controller::create_block_state_future( const signed_block_ptr& b ) {
return my->create_block_state_future( b );
}
Expand Down
3 changes: 3 additions & 0 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace boost { namespace asio {
namespace eosio { namespace chain {

class authorization_manager;
class unapplied_transaction_queue;

namespace resource_limits {
class resource_limits_manager;
Expand Down Expand Up @@ -158,6 +159,8 @@ namespace eosio { namespace chain {

boost::asio::io_context& get_thread_pool();

unapplied_transaction_queue& unapplied_transaction_queue();

const chainbase::database& db()const;

const fork_database& fork_db()const;
Expand Down
43 changes: 21 additions & 22 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <algorithm>
#include <boost/algorithm/string.hpp>
#include <boost/range/adaptor/map.hpp>
#include <boost/function_output_iterator.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/hashed_index.hpp>
Expand Down Expand Up @@ -195,7 +194,6 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
boost::asio::deadline_timer _timer;
std::map<chain::account_name, uint32_t> _producer_watermarks;
pending_block_mode _pending_block_mode;
unapplied_transaction_queue _unapplied_transactions;
fc::optional<named_thread_pool> _thread_pool;

std::atomic<int32_t> _max_transaction_time_ms; // modified by app thread, read by net_plugin thread pool
Expand Down Expand Up @@ -264,7 +262,8 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
}

void on_block( const block_state_ptr& bsp ) {
_unapplied_transactions.clear_applied( bsp );
chain::controller& chain = chain_plug->chain();
chain.unapplied_transaction_queue().clear_applied( bsp );
}

void on_block_header( const block_state_ptr& bsp ) {
Expand Down Expand Up @@ -349,7 +348,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
auto bsf = chain.create_block_state_future( block );

// abort the pending block
_unapplied_transactions.add_aborted( chain.abort_block() );
chain.unapplied_transaction_queue().add_aborted( chain.abort_block() );

// exceptions throw out, make sure we restart our loop
auto ensure = fc::make_scoped_exit([this](){
Expand All @@ -359,8 +358,8 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
// push the new block
bool except = false;
try {
chain.push_block( bsf, [this]( const branch_type& forked_branch ) {
_unapplied_transactions.add_forked( forked_branch );
chain.push_block( bsf, [&chain]( const branch_type& forked_branch ) {
chain.unapplied_transaction_queue().add_forked( forked_branch );
} );
} catch ( const guard_exception& e ) {
chain_plugin::handle_guard_exception(e);
Expand Down Expand Up @@ -540,7 +539,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
if( persist_until_expired ) {
// if this trx didnt fail/soft-fail and the persist flag is set, store its ID so that we can
// ensure its applied to all future speculative blocks as well.
_unapplied_transactions.add_persisted( trx );
chain.unapplied_transaction_queue().add_persisted( trx );
}
send_response( trace );
}
Expand Down Expand Up @@ -738,7 +737,7 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_
(chain.get_read_mode() != chain::db_read_mode::SPECULATIVE) ? unapplied_transaction_queue::process_mode::non_speculative :
my->_producers.empty() ? unapplied_transaction_queue::process_mode::speculative_non_producer :
unapplied_transaction_queue::process_mode::speculative_producer;
my->_unapplied_transactions.set_mode( unapplied_mode );
chain.unapplied_transaction_queue().set_mode( unapplied_mode );

if( options.count("private-key") )
{
Expand Down Expand Up @@ -932,7 +931,7 @@ void producer_plugin::resume() {
//
if (my->_pending_block_mode == pending_block_mode::speculating) {
chain::controller& chain = my->chain_plug->chain();
my->_unapplied_transactions.add_aborted( chain.abort_block() );
chain.unapplied_transaction_queue().add_aborted( chain.abort_block() );
my->schedule_production_loop();
}
}
Expand Down Expand Up @@ -971,7 +970,7 @@ void producer_plugin::update_runtime_options(const runtime_options& options) {

if (check_speculating && my->_pending_block_mode == pending_block_mode::speculating) {
chain::controller& chain = my->chain_plug->chain();
my->_unapplied_transactions.add_aborted( chain.abort_block() );
chain.unapplied_transaction_queue().add_aborted( chain.abort_block() );
my->schedule_production_loop();
}

Expand Down Expand Up @@ -1047,7 +1046,7 @@ producer_plugin::integrity_hash_information producer_plugin::get_integrity_hash(

if (chain.is_building_block()) {
// abort the pending block
my->_unapplied_transactions.add_aborted( chain.abort_block() );
chain.unapplied_transaction_queue().add_aborted( chain.abort_block() );
} else {
reschedule.cancel();
}
Expand Down Expand Up @@ -1076,7 +1075,7 @@ void producer_plugin::create_snapshot(producer_plugin::next_function<producer_pl

if (chain.is_building_block()) {
// abort the pending block
my->_unapplied_transactions.add_aborted( chain.abort_block() );
chain.unapplied_transaction_queue().add_aborted( chain.abort_block() );
} else {
reschedule.cancel();
}
Expand Down Expand Up @@ -1413,7 +1412,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
blocks_to_confirm = (uint16_t)(std::min<uint32_t>(blocks_to_confirm, (uint32_t)(hbs->block_num - hbs->dpos_irreversible_blocknum)));
}

_unapplied_transactions.add_aborted( chain.abort_block() );
chain.unapplied_transaction_queue().add_aborted( chain.abort_block() );

auto features_to_activate = chain.get_preactivated_protocol_features();
if( _pending_block_mode == pending_block_mode::producing && _protocol_features_to_activate.size() > 0 ) {
Expand Down Expand Up @@ -1470,8 +1469,8 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
// remove all expired transactions
size_t num_expired_persistent = 0;
size_t num_expired_other = 0;
size_t orig_count = _unapplied_transactions.size();
exhausted = !_unapplied_transactions.clear_expired( pending_block_time, preprocess_deadline,
size_t orig_count = chain.unapplied_transaction_queue().size();
exhausted = !chain.unapplied_transaction_queue().clear_expired( pending_block_time, preprocess_deadline,
[&num_expired_persistent, &num_expired_other, pbm = _pending_block_mode,
&chain, has_producers = !_producers.empty()]( const transaction_id_type& txid, trx_enum_type trx_type ) {
if( trx_type == trx_enum_type::persisted ) {
Expand Down Expand Up @@ -1508,13 +1507,13 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
size_t orig_pending_txn_size = _pending_incoming_transactions.size();

// Processing unapplied transactions...
if( !_unapplied_transactions.empty() ) {
if( !chain.unapplied_transaction_queue().empty() ) {
int num_applied = 0, num_failed = 0, num_processed = 0;
auto unapplied_trxs_size = _unapplied_transactions.size();
auto unapplied_trxs_size = chain.unapplied_transaction_queue().size();
auto itr = (_pending_block_mode == pending_block_mode::producing) ?
_unapplied_transactions.begin() : _unapplied_transactions.persisted_begin();
chain.unapplied_transaction_queue().begin() : chain.unapplied_transaction_queue().persisted_begin();
auto end_itr = (_pending_block_mode == pending_block_mode::producing) ?
_unapplied_transactions.end() : _unapplied_transactions.persisted_end();
chain.unapplied_transaction_queue().end() : chain.unapplied_transaction_queue().persisted_end();
while( itr != end_itr ) {
if( preprocess_deadline <= fc::time_point::now() ) exhausted = true;
if( exhausted ) break;
Expand All @@ -1539,12 +1538,12 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
} else {
// this failed our configured maximum transaction time, we don't want to replay it
++num_failed;
itr = _unapplied_transactions.erase( itr );
itr = chain.unapplied_transaction_queue().erase( itr );
continue;
}
} else {
++num_applied;
itr = _unapplied_transactions.erase( itr );
itr = chain.unapplied_transaction_queue().erase( itr );
continue;
}
} LOG_AND_DROP();
Expand Down Expand Up @@ -1825,7 +1824,7 @@ bool producer_plugin_impl::maybe_produce_block() {

fc_dlog(_log, "Aborting block due to produce_block error");
chain::controller& chain = chain_plug->chain();
_unapplied_transactions.add_aborted( chain.abort_block() );
chain.unapplied_transaction_queue().add_aborted( chain.abort_block() );
return false;
}

Expand Down

0 comments on commit e3a240e

Please sign in to comment.