diff --git a/README.md b/README.md index fb66532e..c9c5b24b 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ Plus: * [Minimal TCP Convergence Layer](https://tools.ietf.org/html/draft-ietf-dtn-mtcpcl-01) * A simple [HTTP Convergence Layer](doc/http-cl.md) * A [HTTP pull-based Convergence Layer](doc/http-pull-cl.md) +* A minimal [UDP Convergence Layer](https://www.ietf.org/archive/id/draft-sipos-dtn-udpcl-01.html) (currently, without the extensions) * An IP neighborhood discovery service * Convenient command line tools to interact with the daemon * A simple web interface for status information about `dtnd` diff --git a/core/dtn7/src/cla/mod.rs b/core/dtn7/src/cla/mod.rs index e9fbc8e3..d66287b0 100644 --- a/core/dtn7/src/cla/mod.rs +++ b/core/dtn7/src/cla/mod.rs @@ -5,6 +5,7 @@ pub mod http; pub mod httppull; pub mod mtcp; pub mod tcp; +pub mod udp; use self::http::HttpConvergenceLayer; use anyhow::Result; @@ -24,6 +25,7 @@ use std::{ }; use tcp::TcpConvergenceLayer; use tokio::sync::{mpsc, oneshot}; +use udp::UdpConvergenceLayer; // generate various helpers // - enum CLAsAvailable for verification and loading from str diff --git a/core/dtn7/src/cla/udp.rs b/core/dtn7/src/cla/udp.rs new file mode 100644 index 00000000..29f8aa07 --- /dev/null +++ b/core/dtn7/src/cla/udp.rs @@ -0,0 +1,170 @@ +use crate::cla::{ConvergenceLayerAgent, TransferResult}; +use async_trait::async_trait; +use bp7::{Bundle, ByteBuffer}; +use core::convert::TryFrom; +use dtn7_codegen::cla; +use log::{debug, error, info}; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::net::SocketAddrV4; +use std::time::Instant; +use tokio::io; +use tokio::net::UdpSocket; +use tokio::sync::mpsc; + +use super::HelpStr; + +async fn udp_listener(addr: String, port: u16) -> Result<(), io::Error> { + let addr: SocketAddrV4 = format!("{}:{}", addr, port).parse().unwrap(); + let listener = UdpSocket::bind(&addr) + .await + .expect("failed to bind udp port"); + debug!("spawning UDP listener on port {}", port); + loop { + let mut buf = [0; 65535]; + let (amt, src) = listener.recv_from(&mut buf).await?; + let buf = &buf[..amt]; + if let Ok(bndl) = Bundle::try_from(buf.to_vec()) { + info!("Received bundle: {} from {}", bndl.id(), src); + { + tokio::spawn(async move { + if let Err(err) = crate::core::processing::receive(bndl).await { + error!("Failed to process bundle: {}", err); + } + }); + } + } else { + crate::STATS.lock().broken += 1; + info!("Error decoding bundle from {}", src); + } + } +} + +pub async fn udp_send_bundles(addr: SocketAddr, bundles: Vec) -> TransferResult { + let now = Instant::now(); + let num_bundles = bundles.len(); + let total_bytes: usize = bundles.iter().map(|b| b.len()).sum(); + + let sock = UdpSocket::bind("0.0.0.0:0").await; + if sock.is_err() { + error!("Error binding UDP socket for sending"); + return TransferResult::Failure; + } + let sock = sock.unwrap(); + if sock.connect(addr).await.is_err() { + error!("Error connecting UDP socket for sending"); + return TransferResult::Failure; + } + + for b in bundles { + if b.len() > 65535 { + error!("Bundle too large for UDP transmission"); + return TransferResult::Failure; + } + // send b via udp socket to addr + if sock.send(&b).await.is_err() { + error!("Error sending bundle via UDP"); + return TransferResult::Failure; + } + } + + debug!( + "Transmission time: {:?} for {} bundles in {} bytes to {}", + now.elapsed(), + num_bundles, + total_bytes, + addr + ); + + TransferResult::Successful +} + +#[cla(udp)] +#[derive(Debug, Clone)] +pub struct UdpConvergenceLayer { + local_addr: String, + local_port: u16, + tx: mpsc::Sender, +} + +impl UdpConvergenceLayer { + pub fn new(local_settings: Option<&HashMap>) -> UdpConvergenceLayer { + let addr: String = local_settings + .and_then(|settings| settings.get("bind")) + .map(|s| s.to_string()) + .unwrap_or_else(|| "0.0.0.0".to_string()); + let port = local_settings + .and_then(|settings| settings.get("port")) + .and_then(|port_str| port_str.parse::().ok()) + .unwrap_or(4556); + let (tx, mut rx) = mpsc::channel(100); + tokio::spawn(async move { + while let Some(cmd) = rx.recv().await { + match cmd { + super::ClaCmd::Transfer(remote, data, reply) => { + debug!( + "UdpConvergenceLayer: received transfer command for {}", + remote + ); + if !data.is_empty() { + let peeraddr: SocketAddr = remote.parse().unwrap(); + debug!("forwarding to {:?}", peeraddr); + tokio::spawn(async move { + reply + .send(udp_send_bundles(peeraddr, vec![data]).await) + .unwrap(); + }); + } else { + debug!("Nothing to forward."); + reply.send(TransferResult::Successful).unwrap(); + } + } + super::ClaCmd::Shutdown => { + debug!("UdpConvergenceLayer: received shutdown command"); + break; + } + } + } + }); + UdpConvergenceLayer { + local_addr: addr, + local_port: port, + tx, + } + } + + pub async fn spawn_listener(&self) -> std::io::Result<()> { + // TODO: bubble up errors from run + tokio::spawn(udp_listener(self.local_addr.clone(), self.local_port)); /*.await.unwrap()*/ + Ok(()) + } +} + +#[async_trait] +impl ConvergenceLayerAgent for UdpConvergenceLayer { + async fn setup(&mut self) { + self.spawn_listener() + .await + .expect("error setting up udp listener"); + } + fn port(&self) -> u16 { + self.local_port + } + fn name(&self) -> &str { + "udp" + } + fn channel(&self) -> tokio::sync::mpsc::Sender { + self.tx.clone() + } +} + +impl HelpStr for UdpConvergenceLayer { + fn local_help_str() -> &'static str { + "port=4556:bind=0.0.0.0" + } +} +impl std::fmt::Display for UdpConvergenceLayer { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "udp:{}:{}", self.local_addr, self.local_port) + } +} diff --git a/tests/cla_chain_test.sh b/tests/cla_chain_test.sh index a5f7653b..04f28151 100755 --- a/tests/cla_chain_test.sh +++ b/tests/cla_chain_test.sh @@ -10,6 +10,7 @@ PORT_NODE1=$(get_current_port) PORT_NODE2=$((PORT_NODE1 + 1)) PORT_NODE3=$((PORT_NODE1 + 2)) PORT_NODE4=$((PORT_NODE1 + 3)) +PORT_NODE5=$((PORT_NODE1 + 4)) #DB1="-W /tmp/node1 -D sled" #DB1="-W /tmp/node1 -D sneakers" @@ -30,15 +31,20 @@ start_dtnd -d -j5s -i0 -C http -C tcp:port=4224 -e incoming \ -s http://127.0.0.1:$PORT_NODE2/node2 \ -s tcp://127.0.0.1:4225/node4 $STATUS_REPORTS -start_dtnd -d -j5s -i0 -C tcp:port=4225 -e incoming -r epidemic \ - -n node4 -s tcp://127.0.0.1:4224/node3 $DB4 $STATUS_REPORTS +start_dtnd -d -j5s -i0 -C tcp:port=4225 -C udp:port=4556 -e incoming -r epidemic \ + -n node4 -s tcp://127.0.0.1:4224/node3 $DB4 $STATUS_REPORTS \ + -s udp://127.0.0.1:4557/node5 $STATUS_REPORTS + +start_dtnd -d -j5s -i0 -C udp:port=4557 -e incoming \ + -r epidemic -n node5 \ + -s udp://127.0.0.1:4556/node4 $STATUS_REPORTS sleep 1 echo -echo "Sending 'test' to node 4" -echo test | $BINS/dtnsend -r dtn://node4/incoming -p $PORT_NODE1 +echo "Sending 'test' to node 5" +echo test | $BINS/dtnsend -r dtn://node5/incoming -p $PORT_NODE1 sleep 5 @@ -59,8 +65,8 @@ else echo "Incorrect number of bundles in store!" fi echo -echo -n "Receiving on node 4: " -$BINS/dtnrecv -v -e incoming -p $PORT_NODE4 +echo -n "Receiving on node 5: " +$BINS/dtnrecv -v -e incoming -p $PORT_NODE5 RC=$? echo "RET: $RC" echo