Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

litep2p/peerset: Do not disconnect all peers on SetReservedPeers command #6016

Merged
merged 30 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3b98cd4
litep2p/peerset: Reserved peers are already inserted by
lexnv Oct 10, 2024
3bb1b99
litep2p/peerset: Disconnect only different reserved peers
lexnv Oct 10, 2024
6f0dd71
litep2p/peerset: Preserve previously reserve nodes if slots
lexnv Oct 10, 2024
c6fe294
litep2p/peerset: Preserve reserve peer state
lexnv Oct 10, 2024
513e183
litep2p/peerset: Exclude reserved peers from connection candidates
lexnv Oct 10, 2024
513b042
litep2p/peerset: Adjust reserved state and more documentation
lexnv Oct 10, 2024
1bceba0
litep2p/peerset: Simplify PeerState::Connected transition from rm
lexnv Oct 10, 2024
89d2f28
litep2p/peerset: Simplify PeerState::Opening transition from rm reserved
lexnv Oct 10, 2024
b9b7331
litep2p: Fix build
lexnv Oct 10, 2024
8ea7cb1
litep2p/peerset: Preserve direction of reserved peers
lexnv Oct 11, 2024
db73cda
peerset/tests: Adjust testing to new implementation
lexnv Oct 11, 2024
036e58d
peerset/tests: Ensure SetReserved moves peers if capacity
lexnv Oct 11, 2024
587bcde
peerset/tests: Ensure reserved peers are disconnected if no capacity
lexnv Oct 11, 2024
ecf2c1c
litep2p/peerset: Remove unneeded fn
lexnv Oct 11, 2024
3eb80d4
peerset/tests: Double check peer numbers
lexnv Oct 11, 2024
df8e802
Add Prdoc
lexnv Oct 11, 2024
9db9a09
Merge branch 'master' into lexnv/litep2p-peerset
lexnv Oct 11, 2024
f430f6d
Update prdoc/pr_6016.prdoc
lexnv Nov 4, 2024
cb02320
Update substrate/client/network/src/litep2p/shim/notification/peerset.rs
lexnv Nov 4, 2024
d78a860
Update substrate/client/network/src/litep2p/shim/notification/peerset.rs
lexnv Nov 4, 2024
8dd82d0
Merge remote-tracking branch 'origin/master' into lexnv/litep2p-peerset
lexnv Nov 4, 2024
b2159a1
Update prdoc
lexnv Nov 4, 2024
04f2d3a
Fix build
lexnv Nov 4, 2024
0d7a2a3
Update substrate/client/network/src/litep2p/shim/notification/peerset.rs
lexnv Nov 5, 2024
35cfbff
peerset/tests: Adjust comment outbound peers -> reserved peers
lexnv Nov 5, 2024
215e522
peerset/litep2p: Ensure common peer is not disconnected
lexnv Nov 5, 2024
7dbf1f0
peerset: Move duplicated check for disconnection to fn
lexnv Nov 5, 2024
4ee26a3
peerset: Increment slot count based on direction
lexnv Nov 5, 2024
280cc87
Merge remote-tracking branch 'origin/master' into lexnv/litep2p-peerset
lexnv Nov 5, 2024
0a34ad1
Merge branch 'master' into lexnv/litep2p-peerset
lexnv Nov 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions prdoc/pr_6016.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
title: Litep2p do not disconnect all peers on SetReservedPeers command
lexnv marked this conversation as resolved.
Show resolved Hide resolved

doc:
- audience: [ Node Dev, Node Operator ]
description: |
Previously, when the `SetReservedPeers` was received, all peers except the new
reserved peers were disconnected.
This PR ensures that previously reserved nodes are kept connected as regular nodes if
enough slots are available.
While at it, this PR excludes reserved peers from the candidates of peers obtained from
the peerstore.

crates:
- name: sc-network
bump: patch
292 changes: 148 additions & 144 deletions substrate/client/network/src/litep2p/shim/notification/peerset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ const DISCONNECT_ADJUSTMENT: Reputation = Reputation::new(-256, "Peer disconnect
const OPEN_FAILURE_ADJUSTMENT: Reputation = Reputation::new(-1024, "Open failure");

/// Is the peer reserved?
///
/// Regular peers count towards slot allocation.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Reserved {
Yes,
Expand Down Expand Up @@ -118,6 +120,15 @@ pub enum Direction {
Outbound(Reserved),
}

impl Direction {
fn set_reserved(&mut self, new_reserved: Reserved) {
match self {
Direction::Inbound(ref mut reserved) => *reserved = new_reserved,
Direction::Outbound(ref mut reserved) => *reserved = new_reserved,
lexnv marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

impl From<Direction> for traits::Direction {
fn from(direction: Direction) -> traits::Direction {
match direction {
Expand Down Expand Up @@ -784,7 +795,9 @@ impl Peerset {
}

/// Calculate how many of the connected peers were counted as normal inbound/outbound peers
/// which is needed to adjust slot counts when new reserved peers are added
/// which is needed to adjust slot counts when new reserved peers are added.
///
/// If the peer is not already in the [`Peerset`], it is added as a disconnected peer.
fn calculate_slot_adjustment<'a>(
&'a mut self,
peers: impl Iterator<Item = &'a PeerId>,
Expand Down Expand Up @@ -949,8 +962,9 @@ impl Stream for Peerset {
},
// set new reserved peers for the protocol
//
// current reserved peers not in the new set are disconnected and the new reserved
// peers are scheduled for outbound substreams
// Current reserved peers not in the new set are moved to the regular set of peers
// or disconnected (if there are no slots available). The new reserved peers are
// scheduled for outbound substreams
PeersetCommand::SetReservedPeers { peers } => {
log::debug!(target: LOG_TARGET, "{}: set reserved peers {peers:?}", self.protocol);

Expand All @@ -960,39 +974,64 @@ impl Stream for Peerset {
//
// calculate how many of the previously connected peers were counted as regular
// peers and substract these counts from `num_out`/`num_in`
//
// If a reserved peer is not already tracked, it is added as disconnected by
// `calculate_slot_adjustment`. This ensures at the next slot allocation (1sec)
// that we'll try to establish a connection with the reserved peer.
let (in_peers, out_peers) = self.calculate_slot_adjustment(peers.iter());
self.num_out -= out_peers;
self.num_in -= in_peers;

// add all unknown peers to `self.peers`
peers.iter().for_each(|peer| {
if !self.peers.contains_key(peer) {
self.peers.insert(*peer, PeerState::Disconnected);
}
});

// collect all peers who are not in the new reserved set
let peers_to_remove = self
.peers
.iter()
.filter_map(|(peer, _)| (!peers.contains(peer)).then_some(*peer))
.collect::<HashSet<_>>();
// collect all *reserved* peers who are not in the new reserved set
let reserved_peers_maybe_remove =
self.reserved_peers.difference(&peers).cloned().collect::<HashSet<_>>();
lexnv marked this conversation as resolved.
Show resolved Hide resolved

self.reserved_peers = peers;

let peers = peers_to_remove
let peers = reserved_peers_maybe_remove
lexnv marked this conversation as resolved.
Show resolved Hide resolved
.into_iter()
.filter(|peer| {
match self.peers.remove(&peer) {
Some(PeerState::Connected { direction }) => {
log::trace!(
target: LOG_TARGET,
"{}: close connection to {peer:?}, direction {direction:?}",
self.protocol,
);

self.peers.insert(*peer, PeerState::Closing { direction });
true
Some(PeerState::Connected { mut direction }) => {
// The direction contains a `Reserved::Yes` flag, because this
// is a reserve peer that we want to close.
// The `Reserved::Yes` ensures we don't adjust the slot count
// when the substream is closed.

let disconnect = self.reserved_only ||
match direction {
Direction::Inbound(_) => self.num_in >= self.max_in,
Direction::Outbound(_) => self.num_out >= self.max_out,
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: some of the blocks seems to be duplicated - maybe moving them to small functions would be better.

The same with increasing out/inbound count.

Not sure however if this would increase readability.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This piece of code was replicated three times. Could be moved into one function that takes a disconnect_state and a keep_state.


if disconnect {
log::trace!(
target: LOG_TARGET,
"{}: close connection to previously reserved {peer:?}, direction {direction:?}",
self.protocol,
);

self.peers.insert(*peer, PeerState::Closing { direction });
true
} else {
log::trace!(
target: LOG_TARGET,
"{}: {peer:?} is no longer reserved, move to regular peers, direction {direction:?}",
self.protocol,
);

// The peer is kept connected as non-reserved. This will
// further count towards the slot count.
direction.set_reserved(Reserved::No);
match direction {
Direction::Inbound(_) => self.num_in += 1,
Direction::Outbound(_) => self.num_out += 1,
}

self.peers
.insert(*peer, PeerState::Connected { direction });
false
}
},
// substream might have been opening but not yet fully open when
// the protocol request the reserved set to be changed
Expand Down Expand Up @@ -1102,6 +1141,7 @@ impl Stream for Peerset {
self.peers.insert(*peer, PeerState::Backoff);
None
},

// if there is a rapid change in substream state, the peer may
// be canceled when the substream is asked to be closed.
//
Expand All @@ -1122,6 +1162,7 @@ impl Stream for Peerset {
self.peers.insert(*peer, PeerState::Canceled { direction });
None
},

// substream to the peer might have failed to open which caused
// the peer to be backed off
//
Expand All @@ -1138,6 +1179,7 @@ impl Stream for Peerset {
self.peers.insert(*peer, PeerState::Disconnected);
None
},

// if a node disconnects, it's put into `PeerState::Closing`
// which indicates that `Peerset` wants the substream closed and
// has asked litep2p to close it but it hasn't yet received a
Expand Down Expand Up @@ -1167,125 +1209,82 @@ impl Stream for Peerset {
// if there are enough slots, the peer is just converted to
// a regular peer and the used slot count is increased and if the
// peer cannot be accepted, litep2p is asked to close the substream.
PeerState::Connected { direction } => match direction {
Direction::Inbound(_) => match self.num_in < self.max_in {
true => {
log::trace!(
target: LOG_TARGET,
"{}: {peer:?} converted to regular inbound peer (inbound open)",
self.protocol,
);

self.num_in += 1;
self.peers.insert(
*peer,
PeerState::Connected {
direction: Direction::Inbound(Reserved::No),
},
);

None
},
false => {
self.peers.insert(
*peer,
PeerState::Closing {
direction: Direction::Inbound(Reserved::Yes),
},
);

Some(*peer)
},
},
Direction::Outbound(_) => match self.num_out < self.max_out {
true => {
log::trace!(
target: LOG_TARGET,
"{}: {peer:?} converted to regular outbound peer (outbound open)",
self.protocol,
);

self.num_out += 1;
self.peers.insert(
*peer,
PeerState::Connected {
direction: Direction::Outbound(Reserved::No),
},
);

None
},
false => {
self.peers.insert(
*peer,
PeerState::Closing {
direction: Direction::Outbound(Reserved::Yes),
},
);

Some(*peer)
},
},
PeerState::Connected { mut direction } => {
let disconnect = match direction {
Direction::Inbound(_) => self.num_in >= self.max_in,
Direction::Outbound(_) => self.num_out >= self.max_out,
};

if disconnect {
log::trace!(
target: LOG_TARGET,
"{}: close connection to removed reserved {peer:?}, direction {direction:?}",
self.protocol,
);

self.peers.insert(*peer, PeerState::Closing { direction });
Some(*peer)
} else {
log::trace!(
target: LOG_TARGET,
"{}: {peer:?} converted to regular peer {peer:?} direction {direction:?}",
self.protocol,
);

// The peer is kept connected as non-reserved. This will
// further count towards the slot count.
direction.set_reserved(Reserved::No);
match direction {
Direction::Inbound(_) => self.num_in += 1,
Direction::Outbound(_) => self.num_out += 1,
}

self.peers
.insert(*peer, PeerState::Connected { direction });

None
}
},
PeerState::Opening { direction } => match direction {
Direction::Inbound(_) => match self.num_in < self.max_in {
true => {
log::trace!(
target: LOG_TARGET,
"{}: {peer:?} converted to regular inbound peer (inbound opening)",
self.protocol,
);

self.num_in += 1;
self.peers.insert(
*peer,
PeerState::Opening {
direction: Direction::Inbound(Reserved::No),
},
);

None
},
false => {
self.peers.insert(
*peer,
PeerState::Canceled {
direction: Direction::Inbound(Reserved::Yes),
},
);

None
},
},
Direction::Outbound(_) => match self.num_out < self.max_out {
true => {
log::trace!(
target: LOG_TARGET,
"{}: {peer:?} converted to regular outbound peer (outbound opening)",
self.protocol,
);

self.num_out += 1;
self.peers.insert(
*peer,
PeerState::Opening {
direction: Direction::Outbound(Reserved::No),
},
);

None
},
false => {
self.peers.insert(
*peer,
PeerState::Canceled {
direction: Direction::Outbound(Reserved::Yes),
},
);

None
},
},

PeerState::Opening { mut direction } => {
let disconnect = match direction {
Direction::Inbound(_) => self.num_in >= self.max_in,
Direction::Outbound(_) => self.num_out >= self.max_out,
};

if disconnect {
log::trace!(
target: LOG_TARGET,
"{}: cancel substream to disconnect removed reserved peer {peer:?}, direction {direction:?}",
self.protocol,
);

self.peers.insert(
*peer,
PeerState::Canceled {
direction
},
);
} else {
log::trace!(
target: LOG_TARGET,
"{}: {peer:?} converted to regular peer {peer:?} direction {direction:?}",
self.protocol,
);

// The peer is kept connected as non-reserved. This will
// further count towards the slot count.
direction.set_reserved(Reserved::No);
match direction {
Direction::Inbound(_) => self.num_in += 1,
Direction::Outbound(_) => self.num_out += 1,
}

self.peers
.insert(*peer, PeerState::Opening { direction });
}

None
},
}
})
Expand Down Expand Up @@ -1373,12 +1372,17 @@ impl Stream for Peerset {
// if the number of outbound peers is lower than the desired amount of outbound peers,
// query `PeerStore` and try to get a new outbound candidated.
if self.num_out < self.max_out && !self.reserved_only {
// From the candidates offered by the peerstore we need to ignore:
// - all peers that are not in the `PeerState::Disconnected` state (ie they are
// connected / closing)
// - reserved peers since we initiated a connection to them in the previous step
let ignore: HashSet<PeerId> = self
.peers
.iter()
.filter_map(|(peer, state)| {
(!std::matches!(state, PeerState::Disconnected)).then_some(*peer)
})
.chain(self.reserved_peers.iter().cloned())
.collect();

let peers: Vec<_> =
Expand Down
Loading
Loading