Skip to content

Commit

Permalink
feat: minimal implementation of UDP CL without any of the extensions
Browse files Browse the repository at this point in the history
  • Loading branch information
gh0st42 committed Mar 20, 2024
1 parent 3ce594d commit 9159f74
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 6 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Plus:
* [Minimal TCP Convergence Layer](https://tools.ietf.org/html/draft-ietf-dtn-mtcpcl-01)
* A simple [HTTP Convergence Layer](doc/http-cl.md)
* A [HTTP pull-based Convergence Layer](doc/http-pull-cl.md)
* A minimal [UDP Convergence Layer](https://www.ietf.org/archive/id/draft-sipos-dtn-udpcl-01.html) (currently, without the extensions)
* An IP neighborhood discovery service
* Convenient command line tools to interact with the daemon
* A simple web interface for status information about `dtnd`
Expand Down
2 changes: 2 additions & 0 deletions core/dtn7/src/cla/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod http;
pub mod httppull;
pub mod mtcp;
pub mod tcp;
pub mod udp;

use self::http::HttpConvergenceLayer;
use anyhow::Result;
Expand All @@ -24,6 +25,7 @@ use std::{
};
use tcp::TcpConvergenceLayer;
use tokio::sync::{mpsc, oneshot};
use udp::UdpConvergenceLayer;

// generate various helpers
// - enum CLAsAvailable for verification and loading from str
Expand Down
170 changes: 170 additions & 0 deletions core/dtn7/src/cla/udp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
use crate::cla::{ConvergenceLayerAgent, TransferResult};
use async_trait::async_trait;
use bp7::{Bundle, ByteBuffer};
use core::convert::TryFrom;
use dtn7_codegen::cla;
use log::{debug, error, info};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::time::Instant;
use tokio::io;
use tokio::net::UdpSocket;
use tokio::sync::mpsc;

use super::HelpStr;

async fn udp_listener(addr: String, port: u16) -> Result<(), io::Error> {
let addr: SocketAddrV4 = format!("{}:{}", addr, port).parse().unwrap();
let listener = UdpSocket::bind(&addr)
.await
.expect("failed to bind udp port");
debug!("spawning UDP listener on port {}", port);
loop {
let mut buf = [0; 65535];
let (amt, src) = listener.recv_from(&mut buf).await?;
let buf = &buf[..amt];
if let Ok(bndl) = Bundle::try_from(buf.to_vec()) {
info!("Received bundle: {} from {}", bndl.id(), src);
{
tokio::spawn(async move {
if let Err(err) = crate::core::processing::receive(bndl).await {
error!("Failed to process bundle: {}", err);
}
});
}
} else {
crate::STATS.lock().broken += 1;
info!("Error decoding bundle from {}", src);
}
}
}

pub async fn udp_send_bundles(addr: SocketAddr, bundles: Vec<ByteBuffer>) -> TransferResult {
let now = Instant::now();
let num_bundles = bundles.len();
let total_bytes: usize = bundles.iter().map(|b| b.len()).sum();

let sock = UdpSocket::bind("0.0.0.0:0").await;
if sock.is_err() {
error!("Error binding UDP socket for sending");
return TransferResult::Failure;
}
let sock = sock.unwrap();
if sock.connect(addr).await.is_err() {
error!("Error connecting UDP socket for sending");
return TransferResult::Failure;
}

for b in bundles {
if b.len() > 65535 {
error!("Bundle too large for UDP transmission");
return TransferResult::Failure;
}
// send b via udp socket to addr
if sock.send(&b).await.is_err() {
error!("Error sending bundle via UDP");
return TransferResult::Failure;
}
}

debug!(
"Transmission time: {:?} for {} bundles in {} bytes to {}",
now.elapsed(),
num_bundles,
total_bytes,
addr
);

TransferResult::Successful
}

#[cla(udp)]
#[derive(Debug, Clone)]
pub struct UdpConvergenceLayer {
local_addr: String,
local_port: u16,
tx: mpsc::Sender<super::ClaCmd>,
}

impl UdpConvergenceLayer {
pub fn new(local_settings: Option<&HashMap<String, String>>) -> UdpConvergenceLayer {
let addr: String = local_settings
.and_then(|settings| settings.get("bind"))
.map(|s| s.to_string())
.unwrap_or_else(|| "0.0.0.0".to_string());
let port = local_settings
.and_then(|settings| settings.get("port"))
.and_then(|port_str| port_str.parse::<u16>().ok())
.unwrap_or(4556);
let (tx, mut rx) = mpsc::channel(100);
tokio::spawn(async move {
while let Some(cmd) = rx.recv().await {
match cmd {
super::ClaCmd::Transfer(remote, data, reply) => {
debug!(
"UdpConvergenceLayer: received transfer command for {}",
remote
);
if !data.is_empty() {
let peeraddr: SocketAddr = remote.parse().unwrap();
debug!("forwarding to {:?}", peeraddr);
tokio::spawn(async move {
reply
.send(udp_send_bundles(peeraddr, vec![data]).await)
.unwrap();
});
} else {
debug!("Nothing to forward.");
reply.send(TransferResult::Successful).unwrap();
}
}
super::ClaCmd::Shutdown => {
debug!("UdpConvergenceLayer: received shutdown command");
break;
}
}
}
});
UdpConvergenceLayer {
local_addr: addr,
local_port: port,
tx,
}
}

pub async fn spawn_listener(&self) -> std::io::Result<()> {
// TODO: bubble up errors from run
tokio::spawn(udp_listener(self.local_addr.clone(), self.local_port)); /*.await.unwrap()*/
Ok(())
}
}

#[async_trait]
impl ConvergenceLayerAgent for UdpConvergenceLayer {
async fn setup(&mut self) {
self.spawn_listener()
.await
.expect("error setting up udp listener");
}
fn port(&self) -> u16 {
self.local_port
}
fn name(&self) -> &str {
"udp"
}
fn channel(&self) -> tokio::sync::mpsc::Sender<super::ClaCmd> {
self.tx.clone()
}
}

impl HelpStr for UdpConvergenceLayer {
fn local_help_str() -> &'static str {
"port=4556:bind=0.0.0.0"
}
}
impl std::fmt::Display for UdpConvergenceLayer {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "udp:{}:{}", self.local_addr, self.local_port)
}
}
18 changes: 12 additions & 6 deletions tests/cla_chain_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ PORT_NODE1=$(get_current_port)
PORT_NODE2=$((PORT_NODE1 + 1))
PORT_NODE3=$((PORT_NODE1 + 2))
PORT_NODE4=$((PORT_NODE1 + 3))
PORT_NODE5=$((PORT_NODE1 + 4))

#DB1="-W /tmp/node1 -D sled"
#DB1="-W /tmp/node1 -D sneakers"
Expand All @@ -30,15 +31,20 @@ start_dtnd -d -j5s -i0 -C http -C tcp:port=4224 -e incoming \
-s http://127.0.0.1:$PORT_NODE2/node2 \
-s tcp://127.0.0.1:4225/node4 $STATUS_REPORTS

start_dtnd -d -j5s -i0 -C tcp:port=4225 -e incoming -r epidemic \
-n node4 -s tcp://127.0.0.1:4224/node3 $DB4 $STATUS_REPORTS
start_dtnd -d -j5s -i0 -C tcp:port=4225 -C udp:port=4556 -e incoming -r epidemic \
-n node4 -s tcp://127.0.0.1:4224/node3 $DB4 $STATUS_REPORTS \
-s udp://127.0.0.1:4557/node5 $STATUS_REPORTS

start_dtnd -d -j5s -i0 -C udp:port=4557 -e incoming \
-r epidemic -n node5 \
-s udp://127.0.0.1:4556/node4 $STATUS_REPORTS

sleep 1

echo

echo "Sending 'test' to node 4"
echo test | $BINS/dtnsend -r dtn://node4/incoming -p $PORT_NODE1
echo "Sending 'test' to node 5"
echo test | $BINS/dtnsend -r dtn://node5/incoming -p $PORT_NODE1

sleep 5

Expand All @@ -59,8 +65,8 @@ else
echo "Incorrect number of bundles in store!"
fi
echo
echo -n "Receiving on node 4: "
$BINS/dtnrecv -v -e incoming -p $PORT_NODE4
echo -n "Receiving on node 5: "
$BINS/dtnrecv -v -e incoming -p $PORT_NODE5
RC=$?
echo "RET: $RC"
echo
Expand Down

0 comments on commit 9159f74

Please sign in to comment.