Skip to content

Commit

Permalink
refactor(net4mqtt): local socks port interface
Browse files Browse the repository at this point in the history
  • Loading branch information
a-wing committed Oct 5, 2024
1 parent 0150c8a commit 6428478
Show file tree
Hide file tree
Showing 4 changed files with 327 additions and 270 deletions.
22 changes: 18 additions & 4 deletions libs/net4mqtt/bin/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

use clap::{ArgAction, Parser, Subcommand};
use tokio::net::{TcpListener, UdpSocket};
use tracing::{debug, info, trace, Level};

use net4mqtt::proxy;
Expand Down Expand Up @@ -53,6 +54,9 @@ enum Commands {
/// Set Current local id
#[arg(short, long, default_value_t = format!("-"))]
id: String,
/// use udp port
#[arg(short, long, default_value_t = false)]
udp: bool,
/// enable kcp in mqtt
#[arg(short, long, default_value_t = false)]
kcp: bool,
Expand Down Expand Up @@ -99,7 +103,8 @@ async fn main() {
info!("Running as socks, {:?}", listen);
debug!("use domain: {:?}", domain);

proxy::local_socks(&mqtt_url, listen, &agent_id, &id, None, None, kcp)
let listener = TcpListener::bind(listen).await.unwrap();
proxy::local_socks(&mqtt_url, listener, &agent_id, &id, None, None, kcp)
.await
.unwrap();
}
Expand All @@ -108,13 +113,22 @@ async fn main() {
listen,
agent_id,
id,
udp,
kcp,
} => {
info!("Running as local, {:?}", listen);

proxy::local(&mqtt_url, listen, &agent_id, &id, None, None, kcp)
.await
.unwrap();
if udp {
let sock = UdpSocket::bind(listen).await.unwrap();
proxy::local_ports_udp(&mqtt_url, sock, &agent_id, &id, None, None)
.await
.unwrap();
} else {
let listener = TcpListener::bind(listen).await.unwrap();
proxy::local_ports_tcp(&mqtt_url, listener, &agent_id, &id, None, None, kcp)
.await
.unwrap();
}
}
Commands::Agent {
mqtt_url,
Expand Down
94 changes: 79 additions & 15 deletions libs/net4mqtt/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,9 @@ pub async fn agent(
}
}

pub async fn local(
pub async fn local_ports_tcp(
mqtt_url: &str,
address: SocketAddr,
listener: TcpListener,
agent_id: &str,
local_id: &str,
xdata: Option<(Vec<u8>, Option<Vec<u8>>)>,
Expand All @@ -376,14 +376,10 @@ pub async fn local(
)
.await;

let mut buf = [0; MAX_BUFFER_SIZE];
let listener = TcpListener::bind(address).await.unwrap();
let sock = UdpSocket::bind(address).await.unwrap();
loop {
let sender = sender.clone();
let on_xdata = on_xdata.clone();
select! {
// TCP Server
Ok((socket, _)) = listener.accept() => {
let (vnet_tx, vnet_rx) = unbounded_channel::<(String, Vec<u8>)>();

Expand All @@ -403,12 +399,6 @@ pub async fn local(
});
}

// UDP Server
Ok((len, addr)) = sock.recv_from(&mut buf) => {
sender.send((
topic::build(prefix, agent_id, local_id, topic::label::I, topic::protocol::UDP, &addr.to_string()),
buf[..len].to_vec())).unwrap();
}
result = receiver.recv() => {
match result {
Some((key, data)) => {
Expand All @@ -427,7 +417,7 @@ pub async fn local(
Ok(notification) => {
if let Some(p) = mqtt_receive(notification) {
let topic = p.topic.clone();
let (_prefix, agent_id, local_id, label, protocol, address) = topic::parse(&topic);
let (_prefix, agent_id, local_id, label, protocol, _address) = topic::parse(&topic);

match (label, protocol) {
(topic::label::X, _) => {
Expand All @@ -444,6 +434,81 @@ pub async fn local(
}
}
},
(label, protocol) => info!("unknown label: {} and protocol: {}", label, protocol)
}
}
},
Err(e) => {
error!("local mqtt error: {:?}", e);
time::sleep(time::Duration::from_secs(1)).await;
}
}

}
else => { error!("vlocal proxy error"); }
}
}
}

pub async fn local_ports_udp(
mqtt_url: &str,
sock: UdpSocket,
agent_id: &str,
local_id: &str,
xdata: Option<(Vec<u8>, Option<Vec<u8>>)>,
on_xdata: Option<Sender<(String, String, Vec<u8>)>>,
) -> Result<(), Error> {
let (sender, mut receiver) = unbounded_channel::<(String, Vec<u8>)>();

let (url, prefix) = crate::utils::pre_url(mqtt_url.parse::<Url>()?);
let prefix = prefix.as_str();

let (client, mut eventloop) = mqtt_client_init(
url,
topic::build_sub(prefix, topic::ANY, local_id, topic::label::O),
topic::build_sub(prefix, topic::ANY, topic::ANY, topic::label::X),
topic::build_pub_x(prefix, topic::NIL, local_id, topic::label::X),
xdata,
on_xdata.is_some(),
)
.await;

let mut buf = [0; MAX_BUFFER_SIZE];
loop {
let sender = sender.clone();
let on_xdata = on_xdata.clone();
select! {
Ok((len, addr)) = sock.recv_from(&mut buf) => {
sender.send((
topic::build(prefix, agent_id, local_id, topic::label::I, topic::protocol::UDP, &addr.to_string()),
buf[..len].to_vec())).unwrap();
}
result = receiver.recv() => {
match result {
Some((key, data)) => {
client.publish(
key,
QoS::AtMostOnce,
false,
data
).await?;
}
None => return Err(anyhow!("recv error"))
}
}
result = eventloop.poll() => {
match result {
Ok(notification) => {
if let Some(p) = mqtt_receive(notification) {
let topic = p.topic.clone();
let (_prefix, _agent_id, _local_id, label, protocol, address) = topic::parse(&topic);

match (label, protocol) {
(topic::label::X, _) => {
if let Some(s) = on_xdata {
s.send((agent_id.to_string(), local_id.to_string(), p.payload.to_vec())).await.unwrap();
}
},
(_, topic::protocol::UDP) => { let _ = sock.send_to(&p.payload, address).await.unwrap(); },
(label, protocol) => info!("unknown label: {} and protocol: {}", label, protocol)
}
Expand Down Expand Up @@ -477,7 +542,7 @@ use std::sync::Arc;

pub async fn local_socks(
mqtt_url: &str,
address: SocketAddr,
listener: TcpListener,
agent_id: &str,
local_id: &str,
xdata: Option<(Vec<u8>, Option<Vec<u8>>)>,
Expand All @@ -504,7 +569,6 @@ pub async fn local_socks(
)
.await;

let listener = TcpListener::bind(address).await.unwrap();
let server = Server::new(listener, Arc::new(NoAuth));

loop {
Expand Down
Loading

0 comments on commit 6428478

Please sign in to comment.