Skip to content

Commit

Permalink
refactor: reduce spawn task
Browse files Browse the repository at this point in the history
  • Loading branch information
Itsusinn committed Mar 24, 2024
1 parent 270ae0f commit 18677c0
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 55 deletions.
88 changes: 56 additions & 32 deletions clash_lib/src/proxy/tuic/handle_task.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::time::Duration;

use bytes::Bytes;
use quinn::ZeroRttAccepted;

use anyhow::Result;
use std::time::Duration;
use tokio::time;
use tuic::Address;
use tuic_quinn::{Connect, Packet};

Expand Down Expand Up @@ -82,36 +82,6 @@ impl TuicConnection {
}
}

pub async fn dissociate(&self, assoc_id: u16) -> anyhow::Result<()> {
tracing::info!("[udp] [dissociate] [{assoc_id:#06x}]");
match self.inner.dissociate(assoc_id).await {
Ok(()) => Ok(()),
Err(err) => {
tracing::warn!("[udp] [dissociate] [{assoc_id:#06x}] {err}");
Err(anyhow!(err))
}
}
}

pub async fn heartbeat(self, heartbeat: Duration) {
loop {
time::sleep(heartbeat).await;

if self.is_closed() {
break;
}

if self.inner.task_connect_count() + self.inner.task_associate_count() == 0 {
continue;
}

match self.inner.heartbeat().await {
Ok(()) => tracing::trace!("[heartbeat]"),
Err(err) => tracing::error!("[heartbeat] {err}"),
}
}
}

pub async fn incoming_udp(&self, pkt: Packet) {
let assoc_id = pkt.assoc_id();
let pkt_id = pkt.pkt_id();
Expand Down Expand Up @@ -152,4 +122,58 @@ impl TuicConnection {
Err(err) => tracing::error!("[udp] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] packet receiving error: {err}"),
}
}

pub async fn dissociate(&self, assoc_id: u16) -> Result<()> {
tracing::info!("[udp] [dissociate] [{assoc_id:#06x}]");
match self.inner.dissociate(assoc_id).await {
Ok(()) => Ok(()),
Err(err) => {
tracing::warn!("[udp] [dissociate] [{assoc_id:#06x}] {err}");
Err(err)?
}
}
}

async fn heartbeat(&self) -> Result<()> {
self.check_open()?;
if self.inner.task_connect_count() + self.inner.task_associate_count() == 0 {
return Ok(());
}

match self.inner.heartbeat().await {
Ok(()) => tracing::trace!("[heartbeat]"),
Err(err) => tracing::error!("[heartbeat] {err}"),
}
Ok(())
}

fn collect_garbage(&self, gc_lifetime: Duration) -> Result<()> {
self.check_open()?;
tracing::trace!("[gc]");
self.inner.collect_garbage(gc_lifetime);
Ok(())
}
/// Tasks triggered by timer
/// Won't return unless occurs error
pub async fn cyclical_tasks(
self,
heartbeat_interval: Duration,
gc_interval: Duration,
gc_lifetime: Duration,
) -> anyhow::Error {
let mut heartbeat_interval = tokio::time::interval(heartbeat_interval);
let mut gc_interval = tokio::time::interval(gc_interval);
loop {
tokio::select! {
_ = heartbeat_interval.tick() => match self.heartbeat().await {
Ok(_) => { },
Err(err) => break err,
},
_ = gc_interval.tick() => match self.collect_garbage(gc_lifetime) {
Ok(_) => { },
Err(err) => break err,
},
}
}
}
}
8 changes: 2 additions & 6 deletions clash_lib/src/proxy/tuic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl Handler {
.unwrap()
.with_root_certificates(GLOBAL_ROOT_STORE.clone())
.with_no_client_auth();
// aborted by peer: the cryptographic handshake failed: error 120: peer doesn't support any known protocol
// TODO(error-handling) if alpn not match the following error will be throw: aborted by peer: the cryptographic handshake failed: error 120: peer doesn't support any known protocol
crypto.alpn_protocols = opts.alpn.clone();
crypto.enable_early_data = true;
crypto.enable_sni = !opts.disable_sni;
Expand Down Expand Up @@ -196,7 +196,7 @@ impl Handler {
*guard = Some(self.ep.connect().await?);
}
let conn = guard.take().unwrap();
let conn = if conn.is_closed() {
let conn = if conn.check_open().is_err() {
// reconnect
self.ep.connect().await?
} else {
Expand Down Expand Up @@ -240,8 +240,6 @@ impl Handler {

struct TuicDatagramOutbound {
assoc_id: u16,
// conn: TuicConnection,
// dest: tuic::Address,
handle: tokio::task::JoinHandle<Result<()>>,
send_tx: tokio_util::sync::PollSender<UdpPacket>,
recv_rx: tokio::sync::mpsc::Receiver<UdpPacket>,
Expand Down Expand Up @@ -285,8 +283,6 @@ impl TuicDatagramOutbound {
});
let s = Self {
assoc_id,
// conn,
// dest,
handle,
send_tx: tokio_util::sync::PollSender::new(send_tx),
recv_rx,
Expand Down
27 changes: 10 additions & 17 deletions clash_lib/src/proxy/tuic/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,11 @@ pub struct UdpSession {
}

impl TuicConnection {
pub fn is_closed(&self) -> bool {
self.conn.close_reason().is_some()
pub fn check_open(&self) -> Result<()> {
match self.conn.close_reason() {
Some(err) => Err(err)?,
None => Ok(()),
}
}
fn new(
conn: QuinnConnection,
Expand Down Expand Up @@ -149,10 +152,12 @@ impl TuicConnection {
) {
tracing::info!("connection established");

// TODO reduct spawn
// TODO check the cancellation safety of tuic_auth
tokio::spawn(self.clone().tuic_auth(zero_rtt_accepted));
tokio::spawn(self.clone().heartbeat(heartbeat));
tokio::spawn(self.clone().collect_garbage(gc_interval, gc_lifetime));
tokio::spawn(
self.clone()
.cyclical_tasks(heartbeat, gc_interval, gc_lifetime),
);

let err = loop {
tokio::select! {
Expand All @@ -173,18 +178,6 @@ impl TuicConnection {

tracing::warn!("connection error: {err}");
}

async fn collect_garbage(self, gc_interval: Duration, gc_lifetime: Duration) {
let mut interval = tokio::time::interval(gc_interval);
loop {
interval.tick().await;
if self.is_closed() {
break;
}
tracing::trace!("[gc]");
self.inner.collect_garbage(gc_lifetime);
}
}
}

pub struct ServerAddr {
Expand Down

0 comments on commit 18677c0

Please sign in to comment.