Skip to content

Commit

Permalink
Add close RPC connections
Browse files Browse the repository at this point in the history
Added the ability to close RPC connections for a given peer:
- The RPC server can request sessions to be dropped
- When disconnecting a peer connection, all RPC connections will be dropped.
- When dropping a peer connection, all RPC connections will be dropped.
- When dialing a peer (in establishing a new connection), a drop old connections
  switch can be included in the dial request from the client so that only only one
  RPC client connection will ever be active.
  • Loading branch information
hansieodendaal committed Oct 23, 2024
1 parent 2897536 commit 22f13c5
Show file tree
Hide file tree
Showing 39 changed files with 807 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl CommandContext {
let start = Instant::now();
println!("☎️ Dialing peer...");

match connectivity.dial_peer(dest_node_id).await {
match connectivity.dial_peer(dest_node_id, false).await {
Ok(connection) => {
println!("⚡️ Peer connected in {}ms!", start.elapsed().as_millis());
println!("Connection: {}", connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
}

async fn connect_to_sync_peer(&self, peer: NodeId) -> Result<PeerConnection, BlockSyncError> {
let connection = self.connectivity.dial_peer(peer).await?;
let connection = self.connectivity.dial_peer(peer, false).await?;
Ok(connection)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, BlockHeaderSyncError> {
let timer = Instant::now();
debug!(target: LOG_TARGET, "Dialing {} sync peer", node_id);
let conn = self.connectivity.dial_peer(node_id.clone()).await?;
let conn = self.connectivity.dial_peer(node_id.clone(), false).await?;
info!(
target: LOG_TARGET,
"Successfully dialed sync peer {} in {:.2?}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, HorizonSyncError> {
let timer = Instant::now();
debug!(target: LOG_TARGET, "Dialing {} sync peer", node_id);
let conn = self.connectivity.dial_peer(node_id.clone()).await?;
let conn = self.connectivity.dial_peer(node_id.clone(), false).await?;
info!(
target: LOG_TARGET,
"Successfully dialed sync peer {} in {:.2?}",
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/tests/tests/node_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ async fn propagate_and_forward_invalid_block() {
alice_node
.comms
.connectivity()
.dial_peer(bob_node.node_identity.node_id().clone())
.dial_peer(bob_node.node_identity.node_id().clone(), false)
.await
.unwrap();
wait_until_online(&[&alice_node, &bob_node, &carol_node, &dan_node]).await;
Expand Down
10 changes: 7 additions & 3 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ impl WalletConnectivityService {
}

async fn try_setup_rpc_pool(&mut self, peer_node_id: NodeId) -> Result<bool, WalletConnectivityError> {
let conn = match self.try_dial_peer(peer_node_id.clone()).await? {
let conn = match self.try_dial_peer(peer_node_id.clone(), true).await? {
Some(c) => c,
None => {
warn!(target: LOG_TARGET, "Could not dial base node peer '{}'", peer_node_id);
Expand All @@ -413,14 +413,18 @@ impl WalletConnectivityService {
Ok(true)
}

async fn try_dial_peer(&mut self, peer: NodeId) -> Result<Option<PeerConnection>, WalletConnectivityError> {
async fn try_dial_peer(
&mut self,
peer: NodeId,
drop_old_connections: bool,
) -> Result<Option<PeerConnection>, WalletConnectivityError> {
tokio::select! {
biased;

_ = self.base_node_watch_receiver.changed() => {
Ok(None)
}
result = self.connectivity.dial_peer(peer) => {
result = self.connectivity.dial_peer(peer, drop_old_connections) => {
Ok(Some(result?))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,12 @@ where
Ok(())
}

async fn connect_to_peer(&mut self, peer: NodeId) -> Result<PeerConnection, UtxoScannerError> {
async fn new_connection_to_peer(&mut self, peer: NodeId) -> Result<PeerConnection, UtxoScannerError> {
debug!(
target: LOG_TARGET,
"Attempting UTXO sync with seed peer {} ({})", self.peer_index, peer,
);
match self.resources.comms_connectivity.dial_peer(peer.clone()).await {
match self.resources.comms_connectivity.dial_peer(peer.clone(), true).await {
Ok(conn) => Ok(conn),
Err(e) => {
self.publish_event(UtxoScannerEvent::ConnectionFailedToBaseNode {
Expand Down Expand Up @@ -333,7 +333,7 @@ where
&mut self,
peer: &NodeId,
) -> Result<RpcClientLease<BaseNodeWalletRpcClient>, UtxoScannerError> {
let mut connection = self.connect_to_peer(peer.clone()).await?;
let mut connection = self.new_connection_to_peer(peer.clone()).await?;
let client = connection
.connect_rpc_using_builder(BaseNodeWalletRpcClient::builder().with_deadline(Duration::from_secs(60)))
.await?;
Expand Down
8 changes: 4 additions & 4 deletions base_layer/wallet/tests/transaction_service_tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ async fn manage_single_transaction() {

let _peer_connection = bob_comms
.connectivity()
.dial_peer(alice_node_identity.node_id().clone())
.dial_peer(alice_node_identity.node_id().clone(), false)
.await
.unwrap();

Expand Down Expand Up @@ -753,7 +753,7 @@ async fn large_interactive_transaction() {
// Verify that Alice and Bob are connected
let _peer_connection = bob_comms
.connectivity()
.dial_peer(alice_node_identity.node_id().clone())
.dial_peer(alice_node_identity.node_id().clone(), false)
.await
.unwrap();

Expand Down Expand Up @@ -2172,15 +2172,15 @@ async fn manage_multiple_transactions() {

let _peer_connection = bob_comms
.connectivity()
.dial_peer(alice_node_identity.node_id().clone())
.dial_peer(alice_node_identity.node_id().clone(), false)
.await
.unwrap();
sleep(Duration::from_secs(3)).await;

// Connect alice to carol
let _peer_connection = alice_comms
.connectivity()
.dial_peer(carol_node_identity.node_id().clone())
.dial_peer(carol_node_identity.node_id().clone(), false)
.await
.unwrap();

Expand Down
6 changes: 3 additions & 3 deletions base_layer/wallet_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12730,7 +12730,7 @@ mod test {
.block_on(
alice_wallet_comms
.connectivity()
.dial_peer(bob_node_identity.node_id().clone()),
.dial_peer(bob_node_identity.node_id().clone(), false),
)
.is_ok();
}
Expand All @@ -12739,7 +12739,7 @@ mod test {
.block_on(
bob_wallet_comms
.connectivity()
.dial_peer(alice_node_identity.node_id().clone()),
.dial_peer(alice_node_identity.node_id().clone(), false),
)
.is_ok();
}
Expand Down Expand Up @@ -12810,7 +12810,7 @@ mod test {
let bob_comms_dial_peer = bob_wallet_runtime.block_on(
bob_wallet_comms
.connectivity()
.dial_peer(alice_node_identity.node_id().clone()),
.dial_peer(alice_node_identity.node_id().clone(), false),
);
if let Ok(mut connection_to_alice) = bob_comms_dial_peer {
if bob_wallet_runtime
Expand Down
2 changes: 1 addition & 1 deletion comms/core/examples/stress/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ impl StressTestService {
self.comms_node.peer_manager().add_peer(peer).await?;
println!("Dialing peer `{}`...", node_id.short_str());
let start = Instant::now();
let conn = self.comms_node.connectivity().dial_peer(node_id).await?;
let conn = self.comms_node.connectivity().dial_peer(node_id, false).await?;
println!("Dial completed successfully in {:.2?}", start.elapsed());
let outbound_tx = self.outbound_tx.clone();
let inbound_rx = self.inbound_rx.clone();
Expand Down
4 changes: 2 additions & 2 deletions comms/core/src/builder/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ async fn peer_to_peer_custom_protocols() {
let mut conn_man_events2 = comms_node2.subscribe_connection_manager_events();

let mut conn1 = conn_man_requester1
.dial_peer(node_identity2.node_id().clone())
.dial_peer(node_identity2.node_id().clone(), false)
.await
.unwrap();

Expand Down Expand Up @@ -347,7 +347,7 @@ async fn peer_to_peer_messaging_simultaneous() {

comms_node1
.connectivity()
.dial_peer(comms_node2.node_identity().node_id().clone())
.dial_peer(comms_node2.node_identity().node_id().clone(), false)
.await
.unwrap();
// Simultaneously send messages between the two nodes
Expand Down
9 changes: 7 additions & 2 deletions comms/core/src/connection_manager/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub(crate) enum DialerRequest {
Dial(
Box<Peer>,
Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
bool,
),
CancelPendingDial(NodeId),
NotifyNewInboundConnection(Box<PeerConnection>),
Expand Down Expand Up @@ -176,8 +177,8 @@ where
debug!(target: LOG_TARGET, "Connection dialer got request: {:?}", request);

match request {
Dial(peer, reply_tx) => {
self.handle_dial_peer_request(pending_dials, peer, reply_tx);
Dial(peer, reply_tx, drop_old_connections) => {
self.handle_dial_peer_request(pending_dials, peer, reply_tx, drop_old_connections);
},
CancelPendingDial(peer_id) => {
self.cancel_dial(&peer_id);
Expand Down Expand Up @@ -318,6 +319,7 @@ where
pending_dials: &mut DialFuturesUnordered,
peer: Box<Peer>,
reply_tx: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
drop_old_connections: bool,
) {
if self.is_pending_dial(&peer.node_id) {
debug!(
Expand Down Expand Up @@ -371,6 +373,7 @@ where
let result = Self::perform_socket_upgrade_procedure(
&peer_manager,
&node_identity,
drop_old_connections,
socket,
addr.clone(),
authenticated_public_key,
Expand Down Expand Up @@ -421,6 +424,7 @@ where
async fn perform_socket_upgrade_procedure(
peer_manager: &PeerManager,
node_identity: &NodeIdentity,
drop_old_connections: bool,
mut socket: NoiseSocket<TTransport::Output>,
dialed_addr: Multiaddr,
authenticated_public_key: CommsPublicKey,
Expand Down Expand Up @@ -474,6 +478,7 @@ where
muxer,
dialed_addr,
NodeId::from_public_key(&authenticated_public_key),
drop_old_connections,
peer_identity.claim.features,
CONNECTION_DIRECTION,
conn_man_notifier,
Expand Down
1 change: 1 addition & 0 deletions comms/core/src/connection_manager/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ where
muxer,
peer_addr,
peer.node_id.clone(),
false,
peer.features,
CONNECTION_DIRECTION,
conn_man_notifier,
Expand Down
13 changes: 10 additions & 3 deletions comms/core/src/connection_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,17 @@ where
use ConnectionManagerRequest::{CancelDial, DialPeer, NotifyListening};
trace!(target: LOG_TARGET, "Connection manager got request: {:?}", request);
match request {
DialPeer { node_id, reply_tx } => {
DialPeer {
node_id,
reply_tx,
drop_old_connections,
} => {
let tracing_id = tracing::Span::current().id();
let span = span!(Level::TRACE, "connection_manager::handle_request");
span.follows_from(tracing_id);
self.dial_peer(node_id, reply_tx).instrument(span).await
self.dial_peer(node_id, reply_tx, drop_old_connections)
.instrument(span)
.await
},
CancelDial(node_id) => {
if let Err(err) = self.dialer_tx.send(DialerRequest::CancelPendingDial(node_id)).await {
Expand Down Expand Up @@ -500,10 +506,11 @@ where
&mut self,
node_id: NodeId,
reply: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
drop_old_connections: bool,
) {
match self.peer_manager.find_by_node_id(&node_id).await {
Ok(Some(peer)) => {
self.send_dialer_request(DialerRequest::Dial(Box::new(peer), reply))
self.send_dialer_request(DialerRequest::Dial(Box::new(peer), reply, drop_old_connections))
.await;
},
Ok(None) => {
Expand Down
43 changes: 38 additions & 5 deletions comms/core/src/connection_manager/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use std::{
use futures::{future::BoxFuture, stream::FuturesUnordered};
use log::*;
use multiaddr::Multiaddr;
use tari_shutdown::oneshot_trigger::OneshotTrigger;
use tokio::{
sync::{mpsc, oneshot},
time,
Expand Down Expand Up @@ -71,6 +72,7 @@ pub fn create(
connection: Yamux,
peer_addr: Multiaddr,
peer_node_id: NodeId,
drop_old_connections: bool,
peer_features: PeerFeatures,
direction: ConnectionDirection,
event_notifier: mpsc::Sender<ConnectionManagerEvent>,
Expand All @@ -90,6 +92,7 @@ pub fn create(
id,
peer_tx,
peer_node_id.clone(),
drop_old_connections,
peer_features,
peer_addr,
direction,
Expand Down Expand Up @@ -130,20 +133,24 @@ pub type ConnectionId = usize;
pub struct PeerConnection {
id: ConnectionId,
peer_node_id: NodeId,
drop_old_connections: bool,
peer_features: PeerFeatures,
request_tx: mpsc::Sender<PeerConnectionRequest>,
address: Arc<Multiaddr>,
direction: ConnectionDirection,
started_at: Instant,
substream_counter: AtomicRefCounter,
handle_counter: Arc<()>,
drop_notifier: OneshotTrigger<NodeId>,
number_of_rpc_clients: Option<usize>,
}

impl PeerConnection {
pub(crate) fn new(
id: ConnectionId,
request_tx: mpsc::Sender<PeerConnectionRequest>,
peer_node_id: NodeId,
drop_old_connections: bool,
peer_features: PeerFeatures,
address: Multiaddr,
direction: ConnectionDirection,
Expand All @@ -153,12 +160,15 @@ impl PeerConnection {
id,
request_tx,
peer_node_id,
drop_old_connections,
peer_features,
address: Arc::new(address),
direction,
started_at: Instant::now(),
substream_counter,
handle_counter: Arc::new(()),
drop_notifier: OneshotTrigger::<NodeId>::new(),
number_of_rpc_clients: None,
}
}

Expand Down Expand Up @@ -249,16 +259,21 @@ impl PeerConnection {
let protocol = ProtocolId::from_static(T::PROTOCOL_NAME);
debug!(
target: LOG_TARGET,
"Attempting to establish RPC protocol `{}` to peer `{}`",
String::from_utf8_lossy(&protocol),
self.peer_node_id
"Attempting to establish RPC protocol `{}` to peer `{}` (drop_old_connections {})",
String::from_utf8_lossy(&protocol), self.peer_node_id, self.drop_old_connections
);
let framed = self.open_framed_substream(&protocol, RPC_MAX_FRAME_SIZE).await?;
builder

let rpc_client = builder
.with_protocol_id(protocol)
.with_node_id(self.peer_node_id.clone())
.with_drop_old_connections(self.drop_old_connections)
.with_drop_receiver(self.drop_notifier.clone())
.connect(framed)
.await
.await?;
self.number_of_rpc_clients = Some(self.number_of_rpc_clients.unwrap_or(0) + 1);

Ok(rpc_client)
}

/// Creates a new RpcClientPool that can be shared between tasks. The client pool will lazily establish up to
Expand Down Expand Up @@ -298,6 +313,24 @@ impl PeerConnection {
}
}

impl Drop for PeerConnection {
fn drop(&mut self) {
trace!(
target: LOG_TARGET,
"PeerConnection `{}` drop called, still has {} sub-streams and {} handles open",
self.peer_node_id, self.substream_count(), self.handle_count(),
);
if let Some(number_of_rpc_clients) = self.number_of_rpc_clients {
self.drop_notifier.broadcast(self.peer_node_id.clone());
trace!(
target: LOG_TARGET,
"PeerConnection `{}` on drop notified {} RPC clients to drop connection",
self.peer_node_id.clone(), number_of_rpc_clients,
);
}
}
}

impl fmt::Display for PeerConnection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(
Expand Down
Loading

0 comments on commit 22f13c5

Please sign in to comment.