Skip to content

Commit

Permalink
fix: correctly remove device from list when sending
Browse files Browse the repository at this point in the history
Signed-off-by: Martichou <[email protected]>
  • Loading branch information
Martichou committed Jul 8, 2024
1 parent 614bcbc commit 758ada8
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 18 deletions.
2 changes: 1 addition & 1 deletion app/common/vue_lib/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ export async function sendInfo(vm: TauriVM, eid: string) {
if (vm.outboundPayload === undefined) return;

const ei = vm.endpointsInfo.find((el) => el.id === eid);
if (!ei) return;
if (!ei || !ei.ip || !ei.port) return;

const msg: SendInfo = {
id: ei.id,
Expand Down
37 changes: 29 additions & 8 deletions core_lib/src/hdl/mdns_discovery.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use mdns_sd::{ServiceDaemon, ServiceEvent};
use serde::{Deserialize, Serialize};
use tokio::net::TcpStream;
Expand All @@ -11,6 +13,7 @@ use crate::DeviceType;
#[derive(Debug, Clone, Default, Deserialize, Serialize, TS)]
#[ts(export)]
pub struct EndpointInfo {
pub fullname: String,
pub id: String,
pub name: Option<String>,
pub ip: Option<String>,
Expand All @@ -37,6 +40,9 @@ impl MDnsDiscovery {
let service_type = "_FC9F5ED42C8A._tcp.local.";
let receiver = self.daemon.browse(service_type)?;

// Map with fullname as key and EndpointInfo as value
let mut cache: HashMap<String, EndpointInfo> = HashMap::new();

loop {
tokio::select! {
_ = ctk.cancelled() => {
Expand Down Expand Up @@ -77,25 +83,40 @@ impl MDnsDiscovery {
Err(_) => continue
};

if TcpStream::connect(format!("{ip}:{port}")).await.is_ok() {
let ip_port = format!("{ip}:{port}");
let fullname = info.get_fullname().to_string();
if TcpStream::connect(&ip_port).await.is_ok() {
let ei = EndpointInfo {
id: info.get_fullname().to_string(),
fullname: fullname.clone(),
id: ip_port,
name: Some(dn),
ip: Some(ip.to_string()),
port: Some(port.to_string()),
rtype: Some(dt),
present: Some(true),
};
info!("Resolved a new service: {:?}", ei);
info!("ServiceResolved: Resolved a new service: {:?}", ei);
cache.insert(fullname.clone(), ei.clone());
let _ = self.sender.send(ei);
}
}
ServiceEvent::ServiceRemoved(_, fullname) => {
info!("Remove a previous service: {}", fullname);
let _ = self.sender.send(EndpointInfo {
id: fullname,
..Default::default()
});
trace!("ServiceRemoved: checking if should remove {}", fullname);
// Only remove if it has not been seen in the last cleanup_threshold
let should_remove = if let Some(ei) = cache.get(&fullname) {
Some(ei.id.clone())
} else {
None
};

if let Some(id) = should_remove {
info!("ServiceRemoved: Remove a previous service: {}", fullname);
cache.remove(&fullname);
let _ = self.sender.send(EndpointInfo {
id,
..Default::default()
});
}
}
ServiceEvent::SearchStarted(_) | ServiceEvent::SearchStopped(_) => {}
_ => {}
Expand Down
11 changes: 2 additions & 9 deletions core_lib/src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use anyhow::anyhow;
use serde::{Deserialize, Serialize};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::broadcast::Sender;
Expand Down Expand Up @@ -58,7 +57,7 @@ impl TcpServer {
Some(i) = self.connect_receiver.recv() => {
info!("{INNER_NAME}: connect_receiver: got {:?}", i);
if let Err(e) = self.connect(cctk, i).await {
error!("{INNER_NAME}: error sending: {:?}", e);
error!("{INNER_NAME}: error sending: {}", e.to_string());
}
}
r = self.tcp_listener.accept() => {
Expand Down Expand Up @@ -112,13 +111,7 @@ impl TcpServer {
/// To be called inside a separate task if we want to handle concurrency
pub async fn connect(&self, ctk: CancellationToken, si: SendInfo) -> Result<(), anyhow::Error> {
debug!("{INNER_NAME}: Connecting to: {}", si.addr);
let socket = match TcpStream::connect(si.addr.clone()).await {
Ok(r) => r,
Err(e) => {
warn!("Couldn't connect to {}: {}", si.addr, e);
return Err(anyhow!("failed to connect to {}", si.addr));
}
};
let socket = TcpStream::connect(si.addr.clone()).await?;

let mut or = OutboundRequest::new(
self.endpoint_id,
Expand Down

0 comments on commit 758ada8

Please sign in to comment.