Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(iroh-net): Keep connection name, remove connection count #2779

Merged
merged 7 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions iroh-net/src/magicsock/relay_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,7 @@ impl ActiveRelay {
Ok(())
}

async fn handle_relay_msg(
&mut self,
msg: Result<(ReceivedMessage, usize), ClientError>,
) -> ReadResult {
async fn handle_relay_msg(&mut self, msg: Result<ReceivedMessage, ClientError>) -> ReadResult {
match msg {
Err(err) => {
warn!("recv error {:?}", err);
Expand Down Expand Up @@ -200,7 +197,7 @@ impl ActiveRelay {
None => ReadResult::Break,
}
}
Ok((msg, _conn_gen)) => {
Ok(msg) => {
// reset
self.backoff.reset();
let now = Instant::now();
Expand Down
125 changes: 57 additions & 68 deletions iroh-net/src/relay/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,12 @@ use tokio_util::task::AbortOnDropHandle;
use tracing::{debug, error, event, info_span, trace, warn, Instrument, Level};
use url::Url;

use conn::{
Conn as RelayClient, ConnBuilder as RelayClientBuilder, ConnReader,
ConnReceiver as RelayClientReceiver, ConnWriter, ReceivedMessage,
};
use conn::{Conn, ConnBuilder, ConnReader, ConnReceiver, ConnWriter, ReceivedMessage};
use streams::{downcast_upgrade, MaybeTlsStream, ProxyStream};

use crate::defaults::timeouts::relay::*;
use crate::dns::{DnsResolver, ResolverExt};
use crate::key::{PublicKey, SecretKey};
use crate::key::{NodeId, PublicKey, SecretKey};
use crate::relay::codec::DerpCodec;
use crate::relay::http::{Protocol, RELAY_PATH};
use crate::relay::RelayUrl;
Expand Down Expand Up @@ -140,7 +137,7 @@ pub struct Client {

#[derive(Debug)]
enum ActorMessage {
Connect(oneshot::Sender<Result<(RelayClient, usize), ClientError>>),
Connect(oneshot::Sender<Result<Conn, ClientError>>),
NotePreferred(bool),
LocalAddr(oneshot::Sender<Result<Option<SocketAddr>, ClientError>>),
Ping(oneshot::Sender<Result<Duration, ClientError>>),
Expand All @@ -154,19 +151,18 @@ enum ActorMessage {
/// Receiving end of a [`Client`].
#[derive(Debug)]
pub struct ClientReceiver {
msg_receiver: mpsc::Receiver<Result<(ReceivedMessage, usize), ClientError>>,
msg_receiver: mpsc::Receiver<Result<ReceivedMessage, ClientError>>,
}

#[derive(derive_more::Debug)]
struct Actor {
secret_key: SecretKey,
can_ack_pings: bool,
is_preferred: bool,
relay_client: Option<(RelayClient, RelayClientReceiver)>,
relay_conn: Option<(Conn, ConnReceiver)>,
is_closed: bool,
#[debug("address family selector callback")]
address_family_selector: Option<Box<dyn Fn() -> BoxFuture<bool> + Send + Sync + 'static>>,
conn_gen: usize,
url: RelayUrl,
protocol: Protocol,
#[debug("TlsConnector")]
Expand Down Expand Up @@ -334,10 +330,9 @@ impl ClientBuilder {
secret_key: key,
can_ack_pings: self.can_ack_pings,
is_preferred: self.is_preferred,
relay_client: None,
relay_conn: None,
is_closed: false,
address_family_selector: self.address_family_selector,
conn_gen: 0,
pings: PingTracker::default(),
ping_tasks: Default::default(),
url: self.url,
Expand Down Expand Up @@ -371,9 +366,8 @@ impl ClientBuilder {
}

impl ClientReceiver {
/// Reads a message from the server. Returns the message and the `conn_get`, or the number of
/// re-connections this Client has ever made
pub async fn recv(&mut self) -> Option<Result<(ReceivedMessage, usize), ClientError>> {
/// Reads a message from the server.
pub async fn recv(&mut self) -> Option<Result<ReceivedMessage, ClientError>> {
self.msg_receiver.recv().await
}
}
Expand All @@ -399,13 +393,13 @@ impl Client {
}
}

/// Connect to a relay Server and returns the underlying relay Client.
/// Connects to a relay Server and returns the underlying relay connection.
///
/// Returns [`ClientError::Closed`] if the [`Client`] is closed.
///
/// If there is already an active relay connection, returns the already
/// connected [`crate::relay::RelayConn`].
pub async fn connect(&self) -> Result<(RelayClient, usize), ClientError> {
pub async fn connect(&self) -> Result<Conn, ClientError> {
self.send_actor(ActorMessage::Connect).await
}

Expand Down Expand Up @@ -475,7 +469,7 @@ impl Actor {
async fn run(
mut self,
mut inbox: mpsc::Receiver<ActorMessage>,
msg_sender: mpsc::Sender<Result<(ReceivedMessage, usize), ClientError>>,
msg_sender: mpsc::Sender<Result<ReceivedMessage, ClientError>>,
) {
// Add an initial connection attempt.
if let Err(err) = self.connect("initial connect").await {
Expand All @@ -485,7 +479,7 @@ impl Actor {
loop {
tokio::select! {
res = self.recv_detail() => {
if let Ok((ReceivedMessage::Pong(ping), _)) = res {
if let Ok(ReceivedMessage::Pong(ping)) = res {
match self.pings.unregister(ping, "pong") {
Some(chan) => {
if chan.send(()).is_err() {
Expand All @@ -503,7 +497,7 @@ impl Actor {
Some(msg) = inbox.recv() => {
match msg {
ActorMessage::Connect(s) => {
let res = self.connect("actor msg").await.map(|(client, _, count)| (client, count));
let res = self.connect("actor msg").await.map(|(client, _)| (client));
s.send(res).ok();
},
ActorMessage::NotePreferred(is_preferred) => {
Expand Down Expand Up @@ -549,46 +543,51 @@ impl Actor {
}
}

/// Returns a connection to the relay.
///
/// If the client is currently connected, the existing connection is returned; otherwise,
/// a new connection is made.
///
/// Returns:
/// - A clonable connection object which can send DISCO messages to the relay.
/// - A reference to a channel receiving DISCO messages from the relay.
async fn connect(
&mut self,
why: &'static str,
) -> Result<(RelayClient, &'_ mut RelayClientReceiver, usize), ClientError> {
) -> Result<(Conn, &'_ mut ConnReceiver), ClientError> {
debug!(
"connect: {}, current client {}",
why,
self.relay_client.is_some()
self.relay_conn.is_some()
);

if self.is_closed {
return Err(ClientError::Closed);
}
async move {
if self.relay_client.is_none() {
if self.relay_conn.is_none() {
trace!("no connection, trying to connect");
let (relay_client, receiver) =
tokio::time::timeout(CONNECT_TIMEOUT, self.connect_0())
.await
.map_err(|_| ClientError::ConnectTimeout)??;
let (conn, receiver) = tokio::time::timeout(CONNECT_TIMEOUT, self.connect_0())
.await
.map_err(|_| ClientError::ConnectTimeout)??;

self.relay_client = Some((relay_client.clone(), receiver));
self.next_conn();
self.relay_conn = Some((conn, receiver));
} else {
trace!("already had connection");
}
let count = self.current_conn();
let (relay_client, receiver) = self
.relay_client
let (conn, receiver) = self
.relay_conn
.as_mut()
.map(|(c, r)| (c.clone(), r))
.expect("just checked");

Ok((relay_client, receiver, count))
Ok((conn, receiver))
}
.instrument(info_span!("connect"))
.await
}

async fn connect_0(&self) -> Result<(RelayClient, RelayClientReceiver), ClientError> {
async fn connect_0(&self) -> Result<(Conn, ConnReceiver), ClientError> {
let (reader, writer, local_addr) = match self.protocol {
Protocol::Websocket => {
let (reader, writer) = self.connect_ws().await?;
Expand All @@ -601,14 +600,14 @@ impl Actor {
}
};

let (relay_client, receiver) =
RelayClientBuilder::new(self.secret_key.clone(), local_addr, reader, writer)
let (conn, receiver) =
ConnBuilder::new(self.secret_key.clone(), local_addr, reader, writer)
.build()
.await
.map_err(|e| ClientError::Build(e.to_string()))?;

if self.is_preferred && relay_client.note_preferred(true).await.is_err() {
relay_client.close().await;
if self.is_preferred && conn.note_preferred(true).await.is_err() {
conn.close().await;
return Err(ClientError::Send);
}

Expand All @@ -620,7 +619,7 @@ impl Actor {
);

trace!("connect_0 done");
Ok((relay_client, receiver))
Ok((conn, receiver))
}

async fn connect_ws(&self) -> Result<(ConnReader, ConnWriter), ClientError> {
Expand Down Expand Up @@ -732,8 +731,8 @@ impl Actor {

// only send the preference if we already have a connection
let res = {
if let Some((ref client, _)) = self.relay_client {
client.note_preferred(is_preferred).await
if let Some((ref conn, _)) = self.relay_conn {
conn.note_preferred(is_preferred).await
} else {
return;
}
Expand All @@ -749,23 +748,23 @@ impl Actor {
if self.is_closed {
return None;
}
if let Some((ref client, _)) = self.relay_client {
client.local_addr()
if let Some((ref conn, _)) = self.relay_conn {
conn.local_addr()
} else {
None
}
}

async fn ping(&mut self, s: oneshot::Sender<Result<Duration, ClientError>>) {
let connect_res = self.connect("ping").await.map(|(c, _, _)| c);
let connect_res = self.connect("ping").await.map(|(c, _)| c);
let (ping, recv) = self.pings.register();
trace!("ping: {}", hex::encode(ping));

self.ping_tasks.spawn(async move {
let res = match connect_res {
Ok(client) => {
Ok(conn) => {
let start = Instant::now();
if let Err(err) = client.send_ping(ping).await {
if let Err(err) = conn.send_ping(ping).await {
warn!("failed to send ping: {:?}", err);
Err(ClientError::Send)
} else {
Expand All @@ -782,10 +781,10 @@ impl Actor {
});
}

async fn send(&mut self, dst_key: PublicKey, b: Bytes) -> Result<(), ClientError> {
trace!(dst = %dst_key.fmt_short(), len = b.len(), "send");
let (client, _, _) = self.connect("send").await?;
if client.send(dst_key, b).await.is_err() {
async fn send(&mut self, remote_node: NodeId, payload: Bytes) -> Result<(), ClientError> {
trace!(remote_node = %remote_node.fmt_short(), len = payload.len(), "send");
let (conn, _) = self.connect("send").await?;
if conn.send(remote_node, payload).await.is_err() {
self.close_for_reconnect().await;
return Err(ClientError::Send);
}
Expand All @@ -795,8 +794,8 @@ impl Actor {
async fn send_pong(&mut self, data: [u8; 8]) -> Result<(), ClientError> {
debug!("send_pong");
if self.can_ack_pings {
let (client, _, _) = self.connect("send_pong").await?;
if client.send_pong(data).await.is_err() {
let (conn, _) = self.connect("send_pong").await?;
if conn.send_pong(data).await.is_err() {
self.close_for_reconnect().await;
return Err(ClientError::Send);
}
Expand All @@ -817,16 +816,7 @@ impl Actor {
if self.is_closed {
return false;
}
self.relay_client.is_some()
}

fn current_conn(&self) -> usize {
self.conn_gen
}

fn next_conn(&mut self) -> usize {
self.conn_gen = self.conn_gen.wrapping_add(1);
self.conn_gen
self.relay_conn.is_some()
}

fn tls_servername(&self) -> Option<rustls::pki_types::ServerName> {
Expand Down Expand Up @@ -987,13 +977,12 @@ impl Actor {
}
}

async fn recv_detail(&mut self) -> Result<(ReceivedMessage, usize), ClientError> {
if let Some((_client, client_receiver)) = self.relay_client.as_mut() {
async fn recv_detail(&mut self) -> Result<ReceivedMessage, ClientError> {
if let Some((_conn, conn_receiver)) = self.relay_conn.as_mut() {
trace!("recv_detail tick");
match client_receiver.recv().await {
match conn_receiver.recv().await {
Ok(msg) => {
let current_gen = self.current_conn();
return Ok((msg, current_gen));
return Ok(msg);
}
Err(e) => {
self.close_for_reconnect().await;
Expand All @@ -1012,8 +1001,8 @@ impl Actor {
/// requires a connection, it will call `connect`.
async fn close_for_reconnect(&mut self) {
debug!("close for reconnect");
if let Some((client, _)) = self.relay_client.take() {
client.close().await
if let Some((conn, _)) = self.relay_conn.take() {
conn.close().await
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion iroh-net/src/relay/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ pub struct Conn {
inner: Arc<ConnTasks>,
}

/// The channel on which a relay connection sends received messages.
///
/// The [`Conn`] to a relay is easily clonable but can only send DISCO messages to a relay
/// server. This is the counterpart which receives DISCO messages from the relay server for
/// a connection. It is not clonable.
#[derive(Debug)]
pub struct ConnReceiver {
/// The reader channel, receiving incoming messages.
Expand Down Expand Up @@ -376,7 +381,7 @@ impl ConnBuilder {
recv_msgs: writer_recv,
}
.run()
.instrument(info_span!("client.writer")),
.instrument(info_span!("conn.writer")),
);

let (reader_sender, reader_recv) = mpsc::channel(PER_CLIENT_READ_QUEUE_DEPTH);
Expand Down Expand Up @@ -412,6 +417,7 @@ impl ConnBuilder {
}
}
}
.instrument(info_span!("conn.reader"))
});

let conn = Conn {
Expand Down
Loading
Loading