Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Priority queue #6577

Merged
merged 26 commits into from
Jan 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9720b41
Add start block debug logging
heifner Jan 4, 2019
2f076bb
Process incoming transactions as low priority
heifner Jan 4, 2019
87f7f32
Remove duplicate call to cancel_wait
heifner Jan 4, 2019
e77a00c
Handle http call as low priority
heifner Jan 4, 2019
8a94ebb
Use application post wrapper
heifner Jan 4, 2019
22a8676
Use application post for application thread
heifner Jan 4, 2019
4419cbd
Wrap timer call to low priority
heifner Jan 4, 2019
5775f0b
Actually run the do_read
heifner Jan 4, 2019
11b1aba
Wrap async_accept call as low priority
heifner Jan 4, 2019
10c3985
Wrap async_wait calls as low priority
heifner Jan 4, 2019
32116ab
Add priority to boost async calls via app priority queue
heifner Jan 7, 2019
d1a2bc4
Update to pass required debug info
heifner Jan 8, 2019
394720e
Add additional debug args
heifner Jan 8, 2019
6980263
Use priority-queue branch of appbase
heifner Jan 9, 2019
ce19bed
Remove verify of strand on app thread because using wrapped executer
heifner Jan 10, 2019
9627cd1
pack block only once per broadcast
heifner Jan 10, 2019
c652f9b
Update appbase
heifner Jan 10, 2019
60ed35a
Remove debug args
heifner Jan 10, 2019
3e25f6a
Change do_read schedule to medium to match net_plugin
heifner Jan 10, 2019
3a300b3
peer review changes
heifner Jan 10, 2019
8890bad
Peer review changes
heifner Jan 11, 2019
faaa0e3
enqueue_block only called during syncing so make low priority
heifner Jan 11, 2019
5ecadb8
Update appbase
heifner Jan 11, 2019
3305577
Use a variable to make priority more obvious
heifner Jan 11, 2019
be23da6
Add priority to channel publish. Remove unused accepted_confirmation …
heifner Jan 14, 2019
36d6db4
Add priority to publish calls
heifner Jan 14, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ namespace eosio { namespace chain {
signal<void(const block_state_ptr&)> irreversible_block;
signal<void(const transaction_metadata_ptr&)> accepted_transaction;
signal<void(const transaction_trace_ptr&)> applied_transaction;
signal<void(const header_confirmation&)> accepted_confirmation;
signal<void(const int&)> bad_alloc;

/*
Expand Down
31 changes: 13 additions & 18 deletions plugins/bnet_plugin/bnet_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ namespace eosio {
boost::asio::io_service& _ios;
unique_ptr<ws::stream<tcp::socket>> _ws;
boost::asio::strand< boost::asio::io_context::executor_type> _strand;
boost::asio::io_service& _app_ios;

methods::get_block_by_number::method_type& _get_block_by_number;

Expand Down Expand Up @@ -320,7 +319,6 @@ namespace eosio {
_ios(socket.get_io_service()),
_ws( new ws::stream<tcp::socket>(move(socket)) ),
_strand(_ws->get_executor() ),
_app_ios( app().get_io_service() ),
_get_block_by_number( app().get_method<methods::get_block_by_number>() )
{
_session_num = next_session_id();
Expand All @@ -339,7 +337,6 @@ namespace eosio {
_ios(ioc),
_ws( new ws::stream<tcp::socket>(ioc) ),
_strand( _ws->get_executor() ),
_app_ios( app().get_io_service() ),
_get_block_by_number( app().get_method<methods::get_block_by_number>() )
{
_session_num = next_session_id();
Expand Down Expand Up @@ -570,7 +567,7 @@ namespace eosio {
template<typename L>
void async_get_pending_block_ids( L&& callback ) {
/// send peer my head block status which is read from chain plugin
_app_ios.post( [self = shared_from_this(),callback]{
app().post(priority::low, [self = shared_from_this(),callback]{
auto& control = app().get_plugin<chain_plugin>().chain();
auto lib = control.last_irreversible_block_num();
auto head = control.fork_db_head_block_id();
Expand All @@ -595,7 +592,7 @@ namespace eosio {

template<typename L>
void async_get_block_num( uint32_t blocknum, L&& callback ) {
_app_ios.post( [self = shared_from_this(), blocknum, callback]{
app().post(priority::low, [self = shared_from_this(), blocknum, callback]{
auto& control = app().get_plugin<chain_plugin>().chain();
signed_block_ptr sblockptr;
try {
Expand Down Expand Up @@ -919,9 +916,9 @@ namespace eosio {
* the connection from being closed.
*/
void wait_on_app() {
app().get_io_service().post(
boost::asio::bind_executor( _strand, [self=shared_from_this()]{ self->do_read(); } )
);
app().post( priority::medium, [self = shared_from_this()]() {
app().get_io_service().post( boost::asio::bind_executor( self->_strand, [self] { self->do_read(); } ) );
} );
}

void on_message( const bnet_message& msg, fc::datastream<const char*>& ds ) {
Expand Down Expand Up @@ -1005,7 +1002,7 @@ namespace eosio {
auto id = b->id();
mark_block_status( id, true, true );

app().get_channel<incoming::channels::block>().publish(b);
app().get_channel<incoming::channels::block>().publish(priority::high, b);

mark_block_transactions_known_by_peer( b );
}
Expand Down Expand Up @@ -1154,23 +1151,22 @@ namespace eosio {
channels::accepted_transaction::channel_type::handle _on_appled_trx_handle;

void async_add_session( std::weak_ptr<session> wp ) {
app().get_io_service().post( [wp,this]{
app().post(priority::low, [wp,this]{
if( auto l = wp.lock() ) {
_sessions[l.get()] = wp;
}
});
}

void on_session_close( const session* s ) {
verify_strand_in_this_thread(app().get_io_service().get_executor(), __func__, __LINE__);
auto itr = _sessions.find(s);
if( _sessions.end() != itr )
_sessions.erase(itr);
}

template<typename Call>
void for_each_session( Call callback ) {
app().get_io_service().post([this, callback = callback] {
app().post(priority::low, [this, callback = callback] {
for (const auto& item : _sessions) {
if (auto ses = item.second.lock()) {
ses->_ios.post(boost::asio::bind_executor(
Expand Down Expand Up @@ -1226,7 +1222,6 @@ namespace eosio {
};

void on_reconnect_peers() {
verify_strand_in_this_thread(app().get_io_service().get_executor(), __func__, __LINE__);
for( const auto& peer : _connect_to_peers ) {
bool found = false;
for( const auto& con : _sessions ) {
Expand Down Expand Up @@ -1254,10 +1249,10 @@ namespace eosio {
/// add some random delay so that all my peers don't attempt to reconnect to me
/// at the same time after shutting down..
_timer->expires_from_now( boost::posix_time::microseconds( 1000000*(10+rand()%5) ) );
_timer->async_wait([=](const boost::system::error_code& ec) {
_timer->async_wait(app().get_priority_queue().wrap(priority::low, [=](const boost::system::error_code& ec) {
if( ec ) { return; }
on_reconnect_peers();
});
}));
}
};

Expand Down Expand Up @@ -1449,7 +1444,7 @@ namespace eosio {
session::~session() {
wlog( "close session ${n}",("n",_session_num) );
std::weak_ptr<bnet_plugin_impl> netp = _net_plugin;
_app_ios.post( [netp,ses=this]{
app().post(priority::low, [netp,ses=this]{
if( auto net = netp.lock() )
net->on_session_close(ses);
});
Expand All @@ -1476,7 +1471,7 @@ namespace eosio {
}

void session::check_for_redundant_connection() {
app().get_io_service().post( [self=shared_from_this()]{
app().post(priority::low, [self=shared_from_this()]{
self->_net_plugin->for_each_session( [self]( auto ses ){
if( ses != self && ses->_remote_peer_id == self->_remote_peer_id ) {
self->do_goodbye( "redundant connection" );
Expand Down Expand Up @@ -1558,6 +1553,6 @@ namespace eosio {

auto ptr = std::make_shared<transaction_metadata>(p);

app().get_channel<incoming::channels::transaction>().publish(ptr);
app().get_channel<incoming::channels::transaction>().publish(priority::low, ptr);
}
} /// namespace eosio
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ namespace eosio { namespace chain { namespace plugin_interface {
using irreversible_block = channel_decl<struct irreversible_block_tag, block_state_ptr>;
using accepted_transaction = channel_decl<struct accepted_transaction_tag, transaction_metadata_ptr>;
using applied_transaction = channel_decl<struct applied_transaction_tag, transaction_trace_ptr>;
using accepted_confirmation = channel_decl<struct accepted_confirmation_tag, header_confirmation>;

}

namespace methods {
Expand Down
22 changes: 6 additions & 16 deletions plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ class chain_plugin_impl {
,irreversible_block_channel(app().get_channel<channels::irreversible_block>())
,accepted_transaction_channel(app().get_channel<channels::accepted_transaction>())
,applied_transaction_channel(app().get_channel<channels::applied_transaction>())
,accepted_confirmation_channel(app().get_channel<channels::accepted_confirmation>())
,incoming_block_channel(app().get_channel<incoming::channels::block>())
,incoming_block_sync_method(app().get_method<incoming::methods::block_sync>())
,incoming_transaction_async_method(app().get_method<incoming::methods::transaction_async>())
Expand Down Expand Up @@ -174,7 +173,6 @@ class chain_plugin_impl {
channels::irreversible_block::channel_type& irreversible_block_channel;
channels::accepted_transaction::channel_type& accepted_transaction_channel;
channels::applied_transaction::channel_type& applied_transaction_channel;
channels::accepted_confirmation::channel_type& accepted_confirmation_channel;
incoming::channels::block::channel_type& incoming_block_channel;

// retained references to methods for easy calling
Expand All @@ -194,8 +192,6 @@ class chain_plugin_impl {
fc::optional<scoped_connection> irreversible_block_connection;
fc::optional<scoped_connection> accepted_transaction_connection;
fc::optional<scoped_connection> applied_transaction_connection;
fc::optional<scoped_connection> accepted_confirmation_connection;


};

Expand Down Expand Up @@ -672,35 +668,30 @@ void chain_plugin::plugin_initialize(const variables_map& options) {
);
}

my->pre_accepted_block_channel.publish(blk);
my->pre_accepted_block_channel.publish(priority::medium, blk);
});

my->accepted_block_header_connection = my->chain->accepted_block_header.connect(
[this]( const block_state_ptr& blk ) {
my->accepted_block_header_channel.publish( blk );
my->accepted_block_header_channel.publish( priority::medium, blk );
} );

my->accepted_block_connection = my->chain->accepted_block.connect( [this]( const block_state_ptr& blk ) {
my->accepted_block_channel.publish( blk );
my->accepted_block_channel.publish( priority::high, blk );
} );

my->irreversible_block_connection = my->chain->irreversible_block.connect( [this]( const block_state_ptr& blk ) {
my->irreversible_block_channel.publish( blk );
my->irreversible_block_channel.publish( priority::low, blk );
} );

my->accepted_transaction_connection = my->chain->accepted_transaction.connect(
[this]( const transaction_metadata_ptr& meta ) {
my->accepted_transaction_channel.publish( meta );
my->accepted_transaction_channel.publish( priority::low, meta );
} );

my->applied_transaction_connection = my->chain->applied_transaction.connect(
[this]( const transaction_trace_ptr& trace ) {
my->applied_transaction_channel.publish( trace );
} );

my->accepted_confirmation_connection = my->chain->accepted_confirmation.connect(
[this]( const header_confirmation& conf ) {
my->accepted_confirmation_channel.publish( conf );
my->applied_transaction_channel.publish( priority::low, trace );
} );

my->chain->add_indices();
Expand Down Expand Up @@ -744,7 +735,6 @@ void chain_plugin::plugin_shutdown() {
my->irreversible_block_connection.reset();
my->accepted_transaction_connection.reset();
my->applied_transaction_connection.reset();
my->accepted_confirmation_connection.reset();
my->chain->get_thread_pool().stop();
my->chain->get_thread_pool().join();
my->chain.reset();
Expand Down
16 changes: 11 additions & 5 deletions plugins/http_plugin/http_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,10 +286,16 @@ namespace eosio {
auto handler_itr = url_handlers.find( resource );
if( handler_itr != url_handlers.end()) {
con->defer_http_response();
handler_itr->second( resource, body, [con]( auto code, auto&& body ) {
con->set_body( std::move( body ));
con->set_status( websocketpp::http::status_code::value( code ));
con->send_http_response();
app().post( appbase::priority::low, [handler_itr, resource, body, con]() {
try {
handler_itr->second( resource, body, [con]( auto code, auto&& body ) {
con->set_body( std::move( body ) );
con->set_status( websocketpp::http::status_code::value( code ) );
con->send_http_response();
} );
} catch( ... ) {
handle_exception<T>( con );
}
} );

} else {
Expand Down Expand Up @@ -554,7 +560,7 @@ namespace eosio {

void http_plugin::add_handler(const string& url, const url_handler& handler) {
ilog( "add api url: ${c}", ("c",url) );
app().get_io_service().post([=](){
app().post(priority::low, [=](){
my->url_handlers.insert(std::make_pair(url,handler));
});
}
Expand Down
Loading