Skip to content

Commit

Permalink
Merge pull request #123 from rneswold/pull-request
Browse files Browse the repository at this point in the history
Pull request
  • Loading branch information
rneswold authored Aug 14, 2024
2 parents db3ef53 + a6acbf2 commit 3d05a0d
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 47 deletions.
2 changes: 1 addition & 1 deletion drmemd/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "drmemd"
version = "0.4.0"
version = "0.4.1"
authors = ["Rich Neswold <[email protected]>"]
edition = "2021"
description = "Main process of the DrMem control system"
Expand Down
36 changes: 36 additions & 0 deletions drmemd/src/logic/compile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2934,4 +2934,40 @@ mod tests {
);
}
}

#[test]
fn test_solar_usage() {
const DATA: &[(&str, bool)] = &[
// Make sure literals, variables, and time values don't
// return a field.
("{a}", false),
("1", false),
("1.0", false),
("true", false),
("#green", false),
("\"test\"", false),
("{utc:second}", false),
// Make sure the solar values return true.
("{solar:alt}", true),
("{solar:dec}", true),
("{solar:ra}", true),
("{solar:az}", true),
// Now test more complicated expressions to make sure each
// subtree is correctly compared.
("not (2 > 3)", false),
("2 + 2", false),
("{solar:alt} + 2", true),
("2 + {solar:az}", true),
("{solar:dec} + {solar:az}", true),
];

for (expr, result) in DATA {
assert_eq!(
&to_expr(expr).uses_solar(),
result,
"error using {}",
expr
);
}
}
}
110 changes: 90 additions & 20 deletions drmemd/src/logic/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use drmem_api::{client, device, driver, Result};
use futures::future::join_all;
use futures::future::{join_all, pending};
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
Expand Down Expand Up @@ -395,7 +395,7 @@ impl Node {

let wait_for_time = async {
match self.time_ch.as_mut() {
None => None,
None => pending().await,
Some(s) => s.next().await,
}
};
Expand All @@ -407,23 +407,32 @@ impl Node {

let wait_for_solar = async {
match self.solar_ch.as_mut() {
None => None,
Some(ch) => ch.recv().await.ok(),
None => pending().await,
Some(ch) => ch.recv().await,
}
};

#[rustfmt::skip]
tokio::select! {
// Wait for the next reading to arrive. All the
// incoming streams have been combined into one and
// the returned value is a pair consisting of an index
// and the actual reading.
biased;

Some((idx, reading)) = self.in_stream.next() => {
// Save the reading in our array for future
// recalculations.
// If we need the solar channel, wait for the next
// update.

self.inputs[idx] = Some(reading.value);
v = wait_for_solar => {
match v {
Ok(v) => solar = Some(v),
Err(broadcast::error::RecvError::Lagged(_)) => {
warn!("not handling solar info fast enough");
continue
}
Err(broadcast::error::RecvError::Closed) => {
error!("solar info channel is closed");
return Err(drmem_api::Error::OperationError(
"solar channel closed".into()
));
}
}
}

// If we need the time channel, wait for the next
Expand All @@ -433,11 +442,16 @@ impl Node {
time = v;
}

// If we need the solar channel, wait for the next
// update.
// Wait for the next reading to arrive. All the
// incoming streams have been combined into one and
// the returned value is a pair consisting of an index
// and the actual reading.

Some((idx, reading)) = self.in_stream.next() => {
// Save the reading in our array for future
// recalculations.

Some(v) = wait_for_solar => {
solar = Some(v);
self.inputs[idx] = Some(reading.value);
}
}

Expand Down Expand Up @@ -557,16 +571,16 @@ mod test {
// Create the common channels used by DrMem.

let (tx_req, mut c_recv) = mpsc::channel(100);
let (tx_tod, rx_tod) = broadcast::channel(100);
let (tx_solar, rx_solar) = broadcast::channel(100);
let (tx_tod, _) = broadcast::channel(100);
let (tx_solar, _) = broadcast::channel(100);

// Start the logic block with the proper communciation
// channels and configuration.

let node = Node::start(
client::RequestChan::new(tx_req),
rx_tod,
rx_solar,
tx_tod.subscribe(),
tx_solar.subscribe(),
cfg,
);

Expand Down Expand Up @@ -954,6 +968,62 @@ mod test {
assert_eq!(emu.await.unwrap(), Ok(true));
}

// Test a basic logic block in which forwards a solar parameter to
// a memory device.

#[tokio::test]
async fn test_basic_solar_node() {
const OUT1: &str = "device:out1";
const OUT2: &str = "device:out2";
let cfg = build_config(
&[],
&[("alt", OUT1), ("dec", OUT2)],
&[],
&["{solar:alt} -> {alt}", "{solar:dec} -> {dec}"],
);
let (tx_out1, mut rx_out1) = mpsc::channel(100);
let (tx_out2, mut rx_out2) = mpsc::channel(100);

let (_, tx_solar, emu, tx_stop) = Emulator::start(
vec![],
vec![(OUT1.into(), tx_out1), (OUT2.into(), tx_out2)],
cfg,
)
.await
.unwrap();

// Send a value and see if it was forwarded.

assert!(tx_solar
.send(Arc::new(solar::SolarInfo {
elevation: 1.0,
azimuth: 2.0,
right_ascension: 3.0,
declination: 4.0
}))
.is_ok());

{
let (value, rpy) = rx_out1.recv().await.unwrap();
let _ = rpy.send(Ok(value.clone()));

assert_eq!(value, device::Value::Flt(1.0));
}

{
let (value, rpy) = rx_out2.recv().await.unwrap();
let _ = rpy.send(Ok(value.clone()));

assert_eq!(value, device::Value::Flt(4.0));
}

// Stop the emulator and see that its return status is good.

let _ = tx_stop.send(());

assert_eq!(emu.await.unwrap(), Ok(true));
}

// Test a logic block with two outputs. Make sure they are sent
// "in parallel".

Expand Down
2 changes: 1 addition & 1 deletion drmemd/src/logic/solar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub fn create_task(
lat: f64,
long: f64,
) -> (broadcast::Sender<Info>, broadcast::Receiver<Info>) {
let (tx, rx) = broadcast::channel(1);
let (tx, rx) = broadcast::channel(10);
let tx_copy = tx.clone();

tokio::spawn(
Expand Down
47 changes: 22 additions & 25 deletions drmemd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,37 +156,34 @@ async fn run() -> Result<()> {
}
}

// Start the time-of-day task. This needs to be done *before*
// any logic blocks are started because logic blocks *may*
// have an expression that uses the time-of-day.
// Create a nested scope so that the tod and solar handles are
// freed up.

let (tx_tod, rx_tod) = logic::tod::create_task();

// Start the solar task. This, too, needs to be done before
// any logic blocks are started.
{
// Start the time-of-day task. This needs to be done
// *before* any logic blocks are started because logic
// blocks *may* have an expression that uses the
// time-of-day.

let (tx_solar, rx_solar) =
logic::solar::create_task(cfg.latitude, cfg.longitude);
let (tx_tod, _) = logic::tod::create_task();

// Iterate through the [[logic]] sections of the config.
// Start the solar task. This, too, needs to be done
// before any logic blocks are started.

for logic in cfg.logic {
tasks.push(wrap_task(logic::Node::start(
tx_clnt_req.clone(),
tx_tod.subscribe(),
tx_solar.subscribe(),
logic,
)));
}
let (tx_solar, _) =
logic::solar::create_task(cfg.latitude, cfg.longitude);

// Now that we've given all the logic blocks receive handles
// for the time-of-day and solar tasks, we can free up our
// copy. If we freed up our copy *before* creating new
// subscriptions, the tod or solar task may have briefly seen
// no clients and would exit.
// Iterate through the [[logic]] sections of the config.

std::mem::drop(rx_tod);
std::mem::drop(rx_solar);
for logic in cfg.logic {
tasks.push(wrap_task(logic::Node::start(
tx_clnt_req.clone(),
tx_tod.subscribe(),
tx_solar.subscribe(),
logic,
)));
}
}

// Now run all the tasks.

Expand Down

0 comments on commit 3d05a0d

Please sign in to comment.