Skip to content

Commit

Permalink
Merge of #6537
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Apr 23, 2023
2 parents 1e1e234 + 8ed898b commit 3725e1b
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 43 deletions.
14 changes: 12 additions & 2 deletions zebra-consensus/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,20 @@ where
unreachable!("unexpected response type: {response:?} from state request")
}
Err(e) => {
#[cfg(not(test))]
tracing::warn!(
"unexpected error: {e:?} in state request while verifying previous \
state checkpoints"
)
state checkpoints. Is Zebra shutting down?"
);
// This error happens a lot in some tests.
//
// TODO: fix the tests so they don't cause this error,
// or change the tracing filter
#[cfg(test)]
tracing::debug!(
"unexpected error: {e:?} in state request while verifying previous \
state checkpoints. Is Zebra shutting down?"
);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(59);
/// connections are only initiated after this minimum time has elapsed.
///
/// It also enforces a minimum per-peer reconnection interval, and filters failed outbound peers.
pub const MIN_OUTBOUND_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(50);
pub const MIN_OUTBOUND_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(100);

/// The minimum time between _successful_ inbound peer connections, implemented by
/// `peer_set::initialize::accept_inbound_connections`.
Expand Down Expand Up @@ -398,8 +398,8 @@ mod tests {
/ (u32::try_from(MAX_ADDRS_IN_ADDRESS_BOOK).expect("fits in u32")
* MIN_OUTBOUND_PEER_CONNECTION_INTERVAL)
.as_secs() as f32
>= 0.5,
"most peers should get a connection attempt in each connection interval",
>= 0.2,
"some peers should get a connection attempt in each connection interval",
);

assert!(
Expand Down
26 changes: 19 additions & 7 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ impl Handler {
pub(super) enum State {
/// Awaiting a client request or a peer message.
AwaitingRequest,
/// Awaiting a peer message we can interpret as a client request.
/// Awaiting a peer message we can interpret as a response to a client request.
AwaitingResponse {
handler: Handler,
tx: MustUseClientResponseSender,
Expand Down Expand Up @@ -451,7 +451,6 @@ pub struct Connection<S, Tx> {
/// The metadata for the connected peer `service`.
///
/// This field is used for debugging.
#[allow(dead_code)]
pub connection_info: Arc<ConnectionInfo>,

/// The state of this connection's current request or response.
Expand Down Expand Up @@ -1242,18 +1241,31 @@ where
let rsp = match self.svc.call(req.clone()).await {
Err(e) => {
if e.is::<Overloaded>() {
tracing::info!("inbound service is overloaded, closing connection");
tracing::info!(
remote_user_agent = ?self.connection_info.remote.user_agent,
negotiated_version = ?self.connection_info.negotiated_version,
peer = ?self.metrics_label,
last_peer_state = ?self.last_metrics_state,
// TODO: remove this detailed debug info once #6506 is fixed
remote_height = ?self.connection_info.remote.start_height,
cached_addrs = ?self.cached_addrs.len(),
connection_state = ?self.state,
"inbound service is overloaded, closing connection",
);

metrics::counter!("pool.closed.loadshed", 1);
self.fail_with(PeerError::Overloaded);
} else {
// We could send a reject to the remote peer, but that might cause
// them to disconnect, and we might be using them to sync blocks.
// For similar reasons, we don't want to fail_with() here - we
// only close the connection if the peer is doing something wrong.
info!(%e,
connection_state = ?self.state,
client_receiver = ?self.client_rx,
"error processing peer request");
info!(
%e,
connection_state = ?self.state,
client_receiver = ?self.client_rx,
"error processing peer request",
);
}
return;
}
Expand Down
73 changes: 44 additions & 29 deletions zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,22 @@ impl StartCmd {
.buffer(mempool::downloads::MAX_INBOUND_CONCURRENCY)
.service(mempool);

info!("fully initializing inbound peer request handler");
// Fully start the inbound service as soon as possible
let setup_data = InboundSetupData {
address_book: address_book.clone(),
block_download_peer_set: peer_set.clone(),
block_verifier: chain_verifier.clone(),
mempool: mempool.clone(),
state,
latest_chain_tip: latest_chain_tip.clone(),
};
setup_tx
.send(setup_data)
.map_err(|_| eyre!("could not send setup data to inbound service"))?;
// And give it time to clear its queue
tokio::task::yield_now().await;

// Launch RPC server
let (rpc_task_handle, rpc_tx_queue_task_handle, rpc_server) = RpcServer::spawn(
config.rpc,
Expand All @@ -186,27 +202,14 @@ impl StartCmd {
app_version(),
mempool.clone(),
read_only_state_service,
chain_verifier.clone(),
chain_verifier,
sync_status.clone(),
address_book.clone(),
address_book,
latest_chain_tip.clone(),
config.network.network,
);

let setup_data = InboundSetupData {
address_book,
block_download_peer_set: peer_set.clone(),
block_verifier: chain_verifier,
mempool: mempool.clone(),
state,
latest_chain_tip: latest_chain_tip.clone(),
};
setup_tx
.send(setup_data)
.map_err(|_| eyre!("could not send setup data to inbound service"))?;

let syncer_task_handle = tokio::spawn(syncer.sync().in_current_span());

// Start concurrent tasks which don't add load to other tasks
let block_gossip_task_handle = tokio::spawn(
sync::gossip_best_tip_block_hashes(
sync_status.clone(),
Expand All @@ -216,28 +219,40 @@ impl StartCmd {
.in_current_span(),
);

let mempool_crawler_task_handle = mempool::Crawler::spawn(
&config.mempool,
peer_set.clone(),
mempool.clone(),
sync_status.clone(),
chain_tip_change,
);

let mempool_queue_checker_task_handle = mempool::QueueChecker::spawn(mempool.clone());

let tx_gossip_task_handle = tokio::spawn(
mempool::gossip_mempool_transaction_id(mempool_transaction_receiver, peer_set)
mempool::gossip_mempool_transaction_id(mempool_transaction_receiver, peer_set.clone())
.in_current_span(),
);

let mut old_databases_task_handle =
zebra_state::check_and_delete_old_databases(config.state.clone());

let progress_task_handle = tokio::spawn(
show_block_chain_progress(config.network.network, latest_chain_tip, sync_status)
.in_current_span(),
show_block_chain_progress(
config.network.network,
latest_chain_tip,
sync_status.clone(),
)
.in_current_span(),
);

let mut old_databases_task_handle =
zebra_state::check_and_delete_old_databases(config.state.clone());
// Give the inbound service more time to clear its queue,
// then start concurrent tasks that can add load to the inbound service
// (by opening more peer connections, so those peers send us requests)
tokio::task::yield_now().await;

// The crawler only activates immediately in tests that use mempool debug mode
let mempool_crawler_task_handle = mempool::Crawler::spawn(
&config.mempool,
peer_set,
mempool.clone(),
sync_status,
chain_tip_change,
);

let syncer_task_handle = tokio::spawn(syncer.sync().in_current_span());

info!("spawned initial Zebra tasks");

Expand Down
10 changes: 8 additions & 2 deletions zebrad/tests/common/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,16 @@ pub const STOP_ON_LOAD_TIMEOUT: Duration = Duration::from_secs(10);
/// The maximum amount of time Zebra should take to sync a few hundred blocks.
///
/// Usually the small checkpoint is much shorter than this.
pub const TINY_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(120);
//
// Tempoaraily increased to 4 minutes to get more diagnostic info in failed tests.
// TODO: reduce to 120 when #6506 is fixed
pub const TINY_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(240);

/// The maximum amount of time Zebra should take to sync a thousand blocks.
pub const LARGE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(180);
//
// Tempoaraily increased to 4 minutes to get more diagnostic info in failed tests.
// TODO: reduce to 180 when #6506 is fixed
pub const LARGE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(240);

/// The maximum time to wait for Zebrad to synchronize up to the chain tip starting from a
/// partially synchronized state.
Expand Down

0 comments on commit 3725e1b

Please sign in to comment.