Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disconnect nodes in two steps in the full node #3050

Merged
merged 2 commits into from
Nov 30, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 112 additions & 51 deletions bin/full-node/src/run/consensus_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,15 +286,21 @@ struct SyncBackground {
/// State machine containing the list of all the peers, all the non-finalized blocks, and all
/// the network requests in progress.
///
/// Each peer holds an `Option<PeerId>` containing either its `PeerId` for a networking peer,
/// or `None` if this is the "special peer" representing the local block authoring. Only one
/// peer must contain `None` and its id must be [`SyncBackground::block_author_sync_source`].
/// Each peer holds a struct containing either information about a networking peer, or `None`
/// if this is the "special source" representing the local block authoring. Only one source
/// must contain `None` and its id must be [`SyncBackground::block_author_sync_source`].
///
/// Some of the sources can represent networking peers that have already been disconnected. If
/// that is the case, no new request is started against these sources but existing requests
/// are allowed to finish.
/// This "trick" is necessary in order to not cancel requests that have already been started
/// against a peer when it disconnects and that might already have a response.
///
/// Each on-going request has a corresponding future within
/// [`SyncBackground::block_requests_finished`]. This future is wrapped within an aborter, and
/// the an `AbortHandle` is held within this state machine. It can be used to abort the
/// request if necessary.
sync: all::AllSync<future::AbortHandle, Option<libp2p::PeerId>, ()>,
sync: all::AllSync<future::AbortHandle, Option<NetworkSourceInfo>, ()>,

/// Source within the [`SyncBackground::sync`] to use to import locally-authored blocks.
block_author_sync_source: all::SourceId,
Expand Down Expand Up @@ -358,6 +364,7 @@ struct SyncBackground {
'static,
(
all::RequestId,
all::SourceId,
Result<
Result<Vec<BlockData>, network_service::BlocksRequestError>,
future::Aborted,
Expand All @@ -373,6 +380,16 @@ struct SyncBackground {
jaeger_service: Arc<jaeger_service::JaegerService>,
}

/// Information about a source in the sync state machine.
#[derive(Debug, Clone)]
struct NetworkSourceInfo {
/// Identity of the peer according to the networking.
peer_id: libp2p::PeerId,
/// If `true`, this peer is considered disconnected by the network, and no new request should
/// be started against it.
is_disconnected: bool,
}

impl SyncBackground {
#[tracing::instrument(level = "trace", skip(self))]
async fn run(mut self) {
Expand Down Expand Up @@ -502,16 +519,41 @@ impl SyncBackground {
network_service::Event::Connected { peer_id, chain_index, best_block_number, best_block_hash }
if chain_index == self.network_chain_index =>
{
let id = self.sync.add_source(Some(peer_id.clone()), best_block_number, best_block_hash);
self.peers_source_id_map.insert(peer_id, id);
// Most of the time, we insert a new source in the state machine.
// However, a source of that `PeerId` might already exist but be
// considered as disconnected. If that is the case, we simply mark it
// as no longer disconnected.
match self.peers_source_id_map.entry(peer_id) {
hashbrown::hash_map::Entry::Occupied(entry) => {
let id = *entry.get();
let is_disconnected = &mut self.sync[id].as_mut().unwrap().is_disconnected;
debug_assert!(*is_disconnected);
*is_disconnected = false;
}
hashbrown::hash_map::Entry::Vacant(entry) => {
let id = self.sync.add_source(Some(NetworkSourceInfo {
peer_id: entry.key().clone(),
is_disconnected: false,
}), best_block_number, best_block_hash);
entry.insert(id);
}
}
},
network_service::Event::Disconnected { peer_id, chain_index }
if chain_index == self.network_chain_index =>
{
let id = self.peers_source_id_map.remove(&peer_id).unwrap();
let (_, requests) = self.sync.remove_source(id);
for (_, abort) in requests {
abort.abort();
// Sources that disconnect are only immediately removed from the sync
// state machine if they have no request in progress. If that is not
// the case, they are instead only marked as disconnected.
let id = *self.peers_source_id_map.get(&peer_id).unwrap();
if self.sync.source_num_ongoing_requests(id) == 0 {
self.peers_source_id_map.remove(&peer_id).unwrap();
let (_, mut _requests) = self.sync.remove_source(id);
debug_assert!(_requests.next().is_none());
} else {
let is_disconnected = &mut self.sync[id].as_mut().unwrap().is_disconnected;
debug_assert!(!*is_disconnected);
*is_disconnected = true;
}
},
network_service::Event::BlockAnnounce { chain_index, peer_id, header, is_best }
Expand Down Expand Up @@ -539,7 +581,7 @@ impl SyncBackground {
}
},

(request_id, result) = self.block_requests_finished.select_next_some() => {
(request_id, source_id, result) = self.block_requests_finished.select_next_some() => {
// `result` is an error if the block request got cancelled by the sync state
// machine.
// TODO: clarify this piece of code
Expand All @@ -559,6 +601,16 @@ impl SyncBackground {
| all::ResponseOutcome::AllAlreadyInChain { .. } => {
}
}

// If the source was actually disconnected and has no other request in
// progress, we clean it up.
if self.sync[source_id].as_ref().map_or(false, |info| info.is_disconnected)
&& self.sync.source_num_ongoing_requests(source_id) == 0
{
let (info, mut _requests) = self.sync.remove_source(source_id);
debug_assert!(_requests.next().is_none());
self.peers_source_id_map.remove(&info.unwrap().peer_id).unwrap();
}
}
},
}
Expand Down Expand Up @@ -815,44 +867,48 @@ impl SyncBackground {
// `desired_requests()` returns, in decreasing order of priority, the requests
// that should be started in order for the syncing to proceed. We simply pick the
// first request, but enforce one ongoing request per source.
let (source_id, _, mut request_info) =
match self
.sync
.desired_requests()
.find(|(source_id, _, request_details)| {
if *source_id != self.block_author_sync_source {
// Remote source.
self.sync.source_num_ongoing_requests(*source_id) == 0
} else {
// Locally-authored blocks source.
match (request_details, &self.authored_block) {
(
all::DesiredRequest::BlocksRequest {
first_block_hash: None,
first_block_height,
..
},
Some((authored_height, _, _, _)),
) if first_block_height == authored_height => true,
(
all::DesiredRequest::BlocksRequest {
first_block_hash: Some(first_block_hash),
first_block_height,
..
},
Some((authored_height, authored_hash, _, _)),
) if first_block_hash == authored_hash
&& first_block_height == authored_height =>
{
true
}
_ => false,
let (source_id, _, mut request_info) = match self.sync.desired_requests().find(
|(source_id, source_info, request_details)| {
if source_info
.as_ref()
.map_or(false, |info| info.is_disconnected)
{
// Source is a networking source that has already been disconnected.
false
} else if *source_id != self.block_author_sync_source {
// Remote source.
self.sync.source_num_ongoing_requests(*source_id) == 0
} else {
// Locally-authored blocks source.
match (request_details, &self.authored_block) {
(
all::DesiredRequest::BlocksRequest {
first_block_hash: None,
first_block_height,
..
},
Some((authored_height, _, _, _)),
) if first_block_height == authored_height => true,
(
all::DesiredRequest::BlocksRequest {
first_block_hash: Some(first_block_hash),
first_block_height,
..
},
Some((authored_height, authored_hash, _, _)),
) if first_block_hash == authored_hash
&& first_block_height == authored_height =>
{
true
}
_ => false,
}
}) {
Some(v) => v,
None => break,
};
}
},
) {
Some(v) => v,
None => break,
};

// Before notifying the syncing of the request, clamp the number of blocks to the
// number of blocks we expect to receive.
Expand Down Expand Up @@ -897,7 +953,12 @@ impl SyncBackground {
request_bodies,
request_justification,
} => {
let peer_id = self.sync[source_id].clone().unwrap();
let peer_id = {
let info = self.sync[source_id].clone().unwrap();
// Disconnected sources are filtered out above.
debug_assert!(!info.is_disconnected);
info.peer_id
};

// TODO: add jaeger span

Expand Down Expand Up @@ -933,7 +994,7 @@ impl SyncBackground {
let request_id = self.sync.add_request(source_id, request_info.into(), abort);

self.block_requests_finished
.push(request.map(move |r| (request_id, r)).boxed());
.push(request.map(move |r| (request_id, source_id, r)).boxed());
}
all::DesiredRequest::GrandpaWarpSync { .. }
| all::DesiredRequest::StorageGet { .. }
Expand Down Expand Up @@ -1059,8 +1120,8 @@ impl SyncBackground {

for source_id in sources_to_announce_to {
let peer_id = match &self.sync[source_id] {
Some(pid) => pid,
None => continue,
Some(info) if !info.is_disconnected => &info.peer_id,
_ => continue,
};

if self
Expand Down