diff --git a/core/dtn7/src/core/processing.rs b/core/dtn7/src/core/processing.rs index 6339055b..74a7f89b 100644 --- a/core/dtn7/src/core/processing.rs +++ b/core/dtn7/src/core/processing.rs @@ -403,6 +403,14 @@ pub async fn forward(mut bp: BundlePack) -> Result<()> { start_time.elapsed() ); bundle_sent.store(true, Ordering::Relaxed); + if let Err(err) = routing_notify(RoutingNotifcation::SendingSucceeded( + bpid, + n.next_hop.node().unwrap(), + )) + .await + { + error!("Error while sending succeeded notification: {}", err); + } } }); wg.push(task_handle); diff --git a/core/dtn7/src/lib.rs b/core/dtn7/src/lib.rs index 9e91a45f..ff1794d5 100644 --- a/core/dtn7/src/lib.rs +++ b/core/dtn7/src/lib.rs @@ -40,6 +40,8 @@ lazy_static! { pub static ref CLAS: Mutex> = Mutex::new(Vec::new()); } +pub type BundleID = String; + pub fn cla_add(cla: CLAEnum) { (*CLAS.lock()).push(cla); } diff --git a/core/dtn7/src/routing/erouting/mod.rs b/core/dtn7/src/routing/erouting/mod.rs index c3636b94..d9a4d6a7 100644 --- a/core/dtn7/src/routing/erouting/mod.rs +++ b/core/dtn7/src/routing/erouting/mod.rs @@ -1,4 +1,4 @@ -use crate::{peers_get_for_node, BundlePack, DtnPeer, PeerAddress, RoutingNotifcation}; +use crate::{peers_get_for_node, BundleID, BundlePack, DtnPeer, PeerAddress, RoutingNotifcation}; use bp7::{Bundle, EndpointID}; use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, HashMap}; @@ -32,6 +32,8 @@ pub enum Packet { Timeout(Timeout), /// Packet that signals that the sending failed. SendingFailed(SendingFailed), + /// Packet that signals that the sending succeeded. + SendingSucceeded(SendingSucceeded), /// Packet that signals that a bundle is incoming. IncomingBundle(IncomingBundle), /// Packet that signals that a bundle is incoming without a previous node. @@ -54,6 +56,9 @@ impl From for Packet { RoutingNotifcation::SendingFailed(bid, cla_sender) => { Packet::SendingFailed(SendingFailed { bid, cla_sender }) } + RoutingNotifcation::SendingSucceeded(bid, cla_sender) => { + Packet::SendingSucceeded(SendingSucceeded { bid, cla_sender }) + } RoutingNotifcation::IncomingBundle(bndl) => { Packet::IncomingBundle(IncomingBundle { bndl }) } @@ -111,7 +116,13 @@ pub struct Timeout { #[derive(Serialize, Deserialize, Clone)] pub struct SendingFailed { - pub bid: String, + pub bid: BundleID, + pub cla_sender: String, +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct SendingSucceeded { + pub bid: BundleID, pub cla_sender: String, } @@ -122,7 +133,7 @@ pub struct IncomingBundle { #[derive(Serialize, Deserialize, Clone)] pub struct IncomingBundleWithoutPreviousNode { - pub bid: String, + pub bid: BundleID, pub node_name: String, } diff --git a/core/dtn7/src/routing/mod.rs b/core/dtn7/src/routing/mod.rs index d0b8cb9e..bd49330c 100644 --- a/core/dtn7/src/routing/mod.rs +++ b/core/dtn7/src/routing/mod.rs @@ -8,6 +8,7 @@ pub mod static_routing; use crate::cla::ClaSenderTask; use crate::core::bundlepack::BundlePack; +use crate::BundleID; use async_trait::async_trait; use bp7::Bundle; use bp7::EndpointID; @@ -23,10 +24,12 @@ use std::fmt::Debug; use std::fmt::Display; use tokio::sync::{mpsc, oneshot}; +#[derive(Debug)] pub enum RoutingNotifcation { - SendingFailed(String, String), + SendingFailed(BundleID, String), + SendingSucceeded(BundleID, String), IncomingBundle(Bundle), - IncomingBundleWithoutPreviousNode(String, String), + IncomingBundleWithoutPreviousNode(BundleID, String), EncounteredPeer(EndpointID), DroppedPeer(EndpointID), } diff --git a/core/dtn7/src/routing/static_routing.rs b/core/dtn7/src/routing/static_routing.rs index e0f3ef65..fc07623a 100644 --- a/core/dtn7/src/routing/static_routing.rs +++ b/core/dtn7/src/routing/static_routing.rs @@ -1,6 +1,6 @@ use std::fmt::Display; -use crate::{CONFIG, PEERS}; +use crate::{RoutingNotifcation, CONFIG, PEERS}; use super::{RoutingAgent, RoutingCmd}; use async_trait::async_trait; @@ -88,6 +88,19 @@ fn parse_route_from_str(s: &str) -> Option { }) } +async fn handle_routing_notification(notification: RoutingNotifcation) { + debug!("Received notification: {:?}", notification); + match notification { + RoutingNotifcation::SendingFailed(bid, cla_sender) => { + debug!("Sending failed for bundle {} on CLA {}", bid, cla_sender); + } + RoutingNotifcation::SendingSucceeded(bid, cla_sender) => { + debug!("Sending succeeded for bundle {} on CLA {}", bid, cla_sender); + } + _ => { /* ignore */ } + } +} + async fn handle_routing_cmd(mut rx: mpsc::Receiver) { let mut route_entries = vec![]; let settings = CONFIG.lock().routing_settings.clone(); @@ -126,8 +139,7 @@ async fn handle_routing_cmd(mut rx: mpsc::Receiver) { if p.eid.to_string() == route.via { if let Some(cla) = p.first_cla() { clas.push(cla); - delete_afterwards = - p.node_name() == bp.destination.node().unwrap(); + delete_afterwards = !bp.destination.is_non_singleton(); break 'route_loop; } } @@ -172,7 +184,9 @@ async fn handle_routing_cmd(mut rx: mpsc::Receiver) { .fold(String::new(), |acc, r| acc + &format!("{}\n", r)); tx.send(routes_as_str).unwrap(); } - super::RoutingCmd::Notify(_) => {} + super::RoutingCmd::Notify(notification) => { + handle_routing_notification(notification).await; + } } } } diff --git a/tests/routing_static.sh b/tests/routing_static.sh index 177f8bea..f2511052 100755 --- a/tests/routing_static.sh +++ b/tests/routing_static.sh @@ -4,14 +4,12 @@ prepare_test -STATUS_REPORTS="-g" +#STATUS_REPORTS="-g" PORT_NODE1=$(get_current_port) start_dtnd -d -j0 -i0 -C mtcp:port=2342 -e 42 -r static -n 1 -s mtcp://127.0.0.1:4223/2 $STATUS_REPORTS -R static.routes=tests/routes_1.csv PORT_NODE2=$(get_current_port) -#DB2="-W /tmp/node2 -D sled" -#DB2="-W /tmp/node2 -D sneakers" start_dtnd -d -j0 -i0 -C mtcp:port=4223 -e 42 -r static \ -n 2 \ -s mtcp://127.0.0.1:2342/1 \