diff --git a/libs/net4mqtt/src/proxy.rs b/libs/net4mqtt/src/proxy.rs index 662145d..efe9fcb 100644 --- a/libs/net4mqtt/src/proxy.rs +++ b/libs/net4mqtt/src/proxy.rs @@ -54,11 +54,13 @@ pub fn now_millis() -> u32 { since_the_epoch.as_millis() as u32 } -async fn up_kcp_vnet( +async fn vnet_kcp( mut socket: T, key: String, sender: UnboundedSender<(String, Vec)>, mut receiver: UnboundedReceiver<(String, Vec)>, + peer_addr: SocketAddr, + local_addr: SocketAddr, ) -> Result<(), Error> where T: AsyncReadExt + AsyncWriteExt + Unpin, @@ -75,7 +77,10 @@ where Some((_key, mut raw)) => { kcp.input(raw.as_mut_slice())?; match kcp.recv(buf.as_mut_slice()) { - Ok(n) => socket.write_all(&buf[..n]).await?, + Ok(n) => { + socket.write_all(&buf[..n]).await?; + trace!("KCP vnet: {}, send to: {}, length {} bytes", key, peer_addr, n); + }, Err(kcp::Error::RecvQueueEmpty) => continue, Err(err) => return Err(anyhow!("kcp.recv error: {:?}", err)), }; @@ -86,7 +91,7 @@ where result = socket.read(&mut buf) => { match result { Ok(n) => { - trace!("read {} bytes: {:?}", n, buf[..n].to_vec()); + trace!("KCP vnet: {}, received from: {}, length {} bytes", key, local_addr, n); if n == 0 { break }; kcp.send(&buf[..n])?; } @@ -100,11 +105,13 @@ where Ok(()) } -async fn up_tcp_vnet( +async fn vnet_tcp( mut socket: T, key: String, sender: UnboundedSender<(String, Vec)>, mut receiver: UnboundedReceiver<(String, Vec)>, + peer_addr: SocketAddr, + local_addr: SocketAddr, ) -> Result<(), Error> where T: AsyncReadExt + AsyncWriteExt + Unpin, @@ -114,14 +121,17 @@ where select! { result = receiver.recv() => { match result { - Some((_key, data)) => { socket.write_all(data.as_slice()).await?; } + Some((_key, data)) => { + socket.write_all(data.as_slice()).await?; + trace!("TCP vnet: {}, send to: {}, length {} bytes", key, peer_addr, data.len()); + } None => return Err(anyhow!("receiver is None")), } } result = socket.read(&mut buf) => { match result { Ok(n) => { - trace!("read {} bytes: {:?}", n, buf[..n].to_vec()); + trace!("TCP vnet: {}, received from: {}, length {} bytes", key, local_addr, n); if n == 0 { break }; sender.send((key.clone(), buf[..n].to_vec() @@ -137,25 +147,30 @@ where Ok(()) } -async fn up_udp_vnet( +async fn vnet_udp( socket: UdpSocket, key: String, sender: UnboundedSender<(String, Vec)>, mut receiver: UnboundedReceiver<(String, Vec)>, + peer_addr: SocketAddr, + local_addr: SocketAddr, ) -> Result<(), Error> { let mut buf = [0; MAX_BUFFER_SIZE]; loop { select! { result = receiver.recv() => { match result { - Some((_key, data)) => { socket.send(data.as_slice()).await?; } + Some((_key, data)) => { + socket.send(data.as_slice()).await?; + trace!("UDP vnet: {}, send to: {}, length {} bytes", key, peer_addr, data.len()); + } None => return Err(anyhow!("receiver is None")), } } result = socket.recv(&mut buf) => { match result { Ok(n) => { - trace!("read {} bytes: {:?}", n, buf[..n].to_vec()); + trace!("UDP vnet: {}, received from: {}, length {} bytes", key, local_addr, n); if n == 0 { continue }; sender.send((key.clone(), buf[..n].to_vec() @@ -181,11 +196,15 @@ async fn up_agent_vclient( match protocol { topic::protocol::KCP => { let socket = TcpStream::connect(address).await?; - up_kcp_vnet(socket, topic, sender, receiver).await + let peer_addr = socket.peer_addr()?; + let local_addr = socket.local_addr()?; + vnet_kcp(socket, topic, sender, receiver, peer_addr, local_addr).await } topic::protocol::TCP => { let socket = TcpStream::connect(address).await?; - up_tcp_vnet(socket, topic, sender, receiver).await + let peer_addr = socket.peer_addr()?; + let local_addr = socket.local_addr()?; + vnet_tcp(socket, topic, sender, receiver, peer_addr, local_addr).await } topic::protocol::UDP => { let socket = UdpSocket::bind(SocketAddr::new( @@ -199,7 +218,9 @@ async fn up_agent_vclient( )) .await?; socket.connect(address).await?; - up_udp_vnet(socket, topic, sender, receiver).await + let peer_addr = socket.peer_addr()?; + let local_addr = socket.local_addr()?; + vnet_udp(socket, topic, sender, receiver, peer_addr, local_addr).await } e => Err(anyhow!("unknown protocol {}", e)), } @@ -410,7 +431,8 @@ pub async fn local_ports_tcp( let sender = sender.clone(); let on_vdata = on_vdata.clone(); select! { - Ok((socket, _)) = listener.accept() => { + Ok((socket, addr)) = listener.accept() => { + debug!("accept {:?}", addr); let (vnet_tx, vnet_rx) = unbounded_channel::<(String, Vec)>(); let protocol = if tcp_over_kcp { topic::protocol::KCP } else { topic::protocol::TCP }; @@ -419,12 +441,15 @@ pub async fn local_ports_tcp( let key_send = topic::build(&prefix, agent_id, local_id, topic::label::I, protocol, &addr, &target); let key_recv = topic::build(&prefix, agent_id, local_id, topic::label::O, protocol, &addr, &target); + let peer_addr = socket.peer_addr()?; + let local_addr = socket.local_addr()?; + senders.insert(key_recv, vnet_tx); task::spawn(async move { if let Err(e) = if tcp_over_kcp { - up_kcp_vnet(socket, key_send, sender, vnet_rx).await + vnet_kcp(socket, key_send, sender, vnet_rx, peer_addr, local_addr).await } else { - up_tcp_vnet(socket, key_send, sender, vnet_rx).await + vnet_tcp(socket, key_send, sender, vnet_rx, peer_addr, local_addr).await } { error!("local vnet error: {}", e) }; }); } @@ -592,10 +617,11 @@ pub async fn local_socks( ); let (sender, mut receiver) = unbounded_channel::<(String, Vec)>(); loop { - let sender_clone = sender.clone(); + let sender = sender.clone(); let on_vdata = on_vdata.clone(); select! { - Ok((conn, _)) = server.accept() => { + Ok((conn, addr)) = server.accept() => { + debug!("accept {:?}", addr); match crate::socks::handle(conn, domain.clone()).await { Ok((id, target, socket)) => { let agent_id = match id { @@ -615,12 +641,15 @@ pub async fn local_socks( let key_send = topic::build(&prefix, &agent_id, local_id, topic::label::I, protocol, &addr, &target); let key_recv = topic::build(&prefix, &agent_id, local_id, topic::label::O, protocol, &addr, &target); + let peer_addr = socket.peer_addr()?; + let local_addr = socket.local_addr()?; + senders.insert(key_recv, vnet_tx); task::spawn(async move { if let Err(e) = if tcp_over_kcp { - up_kcp_vnet(socket, key_send, sender_clone, vnet_rx).await + vnet_kcp(socket, key_send, sender, vnet_rx, peer_addr, local_addr).await } else { - up_tcp_vnet(socket, key_send, sender_clone, vnet_rx).await + vnet_tcp(socket, key_send, sender, vnet_rx, peer_addr, local_addr).await } { error!("local vnet error: {}", e) }; });