diff --git a/libs/net4mqtt/src/tests.rs b/libs/net4mqtt/src/tests.rs index 2d2185a..ab569bb 100644 --- a/libs/net4mqtt/src/tests.rs +++ b/libs/net4mqtt/src/tests.rs @@ -4,7 +4,21 @@ use std::thread; use anyhow::{Error, Result}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream, UdpSocket}; -use tokio::time; +use tokio::time::{interval, sleep, timeout_at, Duration, Instant}; + +#[macro_export] +macro_rules! timeout_await { + ($future:expr) => { + timeout_at(Instant::now() + Duration::from_secs(1), $future) + .await + .unwrap() + }; + ($future:expr, $duration:expr) => { + timeout_at(Instant::now() + $duration, $future) + .await + .unwrap() + }; +} use crate::broker; use crate::kxdns; @@ -16,7 +30,7 @@ async fn check_port_availability(addr: SocketAddr) -> bool { } async fn wait_for_port_availabilty(addr: SocketAddr) -> bool { - let mut interval = time::interval(time::Duration::from_millis(1)); + let mut interval = interval(Duration::from_millis(1)); loop { if check_port_availability(addr).await { return true; @@ -263,19 +277,19 @@ async fn test_udp_simple() { ..Default::default() }) .await; - time::sleep(time::Duration::from_millis(10)).await; + sleep(Duration::from_millis(10)).await; let sock = UdpSocket::bind(SocketAddr::new(ip, 0)).await.unwrap(); sock.connect(addrs[0]).await.unwrap(); let mut buf = [0; MAX_BUFFER_SIZE]; let test_msg = b"hello, world"; sock.send(test_msg).await.unwrap(); - let len = sock.recv(&mut buf).await.unwrap(); + let len = timeout_await!(sock.recv(&mut buf)).unwrap(); assert_eq!(&buf[..len], test_msg); let test_msg2 = b"hello, world2"; sock.send(test_msg2).await.unwrap(); - let len = sock.recv(&mut buf).await.unwrap(); + let len = timeout_await!(sock.recv(&mut buf)).unwrap(); assert_eq!(&buf[..len], test_msg2); } @@ -299,19 +313,19 @@ async fn test_udp_add() { ..Default::default() }) .await; - time::sleep(time::Duration::from_millis(10)).await; + sleep(Duration::from_millis(10)).await; let sock = UdpSocket::bind(SocketAddr::new(ip, 0)).await.unwrap(); sock.connect(addrs[0]).await.unwrap(); let mut buf = [0; MAX_BUFFER_SIZE]; let test_msg = b"1+2"; sock.send(test_msg).await.unwrap(); - let len = sock.recv(&mut buf).await.unwrap(); + let len = timeout_await!(sock.recv(&mut buf)).unwrap(); assert_eq!(std::str::from_utf8(&buf[..len]), Ok("3")); let test_msg2 = b"123456+543210"; sock.send(test_msg2).await.unwrap(); - let len = sock.recv(&mut buf).await.unwrap(); + let len = timeout_await!(sock.recv(&mut buf)).unwrap(); assert_eq!(std::str::from_utf8(&buf[..len]), Ok("666666")); } @@ -330,7 +344,7 @@ async fn test_udp_ipv6() { ..Default::default() }) .await; - time::sleep(time::Duration::from_millis(10)).await; + sleep(Duration::from_millis(10)).await; let sock = UdpSocket::bind(SocketAddr::new(ip, 0)).await.unwrap(); sock.connect(addrs[0]).await.unwrap(); @@ -338,12 +352,12 @@ async fn test_udp_ipv6() { let mut buf = [0; MAX_BUFFER_SIZE]; let test_msg = b"hello, world"; sock.send(test_msg).await.unwrap(); - let len = sock.recv(&mut buf).await.unwrap(); + let len = timeout_await!(sock.recv(&mut buf)).unwrap(); assert_eq!(&buf[..len], test_msg); let test_msg2 = b"hello, world2"; sock.send(test_msg2).await.unwrap(); - let len = sock.recv(&mut buf).await.unwrap(); + let len = timeout_await!(sock.recv(&mut buf)).unwrap(); assert_eq!(&buf[..len], test_msg2); } @@ -361,7 +375,7 @@ async fn test_udp_two_connect() { ..Default::default() }) .await; - time::sleep(time::Duration::from_millis(10)).await; + sleep(Duration::from_millis(10)).await; let sock = UdpSocket::bind(SocketAddr::new(ip, 0)).await.unwrap(); let sock2 = UdpSocket::bind(SocketAddr::new(ip, 0)).await.unwrap(); @@ -373,24 +387,24 @@ async fn test_udp_two_connect() { let test_2_msg = b"hello, world 22222222222222222222"; sock.send(test_msg).await.unwrap(); sock2.send(test_2_msg).await.unwrap(); - let len = sock.recv(&mut buf).await.unwrap(); + let len = timeout_await!(sock.recv(&mut buf)).unwrap(); assert_eq!(&buf[..len], test_msg); - let len = sock2.recv(&mut buf).await.unwrap(); + let len = timeout_await!(sock2.recv(&mut buf)).unwrap(); assert_eq!(&buf[..len], test_2_msg); let test_2_msg2 = b"hello, world yyyyyyyy"; sock2.send(test_2_msg2).await.unwrap(); - let len = sock2.recv(&mut buf).await.unwrap(); + let len = timeout_await!(sock2.recv(&mut buf)).unwrap(); assert_eq!(&buf[..len], test_2_msg2); let test_msg2 = b"hello, world2"; sock.send(test_msg2).await.unwrap(); - let len = sock.recv(&mut buf).await.unwrap(); + let len = timeout_await!(sock.recv(&mut buf)).unwrap(); assert_eq!(&buf[..len], test_msg2); let test_2_msg3 = b"hello, world 333333"; sock2.send(test_2_msg3).await.unwrap(); - let len = sock2.recv(&mut buf).await.unwrap(); + let len = timeout_await!(sock2.recv(&mut buf)).unwrap(); assert_eq!(&buf[..len], test_2_msg3); } @@ -413,12 +427,12 @@ async fn test_tcp_echo() { let mut buf = [0; MAX_BUFFER_SIZE]; let test_msg = b"hello, world"; socket.write_all(test_msg).await.unwrap(); - let len = socket.read(&mut buf).await.unwrap(); + let len = timeout_await!(socket.read(&mut buf)).unwrap(); assert_eq!(&buf[..len], test_msg); let test_msg2 = b"hello, world2"; socket.write_all(test_msg2).await.unwrap(); - let len = socket.read(&mut buf).await.unwrap(); + let len = timeout_await!(socket.read(&mut buf)).unwrap(); assert_eq!(&buf[..len], test_msg2); } @@ -441,7 +455,7 @@ async fn test_tcp_add() { ..Default::default() }) .await; - time::sleep(time::Duration::from_millis(10)).await; + sleep(Duration::from_millis(10)).await; let mut socket = TcpStream::connect(addrs[0]).await.unwrap(); @@ -449,12 +463,12 @@ async fn test_tcp_add() { let test_msg = b"1+2"; socket.write_all(test_msg).await.unwrap(); - let len = socket.read(&mut buf).await.unwrap(); + let len = timeout_await!(socket.read(&mut buf)).unwrap(); assert_eq!(std::str::from_utf8(&buf[..len]), Ok("3")); let test_msg2 = b"123456+543210"; socket.write_all(test_msg2).await.unwrap(); - let len = socket.read(&mut buf).await.unwrap(); + let len = timeout_await!(socket.read(&mut buf)).unwrap(); assert_eq!(std::str::from_utf8(&buf[..len]), Ok("666666")); } @@ -478,7 +492,7 @@ async fn test_kcp_add() { ..Default::default() }) .await; - time::sleep(time::Duration::from_millis(10)).await; + sleep(Duration::from_millis(10)).await; let mut socket = TcpStream::connect(addrs[0]).await.unwrap(); @@ -486,12 +500,12 @@ async fn test_kcp_add() { let test_msg = b"1+2"; socket.write_all(test_msg).await.unwrap(); - let len = socket.read(&mut buf).await.unwrap(); + let len = timeout_await!(socket.read(&mut buf)).unwrap(); assert_eq!(std::str::from_utf8(&buf[..len]), Ok("3")); let test_msg2 = b"123456+543210"; socket.write_all(test_msg2).await.unwrap(); - let len = socket.read(&mut buf).await.unwrap(); + let len = timeout_await!(socket.read(&mut buf)).unwrap(); assert_eq!(std::str::from_utf8(&buf[..len]), Ok("666666")); } @@ -514,7 +528,7 @@ async fn test_tcp_echo_restart() { .await; for i in 0..10 { - time::sleep(time::Duration::from_millis(10)).await; + sleep(Duration::from_millis(10)).await; let listener = TcpListener::bind(target).await.unwrap(); let handle = tokio::spawn(up_echo_tcp_server(listener)); @@ -523,12 +537,12 @@ async fn test_tcp_echo_restart() { let mut buf = [0; MAX_BUFFER_SIZE]; let test_msg = format!("hello, world: {}", i); socket.write_all(test_msg.as_bytes()).await.unwrap(); - let len = socket.read(&mut buf).await.unwrap(); + let len = timeout_await!(socket.read(&mut buf)).unwrap(); assert_eq!(&buf[..len], test_msg.as_bytes()); let test_msg2 = format!("the end: {}", i); socket.write_all(test_msg2.as_bytes()).await.unwrap(); - let len = socket.read(&mut buf).await.unwrap(); + let len = timeout_await!(socket.read(&mut buf)).unwrap(); assert_eq!(&buf[..len], test_msg2.as_bytes()); handle.abort(); @@ -556,12 +570,12 @@ async fn test_kcp() { let mut buf = [0; MAX_BUFFER_SIZE]; let test_msg = b"hello, world"; socket.write_all(test_msg).await.unwrap(); - let len = socket.read(&mut buf).await.unwrap(); + let len = timeout_await!(socket.read(&mut buf)).unwrap(); assert_eq!(&buf[..len], test_msg); let test_msg2 = b"hello, world2"; socket.write_all(test_msg2).await.unwrap(); - let len = socket.read(&mut buf).await.unwrap(); + let len = timeout_await!(socket.read(&mut buf)).unwrap(); assert_eq!(&buf[..len], test_msg2); } @@ -585,7 +599,7 @@ async fn test_kcp_echo_restart() { .await; for i in 0..10 { - time::sleep(time::Duration::from_millis(10)).await; + sleep(Duration::from_millis(10)).await; let listener = TcpListener::bind(target).await.unwrap(); let handle = tokio::spawn(up_echo_tcp_server(listener)); @@ -594,12 +608,12 @@ async fn test_kcp_echo_restart() { let mut buf = [0; MAX_BUFFER_SIZE]; let test_msg = format!("hello, world: {}", i); socket.write_all(test_msg.as_bytes()).await.unwrap(); - let len = socket.read(&mut buf).await.unwrap(); + let len = timeout_await!(socket.read(&mut buf)).unwrap(); assert_eq!(&buf[..len], test_msg.as_bytes()); let test_msg2 = format!("the end: {}", i); socket.write_all(test_msg2.as_bytes()).await.unwrap(); - let len = socket.read(&mut buf).await.unwrap(); + let len = timeout_await!(socket.read(&mut buf)).unwrap(); assert_eq!(&buf[..len], test_msg2.as_bytes()); handle.abort(); @@ -624,15 +638,15 @@ async fn test_socks_simple() { ..Default::default() }) .await; - time::sleep(time::Duration::from_millis(10)).await; + sleep(Duration::from_millis(10)).await; let message = "Hello World!"; thread::spawn(move || tokio_test::block_on(up_http_server(listener, message))); let local_addr = addrs[0]; let client = reqwest::Client::builder() - .connect_timeout(time::Duration::from_secs(1)) - .timeout(time::Duration::from_secs(1)) + .connect_timeout(Duration::from_secs(1)) + .timeout(Duration::from_secs(1)) .proxy(reqwest::Proxy::http(format!("socks5h://{}", local_addr)).unwrap()) .build() .unwrap(); @@ -647,27 +661,29 @@ async fn test_socks_simple() { .unwrap(); assert_eq!(res.status(), reqwest::StatusCode::OK); - let body = res.text().await.unwrap(); + let body = timeout_await!(res.text()).unwrap(); assert_eq!(body, message); let client = reqwest::Client::builder() - .connect_timeout(time::Duration::from_secs(1)) - .timeout(time::Duration::from_secs(1)) + .connect_timeout(Duration::from_secs(1)) + .timeout(Duration::from_secs(1)) .proxy(reqwest::Proxy::http(format!("socks5h://{}", local_addr)).unwrap()) .build() .unwrap(); - let res = client - .get(format!( - "http://{}/", - kxdns::Kxdns::new(DOMAIN_SUFFIX.to_string()).registry("0") - )) - .send() - .await - .unwrap(); + let res = timeout_await!( + client + .get(format!( + "http://{}/", + kxdns::Kxdns::new(DOMAIN_SUFFIX.to_string()).registry("0") + )) + .send(), + Duration::from_secs(5) + ) + .unwrap(); assert_eq!(res.status(), reqwest::StatusCode::OK); - let body = res.text().await.unwrap(); + let body = timeout_await!(res.text()).unwrap(); assert_eq!(body, message); } @@ -691,49 +707,53 @@ async fn test_socks_restart() { let local_addr = addrs[0]; for _ in 0..10 { - time::sleep(time::Duration::from_millis(10)).await; + sleep(Duration::from_millis(10)).await; let message = "Hello World!"; let listener = TcpListener::bind(target).await.unwrap(); let handle = tokio::spawn(up_http_server(listener, message)); let client = reqwest::Client::builder() - .connect_timeout(time::Duration::from_secs(1)) - .timeout(time::Duration::from_secs(1)) + .connect_timeout(Duration::from_secs(1)) + .timeout(Duration::from_secs(1)) .proxy(reqwest::Proxy::http(format!("socks5h://{}", local_addr)).unwrap()) .build() .unwrap(); - let res = client - .get(format!( - "http://{}/", - kxdns::Kxdns::new(DOMAIN_SUFFIX.to_string()).registry("0") - )) - .send() - .await - .unwrap(); + let res = timeout_await!( + client + .get(format!( + "http://{}/", + kxdns::Kxdns::new(DOMAIN_SUFFIX.to_string()).registry("0") + )) + .send(), + Duration::from_secs(5) + ) + .unwrap(); assert_eq!(res.status(), reqwest::StatusCode::OK); - let body = res.text().await.unwrap(); + let body = timeout_await!(res.text()).unwrap(); assert_eq!(body, message); let client = reqwest::Client::builder() - .connect_timeout(time::Duration::from_secs(1)) - .timeout(time::Duration::from_secs(1)) + .connect_timeout(Duration::from_secs(1)) + .timeout(Duration::from_secs(1)) .proxy(reqwest::Proxy::http(format!("socks5h://{}", local_addr)).unwrap()) .build() .unwrap(); - let res = client - .get(format!( - "http://{}/", - kxdns::Kxdns::new(DOMAIN_SUFFIX.to_string()).registry("0") - )) - .send() - .await - .unwrap(); + let res = timeout_await!( + client + .get(format!( + "http://{}/", + kxdns::Kxdns::new(DOMAIN_SUFFIX.to_string()).registry("0") + )) + .send(), + Duration::from_secs(5) + ) + .unwrap(); assert_eq!(res.status(), reqwest::StatusCode::OK); - let body = res.text().await.unwrap(); + let body = timeout_await!(res.text()).unwrap(); assert_eq!(body, message); handle.abort(); @@ -777,14 +797,14 @@ async fn test_socks_multiple_server() { )) }); } - time::sleep(time::Duration::from_millis(100)).await; + sleep(Duration::from_millis(100)).await; let local_addr = addrs[0]; for id in 0..n { let client = reqwest::Client::builder() - .connect_timeout(time::Duration::from_secs(1)) - .timeout(time::Duration::from_secs(1)) + .connect_timeout(Duration::from_secs(1)) + .timeout(Duration::from_secs(1)) .proxy( // References: https://github.com/seanmonstar/reqwest/issues/899 reqwest::Proxy::all(format!("socks5h://{}", local_addr)).unwrap(), @@ -792,17 +812,19 @@ async fn test_socks_multiple_server() { .build() .unwrap(); - let res = client - .get(format!( - "http://{}/", - kxdns::Kxdns::new(DOMAIN_SUFFIX.to_string()).registry(&id.to_string()) - )) - .send() - .await - .unwrap(); + let res = timeout_await!( + client + .get(format!( + "http://{}/", + kxdns::Kxdns::new(DOMAIN_SUFFIX.to_string()).registry(&id.to_string()) + )) + .send(), + Duration::from_secs(5) + ) + .unwrap(); assert_eq!(res.status(), reqwest::StatusCode::OK); - let body = res.text().await.unwrap(); + let body = timeout_await!(res.text()).unwrap(); assert_eq!(body, id.to_string()); } } @@ -853,7 +875,7 @@ async fn test_vdata() { )) }); - time::sleep(time::Duration::from_millis(100)).await; + sleep(Duration::from_millis(100)).await; thread::spawn(move || { let id = 1; @@ -873,7 +895,7 @@ async fn test_vdata() { }), )) }); - time::sleep(time::Duration::from_millis(100)).await; + sleep(Duration::from_millis(100)).await; thread::spawn(move || { let id = 2; @@ -894,7 +916,7 @@ async fn test_vdata() { )) }); - time::sleep(time::Duration::from_millis(100)).await; + sleep(Duration::from_millis(100)).await; let listener = TcpListener::bind(SocketAddr::new(ip, 0)).await.unwrap(); thread::spawn(move || { @@ -917,7 +939,7 @@ async fn test_vdata() { )) }); - time::sleep(time::Duration::from_millis(100)).await; + sleep(Duration::from_millis(100)).await; let listener = TcpListener::bind(SocketAddr::new(ip, 0)).await.unwrap(); thread::spawn(move || { @@ -940,18 +962,18 @@ async fn test_vdata() { )) }); - let (agent_id, _local_id, r1) = receiver.recv().await.unwrap(); + let (agent_id, _local_id, r1) = timeout_await!(receiver.recv()).unwrap(); assert_eq!(msg_1, r1); assert_eq!("1", agent_id); - let (agent_id, _local_id, r2) = receiver.recv().await.unwrap(); + let (agent_id, _local_id, r2) = timeout_await!(receiver.recv()).unwrap(); assert_eq!("2", agent_id); assert_eq!(msg_2, r2); - let (agent_id, local_id, data) = receiver.recv().await.unwrap(); + let (agent_id, local_id, data) = timeout_await!(receiver.recv()).unwrap(); assert_eq!(msg_3, data); assert_eq!("-", agent_id); assert_eq!("local-x", local_id); - let (agent_id, local_id, data) = receiver.recv().await.unwrap(); + let (agent_id, local_id, data) = timeout_await!(receiver.recv()).unwrap(); assert_eq!("-", agent_id); assert_eq!(msg_4, data); assert_eq!("socks-x", local_id);