Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat static routing #58

Merged
merged 3 commits into from
Feb 26, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
feat: routing agents can now receive commands via HTTP and also be qu…
…eried for internal data.
Lars Baumgaertner committed Feb 26, 2024
commit c2ddf4d74ebb10b0a057216559f0804f6e1501a9
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ Plus:
* [Minimal TCP Convergence Layer](https://tools.ietf.org/html/draft-ietf-dtn-mtcpcl-01)
* A simple [HTTP Convergence Layer](doc/http-cl.md)
* A [HTTP pull-based Convergence Layer](doc/http-pull-cl.md)
* An IP neighorhood discovery service
* An IP neighborhood discovery service
* Convenient command line tools to interact with the daemon
* A simple web interface for status information about `dtnd`
* A [web-socket interface](doc/http-client-api.md) for application agents
@@ -25,7 +25,7 @@ The actual BP7 implementation (encoding/decoding) is available as a separate [pr

Additional dtn extensions and a client library are also [available](https://crates.io/crates/dtn7-plus).

Currently, a service discovery based on IPND but adapted to CBOR and BPv7, TCP, MTCP & HTTP CLs, sprayandwait/flooding/epidemic/sink-routing and restful/websocket command interfaces are implemented.
Currently, a service discovery based on IPND but adapted to CBOR and BPv7, TCP, MTCP & HTTP CLs, sprayandwait/flooding/epidemic/static/sink-routing and restful/websocket command interfaces are implemented.
Both addressing schemes, *dtn* as well as *ipn* are supported.
Furthermore, some CLI tools are provided to easily integrate *dtn7* into shell scripts.

39 changes: 39 additions & 0 deletions core/dtn7/src/dtnd/httpd.rs
Original file line number Diff line number Diff line change
@@ -9,6 +9,8 @@ use crate::core::peer::PeerType;
use crate::core::store::BundleStore;
use crate::peers_add;
use crate::peers_remove;
use crate::routing_cmd;
use crate::routing_get_data;
use crate::store_remove;
use crate::CONFIG;
use crate::DTNCORE;
@@ -329,6 +331,41 @@ async fn debug_rnd_peer() -> String {
res
}

async fn http_routing_cmd(
Query(params): Query<HashMap<String, String>>,
) -> Result<String, (StatusCode, &'static str)> {
if let Some(cmd) = params.get("c") {
if routing_cmd(cmd.to_string()).await.is_ok() {
Ok("Sent command to routing agent.".into())
} else {
Err((
StatusCode::INTERNAL_SERVER_ERROR,
"error sending cmd to routing agent",
))
}
} else {
//anyhow::bail!("missing filter criteria");
Err((
StatusCode::BAD_REQUEST,
"missing routing command parameter cmd",
))
}
}

async fn http_routing_getdata(
Query(params): Query<HashMap<String, String>>,
) -> Result<String, (StatusCode, &'static str)> {
let param = params.get("p").map_or("".to_string(), |f| f.to_string());
if let Ok(res) = routing_get_data(param).await {
Ok(res)
} else {
Err((
StatusCode::INTERNAL_SERVER_ERROR,
"error getting data from routing agent",
))
}
}

async fn http_peers_add(
Query(params): Query<HashMap<String, String>>,
) -> Result<String, (StatusCode, &'static str)> {
@@ -699,6 +736,8 @@ pub async fn spawn_httpd() -> Result<()> {
let mut app_local_only = Router::new()
.route("/peers/add", get(http_peers_add))
.route("/peers/del", get(http_peers_delete))
.route("/routing/cmd", get(http_routing_cmd).post(http_routing_cmd))
.route("/routing/getdata", get(http_routing_getdata))
.route("/send", post(send_post))
.route("/delete", get(delete).delete(delete))
.route("/register", get(register))
20 changes: 20 additions & 0 deletions core/dtn7/src/lib.rs
Original file line number Diff line number Diff line change
@@ -218,6 +218,26 @@ pub fn store_delete_expired() {
}
}
}
pub async fn routing_cmd(cmd: String) -> Result<()> {
let chan = DTNCORE.lock().routing_agent.channel();
if let Err(err) = chan.send(RoutingCmd::Command(cmd)).await {
bail!("Error while sending notification: {}", err);
}
Ok(())
}

pub async fn routing_get_data(param: String) -> Result<String> {
let (reply_tx, reply_rx) = oneshot::channel();

let cmd_channel = DTNCORE.lock().routing_agent.channel();
if let Err(err) = cmd_channel.send(RoutingCmd::GetData(param, reply_tx)).await {
bail!("Error while sending command to routing agent: {}", err);
}
// wait for reply or timeout
let res = tokio::time::timeout(std::time::Duration::from_secs(1), reply_rx).await??;

Ok(res)
}

pub async fn routing_notify(notification: RoutingNotifcation) -> Result<()> {
let chan = DTNCORE.lock().routing_agent.channel();
4 changes: 4 additions & 0 deletions core/dtn7/src/routing/epidemic.rs
Original file line number Diff line number Diff line change
@@ -112,6 +112,10 @@ async fn handle_routing_cmd(mut rx: mpsc::Receiver<RoutingCmd>) {
super::RoutingCmd::Shutdown => {
break;
}
super::RoutingCmd::Command(_cmd) => {}
super::RoutingCmd::GetData(_, tx) => {
tx.send(format!("{:?}", core.history)).unwrap();
}
super::RoutingCmd::Notify(notification) => match notification {
RoutingNotifcation::SendingFailed(bid, cla_sender) => {
core.sending_failed(bid.as_str(), cla_sender.as_str());
4 changes: 4 additions & 0 deletions core/dtn7/src/routing/external.rs
Original file line number Diff line number Diff line change
@@ -30,6 +30,10 @@ impl ExternalRoutingAgent {
super::RoutingCmd::Shutdown => {
break;
}
super::RoutingCmd::Command(_cmd) => {}
super::RoutingCmd::GetData(_, tx) => {
tx.send("unimplemented!".to_string()).unwrap();
}
super::RoutingCmd::Notify(notification) => {
notify(notification);
}
4 changes: 4 additions & 0 deletions core/dtn7/src/routing/flooding.rs
Original file line number Diff line number Diff line change
@@ -39,6 +39,10 @@ impl FloodingRoutingAgent {
super::RoutingCmd::Shutdown => {
break;
}
super::RoutingCmd::Command(_cmd) => {}
super::RoutingCmd::GetData(_, tx) => {
tx.send("unimplemented!".to_string()).unwrap();
}
super::RoutingCmd::Notify(_) => {}
}
}
2 changes: 2 additions & 0 deletions core/dtn7/src/routing/mod.rs
Original file line number Diff line number Diff line change
@@ -45,6 +45,8 @@ pub enum RoutingAgentsEnum {
pub enum RoutingCmd {
SenderForBundle(BundlePack, oneshot::Sender<(Vec<ClaSenderTask>, bool)>),
Notify(RoutingNotifcation),
Command(String),
GetData(String, oneshot::Sender<String>),
Shutdown,
}

4 changes: 4 additions & 0 deletions core/dtn7/src/routing/sink.rs
Original file line number Diff line number Diff line change
@@ -26,6 +26,10 @@ impl SinkRoutingAgent {
super::RoutingCmd::Shutdown => {
break;
}
super::RoutingCmd::Command(_cmd) => {}
super::RoutingCmd::GetData(_, tx) => {
tx.send("unimplemented!".to_string()).unwrap();
}
super::RoutingCmd::Notify(_) => {}
}
}
5 changes: 5 additions & 0 deletions core/dtn7/src/routing/sprayandwait.rs
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ pub struct SprayAndWaitRoutingAgent {
tx: mpsc::Sender<super::RoutingCmd>,
}

#[derive(Debug)]
pub struct SaWBundleData {
/// the number of copies we have left to spread
remaining_copies: usize,
@@ -203,6 +204,10 @@ async fn handle_routing_cmd(mut rx: mpsc::Receiver<RoutingCmd>) {
super::RoutingCmd::Shutdown => {
break;
}
super::RoutingCmd::Command(_cmd) => {}
super::RoutingCmd::GetData(_, tx) => {
tx.send(format!("{:?}", core.history)).unwrap();
}
super::RoutingCmd::Notify(notification) => {
handle_notification(&mut core, notification);
}
34 changes: 32 additions & 2 deletions core/dtn7/src/routing/static_routing.rs
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ use crate::{CONFIG, PEERS};
use super::{RoutingAgent, RoutingCmd};
use async_trait::async_trait;
use glob_match::glob_match;
use log::debug;
use log::{debug, info};
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;

@@ -91,6 +91,7 @@ fn parse_route_from_str(s: &str) -> Option<StaticRouteEntry> {
async fn handle_routing_cmd(mut rx: mpsc::Receiver<RoutingCmd>) {
let mut route_entries = vec![];
let settings = CONFIG.lock().routing_settings.clone();

if let Some(static_settings) = settings.get("static") {
if let Some(routes_file) = static_settings.get("routes") {
// open file and read routes line by line
@@ -104,7 +105,7 @@ async fn handle_routing_cmd(mut rx: mpsc::Receiver<RoutingCmd>) {
}
}

let core: StaticRoutingAgentCore = StaticRoutingAgentCore {
let mut core: StaticRoutingAgentCore = StaticRoutingAgentCore {
routes: route_entries,
};

@@ -142,6 +143,35 @@ async fn handle_routing_cmd(mut rx: mpsc::Receiver<RoutingCmd>) {
super::RoutingCmd::Shutdown => {
break;
}
super::RoutingCmd::Command(cmd) => {
if cmd == "reload" {
let settings = CONFIG.lock().routing_settings.clone();
if let Some(static_settings) = settings.get("static") {
if let Some(routes_file) = static_settings.get("routes") {
info!("Reloading static routes from {}", routes_file);
// open file and read routes line by line
let routes = std::fs::read_to_string(routes_file).unwrap();
let mut route_entries = vec![];
for line in routes.lines() {
if let Some(entry) = parse_route_from_str(line) {
debug!("Adding static route: {}", entry);
route_entries.push(entry);
}
}
core.routes = route_entries;
}
}
} else {
debug!("Unknown command: {}", cmd);
}
}
super::RoutingCmd::GetData(_, tx) => {
let routes_as_str = core
.routes
.iter()
.fold(String::new(), |acc, r| acc + &format!("{}\n", r));
tx.send(routes_as_str).unwrap();
}
super::RoutingCmd::Notify(_) => {}
}
}
17 changes: 17 additions & 0 deletions doc/http-client-api.md
Original file line number Diff line number Diff line change
@@ -88,6 +88,23 @@ $ curl "http://127.0.0.1:3000/peers/del?p=tcp://127.0.0.1:4223/node2"
Removed peer
```

### **GET**, **POST** `/routing/cmd?c=<command to send to routing agent>`

Send a command to the routing daemon, e.g., *static* accepts to `reload` to reload the routing information from the configured file.
```
$ curl http://127.0.0.1:3000/routing/cmd?cmd='reload'
Sent command to routing agent.
```
### **GET** `/routing/getdata?p=<optional parameter>`

Get internal data from routing agent.
Not all routing agents respond to this.

```
$ curl http://127.0.0.1:3000/routing/getdata
#1: route from * to ipn:[2-3].* via ipn:2.0
```

### **GET**, **POST** `/insert`

Insert is used to send a newly constructed bundle from this node instance.