Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move requests-responses and polling from ChainSync to SyncingEngine #1650

Merged
merged 22 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b8e7133
Simplify pending responses polling in `ChainSync`
dmitry-markin Sep 15, 2023
25bdbce
Prepare `ChainSync` for extraction of requests-responses
dmitry-markin Sep 19, 2023
5a090ec
Move requests-responses from `ChainSync` to `SyncingEngine`
dmitry-markin Sep 20, 2023
5875b22
Remove obsolete code from `ChainSync`
dmitry-markin Sep 20, 2023
06af3c7
Fix sync restart test in `ChainSync`
dmitry-markin Sep 20, 2023
4d2c1b8
Move block announce protocol initialization to `SyncingEngine`
dmitry-markin Sep 21, 2023
68812ff
minor: rename `who` -> `peer_id`
dmitry-markin Sep 21, 2023
c62b37e
Introduce `BlockRequestEvent`
dmitry-markin Sep 21, 2023
a3ff69b
Generate an error when discarding a pending response
dmitry-markin Sep 21, 2023
e323033
Introduce `PendingResponses` stream
dmitry-markin Sep 22, 2023
1308f90
minor: docs
dmitry-markin Sep 22, 2023
3432169
minor: typo
dmitry-markin Sep 22, 2023
5e02182
Simplify `PendingResponses` by using `StreamMap`
dmitry-markin Sep 25, 2023
7f4889a
Make `PendingResponses` `Send`
dmitry-markin Sep 25, 2023
b7cd126
Apply suggestions from code review
dmitry-markin Sep 26, 2023
8b950f3
Apply review suggestions
dmitry-markin Sep 26, 2023
73c7d14
Merge remote-tracking branch 'origin/master' into dm-extract-request-…
dmitry-markin Sep 26, 2023
7e7134d
Apply suggestions from code review
dmitry-markin Sep 26, 2023
129beb3
minor: rustfmt
dmitry-markin Sep 26, 2023
f6dc58d
Merge remote-tracking branch 'origin/master' into dm-extract-request-…
dmitry-markin Sep 26, 2023
c7fd5ed
Remove resolved pending responses from `StreamMap`
dmitry-markin Sep 27, 2023
6abebe4
Merge remote-tracking branch 'origin/master' into dm-extract-request-…
dmitry-markin Sep 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions substrate/client/network/common/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,23 @@ pub enum PeerRequest<B: BlockT> {
WarpProof,
}

#[derive(Debug)]
pub enum PeerRequestType {
dmitry-markin marked this conversation as resolved.
Show resolved Hide resolved
Block,
State,
WarpProof,
}

impl<B: BlockT> PeerRequest<B> {
pub fn get_type(&self) -> PeerRequestType {
match self {
PeerRequest::Block(_) => PeerRequestType::Block,
PeerRequest::State => PeerRequestType::State,
PeerRequest::WarpProof => PeerRequestType::WarpProof,
}
}
}

/// Wrapper for implementation-specific state request.
///
/// NOTE: Implementation must be able to encode and decode it for network purposes.
Expand Down
278 changes: 119 additions & 159 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream,
},
block_relay_protocol::{BlockDownloader, BlockResponseError},
pending_responses::{PendingResponses, ResponseEvent},
schema::v1::{StateRequest, StateResponse},
service::{self, chain_sync::ToServiceCommand},
warp::WarpSyncParams,
Expand All @@ -34,11 +35,11 @@ use codec::{Decode, Encode};
use futures::{
channel::oneshot,
future::{BoxFuture, Fuse},
Future, FutureExt, StreamExt,
FutureExt, StreamExt,
};
use futures_timer::Delay;
use libp2p::{request_response::OutboundFailure, PeerId};
use log::{debug, error, trace};
use log::{debug, trace};
use prometheus_endpoint::{
register, Gauge, GaugeVec, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
};
Expand Down Expand Up @@ -74,7 +75,6 @@ use std::{
collections::{HashMap, HashSet},
iter,
num::NonZeroUsize,
pin::Pin,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
Expand Down Expand Up @@ -211,18 +211,6 @@ pub struct Peer<B: BlockT> {
inbound: bool,
}

type PendingResponse<B> = Pin<
Box<
dyn Future<
Output = (
PeerId,
PeerRequest<B>,
Result<Result<Vec<u8>, RequestFailure>, oneshot::Canceled>,
),
> + Send,
>,
>;

pub struct SyncingEngine<B: BlockT, Client> {
/// State machine that handles the list of in-progress requests. Only full node peers are
/// registered.
Expand Down Expand Up @@ -313,7 +301,7 @@ pub struct SyncingEngine<B: BlockT, Client> {
last_notification_io: Instant,

/// Pending responses
pending_responses: HashMap<PeerId, PendingResponse<B>>,
pending_responses: PendingResponses<B>,

/// Block downloader
block_downloader: Arc<dyn BlockDownloader<B>>,
Expand Down Expand Up @@ -509,7 +497,7 @@ where
} else {
None
},
pending_responses: HashMap::new(),
pending_responses: PendingResponses::new(),
block_downloader,
state_request_protocol_name,
warp_sync_protocol_name,
Expand Down Expand Up @@ -888,7 +876,9 @@ where
self.send_chain_sync_requests();

// Poll & process pending responses.
let _ = self.poll_pending_responses(cx);
while let Poll::Ready(Some(event)) = self.pending_responses.poll_next_unpin(cx) {
self.process_response_event(event);
}

// Poll block announce validations last, because if a block announcement was received
// through the event stream between `SyncingEngine` and `Protocol` and the validation
Expand Down Expand Up @@ -1097,19 +1087,12 @@ where
}

let downloader = self.block_downloader.clone();
if let Some(_) = self.pending_responses.insert(

self.pending_responses.insert(
dmitry-markin marked this conversation as resolved.
Show resolved Hide resolved
peer_id,
Box::pin(async move {
(
peer_id,
PeerRequest::Block(request.clone()),
downloader.download_blocks(peer_id, request).await,
)
}),
) {
error!(target: LOG_TARGET, "Discarded block pending response from peer {peer_id}");
debug_assert!(false);
}
PeerRequest::Block(request.clone()),
async move { downloader.download_blocks(peer_id, request).await }.boxed(),
);
}

fn send_state_request(&mut self, peer_id: PeerId, request: OpaqueStateRequest) {
Expand All @@ -1121,13 +1104,7 @@ where

let (tx, rx) = oneshot::channel();

if let Some(_) = self
.pending_responses
.insert(peer_id, Box::pin(async move { (peer_id, PeerRequest::State, rx.await) }))
{
error!(target: LOG_TARGET, "Discarded state pending response from peer {peer_id}");
debug_assert!(false);
}
self.pending_responses.insert(peer_id, PeerRequest::State, rx.boxed());

match Self::encode_state_request(&request) {
Ok(data) => {
Expand Down Expand Up @@ -1157,13 +1134,7 @@ where

let (tx, rx) = oneshot::channel();

if let Some(_) = self
.pending_responses
.insert(peer_id, Box::pin(async move { (peer_id, PeerRequest::WarpProof, rx.await) }))
{
error!(target: LOG_TARGET, "Discarded warp proof pending response from peer {peer_id}");
debug_assert!(false);
}
self.pending_responses.insert(peer_id, PeerRequest::WarpProof, rx.boxed());

match &self.warp_sync_protocol_name {
Some(name) => self.network_service.start_request(
Expand Down Expand Up @@ -1199,129 +1170,118 @@ where
Ok(OpaqueStateResponse(Box::new(response)))
}

fn poll_pending_responses(&mut self, cx: &mut std::task::Context) -> Poll<()> {
let ready_responses = self
.pending_responses
.values_mut()
.filter_map(|future| match future.poll_unpin(cx) {
Poll::Pending => None,
Poll::Ready(result) => Some(result),
})
.collect::<Vec<_>>();

for (id, request, response) in ready_responses {
self.pending_responses
.remove(&id)
.expect("Logic error: peer id from pending response is missing in the map.");

match response {
Ok(Ok(resp)) => match request {
PeerRequest::Block(req) => {
match self.block_downloader.block_response_into_blocks(&req, resp) {
Ok(blocks) => {
if let Some((peer_id, new_req)) =
self.chain_sync.on_block_response(id, req, blocks)
{
self.send_block_request(peer_id, new_req);
}
},
Err(BlockResponseError::DecodeFailed(e)) => {
debug!(
target: LOG_TARGET,
"Failed to decode block response from peer {:?}: {:?}.",
id,
e
);
self.network_service.report_peer(id, rep::BAD_MESSAGE);
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
continue
},
Err(BlockResponseError::ExtractionFailed(e)) => {
debug!(
target: LOG_TARGET,
"Failed to extract blocks from peer response {:?}: {:?}.",
id,
e
);
self.network_service.report_peer(id, rep::BAD_MESSAGE);
continue
},
}
},
PeerRequest::State => {
let response = match Self::decode_state_response(&resp[..]) {
Ok(proto) => proto,
Err(e) => {
debug!(
target: LOG_TARGET,
"Failed to decode state response from peer {id:?}: {e:?}.",
);
self.network_service.report_peer(id, rep::BAD_MESSAGE);
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
continue
},
};

self.chain_sync.on_state_response(id, response);
},
PeerRequest::WarpProof => {
self.chain_sync.on_warp_sync_response(id, EncodedProof(resp));
},
},
Ok(Err(e)) => {
debug!(target: LOG_TARGET, "Request to peer {id:?} failed: {e:?}.");

match e {
RequestFailure::Network(OutboundFailure::Timeout) => {
self.network_service.report_peer(id, rep::TIMEOUT);
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
},
RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
self.network_service.report_peer(id, rep::BAD_PROTOCOL);
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
},
RequestFailure::Network(OutboundFailure::DialFailure) => {
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
},
RequestFailure::Refused => {
self.network_service.report_peer(id, rep::REFUSED);
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
},
RequestFailure::Network(OutboundFailure::ConnectionClosed) |
RequestFailure::NotConnected => {
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
fn process_response_event(&mut self, response_event: ResponseEvent<B>) {
let ResponseEvent { peer_id, request, response } = response_event;

match response {
Ok(Ok(resp)) => match request {
PeerRequest::Block(req) => {
match self.block_downloader.block_response_into_blocks(&req, resp) {
Ok(blocks) => {
if let Some((peer_id, new_req)) =
self.chain_sync.on_block_response(peer_id, req, blocks)
{
self.send_block_request(peer_id, new_req);
}
},
RequestFailure::UnknownProtocol => {
debug_assert!(false, "Block request protocol should always be known.");
Err(BlockResponseError::DecodeFailed(e)) => {
debug!(
target: LOG_TARGET,
"Failed to decode block response from peer {:?}: {:?}.",
peer_id,
e
);
self.network_service.report_peer(peer_id, rep::BAD_MESSAGE);
self.network_service.disconnect_peer(
peer_id,
self.block_announce_protocol_name.clone(),
);
return
},
RequestFailure::Obsolete => {
debug_assert!(
false,
"Can not receive `RequestFailure::Obsolete` after dropping the \
response receiver.",
Err(BlockResponseError::ExtractionFailed(e)) => {
debug!(
target: LOG_TARGET,
"Failed to extract blocks from peer response {:?}: {:?}.",
peer_id,
e
);
self.network_service.report_peer(peer_id, rep::BAD_MESSAGE);
return
},
}
},
Err(oneshot::Canceled) => {
trace!(
target: LOG_TARGET,
"Request to peer {id:?} failed due to oneshot being canceled.",
);
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
PeerRequest::State => {
let response = match Self::decode_state_response(&resp[..]) {
Ok(proto) => proto,
Err(e) => {
debug!(
target: LOG_TARGET,
"Failed to decode state response from peer {peer_id:?}: {e:?}.",
);
self.network_service.report_peer(peer_id, rep::BAD_MESSAGE);
self.network_service.disconnect_peer(
peer_id,
self.block_announce_protocol_name.clone(),
);
return
},
};

self.chain_sync.on_state_response(peer_id, response);
},
}
}
PeerRequest::WarpProof => {
self.chain_sync.on_warp_sync_response(peer_id, EncodedProof(resp));
},
},
Ok(Err(e)) => {
debug!(target: LOG_TARGET, "Request to peer {peer_id:?} failed: {e:?}.");

Poll::Pending
match e {
RequestFailure::Network(OutboundFailure::Timeout) => {
self.network_service.report_peer(peer_id, rep::TIMEOUT);
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
},
RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
self.network_service.report_peer(peer_id, rep::BAD_PROTOCOL);
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
},
RequestFailure::Network(OutboundFailure::DialFailure) => {
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
},
RequestFailure::Refused => {
self.network_service.report_peer(peer_id, rep::REFUSED);
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
},
RequestFailure::Network(OutboundFailure::ConnectionClosed) |
RequestFailure::NotConnected => {
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
},
RequestFailure::UnknownProtocol => {
debug_assert!(false, "Block request protocol should always be known.");
},
RequestFailure::Obsolete => {
debug_assert!(
false,
"Can not receive `RequestFailure::Obsolete` after dropping the \
response receiver.",
);
},
}
},
Err(oneshot::Canceled) => {
trace!(
target: LOG_TARGET,
"Request to peer {peer_id:?} failed due to oneshot being canceled.",
);
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
},
}
}

/// Returns the number of peers we're connected to and that are being queried.
Expand Down
1 change: 1 addition & 0 deletions substrate/client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub use service::chain_sync::SyncingService;
mod block_announce_validator;
mod extra_requests;
mod futures_stream;
mod pending_responses;
mod schema;

pub mod block_relay_protocol;
Expand Down
Loading