Skip to content

Commit

Permalink
refactor(net4mqtt): proxy vnet log
Browse files Browse the repository at this point in the history
  • Loading branch information
a-wing committed Oct 16, 2024
1 parent e863669 commit 91d2594
Showing 1 changed file with 48 additions and 19 deletions.
67 changes: 48 additions & 19 deletions libs/net4mqtt/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ pub fn now_millis() -> u32 {
since_the_epoch.as_millis() as u32
}

async fn up_kcp_vnet<T>(
async fn vnet_kcp<T>(
mut socket: T,
key: String,
sender: UnboundedSender<(String, Vec<u8>)>,
mut receiver: UnboundedReceiver<(String, Vec<u8>)>,
peer_addr: SocketAddr,
local_addr: SocketAddr,
) -> Result<(), Error>
where
T: AsyncReadExt + AsyncWriteExt + Unpin,
Expand All @@ -75,7 +77,10 @@ where
Some((_key, mut raw)) => {
kcp.input(raw.as_mut_slice())?;
match kcp.recv(buf.as_mut_slice()) {
Ok(n) => socket.write_all(&buf[..n]).await?,
Ok(n) => {
socket.write_all(&buf[..n]).await?;
trace!("KCP vnet: {}, send to: {}, length {} bytes", key, peer_addr, n);
},
Err(kcp::Error::RecvQueueEmpty) => continue,
Err(err) => return Err(anyhow!("kcp.recv error: {:?}", err)),
};
Expand All @@ -86,7 +91,7 @@ where
result = socket.read(&mut buf) => {
match result {
Ok(n) => {
trace!("read {} bytes: {:?}", n, buf[..n].to_vec());
trace!("KCP vnet: {}, received from: {}, length {} bytes", key, local_addr, n);
if n == 0 { break };
kcp.send(&buf[..n])?;
}
Expand All @@ -100,11 +105,13 @@ where
Ok(())
}

async fn up_tcp_vnet<T>(
async fn vnet_tcp<T>(
mut socket: T,
key: String,
sender: UnboundedSender<(String, Vec<u8>)>,
mut receiver: UnboundedReceiver<(String, Vec<u8>)>,
peer_addr: SocketAddr,
local_addr: SocketAddr,
) -> Result<(), Error>
where
T: AsyncReadExt + AsyncWriteExt + Unpin,
Expand All @@ -114,14 +121,17 @@ where
select! {
result = receiver.recv() => {
match result {
Some((_key, data)) => { socket.write_all(data.as_slice()).await?; }
Some((_key, data)) => {
socket.write_all(data.as_slice()).await?;
trace!("TCP vnet: {}, send to: {}, length {} bytes", key, peer_addr, data.len());
}
None => return Err(anyhow!("receiver is None")),
}
}
result = socket.read(&mut buf) => {
match result {
Ok(n) => {
trace!("read {} bytes: {:?}", n, buf[..n].to_vec());
trace!("TCP vnet: {}, received from: {}, length {} bytes", key, local_addr, n);
if n == 0 { break };
sender.send((key.clone(),
buf[..n].to_vec()
Expand All @@ -137,25 +147,30 @@ where
Ok(())
}

async fn up_udp_vnet(
async fn vnet_udp(
socket: UdpSocket,
key: String,
sender: UnboundedSender<(String, Vec<u8>)>,
mut receiver: UnboundedReceiver<(String, Vec<u8>)>,
peer_addr: SocketAddr,
local_addr: SocketAddr,
) -> Result<(), Error> {
let mut buf = [0; MAX_BUFFER_SIZE];
loop {
select! {
result = receiver.recv() => {
match result {
Some((_key, data)) => { socket.send(data.as_slice()).await?; }
Some((_key, data)) => {
socket.send(data.as_slice()).await?;
trace!("UDP vnet: {}, send to: {}, length {} bytes", key, peer_addr, data.len());
}
None => return Err(anyhow!("receiver is None")),
}
}
result = socket.recv(&mut buf) => {
match result {
Ok(n) => {
trace!("read {} bytes: {:?}", n, buf[..n].to_vec());
trace!("UDP vnet: {}, received from: {}, length {} bytes", key, local_addr, n);
if n == 0 { continue };
sender.send((key.clone(),
buf[..n].to_vec()
Expand All @@ -181,11 +196,15 @@ async fn up_agent_vclient(
match protocol {
topic::protocol::KCP => {
let socket = TcpStream::connect(address).await?;
up_kcp_vnet(socket, topic, sender, receiver).await
let peer_addr = socket.peer_addr()?;
let local_addr = socket.local_addr()?;
vnet_kcp(socket, topic, sender, receiver, peer_addr, local_addr).await
}
topic::protocol::TCP => {
let socket = TcpStream::connect(address).await?;
up_tcp_vnet(socket, topic, sender, receiver).await
let peer_addr = socket.peer_addr()?;
let local_addr = socket.local_addr()?;
vnet_tcp(socket, topic, sender, receiver, peer_addr, local_addr).await
}
topic::protocol::UDP => {
let socket = UdpSocket::bind(SocketAddr::new(
Expand All @@ -199,7 +218,9 @@ async fn up_agent_vclient(
))
.await?;
socket.connect(address).await?;
up_udp_vnet(socket, topic, sender, receiver).await
let peer_addr = socket.peer_addr()?;
let local_addr = socket.local_addr()?;
vnet_udp(socket, topic, sender, receiver, peer_addr, local_addr).await
}
e => Err(anyhow!("unknown protocol {}", e)),
}
Expand Down Expand Up @@ -410,7 +431,8 @@ pub async fn local_ports_tcp(
let sender = sender.clone();
let on_vdata = on_vdata.clone();
select! {
Ok((socket, _)) = listener.accept() => {
Ok((socket, addr)) = listener.accept() => {
debug!("accept {:?}", addr);
let (vnet_tx, vnet_rx) = unbounded_channel::<(String, Vec<u8>)>();

let protocol = if tcp_over_kcp { topic::protocol::KCP } else { topic::protocol::TCP };
Expand All @@ -419,12 +441,15 @@ pub async fn local_ports_tcp(
let key_send = topic::build(&prefix, agent_id, local_id, topic::label::I, protocol, &addr, &target);
let key_recv = topic::build(&prefix, agent_id, local_id, topic::label::O, protocol, &addr, &target);

let peer_addr = socket.peer_addr()?;
let local_addr = socket.local_addr()?;

senders.insert(key_recv, vnet_tx);
task::spawn(async move {
if let Err(e) = if tcp_over_kcp {
up_kcp_vnet(socket, key_send, sender, vnet_rx).await
vnet_kcp(socket, key_send, sender, vnet_rx, peer_addr, local_addr).await
} else {
up_tcp_vnet(socket, key_send, sender, vnet_rx).await
vnet_tcp(socket, key_send, sender, vnet_rx, peer_addr, local_addr).await
} { error!("local vnet error: {}", e) };
});
}
Expand Down Expand Up @@ -592,10 +617,11 @@ pub async fn local_socks(
);
let (sender, mut receiver) = unbounded_channel::<(String, Vec<u8>)>();
loop {
let sender_clone = sender.clone();
let sender = sender.clone();
let on_vdata = on_vdata.clone();
select! {
Ok((conn, _)) = server.accept() => {
Ok((conn, addr)) = server.accept() => {
debug!("accept {:?}", addr);
match crate::socks::handle(conn, domain.clone()).await {
Ok((id, target, socket)) => {
let agent_id = match id {
Expand All @@ -615,12 +641,15 @@ pub async fn local_socks(
let key_send = topic::build(&prefix, &agent_id, local_id, topic::label::I, protocol, &addr, &target);
let key_recv = topic::build(&prefix, &agent_id, local_id, topic::label::O, protocol, &addr, &target);

let peer_addr = socket.peer_addr()?;
let local_addr = socket.local_addr()?;

senders.insert(key_recv, vnet_tx);
task::spawn(async move {
if let Err(e) = if tcp_over_kcp {
up_kcp_vnet(socket, key_send, sender_clone, vnet_rx).await
vnet_kcp(socket, key_send, sender, vnet_rx, peer_addr, local_addr).await
} else {
up_tcp_vnet(socket, key_send, sender_clone, vnet_rx).await
vnet_tcp(socket, key_send, sender, vnet_rx, peer_addr, local_addr).await
} { error!("local vnet error: {}", e) };
});

Expand Down

0 comments on commit 91d2594

Please sign in to comment.