diff --git a/libs/net4mqtt/bin/main.rs b/libs/net4mqtt/bin/main.rs index a0eb4363..c8861cfc 100644 --- a/libs/net4mqtt/bin/main.rs +++ b/libs/net4mqtt/bin/main.rs @@ -48,6 +48,9 @@ enum Commands { /// Listen local port mapping as agent's target address #[arg(short, long, default_value_t = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6666))] listen: SocketAddr, + /// Agent's target address + #[arg(short, long)] + target: Option, /// agent id #[arg(short, long, default_value_t = format!("-"))] agent_id: String, @@ -67,9 +70,9 @@ enum Commands { /// Mqtt Broker Address (://:/?client_id=) #[arg(short, long, default_value_t = format!("mqtt://localhost:1883/net4mqtt"))] mqtt_url: String, - /// Agent's target address - #[arg(short, long, default_value_t = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 7777))] - target: SocketAddr, + /// Default Agent's target address + #[arg(short, long, default_value_t = format!("{}", SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 7777)))] + target: String, /// Set Current agent id #[arg(short, long, default_value_t = format!("-"))] id: String, @@ -104,13 +107,22 @@ async fn main() { debug!("use domain: {:?}", domain); let listener = TcpListener::bind(listen).await.unwrap(); - proxy::local_socks(&mqtt_url, listener, &agent_id, &id, None, None, kcp) - .await - .unwrap(); + proxy::local_socks( + &mqtt_url, + listener, + (&agent_id, &id), + Some(domain), + None, + None, + kcp, + ) + .await + .unwrap(); } Commands::Local { mqtt_url, listen, + target, agent_id, id, udp, @@ -120,14 +132,22 @@ async fn main() { if udp { let sock = UdpSocket::bind(listen).await.unwrap(); - proxy::local_ports_udp(&mqtt_url, sock, &agent_id, &id, None, None) + proxy::local_ports_udp(&mqtt_url, sock, target, (&agent_id, &id), None, None) .await .unwrap(); } else { let listener = TcpListener::bind(listen).await.unwrap(); - proxy::local_ports_tcp(&mqtt_url, listener, &agent_id, &id, None, None, kcp) - .await - .unwrap(); + proxy::local_ports_tcp( + &mqtt_url, + listener, + target, + (&agent_id, &id), + None, + None, + kcp, + ) + .await + .unwrap(); } } Commands::Agent { @@ -137,7 +157,7 @@ async fn main() { } => { info!("Running as agent, {:?}", target); - proxy::agent(&mqtt_url, target, &id, None, None) + proxy::agent(&mqtt_url, &target, &id, None, None) .await .unwrap(); } diff --git a/libs/net4mqtt/src/proxy.rs b/libs/net4mqtt/src/proxy.rs index 38a5e3d3..bf73a140 100644 --- a/libs/net4mqtt/src/proxy.rs +++ b/libs/net4mqtt/src/proxy.rs @@ -1,4 +1,5 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::str::FromStr; use anyhow::{anyhow, Error, Result}; use kcp::Kcp; @@ -171,7 +172,7 @@ async fn up_udp_vnet( } async fn up_agent_vclient( - address: SocketAddr, + address: &str, protocol: &str, topic: String, sender: UnboundedSender<(String, Vec)>, @@ -179,26 +180,25 @@ async fn up_agent_vclient( ) -> Result<(), Error> { match protocol { topic::protocol::KCP => { - let socket = TcpStream::connect(address).await.unwrap(); + let socket = TcpStream::connect(address).await?; up_kcp_vnet(socket, topic, sender, receiver).await } topic::protocol::TCP => { - let socket = TcpStream::connect(address).await.unwrap(); + let socket = TcpStream::connect(address).await?; up_tcp_vnet(socket, topic, sender, receiver).await } topic::protocol::UDP => { let socket = UdpSocket::bind(SocketAddr::new( // "0.0.0.0:0" // "[::]:0" - match address { + match SocketAddr::from_str(address)? { SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::UNSPECIFIED), SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::UNSPECIFIED), }, 0, )) - .await - .unwrap(); - socket.connect(address).await.unwrap(); + .await?; + socket.connect(address).await?; up_udp_vnet(socket, topic, sender, receiver).await } e => Err(anyhow!("unknown protocol {}", e)), @@ -256,7 +256,7 @@ async fn mqtt_client_init( pub async fn agent( mqtt_url: &str, - address: SocketAddr, + address: &str, agent_id: &str, xdata: Option<(Vec, Option>)>, on_xdata: Option)>>, @@ -288,8 +288,8 @@ pub async fn agent( result = receiver.recv() => { match result { Some((key, data)) => { - let (prefix, agent_id, local_id, _label, protocol, address) = topic::parse(&key); - client.publish(topic::build(prefix, agent_id, local_id, topic::label::O, protocol, address), + let (prefix, agent_id, local_id, _label, protocol, src, dst) = topic::parse(&key); + client.publish(topic::build(prefix, agent_id, local_id, topic::label::O, protocol, src, dst), QoS::AtMostOnce, false, data @@ -303,7 +303,7 @@ pub async fn agent( Ok(notification) => { if let Some(p) = mqtt_receive(notification) { let topic = p.topic.clone(); - let (_prefix, agent_id, local_id, label, protocol, _address) = topic::parse(&topic); + let (_prefix, agent_id, local_id, label, protocol, _src, dst) = topic::parse(&topic); match label { topic::label::X => { @@ -318,8 +318,9 @@ pub async fn agent( let (vnet_tx, vnet_rx) = unbounded_channel::<(String, Vec)>(); let topic = p.topic.clone(); let protocol = protocol.to_string(); + let dst = if dst == topic::NIL { address } else { dst }.to_string(); task::spawn(async move { - if let Err(e) = up_agent_vclient(address, &protocol, topic, sender, vnet_rx).await { + if let Err(e) = up_agent_vclient(&dst, &protocol, topic, sender, vnet_rx).await { error!("agent vnet error: {:?}", e) } }); @@ -350,12 +351,14 @@ pub async fn agent( pub async fn local_ports_tcp( mqtt_url: &str, listener: TcpListener, - agent_id: &str, - local_id: &str, + target: Option, + id: (&str, &str), xdata: Option<(Vec, Option>)>, on_xdata: Option)>>, tcp_over_kcp: bool, ) -> Result<(), Error> { + let (agent_id, local_id) = id; + let target = target.unwrap_or(topic::NIL.to_string()); let mut senders = LruCache::)>>::with_expiry_duration_and_capacity( LRU_TIME_TO_LIVE, @@ -386,8 +389,8 @@ pub async fn local_ports_tcp( let protocol = if tcp_over_kcp { topic::protocol::KCP } else { topic::protocol::TCP }; let addr = socket.peer_addr().unwrap().to_string(); - let key_send = topic::build(prefix, agent_id, local_id, topic::label::I, protocol, &addr); - let key_recv = topic::build(prefix, agent_id, local_id, topic::label::O, protocol, &addr); + let key_send = topic::build(prefix, agent_id, local_id, topic::label::I, protocol, &addr, &target); + let key_recv = topic::build(prefix, agent_id, local_id, topic::label::O, protocol, &addr, &target); senders.insert(key_recv, vnet_tx); task::spawn(async move { @@ -417,12 +420,12 @@ pub async fn local_ports_tcp( Ok(notification) => { if let Some(p) = mqtt_receive(notification) { let topic = p.topic.clone(); - let (_prefix, agent_id, local_id, label, protocol, _address) = topic::parse(&topic); + let (_prefix, agent_id, local_id, label, protocol, _src, _dst) = topic::parse(&topic); match (label, protocol) { (topic::label::X, _) => { if let Some(s) = on_xdata { - s.send((agent_id.to_string(), local_id.to_string(), p.payload.to_vec())).await.unwrap(); + s.send((agent_id.to_string(), local_id.to_string(), p.payload.to_vec())).await?; } }, (_, topic::protocol::KCP | topic::protocol::TCP) => { @@ -430,7 +433,7 @@ pub async fn local_ports_tcp( if sender.is_closed() { senders.remove(&p.topic); } else { - sender.send((p.topic, p.payload.to_vec())).unwrap(); + sender.send((p.topic, p.payload.to_vec()))?; } } }, @@ -453,11 +456,13 @@ pub async fn local_ports_tcp( pub async fn local_ports_udp( mqtt_url: &str, sock: UdpSocket, - agent_id: &str, - local_id: &str, + target: Option, + id: (&str, &str), xdata: Option<(Vec, Option>)>, on_xdata: Option)>>, ) -> Result<(), Error> { + let (agent_id, local_id) = id; + let target = target.unwrap_or(topic::NIL.to_string()); let (sender, mut receiver) = unbounded_channel::<(String, Vec)>(); let (url, prefix) = crate::utils::pre_url(mqtt_url.parse::()?); @@ -480,7 +485,7 @@ pub async fn local_ports_udp( select! { Ok((len, addr)) = sock.recv_from(&mut buf) => { sender.send(( - topic::build(prefix, agent_id, local_id, topic::label::I, topic::protocol::UDP, &addr.to_string()), + topic::build(prefix, agent_id, local_id, topic::label::I, topic::protocol::UDP, &addr.to_string(), &target), buf[..len].to_vec())).unwrap(); } result = receiver.recv() => { @@ -501,15 +506,15 @@ pub async fn local_ports_udp( Ok(notification) => { if let Some(p) = mqtt_receive(notification) { let topic = p.topic.clone(); - let (_prefix, _agent_id, _local_id, label, protocol, address) = topic::parse(&topic); + let (_prefix, _agent_id, _local_id, label, protocol, src, _dst) = topic::parse(&topic); match (label, protocol) { (topic::label::X, _) => { if let Some(s) = on_xdata { - s.send((agent_id.to_string(), local_id.to_string(), p.payload.to_vec())).await.unwrap(); + s.send((agent_id.to_string(), local_id.to_string(), p.payload.to_vec())).await?; } }, - (_, topic::protocol::UDP) => { let _ = sock.send_to(&p.payload, address).await.unwrap(); }, + (_, topic::protocol::UDP) => { let _ = sock.send_to(&p.payload, src).await?; }, (label, protocol) => info!("unknown label: {} and protocol: {}", label, protocol) } } @@ -543,12 +548,13 @@ use std::sync::Arc; pub async fn local_socks( mqtt_url: &str, listener: TcpListener, - agent_id: &str, - local_id: &str, + id: (&str, &str), + domain: Option, xdata: Option<(Vec, Option>)>, on_xdata: Option)>>, tcp_over_kcp: bool, ) -> Result<(), Error> { + let (agent_id, local_id) = id; let mut senders = LruCache::)>>::with_expiry_duration_and_capacity( LRU_TIME_TO_LIVE, @@ -576,20 +582,24 @@ pub async fn local_socks( let on_xdata = on_xdata.clone(); select! { Ok((conn, _)) = server.accept() => { - match crate::socks::handle(conn).await { - Ok((target, socket)) => { - let agent_id = match target { + match crate::socks::handle(conn, domain.clone()).await { + Ok((id, target, socket)) => { + let agent_id = match id { Some(id) => id, None => agent_id.to_string(), }; + let target = match target { + Some(t) => t, + None => topic::NIL.to_string(), + }; let (vnet_tx, vnet_rx) = unbounded_channel::<(String, Vec)>(); let protocol = if tcp_over_kcp { topic::protocol::KCP } else { topic::protocol::TCP }; let addr = socket.peer_addr().unwrap().to_string(); - let key_send = topic::build(prefix, &agent_id, local_id, topic::label::I, protocol, &addr); - let key_recv = topic::build(prefix, &agent_id, local_id, topic::label::O, protocol, &addr); + let key_send = topic::build(prefix, &agent_id, local_id, topic::label::I, protocol, &addr, &target); + let key_recv = topic::build(prefix, &agent_id, local_id, topic::label::O, protocol, &addr, &target); senders.insert(key_recv, vnet_tx); task::spawn(async move { @@ -623,7 +633,7 @@ pub async fn local_socks( Ok(notification) => { if let Some(p) = mqtt_receive(notification) { let topic = p.topic.clone(); - let (_prefix, agent_id, local_id, label, protocol, _address) = topic::parse(&topic); + let (_prefix, agent_id, local_id, label, protocol, _src, _dst) = topic::parse(&topic); match (label, protocol) { (topic::label::X, _) => { diff --git a/libs/net4mqtt/src/socks.rs b/libs/net4mqtt/src/socks.rs index e3a8b73a..368fe170 100644 --- a/libs/net4mqtt/src/socks.rs +++ b/libs/net4mqtt/src/socks.rs @@ -10,7 +10,8 @@ use tokio::io::AsyncWriteExt; pub(crate) async fn handle( conn: IncomingConnection<(), NeedAuthenticate>, -) -> Result<(Option, Connect), Error> { + domain: Option, +) -> Result<(Option, Option, Connect), Error> { let conn = match conn.authenticate().await { Ok((conn, _)) => conn, Err((err, mut conn)) => { @@ -51,12 +52,24 @@ pub(crate) async fn handle( let _ = conn.close().await; } Ok(Command::Connect(connect, addr)) => { - let target = match addr { - Address::DomainAddress(domain, _port) => match std::str::from_utf8(&domain) { - Ok(raw) => Some(crate::kxdns::Kxdns::resolver(raw).to_string()), - Err(_) => None, - }, - Address::SocketAddress(_) => None, + let (id, target) = match addr { + Address::DomainAddress(domain_address, _port) => { + match std::str::from_utf8(&domain_address) { + Ok(raw) => { + if let Some(d) = domain { + if raw.ends_with(&d) { + (Some(crate::kxdns::Kxdns::resolver(raw).to_string()), None) + } else { + (None, Some(raw.to_string())) + } + } else { + (None, Some(raw.to_string())) + } + } + Err(_) => (None, None), + } + } + Address::SocketAddress(ip) => (None, Some(ip.to_string())), }; let replied = connect @@ -70,7 +83,7 @@ pub(crate) async fn handle( return Err(anyhow!(err)); } }; - return Ok((target, conn)); + return Ok((id, target, conn)); } Err((err, mut conn)) => { let _ = conn.shutdown().await; diff --git a/libs/net4mqtt/src/tests.rs b/libs/net4mqtt/src/tests.rs index 8c6aa2c3..4f835df8 100644 --- a/libs/net4mqtt/src/tests.rs +++ b/libs/net4mqtt/src/tests.rs @@ -7,6 +7,7 @@ use tokio::net::{TcpListener, TcpStream, UdpSocket}; use tokio::time; use crate::broker; +use crate::kxdns; const MAX_BUFFER_SIZE: usize = 4096; @@ -111,6 +112,7 @@ use portpicker::pick_unused_port; use crate::proxy; const MQTT_TOPIC_PREFIX: &str = "test"; +const DOMAIN_SUFFIX: &str = "test.local"; struct Config { agent: u16, @@ -173,7 +175,7 @@ async fn helper_cluster_up(cfg: Config) -> Vec { MQTT_TOPIC_PREFIX, id ), - addr, + &addr.to_string(), &id.to_string(), None, None, @@ -197,8 +199,8 @@ async fn helper_cluster_up(cfg: Config) -> Vec { id ), listener, - &id.to_string(), - format!("local-{}", id).as_str(), + None, + (&id.to_string(), &format!("local-{}", id)), None, None, cfg.kcp, @@ -217,8 +219,8 @@ async fn helper_cluster_up(cfg: Config) -> Vec { id ), sock, - &id.to_string(), - format!("local-{}", id).as_str(), + None, + (&id.to_string(), &format!("local-{}", id)), None, None, )) @@ -239,8 +241,8 @@ async fn helper_cluster_up(cfg: Config) -> Vec { id ), listener, - &id.to_string(), - format!("socks-{}", id).as_str(), + (&id.to_string(), &format!("socks-{}", id)), + Some(DOMAIN_SUFFIX.to_string()), None, None, cfg.kcp, @@ -635,12 +637,15 @@ async fn test_socks_simple() { let client = reqwest::Client::builder() .connect_timeout(time::Duration::from_secs(1)) .timeout(time::Duration::from_secs(1)) - .proxy(reqwest::Proxy::http(format!("socks5://{}", local_addr)).unwrap()) + .proxy(reqwest::Proxy::http(format!("socks5h://{}", local_addr)).unwrap()) .build() .unwrap(); let res = client - .get(format!("http://{}/", local_addr)) + .get(format!( + "http://{}/", + kxdns::Kxdns::new(DOMAIN_SUFFIX).registry("0") + )) .send() .await .unwrap(); @@ -652,12 +657,15 @@ async fn test_socks_simple() { let client = reqwest::Client::builder() .connect_timeout(time::Duration::from_secs(1)) .timeout(time::Duration::from_secs(1)) - .proxy(reqwest::Proxy::http(format!("socks5://{}", local_addr)).unwrap()) + .proxy(reqwest::Proxy::http(format!("socks5h://{}", local_addr)).unwrap()) .build() .unwrap(); let res = client - .get(format!("http://{}/", local_addr)) + .get(format!( + "http://{}/", + kxdns::Kxdns::new(DOMAIN_SUFFIX).registry("0") + )) .send() .await .unwrap(); @@ -674,8 +682,6 @@ async fn test_socks_restart() { let mqtt_broker_port: u16 = pick_unused_port().expect("No ports free"); let agent_port: u16 = pick_unused_port().expect("No ports free"); let target = SocketAddr::new(ip, agent_port); - - //let target = listener.local_addr().unwrap(); let addrs = helper_cluster_up(Config { agent: 1, local_socks: 1, @@ -697,12 +703,15 @@ async fn test_socks_restart() { let client = reqwest::Client::builder() .connect_timeout(time::Duration::from_secs(1)) .timeout(time::Duration::from_secs(1)) - .proxy(reqwest::Proxy::http(format!("socks5://{}", local_addr)).unwrap()) + .proxy(reqwest::Proxy::http(format!("socks5h://{}", local_addr)).unwrap()) .build() .unwrap(); let res = client - .get(format!("http://{}/", local_addr)) + .get(format!( + "http://{}/", + kxdns::Kxdns::new(DOMAIN_SUFFIX).registry("0") + )) .send() .await .unwrap(); @@ -714,12 +723,15 @@ async fn test_socks_restart() { let client = reqwest::Client::builder() .connect_timeout(time::Duration::from_secs(1)) .timeout(time::Duration::from_secs(1)) - .proxy(reqwest::Proxy::http(format!("socks5://{}", local_addr)).unwrap()) + .proxy(reqwest::Proxy::http(format!("socks5h://{}", local_addr)).unwrap()) .build() .unwrap(); let res = client - .get(format!("http://{}/", local_addr)) + .get(format!( + "http://{}/", + kxdns::Kxdns::new(DOMAIN_SUFFIX).registry("0") + )) .send() .await .unwrap(); @@ -763,7 +775,7 @@ async fn test_socks_multiple_server() { MQTT_TOPIC_PREFIX, id ), - addr, + &addr.to_string(), &id.to_string(), None, None, @@ -786,7 +798,10 @@ async fn test_socks_multiple_server() { .unwrap(); let res = client - .get(format!("http://{}.test.local/", id)) + .get(format!( + "http://{}/", + kxdns::Kxdns::new(DOMAIN_SUFFIX).registry(&id.to_string()) + )) .send() .await .unwrap(); @@ -834,7 +849,7 @@ async fn test_xdata() { MQTT_TOPIC_PREFIX, id ), - addr, + &addr.to_string(), &id.to_string(), None, Some(sender), @@ -853,7 +868,7 @@ async fn test_xdata() { MQTT_TOPIC_PREFIX, id ), - addr, + &addr.to_string(), &id.to_string(), Some((msg_1_clone, None)), None, @@ -871,7 +886,7 @@ async fn test_xdata() { MQTT_TOPIC_PREFIX, id ), - addr, + &addr.to_string(), &id.to_string(), Some((msg_2_clone, None)), None, @@ -891,8 +906,8 @@ async fn test_xdata() { id ), listener, - id, - id, + None, + (id, id), Some((msg_3_clone, None)), None, false, @@ -912,8 +927,8 @@ async fn test_xdata() { id ), listener, - id, - id, + None, + (id, id), Some((msg_4_clone, None)), None, false, diff --git a/libs/net4mqtt/src/topic.rs b/libs/net4mqtt/src/topic.rs index 63b59488..a2646300 100644 --- a/libs/net4mqtt/src/topic.rs +++ b/libs/net4mqtt/src/topic.rs @@ -1,8 +1,8 @@ /// Publish: -/// TOPIC: ///