Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

EPE-165: Improve logic for unlinkable blocks while sync'ing #10012

Merged
merged 8 commits into from
Feb 10, 2021
85 changes: 80 additions & 5 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,6 @@ namespace eosio {
constexpr auto def_send_buffer_size = 1024*1024*def_send_buffer_size_mb;
constexpr auto def_max_write_queue_size = def_send_buffer_size*10;
constexpr auto def_max_trx_in_progress_size = 100*1024*1024; // 100 MB
constexpr auto def_max_consecutive_rejected_blocks = 13; // num of rejected blocks before disconnect
constexpr auto def_max_consecutive_immediate_connection_close = 9; // back off if client keeps closing
constexpr auto def_max_clients = 25; // 0 for unlimited clients
constexpr auto def_max_nodes_per_host = 1;
Expand Down Expand Up @@ -534,6 +533,48 @@ namespace eosio {
}; // queued_buffer


/// monitors the status of blocks as to whether a block is accepted (sync'd) or
/// rejected. It groups consecutive rejected blocks in a (configurable) time
/// window (rbw) and maintains a metric of the number of consecutive rejected block
/// time windows (rbws).
class block_status_monitor {
private:
fc::microseconds window_size_{2*1000*1000}; ///< rbw time interval (2ms)
fc::time_point window_start_; ///< The start of the recent rbw (0 implies not started)
uint32_t events_{0}; ///< The number of consecutive rbws
const uint32_t max_consecutive_rejected_windows_{13};

public:
/// ctor
///
/// @param[in] window_size The time, in microseconds, of the rejected block window
/// @param[in] max_rejected_windows The max consecutive number of rejected block windows
/// @note Copy ctor is not allowed
block_status_monitor(fc::microseconds window_size = fc::microseconds(2*1000*1000),
heifner marked this conversation as resolved.
Show resolved Hide resolved
uint32_t max_rejected_windows = 13) :
window_size_(window_size) {}
block_status_monitor( const block_status_monitor& ) = delete;
block_status_monitor( block_status_monitor&& ) = delete;
~block_status_monitor() = default;
/// reset to initial state
void reset();
/// called when a block is accepted (sync_recv_block)
void accepted();
/// called when a block is rejected
void rejected();
/// returns number of consecutive rbws
auto events() const { return events_; }
/// indicates if the max number of consecutive rbws has been reached or exceeded
bool max_events_violated() const { return events_ >= max_consecutive_rejected_windows_; }
/// assignment not allowed
block_status_monitor& operator=( const block_status_monitor& ) = delete;
block_status_monitor& operator=( block_status_monitor&& ) = delete;
};
void block_status_monitor::reset() {
events_ = 0;
window_start_ = fc::time_point();
}

class connection : public std::enable_shared_from_this<connection> {
public:
explicit connection( string endpoint );
Expand Down Expand Up @@ -588,7 +629,7 @@ namespace eosio {
std::atomic<bool> syncing{false};

std::atomic<uint16_t> protocol_version = 0;
uint16_t consecutive_rejected_blocks = 0;
block_status_monitor block_status_monitor_;
std::atomic<uint16_t> consecutive_immediate_connection_close = 0;

std::mutex response_expected_timer_mtx;
Expand Down Expand Up @@ -945,7 +986,7 @@ namespace eosio {
self->flush_queues();
self->connecting = false;
self->syncing = false;
self->consecutive_rejected_blocks = 0;
self->block_status_monitor_.reset();
++self->consecutive_immediate_connection_close;
bool has_last_req = false;
{
Expand Down Expand Up @@ -1511,6 +1552,39 @@ namespace eosio {
sync_wait();
}

//-----------------------------------------------------------
void block_status_monitor::accepted() {
if( window_start_ != fc::time_point() ) {
window_start_ = fc::time_point();
events_ = 0;
}
}

void block_status_monitor::rejected() {
const auto now = fc::time_point::now();
// if starting a new window
if( window_start_ == fc::time_point() ) {
window_start_ = now;
return;
heifner marked this conversation as resolved.
Show resolved Hide resolved
}

// if window time has not elapsed
const auto elapsed = now - window_start_;
if( elapsed < window_size_ ) {
return;
}

// if window time exactly expired
++events_;
if( elapsed == window_size_ ) {
heifner marked this conversation as resolved.
Show resolved Hide resolved
window_start_ = fc::time_point();
}

// window time exceeded
else {
window_start_ = now;
}
}
//-----------------------------------------------------------

sync_manager::sync_manager( uint32_t req_span )
Expand Down Expand Up @@ -1900,7 +1974,8 @@ namespace eosio {
// called from connection strand
void sync_manager::rejected_block( const connection_ptr& c, uint32_t blk_num ) {
std::unique_lock<std::mutex> g( sync_mtx );
heifner marked this conversation as resolved.
Show resolved Hide resolved
if( ++c->consecutive_rejected_blocks > def_max_consecutive_rejected_blocks ) {
c->block_status_monitor_.rejected();
if( c->block_status_monitor_.max_events_violated()) {
fc_wlog( logger, "block ${bn} not accepted from ${p}, closing connection", ("bn", blk_num)("p", c->peer_name()) );
sync_last_requested_num = 0;
sync_source.reset();
Expand Down Expand Up @@ -1935,9 +2010,9 @@ namespace eosio {
c->close( false, true );
return;
}
c->consecutive_rejected_blocks = 0;
sync_update_expected( c, blk_id, blk_num, blk_applied );
std::unique_lock<std::mutex> g_sync( sync_mtx );
c->block_status_monitor_.accepted();
heifner marked this conversation as resolved.
Show resolved Hide resolved
stages state = sync_state;
fc_dlog( logger, "state ${s}", ("s", stage_str( state )) );
if( state == head_catchup ) {
Expand Down