Skip to content

Commit

Permalink
add the processing for the messages from ws - not compling for now
Browse files Browse the repository at this point in the history
  • Loading branch information
besok committed Nov 8, 2023
1 parent 285f4d4 commit ccd986a
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 15 deletions.
70 changes: 59 additions & 11 deletions src/runtime/ros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ pub mod client;

use std::fmt::format;
use std::sync::atomic::Ordering;
use serde::Deserialize;
use serde_json::Deserializer;
use tungstenite::{connect, Message};
use url::Url;
use blackboard::utils::push_to_arr;
use crate::runtime::action::{Impl, Tick};
use crate::runtime::args::{RtArgs, RtValue};
use crate::runtime::context::TreeContextRef;
use crate::runtime::{ros, RtResult, RuntimeError, TickResult};
use crate::runtime::{blackboard, ros, RtResult, RuntimeError, TickResult};
use crate::runtime::blackboard::BBKey;
use crate::runtime::env::daemon::context::DaemonContext;
use crate::runtime::env::daemon::{Daemon, DaemonFn, StopFlag};
use crate::runtime::ros::client::{SubscribeCfg, WS};
Expand Down Expand Up @@ -74,14 +78,24 @@ impl Impl for OneTimeSender {
}
}

pub struct SubscriberDaemon {
cfg: TargetCfg,
ws: WS,
pub enum SubscriberDaemon {
Last(WS, BBKey),
All(WS, BBKey),
}


impl SubscriberDaemon {
pub fn new(cfg: TargetCfg, ws: WS) -> Self {
Self { cfg, ws }
match cfg.tp.as_str() {
"last" => SubscriberDaemon::Last(ws, cfg.dst),
_ => SubscriberDaemon::All(ws, cfg.dst),
}
}
pub fn ws(&self) -> &WS {
match self {
SubscriberDaemon::Last(ws, _) => ws,
SubscriberDaemon::All(ws, _) => ws,
}
}
}

Expand All @@ -91,8 +105,30 @@ impl DaemonFn for SubscriberDaemon {
if signal.load(Ordering::Relaxed) {
break;
}
// let msg = self.ws.read()?;
// let msg = msg.into_text()?;
let msg = self.ws().read()?;

match self {
SubscriberDaemon::Last(_, key) => {
if msg.is_text() {
let string = msg.into_text().expect("text");
let value = RtValue::deserialize(string).unwrap();
ctx.bb.lock().unwrap().put(key.clone(), value).unwrap();
} else {
debug!(target: "ws-subscriber" ,"Subscriber Daemon: Received not text: {:#?}", msg);
}
}
SubscriberDaemon::All(_, key) => {
if msg.is_text() {
let string = msg.into_text().expect("text");
let value = RtValue::deserialize(string).unwrap();
push_to_arr(ctx.bb.clone(), key.clone(), value).unwrap();
} else {
debug!(target: "ws-subscriber" ,"Subscriber Daemon: Received not text: {:#?}", msg);
}
}
}


// let msg = serde_json::from_str::<ros::RosMessage>(&msg)?;
// let mut bb = ctx.bb.lock()?;
// bb.insert(topic.clone(), RtValue::from(msg));
Expand All @@ -108,8 +144,20 @@ pub struct TargetCfg {
}

impl TargetCfg {
pub fn from(v: RtValue) -> Self {
Self::default()
pub fn from(v: RtValue) -> RtResult<TargetCfg> {
let elems = v.as_map(|(k, v)| (k, v))
.ok_or(RuntimeError::fail(format!("the target_cfg should be an object")))?;

let tp = elems.get("tp").and_then(|v| v.clone().as_string())
.ok_or(RuntimeError::fail(format!("the tp is not found")))?;

let buf_size = elems.get("buf_size")
.and_then(|v| v.clone().as_int().map(|v| v as usize));

let dst = elems.get("dst").and_then(|v| v.clone().as_string())
.ok_or(RuntimeError::fail(format!("the dst should be a string")))?;

Ok(TargetCfg { tp, buf_size, dst })
}
}

Expand All @@ -129,10 +177,10 @@ fn param_as_str(key: &str, i: usize, args: &RtArgs, ctx: TreeContextRef) -> RtRe
}

fn param_as<T, V>(key: &str, i: usize, args: &RtArgs, m: T) -> RtResult<V>
where T: Fn(RtValue) -> V
where T: Fn(RtValue) -> RtResult<V>
{
args
.find_or_ith(key.to_string(), i)
.ok_or(RuntimeError::fail(format!("the {key} is not found")))
.map(|v| m(v))
.and_then(|v| m(v))
}
40 changes: 36 additions & 4 deletions src/runtime/ros/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use tungstenite::stream::MaybeTlsStream;
use url::Url;
use crate::runtime::action::Tick;
use crate::runtime::args::RtValue;
use crate::runtime::{RtResult, TickResult};
use crate::runtime::{RtResult, RuntimeError, TickResult};

type Topic = String;
type Type = String;

pub type WS = WebSocket<MaybeTlsStream<TcpStream>>;

#[derive(Debug, Default, Clone,Deserialize)]
#[derive(Debug, Default, Clone, Deserialize)]
pub struct SubscribeCfg {
tp: Option<String>,
throttle_rate: Option<i32>,
Expand All @@ -24,8 +24,40 @@ pub struct SubscribeCfg {


impl SubscribeCfg {
pub fn from(v: RtValue) -> Self {
Self::default()
pub fn from(v: RtValue) -> RtResult<SubscribeCfg> {
let mut cfg = SubscribeCfg::default();
if let RtValue::Object(map) = v {
for (k, v) in map {
match k.as_str() {
"tp" => {
cfg.tp = Some(v.as_string().ok_or(RuntimeError::fail("the type is not string".to_string()))?);
}
"throttle_rate" => {
cfg.throttle_rate = Some(v.as_int().map(|v| v as i32)
.ok_or(RuntimeError::fail("the throttle_rate is not int".to_string()))?);
}
"queue_length" => {
cfg.queue_length = Some(v.as_int().map(|v| v as i32)
.ok_or(RuntimeError::fail("the queue_length is not int".to_string()))?);
}
"fragment_size" => {
cfg.fragment_size = Some(v.as_int().map(|v| v as i32)
.ok_or(RuntimeError::fail("the fragment_size is not int".to_string()))?);
}
"compression" => {
cfg.compression = Some(v.as_string()
.ok_or(RuntimeError::fail("the compression is not string".to_string()))?);
}
_ => {
return Err(RuntimeError::fail((format!("the key {} is not supported", k))));
}
}
}
} else {
return Err(RuntimeError::fail("the source_cfg is not object".to_owned()));
}

Ok(cfg)
}
pub fn count(&self) -> usize {
let mut count = 0;
Expand Down

0 comments on commit ccd986a

Please sign in to comment.