From 64284788ab4e90cf1b0be79067848a8a2c9ec327 Mon Sep 17 00:00:00 2001 From: a-wing <1@233.email> Date: Sat, 5 Oct 2024 23:52:55 +0800 Subject: [PATCH] refactor(net4mqtt): local socks port interface --- libs/net4mqtt/bin/main.rs | 22 +- libs/net4mqtt/src/proxy.rs | 94 ++++++-- libs/net4mqtt/src/tests.rs | 478 ++++++++++++++++++------------------- liveman/src/lib.rs | 3 +- 4 files changed, 327 insertions(+), 270 deletions(-) diff --git a/libs/net4mqtt/bin/main.rs b/libs/net4mqtt/bin/main.rs index a9ec043e..a0eb4363 100644 --- a/libs/net4mqtt/bin/main.rs +++ b/libs/net4mqtt/bin/main.rs @@ -1,6 +1,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use clap::{ArgAction, Parser, Subcommand}; +use tokio::net::{TcpListener, UdpSocket}; use tracing::{debug, info, trace, Level}; use net4mqtt::proxy; @@ -53,6 +54,9 @@ enum Commands { /// Set Current local id #[arg(short, long, default_value_t = format!("-"))] id: String, + /// use udp port + #[arg(short, long, default_value_t = false)] + udp: bool, /// enable kcp in mqtt #[arg(short, long, default_value_t = false)] kcp: bool, @@ -99,7 +103,8 @@ async fn main() { info!("Running as socks, {:?}", listen); debug!("use domain: {:?}", domain); - proxy::local_socks(&mqtt_url, listen, &agent_id, &id, None, None, kcp) + let listener = TcpListener::bind(listen).await.unwrap(); + proxy::local_socks(&mqtt_url, listener, &agent_id, &id, None, None, kcp) .await .unwrap(); } @@ -108,13 +113,22 @@ async fn main() { listen, agent_id, id, + udp, kcp, } => { info!("Running as local, {:?}", listen); - proxy::local(&mqtt_url, listen, &agent_id, &id, None, None, kcp) - .await - .unwrap(); + if udp { + let sock = UdpSocket::bind(listen).await.unwrap(); + proxy::local_ports_udp(&mqtt_url, sock, &agent_id, &id, None, None) + .await + .unwrap(); + } else { + let listener = TcpListener::bind(listen).await.unwrap(); + proxy::local_ports_tcp(&mqtt_url, listener, &agent_id, &id, None, None, kcp) + .await + .unwrap(); + } } Commands::Agent { mqtt_url, diff --git a/libs/net4mqtt/src/proxy.rs b/libs/net4mqtt/src/proxy.rs index 95a0ec20..38a5e3d3 100644 --- a/libs/net4mqtt/src/proxy.rs +++ b/libs/net4mqtt/src/proxy.rs @@ -347,9 +347,9 @@ pub async fn agent( } } -pub async fn local( +pub async fn local_ports_tcp( mqtt_url: &str, - address: SocketAddr, + listener: TcpListener, agent_id: &str, local_id: &str, xdata: Option<(Vec, Option>)>, @@ -376,14 +376,10 @@ pub async fn local( ) .await; - let mut buf = [0; MAX_BUFFER_SIZE]; - let listener = TcpListener::bind(address).await.unwrap(); - let sock = UdpSocket::bind(address).await.unwrap(); loop { let sender = sender.clone(); let on_xdata = on_xdata.clone(); select! { - // TCP Server Ok((socket, _)) = listener.accept() => { let (vnet_tx, vnet_rx) = unbounded_channel::<(String, Vec)>(); @@ -403,12 +399,6 @@ pub async fn local( }); } - // UDP Server - Ok((len, addr)) = sock.recv_from(&mut buf) => { - sender.send(( - topic::build(prefix, agent_id, local_id, topic::label::I, topic::protocol::UDP, &addr.to_string()), - buf[..len].to_vec())).unwrap(); - } result = receiver.recv() => { match result { Some((key, data)) => { @@ -427,7 +417,7 @@ pub async fn local( Ok(notification) => { if let Some(p) = mqtt_receive(notification) { let topic = p.topic.clone(); - let (_prefix, agent_id, local_id, label, protocol, address) = topic::parse(&topic); + let (_prefix, agent_id, local_id, label, protocol, _address) = topic::parse(&topic); match (label, protocol) { (topic::label::X, _) => { @@ -444,6 +434,81 @@ pub async fn local( } } }, + (label, protocol) => info!("unknown label: {} and protocol: {}", label, protocol) + } + } + }, + Err(e) => { + error!("local mqtt error: {:?}", e); + time::sleep(time::Duration::from_secs(1)).await; + } + } + + } + else => { error!("vlocal proxy error"); } + } + } +} + +pub async fn local_ports_udp( + mqtt_url: &str, + sock: UdpSocket, + agent_id: &str, + local_id: &str, + xdata: Option<(Vec, Option>)>, + on_xdata: Option)>>, +) -> Result<(), Error> { + let (sender, mut receiver) = unbounded_channel::<(String, Vec)>(); + + let (url, prefix) = crate::utils::pre_url(mqtt_url.parse::()?); + let prefix = prefix.as_str(); + + let (client, mut eventloop) = mqtt_client_init( + url, + topic::build_sub(prefix, topic::ANY, local_id, topic::label::O), + topic::build_sub(prefix, topic::ANY, topic::ANY, topic::label::X), + topic::build_pub_x(prefix, topic::NIL, local_id, topic::label::X), + xdata, + on_xdata.is_some(), + ) + .await; + + let mut buf = [0; MAX_BUFFER_SIZE]; + loop { + let sender = sender.clone(); + let on_xdata = on_xdata.clone(); + select! { + Ok((len, addr)) = sock.recv_from(&mut buf) => { + sender.send(( + topic::build(prefix, agent_id, local_id, topic::label::I, topic::protocol::UDP, &addr.to_string()), + buf[..len].to_vec())).unwrap(); + } + result = receiver.recv() => { + match result { + Some((key, data)) => { + client.publish( + key, + QoS::AtMostOnce, + false, + data + ).await?; + } + None => return Err(anyhow!("recv error")) + } + } + result = eventloop.poll() => { + match result { + Ok(notification) => { + if let Some(p) = mqtt_receive(notification) { + let topic = p.topic.clone(); + let (_prefix, _agent_id, _local_id, label, protocol, address) = topic::parse(&topic); + + match (label, protocol) { + (topic::label::X, _) => { + if let Some(s) = on_xdata { + s.send((agent_id.to_string(), local_id.to_string(), p.payload.to_vec())).await.unwrap(); + } + }, (_, topic::protocol::UDP) => { let _ = sock.send_to(&p.payload, address).await.unwrap(); }, (label, protocol) => info!("unknown label: {} and protocol: {}", label, protocol) } @@ -477,7 +542,7 @@ use std::sync::Arc; pub async fn local_socks( mqtt_url: &str, - address: SocketAddr, + listener: TcpListener, agent_id: &str, local_id: &str, xdata: Option<(Vec, Option>)>, @@ -504,7 +569,6 @@ pub async fn local_socks( ) .await; - let listener = TcpListener::bind(address).await.unwrap(); let server = Server::new(listener, Arc::new(NoAuth)); loop { diff --git a/libs/net4mqtt/src/tests.rs b/libs/net4mqtt/src/tests.rs index 2a37247f..8c6aa2c3 100644 --- a/libs/net4mqtt/src/tests.rs +++ b/libs/net4mqtt/src/tests.rs @@ -1,6 +1,7 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::thread; +use anyhow::{Error, Result}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream, UdpSocket}; use tokio::time; @@ -23,19 +24,17 @@ async fn wait_for_port_availabilty(addr: SocketAddr) -> bool { } } -async fn up_echo_udp_server(listen: SocketAddr) { - let sock = UdpSocket::bind(listen).await.unwrap(); +async fn up_echo_udp_server(sock: UdpSocket) -> Result<(), Error> { let mut buf = [0; MAX_BUFFER_SIZE]; loop { - let (n, addr) = sock.recv_from(&mut buf).await.unwrap(); - let _ = sock.send_to(&buf[..n], addr).await.unwrap(); + let (n, addr) = sock.recv_from(&mut buf).await?; + let _ = sock.send_to(&buf[..n], addr).await?; } } -async fn up_echo_tcp_server(listen: SocketAddr) { - let listener = TcpListener::bind(listen).await.unwrap(); +async fn up_echo_tcp_server(listener: TcpListener) -> Result<(), Error> { loop { - let (mut socket, _) = listener.accept().await.unwrap(); + let (mut socket, _) = listener.accept().await?; tokio::spawn(async move { let mut buf = [0; MAX_BUFFER_SIZE]; loop { @@ -52,24 +51,22 @@ async fn up_echo_tcp_server(listen: SocketAddr) { } } -async fn up_add_udp_server(listen: SocketAddr) { - let sock = UdpSocket::bind(listen).await.unwrap(); +async fn up_add_udp_server(sock: UdpSocket) -> Result<(), Error> { let mut buf = [0; MAX_BUFFER_SIZE]; loop { - let (n, addr) = sock.recv_from(&mut buf).await.unwrap(); + let (n, addr) = sock.recv_from(&mut buf).await?; let raw = String::from_utf8_lossy(&buf[..n]); let v: Vec<&str> = raw.split('+').collect(); let num0 = v[0].parse::().unwrap_or(0); let num1 = v[1].parse::().unwrap_or(0); let r = num0 + num1; - let _ = sock.send_to(r.to_string().as_bytes(), addr).await.unwrap(); + let _ = sock.send_to(r.to_string().as_bytes(), addr).await?; } } -async fn up_add_tcp_server(listen: SocketAddr) { - let listener = TcpListener::bind(listen).await.unwrap(); +async fn up_add_tcp_server(listener: TcpListener) -> Result<(), Error> { loop { - let (mut socket, _) = listener.accept().await.unwrap(); + let (mut socket, _) = listener.accept().await?; tokio::spawn(async move { let mut buf = [0; MAX_BUFFER_SIZE]; loop { @@ -92,20 +89,20 @@ async fn up_add_tcp_server(listen: SocketAddr) { } } -async fn handle_request(body: &str, mut socket: tokio::net::TcpStream) { +async fn handle_request(body: &str, mut socket: tokio::net::TcpStream) -> Result<(), Error> { let response = format!( "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}", body.len(), body ); - socket.write_all(response.as_bytes()).await.unwrap(); + socket.write_all(response.as_bytes()).await?; + Ok(()) } -async fn up_http_server(listen: SocketAddr, body: &str) { - let listener = TcpListener::bind(listen).await.unwrap(); +async fn up_http_server(listener: TcpListener, body: &str) -> Result<(), Error> { loop { - let (socket, _) = listener.accept().await.unwrap(); - handle_request(body, socket).await; + let (socket, _) = listener.accept().await?; + handle_request(body, socket).await? } } @@ -116,46 +113,59 @@ use crate::proxy; const MQTT_TOPIC_PREFIX: &str = "test"; struct Config { - agent: Vec, - local: Vec, + agent: u16, + local_ports: u16, + local_socks: u16, ip: IpAddr, kcp: bool, - echo: bool, + tcp: bool, broker: u16, + target: Option, } impl Default for Config { fn default() -> Self { Self { - agent: Vec::new(), - local: Vec::new(), + agent: 0, + local_ports: 0, + local_socks: 0, ip: IpAddr::V4(Ipv4Addr::LOCALHOST), kcp: false, - echo: false, + tcp: true, broker: pick_unused_port().expect("No ports free"), + target: None, } } } -async fn helper_cluster_up(cfg: Config) { +async fn helper_cluster_up(cfg: Config) -> Vec { let mqtt_broker_host = cfg.ip; - if cfg.echo { - for port in cfg.agent.iter() { - let addr = SocketAddr::new(cfg.ip, *port); - thread::spawn(move || tokio_test::block_on(up_echo_tcp_server(addr))); - thread::spawn(move || tokio_test::block_on(up_echo_udp_server(addr))); + let addr = match cfg.target { + Some(target) => target, + None => { + let addr = SocketAddr::new(cfg.ip, 0); + if cfg.tcp { + let listener = TcpListener::bind(addr).await.unwrap(); + let target = listener.local_addr().unwrap(); + thread::spawn(move || tokio_test::block_on(up_echo_tcp_server(listener))); + target + } else { + let sock = UdpSocket::bind(addr).await.unwrap(); + let target = sock.local_addr().unwrap(); + thread::spawn(move || tokio_test::block_on(up_echo_udp_server(sock))); + target + } } - } + }; let broker_addr = SocketAddr::new(mqtt_broker_host, cfg.broker); thread::spawn(move || broker::up_mqtt_broker(broker_addr)); wait_for_port_availabilty(broker_addr).await; - for (id, port) in cfg.agent.into_iter().enumerate() { + for id in 0..cfg.agent { thread::spawn(move || { - let addr = SocketAddr::new(cfg.ip, port); tokio_test::block_on(proxy::agent( &format!( "mqtt://{}/{}?client_id=test-proxy-agent-{}", @@ -171,39 +181,86 @@ async fn helper_cluster_up(cfg: Config) { }); } - for (id, port) in cfg.local.into_iter().enumerate() { + let mut addrs = Vec::new(); + + for id in 0..cfg.local_ports { + if cfg.tcp { + let listener = TcpListener::bind(SocketAddr::new(cfg.ip, 0)).await.unwrap(); + addrs.push(listener.local_addr().unwrap()); + + thread::spawn(move || { + tokio_test::block_on(proxy::local_ports_tcp( + &format!( + "mqtt://{}/{}?client_id=test-proxy-local-{}", + SocketAddr::new(mqtt_broker_host, cfg.broker), + MQTT_TOPIC_PREFIX, + id + ), + listener, + &id.to_string(), + format!("local-{}", id).as_str(), + None, + None, + cfg.kcp, + )) + }); + } else { + let sock = UdpSocket::bind(SocketAddr::new(cfg.ip, 0)).await.unwrap(); + addrs.push(sock.local_addr().unwrap()); + + thread::spawn(move || { + tokio_test::block_on(proxy::local_ports_udp( + &format!( + "mqtt://{}/{}?client_id=test-proxy-local-{}", + SocketAddr::new(mqtt_broker_host, cfg.broker), + MQTT_TOPIC_PREFIX, + id + ), + sock, + &id.to_string(), + format!("local-{}", id).as_str(), + None, + None, + )) + }); + } + } + + for id in 0..cfg.local_socks { + let listener = TcpListener::bind(SocketAddr::new(cfg.ip, 0)).await.unwrap(); + addrs.push(listener.local_addr().unwrap()); + thread::spawn(move || { - let addr = SocketAddr::new(cfg.ip, port); - tokio_test::block_on(proxy::local( + tokio_test::block_on(proxy::local_socks( &format!( - "mqtt://{}/{}?client_id=test-proxy-local-{}", + "mqtt://{}/{}?client_id=test-proxy-socks-{}", SocketAddr::new(mqtt_broker_host, cfg.broker), MQTT_TOPIC_PREFIX, id ), - addr, - &id.to_string(), + listener, &id.to_string(), + format!("socks-{}", id).as_str(), None, None, cfg.kcp, )) }); } + + addrs } #[tokio::test] async fn test_udp_simple() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let mqtt_broker_port: u16 = pick_unused_port().expect("No ports free"); - let agent_port: u16 = pick_unused_port().expect("No ports free"); - let local_port: u16 = pick_unused_port().expect("No ports free"); - helper_cluster_up(Config { - agent: vec![agent_port], - local: vec![local_port], + let addrs = helper_cluster_up(Config { + agent: 1, + local_ports: 1, ip, - echo: true, + tcp: false, broker: mqtt_broker_port, ..Default::default() }) @@ -211,7 +268,7 @@ async fn test_udp_simple() { time::sleep(time::Duration::from_millis(10)).await; let sock = UdpSocket::bind(SocketAddr::new(ip, 0)).await.unwrap(); - sock.connect(SocketAddr::new(ip, local_port)).await.unwrap(); + sock.connect(addrs[0]).await.unwrap(); let mut buf = [0; MAX_BUFFER_SIZE]; let test_msg = b"hello, world"; sock.send(test_msg).await.unwrap(); @@ -228,24 +285,26 @@ async fn test_udp_simple() { async fn test_udp_add() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let mqtt_broker_port: u16 = pick_unused_port().expect("No ports free"); - let agent_port: u16 = pick_unused_port().expect("No ports free"); - let local_port: u16 = pick_unused_port().expect("No ports free"); - thread::spawn(move || tokio_test::block_on(up_add_udp_server(SocketAddr::new(ip, agent_port)))); + let sock = UdpSocket::bind(SocketAddr::new(ip, 0)).await.unwrap(); + let target = sock.local_addr().unwrap(); + thread::spawn(move || tokio_test::block_on(up_add_udp_server(sock))); - helper_cluster_up(Config { - agent: vec![agent_port], - local: vec![local_port], + let addrs = helper_cluster_up(Config { + agent: 1, + local_ports: 1, ip, + tcp: false, broker: mqtt_broker_port, + target: Some(target), ..Default::default() }) .await; time::sleep(time::Duration::from_millis(10)).await; let sock = UdpSocket::bind(SocketAddr::new(ip, 0)).await.unwrap(); - sock.connect(SocketAddr::new(ip, local_port)).await.unwrap(); + sock.connect(addrs[0]).await.unwrap(); let mut buf = [0; MAX_BUFFER_SIZE]; let test_msg = b"1+2"; sock.send(test_msg).await.unwrap(); @@ -262,14 +321,13 @@ async fn test_udp_add() { async fn test_udp_ipv6() { let ip = IpAddr::V6(Ipv6Addr::LOCALHOST); let mqtt_broker_port: u16 = pick_unused_port().expect("No ports free"); - let agent_port: u16 = pick_unused_port().expect("No ports free"); - let local_port: u16 = pick_unused_port().expect("No ports free"); - helper_cluster_up(Config { - agent: vec![agent_port], - local: vec![local_port], + + let addrs = helper_cluster_up(Config { + agent: 1, + local_ports: 1, ip, - echo: true, + tcp: false, broker: mqtt_broker_port, ..Default::default() }) @@ -277,7 +335,8 @@ async fn test_udp_ipv6() { time::sleep(time::Duration::from_millis(10)).await; let sock = UdpSocket::bind(SocketAddr::new(ip, 0)).await.unwrap(); - sock.connect(SocketAddr::new(ip, local_port)).await.unwrap(); + sock.connect(addrs[0]).await.unwrap(); + let mut buf = [0; MAX_BUFFER_SIZE]; let test_msg = b"hello, world"; sock.send(test_msg).await.unwrap(); @@ -294,14 +353,12 @@ async fn test_udp_ipv6() { async fn test_udp_two_connect() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let mqtt_broker_port: u16 = pick_unused_port().expect("No ports free"); - let agent_port: u16 = pick_unused_port().expect("No ports free"); - let local_port: u16 = pick_unused_port().expect("No ports free"); - helper_cluster_up(Config { - agent: vec![agent_port], - local: vec![local_port], + let addrs = helper_cluster_up(Config { + agent: 1, + local_ports: 1, ip, - echo: true, + tcp: false, broker: mqtt_broker_port, ..Default::default() }) @@ -310,11 +367,8 @@ async fn test_udp_two_connect() { let sock = UdpSocket::bind(SocketAddr::new(ip, 0)).await.unwrap(); let sock2 = UdpSocket::bind(SocketAddr::new(ip, 0)).await.unwrap(); - sock.connect(SocketAddr::new(ip, local_port)).await.unwrap(); - sock2 - .connect(SocketAddr::new(ip, local_port)) - .await - .unwrap(); + sock.connect(addrs[0]).await.unwrap(); + sock2.connect(addrs[0]).await.unwrap(); let mut buf = [0; MAX_BUFFER_SIZE]; let test_msg = b"hello, world"; @@ -346,23 +400,17 @@ async fn test_udp_two_connect() { async fn test_tcp_echo() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let mqtt_broker_port: u16 = pick_unused_port().expect("No ports free"); - let agent_port: u16 = pick_unused_port().expect("No ports free"); - let local_port: u16 = pick_unused_port().expect("No ports free"); - helper_cluster_up(Config { - agent: vec![agent_port], - local: vec![local_port], + let addrs = helper_cluster_up(Config { + agent: 1, + local_ports: 1, ip, - echo: true, broker: mqtt_broker_port, ..Default::default() }) .await; - time::sleep(time::Duration::from_millis(10)).await; - let mut socket = TcpStream::connect(SocketAddr::new(ip, local_port)) - .await - .unwrap(); + let mut socket = TcpStream::connect(addrs[0]).await.unwrap(); let mut buf = [0; MAX_BUFFER_SIZE]; let test_msg = b"hello, world"; @@ -380,25 +428,24 @@ async fn test_tcp_echo() { async fn test_tcp_add() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let mqtt_broker_port: u16 = pick_unused_port().expect("No ports free"); - let agent_port: u16 = pick_unused_port().expect("No ports free"); - let local_port: u16 = pick_unused_port().expect("No ports free"); - thread::spawn(move || tokio_test::block_on(up_add_tcp_server(SocketAddr::new(ip, agent_port)))); + let listener = TcpListener::bind(SocketAddr::new(ip, 0)).await.unwrap(); + let target = listener.local_addr().unwrap(); + thread::spawn(move || tokio_test::block_on(up_add_tcp_server(listener))); - helper_cluster_up(Config { - agent: vec![agent_port], - local: vec![local_port], + let addrs = helper_cluster_up(Config { + agent: 1, + local_ports: 1, ip, broker: mqtt_broker_port, + target: Some(target), ..Default::default() }) .await; time::sleep(time::Duration::from_millis(10)).await; - let mut socket = TcpStream::connect(SocketAddr::new(ip, local_port)) - .await - .unwrap(); + let mut socket = TcpStream::connect(addrs[0]).await.unwrap(); let mut buf = [0; MAX_BUFFER_SIZE]; @@ -417,26 +464,25 @@ async fn test_tcp_add() { async fn test_kcp_add() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let mqtt_broker_port: u16 = pick_unused_port().expect("No ports free"); - let agent_port: u16 = pick_unused_port().expect("No ports free"); - let local_port: u16 = pick_unused_port().expect("No ports free"); - thread::spawn(move || tokio_test::block_on(up_add_tcp_server(SocketAddr::new(ip, agent_port)))); + let listener = TcpListener::bind(SocketAddr::new(ip, 0)).await.unwrap(); + let target = listener.local_addr().unwrap(); + thread::spawn(move || tokio_test::block_on(up_add_tcp_server(listener))); - helper_cluster_up(Config { - agent: vec![agent_port], - local: vec![local_port], + let addrs = helper_cluster_up(Config { + agent: 1, + local_ports: 1, ip, kcp: true, broker: mqtt_broker_port, + target: Some(target), ..Default::default() }) .await; time::sleep(time::Duration::from_millis(10)).await; - let mut socket = TcpStream::connect(SocketAddr::new(ip, local_port)) - .await - .unwrap(); + let mut socket = TcpStream::connect(addrs[0]).await.unwrap(); let mut buf = [0; MAX_BUFFER_SIZE]; @@ -456,28 +502,25 @@ async fn test_tcp_echo_restart() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let mqtt_broker_port: u16 = pick_unused_port().expect("No ports free"); let agent_port: u16 = pick_unused_port().expect("No ports free"); - let local_port: u16 = pick_unused_port().expect("No ports free"); - helper_cluster_up(Config { - agent: vec![agent_port], - local: vec![local_port], + let target = SocketAddr::new(ip, agent_port); + + let addrs = helper_cluster_up(Config { + agent: 1, + local_ports: 1, ip, broker: mqtt_broker_port, + target: Some(target), ..Default::default() }) .await; - let agent_addr = SocketAddr::new(ip, agent_port); - - time::sleep(time::Duration::from_millis(10)).await; - for i in 0..10 { - let handle = tokio::spawn(up_echo_tcp_server(agent_addr)); - wait_for_port_availabilty(agent_addr).await; + time::sleep(time::Duration::from_millis(10)).await; + let listener = TcpListener::bind(target).await.unwrap(); + let handle = tokio::spawn(up_echo_tcp_server(listener)); - let mut socket = TcpStream::connect(SocketAddr::new(ip, local_port)) - .await - .unwrap(); + let mut socket = TcpStream::connect(addrs[0]).await.unwrap(); let mut buf = [0; MAX_BUFFER_SIZE]; let test_msg = format!("hello, world: {}", i); @@ -498,23 +541,19 @@ async fn test_tcp_echo_restart() { async fn test_kcp() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let mqtt_broker_port: u16 = pick_unused_port().expect("No ports free"); - let agent_port: u16 = pick_unused_port().expect("No ports free"); - let local_port: u16 = pick_unused_port().expect("No ports free"); - helper_cluster_up(Config { - agent: vec![agent_port], - local: vec![local_port], + + let addrs = helper_cluster_up(Config { + agent: 1, + local_ports: 1, ip, kcp: true, - echo: true, broker: mqtt_broker_port, + ..Default::default() }) .await; - time::sleep(time::Duration::from_millis(10)).await; - let mut socket = TcpStream::connect(SocketAddr::new(ip, local_port)) - .await - .unwrap(); + let mut socket = TcpStream::connect(addrs[0]).await.unwrap(); let mut buf = [0; MAX_BUFFER_SIZE]; let test_msg = b"hello, world"; @@ -533,29 +572,26 @@ async fn test_kcp_echo_restart() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let mqtt_broker_port: u16 = pick_unused_port().expect("No ports free"); let agent_port: u16 = pick_unused_port().expect("No ports free"); - let local_port: u16 = pick_unused_port().expect("No ports free"); - helper_cluster_up(Config { - agent: vec![agent_port], - local: vec![local_port], + let target = SocketAddr::new(ip, agent_port); + + let addrs = helper_cluster_up(Config { + agent: 1, + local_ports: 1, ip, kcp: true, broker: mqtt_broker_port, + target: Some(target), ..Default::default() }) .await; - let agent_addr = SocketAddr::new(ip, agent_port); - - time::sleep(time::Duration::from_millis(10)).await; - for i in 0..10 { - let handle = tokio::spawn(up_echo_tcp_server(agent_addr)); - wait_for_port_availabilty(agent_addr).await; + time::sleep(time::Duration::from_millis(10)).await; + let listener = TcpListener::bind(target).await.unwrap(); + let handle = tokio::spawn(up_echo_tcp_server(listener)); - let mut socket = TcpStream::connect(SocketAddr::new(ip, local_port)) - .await - .unwrap(); + let mut socket = TcpStream::connect(addrs[0]).await.unwrap(); let mut buf = [0; MAX_BUFFER_SIZE]; let test_msg = format!("hello, world: {}", i); @@ -577,49 +613,25 @@ async fn test_kcp_echo_restart() { async fn test_socks_simple() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let mqtt_broker_port: u16 = pick_unused_port().expect("No ports free"); - let agent_port: u16 = pick_unused_port().expect("No ports free"); - let local_port: u16 = pick_unused_port().expect("No ports free"); - helper_cluster_up(Config { - agent: vec![agent_port], + + let listener = TcpListener::bind(SocketAddr::new(ip, 0)).await.unwrap(); + let target = listener.local_addr().unwrap(); + let addrs = helper_cluster_up(Config { + agent: 1, + local_socks: 1, ip, broker: mqtt_broker_port, + target: Some(target), ..Default::default() }) .await; - let tcp_over_kcp = false; - let mqtt_broker_host = ip; - - let agent_id = "0"; - let local_id = "0"; - - let agent_addr = SocketAddr::new(ip, agent_port); - let local_addr = SocketAddr::new(ip, local_port); + time::sleep(time::Duration::from_millis(10)).await; let message = "Hello World!"; - thread::spawn(move || { - tokio_test::block_on(up_http_server(agent_addr, message)); - }); - - thread::spawn(move || { - tokio_test::block_on(proxy::local_socks( - &format!( - "mqtt://{}/{}?client_id=test-proxy-socks-{}", - SocketAddr::new(mqtt_broker_host, mqtt_broker_port), - MQTT_TOPIC_PREFIX, - local_id - ), - local_addr, - agent_id, - local_id, - None, - None, - tcp_over_kcp, - )) - }); - - wait_for_port_availabilty(local_addr).await; + thread::spawn(move || tokio_test::block_on(up_http_server(listener, message))); + let local_addr = addrs[0]; let client = reqwest::Client::builder() .connect_timeout(time::Duration::from_secs(1)) .timeout(time::Duration::from_secs(1)) @@ -661,46 +673,26 @@ async fn test_socks_restart() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let mqtt_broker_port: u16 = pick_unused_port().expect("No ports free"); let agent_port: u16 = pick_unused_port().expect("No ports free"); - let local_port: u16 = pick_unused_port().expect("No ports free"); - helper_cluster_up(Config { - agent: vec![agent_port], + let target = SocketAddr::new(ip, agent_port); + + //let target = listener.local_addr().unwrap(); + let addrs = helper_cluster_up(Config { + agent: 1, + local_socks: 1, ip, broker: mqtt_broker_port, + target: Some(target), ..Default::default() }) .await; - let mqtt_broker_host = ip; - - let agent_id = "0"; - let local_id = "0"; - - let agent_addr = SocketAddr::new(ip, agent_port); - let local_addr = SocketAddr::new(ip, local_port); - - thread::spawn(move || { - tokio_test::block_on(proxy::local_socks( - &format!( - "mqtt://{}/{}?client_id=test-proxy-socks-{}", - SocketAddr::new(mqtt_broker_host, mqtt_broker_port), - MQTT_TOPIC_PREFIX, - local_id - ), - local_addr, - agent_id, - local_id, - None, - None, - false, - )) - }); - wait_for_port_availabilty(local_addr).await; - + let local_addr = addrs[0]; for _ in 0..10 { + time::sleep(time::Duration::from_millis(10)).await; let message = "Hello World!"; - let handle = tokio::spawn(up_http_server(agent_addr, message)); - wait_for_port_availabilty(agent_addr).await; + let listener = TcpListener::bind(target).await.unwrap(); + let handle = tokio::spawn(up_http_server(listener, message)); let client = reqwest::Client::builder() .connect_timeout(time::Duration::from_secs(1)) @@ -744,57 +736,45 @@ async fn test_socks_restart() { #[tokio::test] async fn test_socks_multiple_server() { let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); - let mqtt_broker_port: u16 = pick_unused_port().expect("No ports free"); - - let agent_ports: Vec = (0..10) - .map(|_| pick_unused_port().expect("No ports free")) - .collect(); - let local_port: u16 = pick_unused_port().expect("No ports free"); - - for (id, port) in agent_ports.iter().enumerate() { - let agent_addr = SocketAddr::new(ip, *port); - let message = id.to_string(); - thread::spawn(move || { - tokio_test::block_on(up_http_server(agent_addr, &message)); - }); - } - time::sleep(time::Duration::from_millis(100)).await; + let mqtt_broker_port: u16 = 1883; + let n = 10; - helper_cluster_up(Config { - agent: agent_ports.clone(), + let addrs = helper_cluster_up(Config { + local_socks: 1, ip, broker: mqtt_broker_port, ..Default::default() }) .await; - let tcp_over_kcp = false; - let mqtt_broker_host = ip; - let agent_id = "0"; - let local_id = "0"; + for id in 0..n { + let message = id.to_string(); - let local_addr = SocketAddr::new(ip, local_port); - thread::spawn(move || { - tokio_test::block_on(proxy::local_socks( - &format!( - "mqtt://{}/{}?client_id=test-proxy-socks-{}", - SocketAddr::new(mqtt_broker_host, mqtt_broker_port), - MQTT_TOPIC_PREFIX, - local_id - ), - local_addr, - agent_id, - local_id, - None, - None, - tcp_over_kcp, - )) - }); + let listener = TcpListener::bind(SocketAddr::new(ip, 0)).await.unwrap(); + let addr = listener.local_addr().unwrap(); - wait_for_port_availabilty(local_addr).await; + thread::spawn(move || tokio_test::block_on(up_http_server(listener, &message))); + thread::spawn(move || { + tokio_test::block_on(proxy::agent( + &format!( + "mqtt://{}/{}?client_id=test-proxy-agent-{}", + SocketAddr::new(ip, mqtt_broker_port), + MQTT_TOPIC_PREFIX, + id + ), + addr, + &id.to_string(), + None, + None, + )) + }); + } + time::sleep(time::Duration::from_millis(100)).await; - for (id, _port) in agent_ports.iter().enumerate() { + let local_addr = addrs[0]; + + for id in 0..n { let client = reqwest::Client::builder() .connect_timeout(time::Duration::from_secs(1)) .timeout(time::Duration::from_secs(1)) @@ -824,8 +804,6 @@ async fn test_xdata() { let agent_port: u16 = pick_unused_port().expect("No ports free"); let agent_port_1: u16 = pick_unused_port().expect("No ports free"); let agent_port_2: u16 = pick_unused_port().expect("No ports free"); - let local_port_1: u16 = pick_unused_port().expect("No ports free"); - let local_port_2: u16 = pick_unused_port().expect("No ports free"); helper_cluster_up(Config { ip, kcp: true, @@ -902,17 +880,17 @@ async fn test_xdata() { time::sleep(time::Duration::from_millis(100)).await; + let listener = TcpListener::bind(SocketAddr::new(ip, 0)).await.unwrap(); thread::spawn(move || { let id = "local-x"; - let addr = SocketAddr::new(ip, local_port_1); - tokio_test::block_on(proxy::local( + tokio_test::block_on(proxy::local_ports_tcp( &format!( "mqtt://{}/{}?client_id=test-proxy-local-{}", SocketAddr::new(ip, mqtt_broker_port), MQTT_TOPIC_PREFIX, id ), - addr, + listener, id, id, Some((msg_3_clone, None)), @@ -923,17 +901,17 @@ async fn test_xdata() { time::sleep(time::Duration::from_millis(100)).await; + let listener = TcpListener::bind(SocketAddr::new(ip, 0)).await.unwrap(); thread::spawn(move || { let id = "socks-x"; - let addr = SocketAddr::new(ip, local_port_2); - tokio_test::block_on(proxy::local( + tokio_test::block_on(proxy::local_ports_tcp( &format!( "mqtt://{}/{}?client_id=test-proxy-local-{}", SocketAddr::new(ip, mqtt_broker_port), MQTT_TOPIC_PREFIX, id ), - addr, + listener, id, id, Some((msg_4_clone, None)), diff --git a/liveman/src/lib.rs b/liveman/src/lib.rs index fe6a4457..971eb573 100644 --- a/liveman/src/lib.rs +++ b/liveman/src/lib.rs @@ -89,9 +89,10 @@ where tokio::runtime::Runtime::new() .unwrap() .block_on(async move { + let listener = TcpListener::bind(c.listen).await.unwrap(); net4mqtt::proxy::local_socks( &c.mqtt_url, - c.listen, + listener, "-", &c.alias.clone(), None,