Skip to content

Commit

Permalink
Merge pull request #255 from binbat/feat/net4mqtt-reconnect
Browse files Browse the repository at this point in the history
Feat/net4mqtt reconnect
  • Loading branch information
a-wing authored Nov 29, 2024
2 parents 101b63b + cfdfa88 commit c551278
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 55 deletions.
5 changes: 4 additions & 1 deletion conf/live777.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,8 @@ urls = [

# Default enabled `--features=net4mqtt`
# [net4mqtt]
# mqtt_url = "mqtt://localhost:1883/net4mqtt"
# Global unique alias
# alias = "liveion-0"
# `client_id={alias}` use alias as MQTT `client_id`
# mqtt_url = "mqtt://localhost:1883/net4mqtt?client_id={alias}"

4 changes: 3 additions & 1 deletion conf/liveman.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
# close_other_sub = false

# [net4mqtt]
# mqtt_url = "mqtt://localhost:1883/net4mqtt"
# Global unique alias
# alias = "liveman-0"
# `client_id={alias}` use alias as MQTT `client_id`
# mqtt_url = "mqtt://localhost:1883/net4mqtt?client_id={alias}"
# listen = "127.0.0.1:1077"
# domain = "net4mqtt.local"

Expand Down
20 changes: 4 additions & 16 deletions libs/net4mqtt/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,10 +406,7 @@ pub async fn agent(
}
}
},
Err(e) => {
error!("agent mqtt error: {:?}", e);
time::sleep(time::Duration::from_secs(1)).await;
}
Err(e) => return Err(anyhow!("agent mqtt error: {:?}", e))
}
}
else => { error!("vagent proxy error"); }
Expand Down Expand Up @@ -508,10 +505,7 @@ pub async fn local_ports_tcp(
}
}
},
Err(e) => {
error!("local mqtt error: {:?}", e);
time::sleep(time::Duration::from_secs(1)).await;
}
Err(e) => return Err(anyhow!("local mqtt error: {:?}", e))
}

}
Expand Down Expand Up @@ -580,10 +574,7 @@ pub async fn local_ports_udp(
}
}
},
Err(e) => {
error!("local mqtt error: {:?}", e);
time::sleep(time::Duration::from_secs(1)).await;
}
Err(e) => return Err(anyhow!("local mqtt error: {:?}", e))
}

}
Expand Down Expand Up @@ -716,10 +707,7 @@ pub async fn local_socks(
}
}
},
Err(e) => {
error!("local mqtt error: {:?}", e);
time::sleep(time::Duration::from_secs(1)).await;
}
Err(e) => return Err(anyhow!("local mqtt error: {:?}", e))
}

}
Expand Down
7 changes: 7 additions & 0 deletions liveion/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ pub struct Net4mqtt {
pub alias: String,
}

#[cfg(feature = "net4mqtt")]
impl Net4mqtt {
pub fn validate(&mut self) {
self.mqtt_url = self.mqtt_url.replace("{alias}", &self.alias)
}
}

#[derive(Debug, Default, Clone, Deserialize, Serialize)]
pub struct Webhook {
#[serde(default)]
Expand Down
49 changes: 28 additions & 21 deletions liveion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::net::TcpListener;
use tower_http::{
cors::CorsLayer, trace::TraceLayer, validate_request::ValidateRequestHeaderLayer,
};
use tracing::{error, info_span, Level};
use tracing::{error, info_span, warn, Level};

use auth::{access::access_middleware, ManyValidate};
use error::AppError;
Expand Down Expand Up @@ -84,30 +84,37 @@ where

#[cfg(feature = "net4mqtt")]
{
if let Some(c) = cfg.net4mqtt {
if let Some(mut c) = cfg.net4mqtt {
c.validate();
std::thread::spawn(move || {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
net4mqtt::proxy::agent(
&c.mqtt_url,
&cfg.http.listen.to_string(),
&c.alias.clone(),
Some(net4mqtt::proxy::VDataConfig {
online: Some(
serde_json::json!({
"alias": c.alias,
})
.to_string()
.bytes()
.collect(),
),
offline: Some("{}".bytes().collect()),
..Default::default()
}),
)
.await
.unwrap()
loop {
match net4mqtt::proxy::agent(
&c.mqtt_url,
&cfg.http.listen.to_string(),
&c.alias.clone(),
Some(net4mqtt::proxy::VDataConfig {
online: Some(
serde_json::json!({
"alias": c.alias,
})
.to_string()
.bytes()
.collect(),
),
offline: Some("{}".bytes().collect()),
..Default::default()
}),
)
.await
{
Ok(_) => warn!("net4mqtt service is end, restart net4mqtt service"),
Err(e) => error!("mqtt4mqtt error: {:?}", e),
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
});
});
}
Expand Down
7 changes: 7 additions & 0 deletions liveman/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ pub struct Net4mqtt {
pub domain: String,
}

#[cfg(feature = "net4mqtt")]
impl Net4mqtt {
pub fn validate(&mut self) {
self.mqtt_url = self.mqtt_url.replace("{alias}", &self.alias)
}
}

#[cfg(feature = "net4mqtt")]
impl Default for Net4mqtt {
fn default() -> Self {
Expand Down
39 changes: 23 additions & 16 deletions liveman/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::net::TcpListener;
use tower_http::{
cors::CorsLayer, trace::TraceLayer, validate_request::ValidateRequestHeaderLayer,
};
use tracing::{error, info, info_span};
use tracing::{error, info, info_span, warn};

use crate::admin::{authorize, token};
use crate::config::Config;
Expand Down Expand Up @@ -71,7 +71,8 @@ where

#[cfg(feature = "net4mqtt")]
{
if let Some(c) = cfg.net4mqtt.clone() {
if let Some(mut c) = cfg.net4mqtt.clone() {
c.validate();
let (sender, mut receiver) =
tokio::sync::mpsc::channel::<(String, String, Vec<u8>)>(10);

Expand All @@ -81,20 +82,26 @@ where
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
let listener = TcpListener::bind(c.listen).await.unwrap();
net4mqtt::proxy::local_socks(
&c.mqtt_url,
listener,
("-", &c.alias.clone()),
Some(c.domain),
Some(net4mqtt::proxy::VDataConfig {
receiver: Some(sender),
..Default::default()
}),
false,
)
.await
.unwrap()
loop {
let listener = TcpListener::bind(c.listen).await.unwrap();
match net4mqtt::proxy::local_socks(
&c.mqtt_url,
listener,
("-", &c.alias.clone()),
Some(c.domain.clone()),
Some(net4mqtt::proxy::VDataConfig {
receiver: Some(sender.clone()),
..Default::default()
}),
false,
)
.await
{
Ok(_) => warn!("net4mqtt service is end, restart net4mqtt service"),
Err(e) => error!("mqtt4mqtt error: {:?}", e),
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
});
});

Expand Down

0 comments on commit c551278

Please sign in to comment.