diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index 98b2065d1e1..2d2af25f0c1 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -143,7 +143,7 @@ struct controller_impl { * are removed from this list if they are re-applied in other blocks. Producers * can query this list when scheduling new transactions into blocks. */ - map unapplied_transactions; + unapplied_transactions_type unapplied_transactions; void pop_block() { auto prev = fork_db.get_block( head->header.previous ); @@ -2106,41 +2106,12 @@ const account_object& controller::get_account( account_name name )const return my->db.get(name); } FC_CAPTURE_AND_RETHROW( (name) ) } -vector controller::get_unapplied_transactions() const { - vector result; - if ( my->read_mode == db_read_mode::SPECULATIVE ) { - result.reserve(my->unapplied_transactions.size()); - for ( const auto& entry: my->unapplied_transactions ) { - result.emplace_back(entry.second); - } - } else { - EOS_ASSERT( my->unapplied_transactions.empty(), transaction_exception, "not empty unapplied_transactions in non-speculative mode" ); //should never happen - } - return result; -} - -void controller::drop_unapplied_transaction(const transaction_metadata_ptr& trx) { - my->unapplied_transactions.erase(trx->signed_id); -} - -void controller::drop_all_unapplied_transactions() { - my->unapplied_transactions.clear(); -} - -vector controller::get_scheduled_transactions() const { - const auto& idx = db().get_index(); - - vector result; - - static const size_t max_reserve = 64; - result.reserve(std::min(idx.size(), max_reserve)); - - auto itr = idx.begin(); - while( itr != idx.end() && itr->delay_until <= pending_block_time() ) { - result.emplace_back(itr->trx_id); - ++itr; +unapplied_transactions_type& controller::get_unapplied_transactions() { + if ( my->read_mode != db_read_mode::SPECULATIVE ) { + EOS_ASSERT( my->unapplied_transactions.empty(), transaction_exception, + "not empty unapplied_transactions in non-speculative mode" ); //should never happen } - return result; + return my->unapplied_transactions; } bool controller::sender_avoids_whitelist_blacklist_enforcement( account_name sender )const { diff --git a/libraries/chain/include/eosio/chain/controller.hpp b/libraries/chain/include/eosio/chain/controller.hpp index f0d5a53f52d..9e6947fdff9 100644 --- a/libraries/chain/include/eosio/chain/controller.hpp +++ b/libraries/chain/include/eosio/chain/controller.hpp @@ -33,6 +33,7 @@ namespace eosio { namespace chain { class account_object; using resource_limits::resource_limits_manager; using apply_handler = std::function; + using unapplied_transactions_type = map; class fork_database; @@ -111,22 +112,9 @@ namespace eosio { namespace chain { * The caller is responsible for calling drop_unapplied_transaction on a failing transaction that * they never intend to retry * - * @return vector of transactions which have been unapplied + * @return map of transactions which have been unapplied */ - vector get_unapplied_transactions() const; - void drop_unapplied_transaction(const transaction_metadata_ptr& trx); - void drop_all_unapplied_transactions(); - - /** - * These transaction IDs represent transactions available in the head chain state as scheduled - * or otherwise generated transactions. - * - * calling push_scheduled_transaction with these IDs will remove the associated transaction from - * the chain state IFF it succeeds or objectively fails - * - * @return - */ - vector get_scheduled_transactions() const; + unapplied_transactions_type& get_unapplied_transactions(); /** * diff --git a/libraries/chain/include/eosio/chain/wasm_eosio_injection.hpp b/libraries/chain/include/eosio/chain/wasm_eosio_injection.hpp index f5ebf01c1f7..a67ca9ef696 100644 --- a/libraries/chain/include/eosio/chain/wasm_eosio_injection.hpp +++ b/libraries/chain/include/eosio/chain/wasm_eosio_injection.hpp @@ -272,7 +272,7 @@ namespace eosio { namespace chain { namespace wasm_injections { }; - struct call_depth_check { + struct call_depth_check_and_insert_checktime { static constexpr bool kills = true; static constexpr bool post = false; static int32_t global_idx; @@ -290,6 +290,7 @@ namespace eosio { namespace chain { namespace wasm_injections { injector_utils::add_import(*(arg.module), "call_depth_assert", assert_idx); wasm_ops::op_types<>::call_t call_assert; + wasm_ops::op_types<>::call_t call_checktime; wasm_ops::op_types<>::get_global_t get_global_inst; wasm_ops::op_types<>::set_global_t set_global_inst; @@ -301,6 +302,7 @@ namespace eosio { namespace chain { namespace wasm_injections { wasm_ops::op_types<>::else__t else_inst; call_assert.field = assert_idx; + call_checktime.field = checktime_injection::chktm_idx; get_global_inst.field = global_idx; set_global_inst.field = global_idx; const_inst.field = -1; @@ -334,6 +336,7 @@ namespace eosio { namespace chain { namespace wasm_injections { INSERT_INJECTED(const_inst); INSERT_INJECTED(add_inst); INSERT_INJECTED(set_global_inst); + INSERT_INJECTED(call_checktime); #undef INSERT_INJECTED } @@ -679,8 +682,8 @@ namespace eosio { namespace chain { namespace wasm_injections { }; struct pre_op_injectors : wasm_ops::op_types { - using call_t = wasm_ops::call ; - using call_indirect_t = wasm_ops::call_indirect ; + using call_t = wasm_ops::call ; + using call_indirect_t = wasm_ops::call_indirect ; // float binops using f32_add_t = wasm_ops::f32_add >; @@ -785,7 +788,7 @@ namespace eosio { namespace chain { namespace wasm_injections { // initialize static fields of injectors injector_utils::init( mod ); checktime_injection::init(); - call_depth_check::init(); + call_depth_check_and_insert_checktime::init(); } void inject() { diff --git a/libraries/chain/wasm_eosio_injection.cpp b/libraries/chain/wasm_eosio_injection.cpp index a4afa44d46d..2c627e13ea7 100644 --- a/libraries/chain/wasm_eosio_injection.cpp +++ b/libraries/chain/wasm_eosio_injection.cpp @@ -35,7 +35,7 @@ void max_memory_injection_visitor::inject( Module& m ) { } void max_memory_injection_visitor::initializer() {} -int32_t call_depth_check::global_idx = -1; +int32_t call_depth_check_and_insert_checktime::global_idx = -1; uint32_t instruction_counter::icnt = 0; uint32_t instruction_counter::tcnt = 0; uint32_t instruction_counter::bcnt = 0; diff --git a/libraries/testing/include/eosio/testing/tester.hpp b/libraries/testing/include/eosio/testing/tester.hpp index 10e7d4499e2..c9a4591d6c9 100644 --- a/libraries/testing/include/eosio/testing/tester.hpp +++ b/libraries/testing/include/eosio/testing/tester.hpp @@ -99,6 +99,17 @@ namespace eosio { namespace testing { void produce_min_num_of_blocks_to_spend_time_wo_inactive_prod(const fc::microseconds target_elapsed_time = fc::microseconds()); signed_block_ptr push_block(signed_block_ptr b); + /** + * These transaction IDs represent transactions available in the head chain state as scheduled + * or otherwise generated transactions. + * + * calling push_scheduled_transaction with these IDs will remove the associated transaction from + * the chain state IFF it succeeds or objectively fails + * + * @return + */ + vector get_scheduled_transactions() const; + transaction_trace_ptr push_transaction( packed_transaction& trx, fc::time_point deadline = fc::time_point::maximum(), uint32_t billed_cpu_time_us = DEFAULT_BILLED_CPU_TIME_US ); transaction_trace_ptr push_transaction( signed_transaction& trx, fc::time_point deadline = fc::time_point::maximum(), uint32_t billed_cpu_time_us = DEFAULT_BILLED_CPU_TIME_US ); action_result push_action(action&& cert_act, uint64_t authorizer); // TODO/QUESTION: Is this needed? diff --git a/libraries/testing/tester.cpp b/libraries/testing/tester.cpp index bcf811434d5..a632cb40643 100644 --- a/libraries/testing/tester.cpp +++ b/libraries/testing/tester.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -163,16 +164,16 @@ namespace eosio { namespace testing { } if( !skip_pending_trxs ) { - auto unapplied_trxs = control->get_unapplied_transactions(); - for (const auto& trx : unapplied_trxs ) { - auto trace = control->push_transaction(trx, fc::time_point::maximum()); + unapplied_transactions_type unapplied_trxs = control->get_unapplied_transactions(); // make copy of map + for (const auto& entry : unapplied_trxs ) { + auto trace = control->push_transaction(entry.second, fc::time_point::maximum()); if(trace->except) { trace->except->dynamic_rethrow_exception(); } } vector scheduled_trxs; - while( (scheduled_trxs = control->get_scheduled_transactions() ).size() > 0 ) { + while( (scheduled_trxs = get_scheduled_transactions() ).size() > 0 ) { for (const auto& trx : scheduled_trxs ) { auto trace = control->push_scheduled_transaction(trx, fc::time_point::maximum()); if(trace->except) { @@ -237,6 +238,18 @@ namespace eosio { namespace testing { } } + vector base_tester::get_scheduled_transactions() const { + const auto& idx = control->db().get_index(); + + vector result; + + auto itr = idx.begin(); + while( itr != idx.end() && itr->delay_until <= control->pending_block_time() ) { + result.emplace_back(itr->trx_id); + ++itr; + } + return result; + } void base_tester::produce_blocks_until_end_of_round() { uint64_t blocks_per_round; diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 0c00495c48d..1b398a8b53a 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -283,6 +283,10 @@ namespace eosio { */ constexpr auto def_send_buffer_size_mb = 4; constexpr auto def_send_buffer_size = 1024*1024*def_send_buffer_size_mb; + constexpr auto def_max_write_queue_size = def_send_buffer_size*10; + constexpr boost::asio::chrono::milliseconds def_read_delay_for_full_write_queue{100}; + constexpr auto def_max_reads_in_flight = 1000; + constexpr auto def_max_trx_in_progress_size = 100*1024*1024; // 100 MB constexpr auto def_max_clients = 25; // 0 for unlimited clients constexpr auto def_max_nodes_per_host = 1; constexpr auto def_conn_retry_wait = 30; @@ -400,6 +404,86 @@ namespace eosio { static void populate(handshake_message &hello); }; + class queued_buffer : boost::noncopyable { + public: + void clear_write_queue() { + _write_queue.clear(); + _sync_write_queue.clear(); + _write_queue_size = 0; + } + + void clear_out_queue() { + while ( _out_queue.size() > 0 ) { + _out_queue.pop_front(); + } + } + + uint32_t write_queue_size() const { return _write_queue_size; } + + bool is_out_queue_empty() const { return _out_queue.empty(); } + + bool ready_to_send() const { + // if out_queue is not empty then async_write is in progress + return ((!_sync_write_queue.empty() || !_write_queue.empty()) && _out_queue.empty()); + } + + bool add_write_queue( const std::shared_ptr>& buff, + std::function callback, + bool to_sync_queue ) { + if( to_sync_queue ) { + _sync_write_queue.push_back( {buff, callback} ); + } else { + _write_queue.push_back( {buff, callback} ); + } + _write_queue_size += buff->size(); + if( _write_queue_size > 2 * def_max_write_queue_size ) { + return false; + } + return true; + } + + void fill_out_buffer( std::vector& bufs ) { + if( _sync_write_queue.size() > 0 ) { // always send msgs from sync_write_queue first + fill_out_buffer( bufs, _sync_write_queue ); + } else { // postpone real_time write_queue if sync queue is not empty + fill_out_buffer( bufs, _write_queue ); + EOS_ASSERT( _write_queue_size == 0, plugin_exception, "write queue size expected to be zero" ); + } + } + + void out_callback( boost::system::error_code ec, std::size_t w ) { + for( auto& m : _out_queue ) { + m.callback( ec, w ); + } + } + + private: + struct queued_write; + void fill_out_buffer( std::vector& bufs, + deque& w_queue ) { + while ( w_queue.size() > 0 ) { + auto& m = w_queue.front(); + bufs.push_back( boost::asio::buffer( *m.buff )); + _write_queue_size -= m.buff->size(); + _out_queue.emplace_back( m ); + w_queue.pop_front(); + } + } + + private: + struct queued_write { + std::shared_ptr> buff; + std::function callback; + }; + + uint32_t _write_queue_size = 0; + deque _write_queue; + deque _sync_write_queue; // sync_write_queue will be sent first + deque _out_queue; + + }; // queued_buffer + + class connection : public std::enable_shared_from_this { public: explicit connection( string endpoint ); @@ -416,12 +500,11 @@ namespace eosio { fc::message_buffer<1024*1024> pending_message_buffer; fc::optional outstanding_read_bytes; - struct queued_write { - std::shared_ptr> buff; - std::function callback; - }; - deque write_queue; - deque out_queue; + + queued_buffer buffer_queue; + + uint32_t reads_in_flight = 0; + uint32_t trx_in_progress_size = 0; fc::sha256 node_id; handshake_message last_handshake_recv; handshake_message last_handshake_sent; @@ -431,6 +514,7 @@ namespace eosio { uint16_t protocol_version = 0; string peer_addr; unique_ptr response_expected; + unique_ptr read_delay_timer; optional pending_fetch; go_away_reason no_retry = no_reason; block_id_type fork_head; @@ -497,12 +581,14 @@ namespace eosio { void txn_send(const vector& txn_lis); void blk_send_branch(); - void blk_send(const vector &txn_lis); + void blk_send(const block_id_type& blkid); void stop_send(); void enqueue( const net_message &msg, bool trigger_send = true ); - void enqueue_block( const signed_block_ptr& sb, bool trigger_send = true ); - void enqueue_buffer( const std::shared_ptr>& send_buffer, bool trigger_send, go_away_reason close_after_send ); + void enqueue_block( const signed_block_ptr& sb, bool trigger_send = true, bool to_sync_queue = false ); + void enqueue_buffer( const std::shared_ptr>& send_buffer, + bool trigger_send, go_away_reason close_after_send, + bool to_sync_queue = false); void cancel_sync(go_away_reason); void flush_queues(); bool enqueue_sync_block(); @@ -516,7 +602,8 @@ namespace eosio { void queue_write(const std::shared_ptr>& buff, bool trigger_send, - std::function callback); + std::function callback, + bool to_sync_queue = false); void do_queue_write(); /** \brief Process the next message from the pending message buffer @@ -530,7 +617,7 @@ namespace eosio { */ bool process_next_message(net_plugin_impl& impl, uint32_t message_length); - bool add_peer_block(const peer_block_state &pbs); + bool add_peer_block(const peer_block_state& pbs); fc::optional _logger_variant; const fc::variant_object& get_logger_variant() { @@ -659,6 +746,7 @@ namespace eosio { protocol_version(0), peer_addr(endpoint), response_expected(), + read_delay_timer(), pending_fetch(), no_retry(no_reason), fork_head(), @@ -683,6 +771,7 @@ namespace eosio { protocol_version(0), peer_addr(), response_expected(), + read_delay_timer(), pending_fetch(), no_retry(no_reason), fork_head(), @@ -699,6 +788,7 @@ namespace eosio { auto *rnd = node_id.data(); rnd[0] = 0; response_expected.reset(new boost::asio::steady_timer(app().get_io_service())); + read_delay_timer.reset(new boost::asio::steady_timer(app().get_io_service())); } bool connection::connected() { @@ -716,7 +806,7 @@ namespace eosio { } void connection::flush_queues() { - write_queue.clear(); + buffer_queue.clear_write_queue(); } void connection::close() { @@ -739,6 +829,7 @@ namespace eosio { my_impl->sync_master->reset_lib_num(shared_from_this()); fc_dlog(logger, "canceling wait on ${p}", ("p",peer_name())); cancel_wait(); + if( read_delay_timer ) read_delay_timer->cancel(); pending_message_buffer.reset(); } @@ -803,75 +894,43 @@ namespace eosio { catch (...) { } - vector bstack; - block_id_type null_id; - for (auto bid = head_id; bid != null_id && bid != lib_id; ) { - try { - - // if the last handshake received indicates that we are catching up on a fork - // that the peer is already partially aware of, no need to resend blocks - if (remote_head_id == bid) { - break; - } - - signed_block_ptr b = cc.fetch_block_by_id(bid); - if ( b ) { - bid = b->previous; - bstack.push_back(b); - } - else { - break; - } - } catch (...) { - break; - } - } - size_t count = 0; - if (!bstack.empty()) { - if (bstack.back()->previous == lib_id || bstack.back()->previous == remote_head_id) { - count = bstack.size(); - while (bstack.size()) { - enqueue_block( bstack.back() ); - bstack.pop_back(); - } - } - fc_ilog(logger, "Sent ${n} blocks on my fork",("n",count)); + if( !peer_requested ) { + peer_requested = sync_state( block_header::num_from_id(lib_id)+1, + block_header::num_from_id(head_id), + block_header::num_from_id(lib_id) ); } else { - fc_ilog(logger, "Nothing to send on fork request"); + uint32_t start = std::min( peer_requested->last + 1, block_header::num_from_id(lib_id)+1 ); + uint32_t end = std::max( peer_requested->end_block, block_header::num_from_id(head_id) ); + peer_requested = sync_state( start, end, start - 1 ); } + enqueue_sync_block(); + // still want to send transactions along during blk branch sync syncing = false; } - void connection::blk_send(const vector &ids) { + void connection::blk_send(const block_id_type& blkid) { controller &cc = my_impl->chain_plug->chain(); - int count = 0; - for(auto &blkid : ids) { - ++count; - try { - signed_block_ptr b = cc.fetch_block_by_id(blkid); - if(b) { - fc_dlog(logger,"found block for id at num ${n}",("n",b->block_num())); - enqueue_block( b ); - } - else { - ilog("fetch block by id returned null, id ${id} on block ${c} of ${s} for ${p}", - ("id",blkid)("c",count)("s",ids.size())("p",peer_name())); - break; - } - } - catch (const assert_exception &ex) { - elog( "caught assert on fetch_block_by_id, ${ex}, id ${id} on block ${c} of ${s} for ${p}", - ("ex",ex.to_string())("id",blkid)("c",count)("s",ids.size())("p",peer_name())); - break; - } - catch (...) { - elog( "caught othser exception fetching block id ${id} on block ${c} of ${s} for ${p}", - ("id",blkid)("c",count)("s",ids.size())("p",peer_name())); - break; + try { + signed_block_ptr b = cc.fetch_block_by_id(blkid); + if(b) { + fc_dlog(logger,"found block for id at num ${n}",("n",b->block_num())); + peer_block_state pbstate = {blkid, block_header::num_from_id(blkid), true, true, time_point()}; + add_peer_block(pbstate); + enqueue_block( b ); + } else { + ilog("fetch block by id returned null, id ${id} for ${p}", + ("id",blkid)("p",peer_name())); } } - + catch (const assert_exception &ex) { + elog( "caught assert on fetch_block_by_id, ${ex}, id ${id} for ${p}", + ("ex",ex.to_string())("id",blkid)("p",peer_name())); + } + catch (...) { + elog( "caught other exception fetching block id ${id} for ${p}", + ("id",blkid)("p",peer_name())); + } } void connection::stop_send() { @@ -905,14 +964,21 @@ namespace eosio { void connection::queue_write(const std::shared_ptr>& buff, bool trigger_send, - std::function callback) { - write_queue.push_back({buff, callback}); - if(out_queue.empty() && trigger_send) + std::function callback, + bool to_sync_queue) { + if( !buffer_queue.add_write_queue( buff, callback, to_sync_queue )) { + fc_wlog( logger, "write_queue full ${s} bytes, giving up on connection ${p}", + ("s", buffer_queue.write_queue_size())("p", peer_name()) ); + my_impl->close( shared_from_this() ); + return; + } + if( buffer_queue.is_out_queue_empty() && trigger_send) { do_queue_write(); + } } void connection::do_queue_write() { - if(write_queue.empty() || !out_queue.empty()) + if( !buffer_queue.ready_to_send() ) return; connection_wptr c(shared_from_this()); if(!socket->is_open()) { @@ -921,21 +987,14 @@ namespace eosio { return; } std::vector bufs; - while (write_queue.size() > 0) { - auto& m = write_queue.front(); - bufs.push_back(boost::asio::buffer(*m.buff)); - out_queue.push_back(m); - write_queue.pop_front(); - } + buffer_queue.fill_out_buffer( bufs ); boost::asio::async_write(*socket, bufs, [c](boost::system::error_code ec, std::size_t w) { try { auto conn = c.lock(); if(!conn) return; - for (auto& m: conn->out_queue) { - m.callback(ec, w); - } + conn->buffer_queue.out_callback( ec, w ); if(ec) { string pname = conn ? conn->peer_name() : "no connection name"; @@ -948,9 +1007,7 @@ namespace eosio { my_impl->close(conn); return; } - while (conn->out_queue.size() > 0) { - conn->out_queue.pop_front(); - } + conn->buffer_queue.clear_out_queue(); conn->enqueue_sync_block(); conn->do_queue_write(); } @@ -973,8 +1030,8 @@ namespace eosio { } void connection::cancel_sync(go_away_reason reason) { - fc_dlog(logger,"cancel sync reason = ${m}, write queue size ${o} peer ${p}", - ("m",reason_str(reason)) ("o", write_queue.size())("p", peer_name())); + fc_dlog(logger,"cancel sync reason = ${m}, write queue size ${o} bytes peer ${p}", + ("m",reason_str(reason)) ("o", buffer_queue.write_queue_size())("p", peer_name())); cancel_wait(); flush_queues(); switch (reason) { @@ -1002,7 +1059,7 @@ namespace eosio { controller& cc = my_impl->chain_plug->chain(); signed_block_ptr sb = cc.fetch_block_by_number(num); if(sb) { - enqueue_block( sb, trigger_send); + enqueue_block( sb, trigger_send, true); return true; } } catch ( ... ) { @@ -1031,7 +1088,7 @@ namespace eosio { enqueue_buffer( send_buffer, trigger_send, close_after_send ); } - void connection::enqueue_block( const signed_block_ptr& sb, bool trigger_send ) { + void connection::enqueue_block( const signed_block_ptr& sb, bool trigger_send, bool to_sync_queue ) { // this implementation is to avoid copy of signed_block to net_message int which = 7; // matches which of net_message for signed_block @@ -1048,10 +1105,13 @@ namespace eosio { fc::raw::pack( ds, unsigned_int( which )); fc::raw::pack( ds, *sb ); - enqueue_buffer( send_buffer, trigger_send, no_reason ); + enqueue_buffer( send_buffer, trigger_send, no_reason, to_sync_queue ); } - void connection::enqueue_buffer( const std::shared_ptr>& send_buffer, bool trigger_send, go_away_reason close_after_send ) { + void connection::enqueue_buffer( const std::shared_ptr>& send_buffer, bool trigger_send, + go_away_reason close_after_send, + bool to_sync_queue ) + { connection_wptr weak_this = shared_from_this(); queue_write(send_buffer,trigger_send, [weak_this, close_after_send](boost::system::error_code ec, std::size_t ) { @@ -1065,7 +1125,8 @@ namespace eosio { } else { fc_wlog(logger, "connection expired before enqueued net_message called callback!"); } - }); + }, + to_sync_queue); } void connection::cancel_wait() { @@ -1165,7 +1226,7 @@ namespace eosio { return true; } - bool connection::add_peer_block(const peer_block_state &entry) { + bool connection::add_peer_block(const peer_block_state& entry) { auto bptr = blk_state.get().find(entry.id); bool added = (bptr == blk_state.end()); if (added){ @@ -1476,11 +1537,15 @@ namespace eosio { void sync_manager::recv_notice(const connection_ptr& c, const notice_message& msg) { fc_ilog(logger, "sync_manager got ${m} block notice",("m",modes_str(msg.known_blocks.mode))); + if( msg.known_blocks.ids.size() > 1 ) { + elog( "Invalid notice_message, known_blocks.ids.size ${s}", ("s", msg.known_blocks.ids.size()) ); + my_impl->close(c); + return; + } if (msg.known_blocks.mode == catch_up) { if (msg.known_blocks.ids.size() == 0) { elog("got a catch up with ids size = 0"); - } - else { + } else { verify_catchup(c, msg.known_blocks.pending, msg.known_blocks.ids.back()); } } @@ -1685,7 +1750,9 @@ namespace eosio { if (msg.known_blocks.mode == normal) { req.req_blocks.mode = normal; controller& cc = my_impl->chain_plug->chain(); - for( const auto& blkid : msg.known_blocks.ids) { + // known_blocks.ids is never > 1 + if( !msg.known_blocks.ids.empty() ) { + const block_id_type& blkid = msg.known_blocks.ids.back(); signed_block_ptr b; peer_block_state entry = {blkid,0,true,true,fc::time_point()}; try { @@ -1952,6 +2019,37 @@ namespace eosio { } }; + if( conn->buffer_queue.write_queue_size() > def_max_write_queue_size || + conn->reads_in_flight > def_max_reads_in_flight || + conn->trx_in_progress_size > def_max_trx_in_progress_size ) + { + // too much queued up, reschedule + if( conn->buffer_queue.write_queue_size() > def_max_write_queue_size ) { + peer_wlog( conn, "write_queue full ${s} bytes", ("s", conn->buffer_queue.write_queue_size()) ); + } else if( conn->reads_in_flight > def_max_reads_in_flight ) { + peer_wlog( conn, "max reads in flight ${s}", ("s", conn->reads_in_flight) ); + } else { + peer_wlog( conn, "max trx in progress ${s} bytes", ("s", conn->trx_in_progress_size) ); + } + if( conn->buffer_queue.write_queue_size() > 2*def_max_write_queue_size || + conn->reads_in_flight > 2*def_max_reads_in_flight || + conn->trx_in_progress_size > 2*def_max_trx_in_progress_size ) + { + fc_wlog( logger, "queues over full, giving up on connection ${p}", ("p", conn->peer_name()) ); + my_impl->close( conn ); + return; + } + if( !conn->read_delay_timer ) return; + conn->read_delay_timer->expires_from_now( def_read_delay_for_full_write_queue ); + conn->read_delay_timer->async_wait([this, weak_conn]( boost::system::error_code ) { + auto conn = weak_conn.lock(); + if( !conn ) return; + start_read_message( conn ); + } ); + return; + } + + ++conn->reads_in_flight; boost::asio::async_read(*conn->socket, conn->pending_message_buffer.get_buffer_sequence_for_boost_async_read(), completion_handler, [this,weak_conn]( boost::system::error_code ec, std::size_t bytes_transferred ) { @@ -1960,6 +2058,7 @@ namespace eosio { return; } + --conn->reads_in_flight; conn->outstanding_read_bytes.reset(); try { @@ -2308,6 +2407,12 @@ namespace eosio { } void net_plugin_impl::handle_message(const connection_ptr& c, const request_message& msg) { + if( msg.req_blocks.ids.size() > 1 ) { + elog( "Invalid request_message, req_blocks.ids.size ${s}", ("s", msg.req_blocks.ids.size()) ); + close(c); + return; + } + switch (msg.req_blocks.mode) { case catch_up : peer_ilog(c, "received request_message:catch_up"); @@ -2315,7 +2420,9 @@ namespace eosio { break; case normal : peer_ilog(c, "received request_message:normal"); - c->blk_send(msg.req_blocks.ids); + if( !msg.req_blocks.ids.empty() ) { + c->blk_send(msg.req_blocks.ids.back()); + } break; default:; } @@ -2347,6 +2454,13 @@ namespace eosio { } } + size_t calc_trx_size( const packed_transaction_ptr& trx ) { + // transaction is stored packed and unpacked, double packed_size and size of signed as an approximation of use + return (trx->get_packed_transaction().size() * 2 + sizeof(trx->get_signed_transaction())) * 2 + + trx->get_packed_context_free_data().size() * 4 + + trx->get_signatures().size() * sizeof(signature_type); + } + void net_plugin_impl::handle_message(const connection_ptr& c, const packed_transaction_ptr& trx) { fc_dlog(logger, "got a packed transaction, cancel wait"); peer_ilog(c, "received packed_transaction"); @@ -2369,7 +2483,9 @@ namespace eosio { return; } dispatcher->recv_transaction(c, tid); + c->trx_in_progress_size += calc_trx_size( ptrx->packed_trx ); chain_plug->accept_transaction(ptrx, [c, this, ptrx](const static_variant& result) { + c->trx_in_progress_size -= calc_trx_size( ptrx->packed_trx ); if (result.contains()) { peer_dlog(c, "bad packed_transaction : ${m}", ("m",result.get()->what())); } else { diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 71583aee0c5..947ef48f46a 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -1143,6 +1144,10 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { int orig_count = _persistent_transactions.size(); while(!persisted_by_expiry.empty() && persisted_by_expiry.begin()->expiry <= pbs->header.timestamp.to_time_point()) { + if (preprocess_deadline <= fc::time_point::now()) { + exhausted = true; + break; + } auto const& txid = persisted_by_expiry.begin()->trx_id; if (_pending_block_mode == pending_block_mode::producing) { fc_dlog(_trx_trace_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} is EXPIRING PERSISTED tx: ${txid}", @@ -1158,9 +1163,15 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { num_expired_persistent++; } - fc_dlog(_log, "Processed ${n} persisted transactions, Expired ${expired}", - ("n", orig_count) - ("expired", num_expired_persistent)); + if( exhausted ) { + fc_wlog( _log, "Unable to process all ${n} persisted transactions before deadline, Expired ${expired}", + ( "n", orig_count ) + ( "expired", num_expired_persistent ) ); + } else { + fc_dlog( _log, "Processed ${n} persisted transactions, Expired ${expired}", + ( "n", orig_count ) + ( "expired", num_expired_persistent ) ); + } } try { @@ -1171,13 +1182,15 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { if (_producers.empty() && persisted_by_id.empty()) { // if this node can never produce and has no persisted transactions, // there is no need for unapplied transactions they can be dropped - chain.drop_all_unapplied_transactions(); + chain.get_unapplied_transactions().clear(); } else { - std::vector apply_trxs; - { // derive appliable transactions from unapplied_transactions and drop droppable transactions - auto unapplied_trxs = chain.get_unapplied_transactions(); - apply_trxs.reserve(unapplied_trxs.size()); - + // derive appliable transactions from unapplied_transactions and drop droppable transactions + unapplied_transactions_type& unapplied_trxs = chain.get_unapplied_transactions(); + if( !unapplied_trxs.empty() ) { + auto unapplied_trxs_size = unapplied_trxs.size(); + int num_applied = 0; + int num_failed = 0; + int num_processed = 0; auto calculate_transaction_category = [&](const transaction_metadata_ptr& trx) { if (trx->packed_trx->expiration() < pbs->header.timestamp.to_time_point()) { return tx_category::EXPIRED; @@ -1188,64 +1201,65 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { } }; - for (auto& trx: unapplied_trxs) { + auto itr = unapplied_trxs.begin(); + while( itr != unapplied_trxs.end() ) { + auto itr_next = itr; // save off next since itr may be invalidated by loop + ++itr_next; + + if( preprocess_deadline <= fc::time_point::now() ) exhausted = true; + if( exhausted ) break; + const auto& trx = itr->second; auto category = calculate_transaction_category(trx); - if (category == tx_category::EXPIRED || (category == tx_category::UNEXPIRED_UNPERSISTED && _producers.empty())) { + if (category == tx_category::EXPIRED || + (category == tx_category::UNEXPIRED_UNPERSISTED && _producers.empty())) + { if (!_producers.empty()) { fc_dlog(_trx_trace_log, "[TRX_TRACE] Node with producers configured is dropping an EXPIRED transaction that was PREVIOUSLY ACCEPTED : ${txid}", ("txid", trx->id)); } - chain.drop_unapplied_transaction(trx); - } else if (category == tx_category::PERSISTED || (category == tx_category::UNEXPIRED_UNPERSISTED && _pending_block_mode == pending_block_mode::producing)) { - apply_trxs.emplace_back(std::move(trx)); - } - } - } - - if (!apply_trxs.empty()) { - int num_applied = 0; - int num_failed = 0; - int num_processed = 0; - - for (const auto& trx: apply_trxs) { - if (preprocess_deadline <= fc::time_point::now()) exhausted = true; - if (exhausted) { - break; - } - - num_processed++; - - try { - auto deadline = fc::time_point::now() + fc::milliseconds(_max_transaction_time_ms); - bool deadline_is_subjective = false; - if (_max_transaction_time_ms < 0 || (_pending_block_mode == pending_block_mode::producing && preprocess_deadline < deadline)) { - deadline_is_subjective = true; - deadline = preprocess_deadline; - } + itr = unapplied_trxs.erase( itr ); // unapplied_trxs map has not been modified, so simply erase and continue + continue; + } else if (category == tx_category::PERSISTED || + (category == tx_category::UNEXPIRED_UNPERSISTED && _pending_block_mode == pending_block_mode::producing)) + { + ++num_processed; + + try { + auto deadline = fc::time_point::now() + fc::milliseconds(_max_transaction_time_ms); + bool deadline_is_subjective = false; + if (_max_transaction_time_ms < 0 || (_pending_block_mode == pending_block_mode::producing && preprocess_deadline < deadline)) { + deadline_is_subjective = true; + deadline = preprocess_deadline; + } - auto trace = chain.push_transaction(trx, deadline); - if (trace->except) { - if (failure_is_subjective(*trace->except, deadline_is_subjective)) { - exhausted = true; + auto trace = chain.push_transaction(trx, deadline); + if (trace->except) { + if (failure_is_subjective(*trace->except, deadline_is_subjective)) { + exhausted = true; + break; + } else { + // this failed our configured maximum transaction time, we don't want to replay it + // chain.plus_transactions can modify unapplied_trxs, so erase by id + unapplied_trxs.erase( trx->signed_id ); + ++num_failed; + } } else { - // this failed our configured maximum transaction time, we don't want to replay it - chain.drop_unapplied_transaction(trx); - num_failed++; + ++num_applied; } - } else { - num_applied++; - } - } catch ( const guard_exception& e ) { - chain_plug->handle_guard_exception(e); - return start_block_result::failed; - } FC_LOG_AND_DROP(); + } catch ( const guard_exception& e ) { + chain_plug->handle_guard_exception(e); + return start_block_result::failed; + } FC_LOG_AND_DROP(); + } + + itr = itr_next; } fc_dlog(_log, "Processed ${m} of ${n} previously applied transactions, Applied ${applied}, Failed/Dropped ${failed}", - ("m", num_processed) - ("n", apply_trxs.size()) - ("applied", num_applied) - ("failed", num_failed)); + ("m", num_processed) + ("n", unapplied_trxs_size) + ("applied", num_applied) + ("failed", num_failed)); } } @@ -1258,6 +1272,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { int orig_count = _blacklisted_transactions.size(); while (!blacklist_by_expiry.empty() && blacklist_by_expiry.begin()->expiry <= now) { + if (preprocess_deadline <= fc::time_point::now()) break; blacklist_by_expiry.erase(blacklist_by_expiry.begin()); num_expired++; } @@ -1267,85 +1282,105 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { ("expired", num_expired)); } - auto scheduled_trxs = chain.get_scheduled_transactions(); - if (!scheduled_trxs.empty()) { - int num_applied = 0; - int num_failed = 0; - int num_processed = 0; + // scheduled transactions + int num_applied = 0; + int num_failed = 0; + int num_processed = 0; + + auto scheduled_trx_deadline = preprocess_deadline; + if (_max_scheduled_transaction_time_per_block_ms >= 0) { + scheduled_trx_deadline = std::min( + scheduled_trx_deadline, + fc::time_point::now() + fc::milliseconds(_max_scheduled_transaction_time_per_block_ms) + ); + } + time_point pending_block_time = chain.pending_block_time(); + const auto& sch_idx = chain.db().get_index(); + const auto scheduled_trxs_size = sch_idx.size(); + auto sch_itr = sch_idx.begin(); + while( sch_itr != sch_idx.end() ) { + if( sch_itr->delay_until > pending_block_time) break; // not scheduled yet + if( sch_itr->published >= pending_block_time ) { + ++sch_itr; + continue; // do not allow schedule and execute in same block + } + if( scheduled_trx_deadline <= fc::time_point::now() ) { + exhausted = true; + break; + } - auto scheduled_trx_deadline = preprocess_deadline; - if (_max_scheduled_transaction_time_per_block_ms >= 0) { - scheduled_trx_deadline = std::min( - scheduled_trx_deadline, - fc::time_point::now() + fc::milliseconds(_max_scheduled_transaction_time_per_block_ms) - ); + const transaction_id_type trx_id = sch_itr->trx_id; // make copy since reference could be invalidated + if (blacklist_by_id.find(trx_id) != blacklist_by_id.end()) { + ++sch_itr; + continue; } - for (const auto& trx : scheduled_trxs) { - if (scheduled_trx_deadline <= fc::time_point::now()) exhausted = true; - if (exhausted) { - break; - } + auto sch_itr_next = sch_itr; // save off next since sch_itr may be invalidated by loop + ++sch_itr_next; + const auto next_delay_until = sch_itr_next != sch_idx.end() ? sch_itr_next->delay_until : sch_itr->delay_until; + const auto next_id = sch_itr_next != sch_idx.end() ? sch_itr_next->id : sch_itr->id; - num_processed++; + num_processed++; - // configurable ratio of incoming txns vs deferred txns - while (_incoming_trx_weight >= 1.0 && orig_pending_txn_size && _pending_incoming_transactions.size()) { - if (scheduled_trx_deadline <= fc::time_point::now()) break; + // configurable ratio of incoming txns vs deferred txns + while (_incoming_trx_weight >= 1.0 && orig_pending_txn_size && _pending_incoming_transactions.size()) { + if (scheduled_trx_deadline <= fc::time_point::now()) break; - auto e = _pending_incoming_transactions.front(); - _pending_incoming_transactions.pop_front(); - --orig_pending_txn_size; - _incoming_trx_weight -= 1.0; - process_incoming_transaction_async(std::get<0>(e), std::get<1>(e), std::get<2>(e)); - } + auto e = _pending_incoming_transactions.front(); + _pending_incoming_transactions.pop_front(); + --orig_pending_txn_size; + _incoming_trx_weight -= 1.0; + process_incoming_transaction_async(std::get<0>(e), std::get<1>(e), std::get<2>(e)); + } - if (scheduled_trx_deadline <= fc::time_point::now()) { - exhausted = true; - break; - } + if (scheduled_trx_deadline <= fc::time_point::now()) { + exhausted = true; + break; + } - if (blacklist_by_id.find(trx) != blacklist_by_id.end()) { - continue; + try { + auto deadline = fc::time_point::now() + fc::milliseconds(_max_transaction_time_ms); + bool deadline_is_subjective = false; + if (_max_transaction_time_ms < 0 || (_pending_block_mode == pending_block_mode::producing && scheduled_trx_deadline < deadline)) { + deadline_is_subjective = true; + deadline = scheduled_trx_deadline; } - try { - auto deadline = fc::time_point::now() + fc::milliseconds(_max_transaction_time_ms); - bool deadline_is_subjective = false; - if (_max_transaction_time_ms < 0 || (_pending_block_mode == pending_block_mode::producing && scheduled_trx_deadline < deadline)) { - deadline_is_subjective = true; - deadline = scheduled_trx_deadline; - } - - auto trace = chain.push_scheduled_transaction(trx, deadline); - if (trace->except) { - if (failure_is_subjective(*trace->except, deadline_is_subjective)) { - exhausted = true; - } else { - auto expiration = fc::time_point::now() + fc::seconds(chain.get_global_properties().configuration.deferred_trx_expiration_window); - // this failed our configured maximum transaction time, we don't want to replay it add it to a blacklist - _blacklisted_transactions.insert(transaction_id_with_expiry{trx, expiration}); - num_failed++; - } + auto trace = chain.push_scheduled_transaction(trx_id, deadline); + if (trace->except) { + if (failure_is_subjective(*trace->except, deadline_is_subjective)) { + exhausted = true; + break; } else { - num_applied++; + auto expiration = fc::time_point::now() + fc::seconds(chain.get_global_properties().configuration.deferred_trx_expiration_window); + // this failed our configured maximum transaction time, we don't want to replay it add it to a blacklist + _blacklisted_transactions.insert(transaction_id_with_expiry{trx_id, expiration}); + num_failed++; } - } catch ( const guard_exception& e ) { - chain_plug->handle_guard_exception(e); - return start_block_result::failed; - } FC_LOG_AND_DROP(); + } else { + num_applied++; + } + } catch ( const guard_exception& e ) { + chain_plug->handle_guard_exception(e); + return start_block_result::failed; + } FC_LOG_AND_DROP(); - _incoming_trx_weight += _incoming_defer_ratio; - if (!orig_pending_txn_size) _incoming_trx_weight = 0.0; - } + _incoming_trx_weight += _incoming_defer_ratio; + if (!orig_pending_txn_size) _incoming_trx_weight = 0.0; - fc_dlog(_log, "Processed ${m} of ${n} scheduled transactions, Applied ${applied}, Failed/Dropped ${failed}", - ("m", num_processed) - ("n", scheduled_trxs.size()) - ("applied", num_applied) - ("failed", num_failed)); + if( sch_itr_next == sch_idx.end() ) break; + sch_itr = sch_idx.lower_bound( boost::make_tuple( next_delay_until, next_id ) ); + } + if( scheduled_trxs_size > 0 ) { + fc_dlog( _log, + "Processed ${m} of ${n} scheduled transactions, Applied ${applied}, Failed/Dropped ${failed}", + ( "m", num_processed ) + ( "n", scheduled_trxs_size ) + ( "applied", num_applied ) + ( "failed", num_failed ) ); } + } if (exhausted || preprocess_deadline <= fc::time_point::now()) { @@ -1357,11 +1392,11 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { if (!_pending_incoming_transactions.empty()) { fc_dlog(_log, "Processing ${n} pending transactions"); while (orig_pending_txn_size && _pending_incoming_transactions.size()) { + if (preprocess_deadline <= fc::time_point::now()) return start_block_result::exhausted; auto e = _pending_incoming_transactions.front(); _pending_incoming_transactions.pop_front(); --orig_pending_txn_size; process_incoming_transaction_async(std::get<0>(e), std::get<1>(e), std::get<2>(e)); - if (preprocess_deadline <= fc::time_point::now()) return start_block_result::exhausted; } } return start_block_result::succeeded; diff --git a/unittests/api_tests.cpp b/unittests/api_tests.cpp index 89c94d6f393..955130130fa 100644 --- a/unittests/api_tests.cpp +++ b/unittests/api_tests.cpp @@ -1089,7 +1089,7 @@ BOOST_FIXTURE_TEST_CASE(deferred_transaction_tests, TESTER) { try { produce_blocks( 3 ); //check that only one deferred transaction executed - auto dtrxs = control->get_scheduled_transactions(); + auto dtrxs = get_scheduled_transactions(); BOOST_CHECK_EQUAL(dtrxs.size(), 1); for (const auto& trx: dtrxs) { control->push_scheduled_transaction(trx, fc::time_point::maximum()); @@ -1114,7 +1114,7 @@ BOOST_FIXTURE_TEST_CASE(deferred_transaction_tests, TESTER) { try { produce_blocks( 3 ); //check that only one deferred transaction executed - auto dtrxs = control->get_scheduled_transactions(); + auto dtrxs = get_scheduled_transactions(); BOOST_CHECK_EQUAL(dtrxs.size(), 1); for (const auto& trx: dtrxs) { control->push_scheduled_transaction(trx, fc::time_point::maximum()); diff --git a/unittests/delay_tests.cpp b/unittests/delay_tests.cpp index 9f14de4107c..ae1a3d114b5 100644 --- a/unittests/delay_tests.cpp +++ b/unittests/delay_tests.cpp @@ -72,7 +72,7 @@ BOOST_FIXTURE_TEST_CASE( delay_error_create_account, validating_tester) { try { produce_blocks(6); - auto scheduled_trxs = control->get_scheduled_transactions(); + auto scheduled_trxs = get_scheduled_transactions(); BOOST_REQUIRE_EQUAL(scheduled_trxs.size(), 1); auto dtrace = control->push_scheduled_transaction(scheduled_trxs.front(), fc::time_point::maximum()); BOOST_REQUIRE_EQUAL(dtrace->except.valid(), true);