Skip to content

Commit

Permalink
feat: routing agents can now receive commands via HTTP and also be qu…
Browse files Browse the repository at this point in the history
…eried for internal data.
  • Loading branch information
Lars Baumgaertner committed Feb 26, 2024
1 parent c2f4656 commit c2ddf4d
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 4 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Plus:
* [Minimal TCP Convergence Layer](https://tools.ietf.org/html/draft-ietf-dtn-mtcpcl-01)
* A simple [HTTP Convergence Layer](doc/http-cl.md)
* A [HTTP pull-based Convergence Layer](doc/http-pull-cl.md)
* An IP neighorhood discovery service
* An IP neighborhood discovery service
* Convenient command line tools to interact with the daemon
* A simple web interface for status information about `dtnd`
* A [web-socket interface](doc/http-client-api.md) for application agents
Expand All @@ -25,7 +25,7 @@ The actual BP7 implementation (encoding/decoding) is available as a separate [pr

Additional dtn extensions and a client library are also [available](https://crates.io/crates/dtn7-plus).

Currently, a service discovery based on IPND but adapted to CBOR and BPv7, TCP, MTCP & HTTP CLs, sprayandwait/flooding/epidemic/sink-routing and restful/websocket command interfaces are implemented.
Currently, a service discovery based on IPND but adapted to CBOR and BPv7, TCP, MTCP & HTTP CLs, sprayandwait/flooding/epidemic/static/sink-routing and restful/websocket command interfaces are implemented.
Both addressing schemes, *dtn* as well as *ipn* are supported.
Furthermore, some CLI tools are provided to easily integrate *dtn7* into shell scripts.

Expand Down
39 changes: 39 additions & 0 deletions core/dtn7/src/dtnd/httpd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use crate::core::peer::PeerType;
use crate::core::store::BundleStore;
use crate::peers_add;
use crate::peers_remove;
use crate::routing_cmd;
use crate::routing_get_data;
use crate::store_remove;
use crate::CONFIG;
use crate::DTNCORE;
Expand Down Expand Up @@ -329,6 +331,41 @@ async fn debug_rnd_peer() -> String {
res
}

async fn http_routing_cmd(
Query(params): Query<HashMap<String, String>>,
) -> Result<String, (StatusCode, &'static str)> {
if let Some(cmd) = params.get("c") {
if routing_cmd(cmd.to_string()).await.is_ok() {
Ok("Sent command to routing agent.".into())
} else {
Err((
StatusCode::INTERNAL_SERVER_ERROR,
"error sending cmd to routing agent",
))
}
} else {
//anyhow::bail!("missing filter criteria");
Err((
StatusCode::BAD_REQUEST,
"missing routing command parameter cmd",
))
}
}

async fn http_routing_getdata(
Query(params): Query<HashMap<String, String>>,
) -> Result<String, (StatusCode, &'static str)> {
let param = params.get("p").map_or("".to_string(), |f| f.to_string());
if let Ok(res) = routing_get_data(param).await {
Ok(res)
} else {
Err((
StatusCode::INTERNAL_SERVER_ERROR,
"error getting data from routing agent",
))
}
}

async fn http_peers_add(
Query(params): Query<HashMap<String, String>>,
) -> Result<String, (StatusCode, &'static str)> {
Expand Down Expand Up @@ -699,6 +736,8 @@ pub async fn spawn_httpd() -> Result<()> {
let mut app_local_only = Router::new()
.route("/peers/add", get(http_peers_add))
.route("/peers/del", get(http_peers_delete))
.route("/routing/cmd", get(http_routing_cmd).post(http_routing_cmd))
.route("/routing/getdata", get(http_routing_getdata))
.route("/send", post(send_post))
.route("/delete", get(delete).delete(delete))
.route("/register", get(register))
Expand Down
20 changes: 20 additions & 0 deletions core/dtn7/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,26 @@ pub fn store_delete_expired() {
}
}
}
pub async fn routing_cmd(cmd: String) -> Result<()> {
let chan = DTNCORE.lock().routing_agent.channel();
if let Err(err) = chan.send(RoutingCmd::Command(cmd)).await {
bail!("Error while sending notification: {}", err);
}
Ok(())
}

pub async fn routing_get_data(param: String) -> Result<String> {
let (reply_tx, reply_rx) = oneshot::channel();

let cmd_channel = DTNCORE.lock().routing_agent.channel();
if let Err(err) = cmd_channel.send(RoutingCmd::GetData(param, reply_tx)).await {
bail!("Error while sending command to routing agent: {}", err);
}
// wait for reply or timeout
let res = tokio::time::timeout(std::time::Duration::from_secs(1), reply_rx).await??;

Ok(res)
}

pub async fn routing_notify(notification: RoutingNotifcation) -> Result<()> {
let chan = DTNCORE.lock().routing_agent.channel();
Expand Down
4 changes: 4 additions & 0 deletions core/dtn7/src/routing/epidemic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ async fn handle_routing_cmd(mut rx: mpsc::Receiver<RoutingCmd>) {
super::RoutingCmd::Shutdown => {
break;
}
super::RoutingCmd::Command(_cmd) => {}
super::RoutingCmd::GetData(_, tx) => {
tx.send(format!("{:?}", core.history)).unwrap();
}
super::RoutingCmd::Notify(notification) => match notification {
RoutingNotifcation::SendingFailed(bid, cla_sender) => {
core.sending_failed(bid.as_str(), cla_sender.as_str());
Expand Down
4 changes: 4 additions & 0 deletions core/dtn7/src/routing/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ impl ExternalRoutingAgent {
super::RoutingCmd::Shutdown => {
break;
}
super::RoutingCmd::Command(_cmd) => {}
super::RoutingCmd::GetData(_, tx) => {
tx.send("unimplemented!".to_string()).unwrap();
}
super::RoutingCmd::Notify(notification) => {
notify(notification);
}
Expand Down
4 changes: 4 additions & 0 deletions core/dtn7/src/routing/flooding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ impl FloodingRoutingAgent {
super::RoutingCmd::Shutdown => {
break;
}
super::RoutingCmd::Command(_cmd) => {}
super::RoutingCmd::GetData(_, tx) => {
tx.send("unimplemented!".to_string()).unwrap();
}
super::RoutingCmd::Notify(_) => {}
}
}
Expand Down
2 changes: 2 additions & 0 deletions core/dtn7/src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub enum RoutingAgentsEnum {
pub enum RoutingCmd {
SenderForBundle(BundlePack, oneshot::Sender<(Vec<ClaSenderTask>, bool)>),
Notify(RoutingNotifcation),
Command(String),
GetData(String, oneshot::Sender<String>),
Shutdown,
}

Expand Down
4 changes: 4 additions & 0 deletions core/dtn7/src/routing/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ impl SinkRoutingAgent {
super::RoutingCmd::Shutdown => {
break;
}
super::RoutingCmd::Command(_cmd) => {}
super::RoutingCmd::GetData(_, tx) => {
tx.send("unimplemented!".to_string()).unwrap();
}
super::RoutingCmd::Notify(_) => {}
}
}
Expand Down
5 changes: 5 additions & 0 deletions core/dtn7/src/routing/sprayandwait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct SprayAndWaitRoutingAgent {
tx: mpsc::Sender<super::RoutingCmd>,
}

#[derive(Debug)]
pub struct SaWBundleData {
/// the number of copies we have left to spread
remaining_copies: usize,
Expand Down Expand Up @@ -203,6 +204,10 @@ async fn handle_routing_cmd(mut rx: mpsc::Receiver<RoutingCmd>) {
super::RoutingCmd::Shutdown => {
break;
}
super::RoutingCmd::Command(_cmd) => {}
super::RoutingCmd::GetData(_, tx) => {
tx.send(format!("{:?}", core.history)).unwrap();
}
super::RoutingCmd::Notify(notification) => {
handle_notification(&mut core, notification);
}
Expand Down
34 changes: 32 additions & 2 deletions core/dtn7/src/routing/static_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{CONFIG, PEERS};
use super::{RoutingAgent, RoutingCmd};
use async_trait::async_trait;
use glob_match::glob_match;
use log::debug;
use log::{debug, info};
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;

Expand Down Expand Up @@ -91,6 +91,7 @@ fn parse_route_from_str(s: &str) -> Option<StaticRouteEntry> {
async fn handle_routing_cmd(mut rx: mpsc::Receiver<RoutingCmd>) {
let mut route_entries = vec![];
let settings = CONFIG.lock().routing_settings.clone();

if let Some(static_settings) = settings.get("static") {
if let Some(routes_file) = static_settings.get("routes") {
// open file and read routes line by line
Expand All @@ -104,7 +105,7 @@ async fn handle_routing_cmd(mut rx: mpsc::Receiver<RoutingCmd>) {
}
}

let core: StaticRoutingAgentCore = StaticRoutingAgentCore {
let mut core: StaticRoutingAgentCore = StaticRoutingAgentCore {
routes: route_entries,
};

Expand Down Expand Up @@ -142,6 +143,35 @@ async fn handle_routing_cmd(mut rx: mpsc::Receiver<RoutingCmd>) {
super::RoutingCmd::Shutdown => {
break;
}
super::RoutingCmd::Command(cmd) => {
if cmd == "reload" {
let settings = CONFIG.lock().routing_settings.clone();
if let Some(static_settings) = settings.get("static") {
if let Some(routes_file) = static_settings.get("routes") {
info!("Reloading static routes from {}", routes_file);
// open file and read routes line by line
let routes = std::fs::read_to_string(routes_file).unwrap();
let mut route_entries = vec![];
for line in routes.lines() {
if let Some(entry) = parse_route_from_str(line) {
debug!("Adding static route: {}", entry);
route_entries.push(entry);
}
}
core.routes = route_entries;
}
}
} else {
debug!("Unknown command: {}", cmd);
}
}
super::RoutingCmd::GetData(_, tx) => {
let routes_as_str = core
.routes
.iter()
.fold(String::new(), |acc, r| acc + &format!("{}\n", r));
tx.send(routes_as_str).unwrap();
}
super::RoutingCmd::Notify(_) => {}
}
}
Expand Down
17 changes: 17 additions & 0 deletions doc/http-client-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,23 @@ $ curl "http://127.0.0.1:3000/peers/del?p=tcp://127.0.0.1:4223/node2"
Removed peer
```

### **GET**, **POST** `/routing/cmd?c=<command to send to routing agent>`

Send a command to the routing daemon, e.g., *static* accepts to `reload` to reload the routing information from the configured file.
```
$ curl http://127.0.0.1:3000/routing/cmd?cmd='reload'
Sent command to routing agent.
```
### **GET** `/routing/getdata?p=<optional parameter>`

Get internal data from routing agent.
Not all routing agents respond to this.

```
$ curl http://127.0.0.1:3000/routing/getdata
#1: route from * to ipn:[2-3].* via ipn:2.0
```

### **GET**, **POST** `/insert`

Insert is used to send a newly constructed bundle from this node instance.
Expand Down

0 comments on commit c2ddf4d

Please sign in to comment.