diff --git a/Cargo.toml b/Cargo.toml index bebab88..1314b32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ async-trait = { version = "0.1", default-features = false } futures = { version = "0.3", features = ["std"] } -toml = { version = "0.7", default-features = false } +toml = { version = "0.8", default-features = false } chrono = { version = "0.4", default-features = false } tokio = { version = "1", default-features = false } diff --git a/backends/drmem-db-redis/src/lib.rs b/backends/drmem-db-redis/src/lib.rs index 638280f..89ada4e 100644 --- a/backends/drmem-db-redis/src/lib.rs +++ b/backends/drmem-db-redis/src/lib.rs @@ -409,8 +409,8 @@ pub struct RedisStore { impl RedisStore { fn make_client( cfg: &config::Config, - name: &Option, - pword: &Option, + name: Option<&String>, + pword: Option<&String>, ) -> Result { use redis::{ConnectionAddr, ConnectionInfo, RedisConnectionInfo}; @@ -420,8 +420,8 @@ impl RedisStore { addr: ConnectionAddr::Tcp(addr.ip().to_string(), addr.port()), redis: RedisConnectionInfo { db: cfg.get_dbn(), - username: name.clone(), - password: pword.clone(), + username: name.cloned(), + password: pword.cloned(), }, }; @@ -435,7 +435,7 @@ impl RedisStore { name: Option, pword: Option, ) -> Result { - let client = Self::make_client(cfg, &name, &pword)?; + let client = Self::make_client(cfg, name.as_ref(), pword.as_ref())?; debug!("creating new redis connection"); @@ -452,7 +452,7 @@ impl RedisStore { name: Option, pword: Option, ) -> Result { - let client = Self::make_client(cfg, &name, &pword)?; + let client = Self::make_client(cfg, name.as_ref(), pword.as_ref())?; debug!("creating new, shared redis connection"); @@ -500,7 +500,7 @@ impl RedisStore { fn init_device_cmd( name: &str, driver: &str, - units: &Option, + units: Option<&String>, ) -> redis::Pipeline { let hist_key = Self::hist_key(name); let info_key = Self::info_key(name); @@ -543,12 +543,11 @@ impl RedisStore { redis::Cmd::xrevrange_count(name, "+", "-", 1usize) } - fn match_pattern_cmd(pattern: &Option) -> redis::Cmd { + fn match_pattern_cmd(pattern: Option<&str>) -> redis::Cmd { // Take the pattern from the caller and append "#info" since // we only want to look at device information keys. let pattern = pattern - .as_ref() .map(|v| Self::info_key(v)) .unwrap_or_else(|| String::from("*#info")); @@ -835,7 +834,7 @@ impl RedisStore { &mut self, name: &str, driver: &str, - units: &Option, + units: Option<&String>, ) -> Result<()> { debug!("initializing {}", name); Self::init_device_cmd(name, driver, units) @@ -850,12 +849,12 @@ impl RedisStore { fn mk_report_func( &self, name: &str, - max_history: &Option, + max_history: Option, ) -> ReportReading { let db_con = self.db_con.clone(); let name = String::from(name); - if let Some(mh) = *max_history { + if let Some(mh) = max_history { Box::new(move |v| { let mut db_con = db_con.clone(); let hist_key = Self::hist_key(&name); @@ -898,8 +897,8 @@ impl Store for RedisStore { &mut self, driver_name: &str, name: &device::Name, - units: &Option, - max_history: &Option, + units: Option<&String>, + max_history: Option, ) -> Result<(ReportReading, Option)> { let name = name.to_string(); @@ -920,8 +919,8 @@ impl Store for RedisStore { &mut self, driver_name: &str, name: &device::Name, - units: &Option, - max_history: &Option, + units: Option<&String>, + max_history: Option, ) -> Result<( ReportReading, RxDeviceSetting, @@ -956,7 +955,7 @@ impl Store for RedisStore { async fn get_device_info( &mut self, - pattern: &Option, + pattern: Option<&str>, ) -> Result> { // Get a list of all the keys that match the pattern. For // Redis, these keys will have "#info" appended at the end. @@ -1339,20 +1338,19 @@ mod tests { #[test] fn test_pattern_cmd() { assert_eq!( - &RedisStore::match_pattern_cmd(&None).get_packed_command(), + &RedisStore::match_pattern_cmd(None).get_packed_command(), b"*2\r $4\r\nKEYS\r $6\r\n*#info\r\n" ); assert_eq!( - &RedisStore::match_pattern_cmd(&Some(String::from("device"))) - .get_packed_command(), + &RedisStore::match_pattern_cmd(Some("device")).get_packed_command(), b"*2\r $4\r\nKEYS\r $11\r\ndevice#info\r\n" ); assert_eq!( - &RedisStore::match_pattern_cmd(&Some(String::from("*weather*"))) + &RedisStore::match_pattern_cmd(Some("*weather*")) .get_packed_command(), b"*2\r $4\r\nKEYS\r @@ -1639,7 +1637,7 @@ $10\r\nS\x00\x00\x00\x05hello\r\n" fn test_init_dev() { assert_eq!( String::from_utf8_lossy( - &RedisStore::init_device_cmd("device", "mem", &None) + &RedisStore::init_device_cmd("device", "mem", None) .get_packed_pipeline() ), "*1\r @@ -1673,7 +1671,7 @@ $4\r\nEXEC\r\n" &RedisStore::init_device_cmd( "device", "pump", - &Some(String::from("gpm")) + Some(&String::from("gpm")) ) .get_packed_pipeline() ), diff --git a/backends/drmem-db-simple/src/lib.rs b/backends/drmem-db-simple/src/lib.rs index afb0baa..2bf75a6 100644 --- a/backends/drmem-db-simple/src/lib.rs +++ b/backends/drmem-db-simple/src/lib.rs @@ -49,7 +49,7 @@ struct DeviceInfo { impl DeviceInfo { pub fn create( owner: String, - units: Option, + units: Option<&String>, tx_setting: Option, ) -> DeviceInfo { let (tx, _) = broadcast::channel(CHAN_SIZE); @@ -58,7 +58,7 @@ impl DeviceInfo { DeviceInfo { owner, - units, + units: units.cloned(), tx_setting, reading: Arc::new(Mutex::new((tx, None, time::UNIX_EPOCH))), } @@ -145,8 +145,8 @@ impl Store for SimpleStore { &mut self, driver: &str, name: &device::Name, - units: &Option, - _max_history: &Option, + units: Option<&String>, + _max_history: Option, ) -> Result<(ReportReading, Option)> { // Check to see if the device name already exists. @@ -203,8 +203,8 @@ impl Store for SimpleStore { &mut self, driver: &str, name: &device::Name, - units: &Option, - _max_history: &Option, + units: Option<&String>, + _max_history: Option, ) -> Result<( ReportReading, RxDeviceSetting, @@ -224,7 +224,7 @@ impl Store for SimpleStore { let di = e.insert(DeviceInfo::create( String::from(driver), - units.clone(), + units, Some(tx_sets), )); @@ -270,7 +270,7 @@ impl Store for SimpleStore { async fn get_device_info( &mut self, - pattern: &Option, + pattern: Option<&str>, ) -> Result> { let pred: Box bool> = if let Some(pattern) = pattern { @@ -489,7 +489,7 @@ mod tests { let name = "test:device".parse::().unwrap(); if let Ok((f, None)) = db - .register_read_only_device("test", &name, &None, &None) + .register_read_only_device("test", &name, None, None) .await { // Test that priming the history with one value returns @@ -573,7 +573,7 @@ mod tests { let name = "test:device".parse::().unwrap(); if let Ok((f, None)) = db - .register_read_only_device("test", &name, &None, &None) + .register_read_only_device("test", &name, None, None) .await { // Verify that monitoring device, starting now, picks up @@ -660,7 +660,7 @@ mod tests { let name = "test:device".parse::().unwrap(); if let Ok((f, None)) = db - .register_read_only_device("test", &name, &None, &None) + .register_read_only_device("test", &name, None, None) .await { // Verify that, if the latest point is before the starting @@ -704,7 +704,7 @@ mod tests { let name = "test:device".parse::().unwrap(); if let Ok((f, None)) = db - .register_read_only_device("test", &name, &None, &None) + .register_read_only_device("test", &name, None, None) .await { // Verify that, if both times are before the data, nothing @@ -893,7 +893,7 @@ mod tests { // driver named "test". We don't define units for this device. if let Ok((f, None)) = db - .register_read_only_device("test", &name, &None, &None) + .register_read_only_device("test", &name, None, None) .await { // Make sure the device was defined and the setting @@ -920,7 +920,7 @@ mod tests { // driver name results in an error. assert!(db - .register_read_only_device("test2", &name, &None, &None) + .register_read_only_device("test2", &name, None, None) .await .is_err()); @@ -928,7 +928,7 @@ mod tests { // driver name is successful. if let Ok((f, Some(device::Value::Int(1)))) = db - .register_read_only_device("test", &name, &None, &None) + .register_read_only_device("test", &name, None, None) .await { // Also, verify that the device update channel wasn't @@ -954,7 +954,7 @@ mod tests { // driver named "test". We don't define units for this device. if let Ok((f, mut set_chan, None)) = db - .register_read_write_device("test", &name, &None, &None) + .register_read_write_device("test", &name, None, None) .await { // Make sure the device was defined and a setting channel @@ -1002,7 +1002,7 @@ mod tests { // didn't affect the setting channel. assert!(db - .register_read_only_device("test2", &name, &None, &None) + .register_read_only_device("test2", &name, None, None) .await .is_err()); assert_eq!( @@ -1014,7 +1014,7 @@ mod tests { // driver name is successful. if let Ok((f, _, Some(device::Value::Int(1)))) = db - .register_read_write_device("test", &name, &None, &None) + .register_read_write_device("test", &name, None, None) .await { assert_eq!( diff --git a/drivers/drmem-drv-sump/src/lib.rs b/drivers/drmem-drv-sump/src/lib.rs index 7a505e6..d64ed58 100644 --- a/drivers/drmem-drv-sump/src/lib.rs +++ b/drivers/drmem-drv-sump/src/lib.rs @@ -166,19 +166,22 @@ impl Instance { pub const DESCRIPTION: &'static str = include_str!("../README.md"); - fn elapsed(dur: u64) -> String { - match (dur + 500) / 1000 { - dur if dur >= 3600 * 24 => { + fn elapsed(millis: u64) -> String { + match (millis + 500) / 1000 { + dur if dur >= 3600 * 24 - 30 => { + let dur = dur + 30; + format!( - "{}d{}h{}m{}s", + "{}d{}h{}m", dur / (3600 * 24), (dur / 3600) % 24, - (dur / 60) % 60, - dur % 60 + (dur / 60) % 60 ) } - dur if dur >= 3600 => { - format!("{}h{}m{}s", dur / 3600, (dur / 60) % 60, dur % 60) + dur if dur >= 3570 => { + let dur = dur + 30; + + format!("{}h{}m", dur / 3600, (dur / 60) % 60) } dur if dur >= 60 => { format!("{}m{}s", dur / 60, dur % 60) @@ -497,4 +500,22 @@ mod tests { assert_eq!(state.off_event(60000, 60.0), Some((60000, 10.0, 6.0))); assert_eq!(state, State::Off { off_time: 60000 }); } + + #[test] + fn test_elapsed() { + assert_eq!(Instance::elapsed(0), "0s"); + assert_eq!(Instance::elapsed(1000), "1s"); + assert_eq!(Instance::elapsed(59000), "59s"); + assert_eq!(Instance::elapsed(60000), "1m0s"); + + assert_eq!(Instance::elapsed(3569000), "59m29s"); + assert_eq!(Instance::elapsed(3570000), "1h0m"); + assert_eq!(Instance::elapsed(3599000), "1h0m"); + assert_eq!(Instance::elapsed(3600000), "1h0m"); + + assert_eq!(Instance::elapsed(3600000 * 24 - 31000), "23h59m"); + assert_eq!(Instance::elapsed(3600000 * 24 - 30000), "1d0h0m"); + assert_eq!(Instance::elapsed(3600000 * 24 - 1000), "1d0h0m"); + assert_eq!(Instance::elapsed(3600000 * 24), "1d0h0m"); + } } diff --git a/drivers/drmem-drv-tplink/src/lib.rs b/drivers/drmem-drv-tplink/src/lib.rs index e00eed9..0dace64 100644 --- a/drivers/drmem-drv-tplink/src/lib.rs +++ b/drivers/drmem-drv-tplink/src/lib.rs @@ -34,7 +34,7 @@ use drmem_api::{ driver::{self, DriverConfig}, Error, Result, }; -use futures::{Future, StreamExt}; +use futures::{Future, FutureExt, StreamExt}; use std::net::SocketAddrV4; use std::sync::Arc; use std::{convert::Infallible, pin::Pin}; @@ -138,8 +138,17 @@ impl Instance { |e| Error::MissingPeer(e.to_string()); let out_buf = cmd.encode(); - s.write_all(&out_buf[..]).await.map_err(ERR_F)?; - s.flush().await.map_err(ERR_F) + #[rustfmt::skip] + tokio::select! { + result = s.write_all(&out_buf[..]) => { + match result { + Ok(_) => s.flush().await.map_err(ERR_F), + Err(e) => Err(ERR_F(e)) + } + } + _ = time::sleep(time::Duration::from_millis(500)) => + Err(Error::TimeoutError) + } } // Performs an "RPC" call to the device; it sends the command and @@ -155,23 +164,21 @@ impl Instance { R: AsyncReadExt + std::marker::Unpin, S: AsyncWriteExt + std::marker::Unpin, { - // Wrap the transaction in an async block and wrap the block - // in a future that expects it to complete in 1/2 s. - - let fut = time::timeout(time::Duration::from_millis(500), async { - let res = tokio::try_join!( - Instance::send_cmd(tx, cmd), - self.read_reply(rx) - ); - - res.map(|(_, reply)| reply) - }); - - if let Ok(v) = fut.await { - v - } else { - Err(Error::TimeoutError) - } + Instance::send_cmd(tx, cmd) + .then(|res| async { + match res { + Ok(()) => { + #[rustfmt::skip] + tokio::select! { + result = self.read_reply(rx) => result, + _ = time::sleep(time::Duration::from_millis(500)) => + Err(Error::TimeoutError) + } + } + Err(e) => Err(e), + } + }) + .await } // Sets the relay state on or off, depending on the argument. @@ -370,6 +377,10 @@ impl Instance { v => v, }; + // Send an OK reply to the client with the updated value. + + reply(Ok(v)); + // Always log incoming settings. Let the client know there // was a successful setting, and include the value that // was used. @@ -377,12 +388,10 @@ impl Instance { match self.set_brightness(s, v).await { Ok(()) => { report(v).await; - reply(Ok(v)); Ok(Some(v)) } Err(e) => { error!("setting brightness : {}", &e); - reply(Err(e.clone())); Err(e) } } @@ -401,15 +410,14 @@ impl Instance { reply: driver::SettingReply, report: &'a driver::ReportReading, ) -> Result<()> { + reply(Ok(v)); match self.led_state_rpc(s, v).await { Ok(()) => { report(v).await; - reply(Ok(v)); Ok(()) } Err(e) => { error!("setting LED : {}", &e); - reply(Err(e.clone())); Err(e) } } @@ -618,8 +626,8 @@ impl driver::API for Instance { let fut = async move { // Lock the mutex for the life of the driver. There is no // other task that wants access to these device handles. - // An Arc> is the other way I know of passing a - // mutable value to async tasks. + // An Arc> is the only way I know of sharing a + // mutable value with async tasks. let mut devices = devices.lock().await; diff --git a/drmem-api/src/lib.rs b/drmem-api/src/lib.rs index 052a122..7a75328 100644 --- a/drmem-api/src/lib.rs +++ b/drmem-api/src/lib.rs @@ -52,8 +52,8 @@ pub trait Store { &mut self, driver: &str, name: &types::device::Name, - units: &Option, - max_history: &Option, + units: Option<&String>, + max_history: Option, ) -> Result<( driver::ReportReading, Option, @@ -86,8 +86,8 @@ pub trait Store { &mut self, driver: &str, name: &types::device::Name, - units: &Option, - max_history: &Option, + units: Option<&String>, + max_history: Option, ) -> Result<( driver::ReportReading, driver::RxDeviceSetting, @@ -104,7 +104,7 @@ pub trait Store { async fn get_device_info( &mut self, - pattern: &Option, + pattern: Option<&str>, ) -> Result>; /// Sends a request to a driver to set its device to the specified diff --git a/drmemd/src/config.rs b/drmemd/src/config.rs index eb2b883..6f7c7ed 100644 --- a/drmemd/src/config.rs +++ b/drmemd/src/config.rs @@ -267,7 +267,6 @@ pub async fn get() -> Option { #[cfg(test)] mod tests { use super::*; - use std::net::Ipv4Addr; #[test] fn test_config() { @@ -333,6 +332,8 @@ log_level = "warn" #[cfg(feature = "graphql")] #[test] fn test_graphql_config() { + use std::net::Ipv4Addr; + match toml::from_str::( r#" latitude = -45.0 diff --git a/drmemd/src/core/mod.rs b/drmemd/src/core/mod.rs index 9eef70f..4517066 100644 --- a/drmemd/src/core/mod.rs +++ b/drmemd/src/core/mod.rs @@ -29,7 +29,7 @@ impl State { ref driver_name, ref dev_name, ref dev_units, - ref max_history, + max_history, rpy_chan, } => { let result = self @@ -37,7 +37,7 @@ impl State { .register_read_only_device( driver_name, dev_name, - dev_units, + dev_units.as_ref(), max_history, ) .await @@ -52,7 +52,7 @@ impl State { ref driver_name, ref dev_name, ref dev_units, - ref max_history, + max_history, rpy_chan, } => { let result = self @@ -60,7 +60,7 @@ impl State { .register_read_write_device( driver_name, dev_name, - dev_units, + dev_units.as_ref(), max_history, ) .await @@ -75,11 +75,11 @@ impl State { async fn handle_client_request(&mut self, req: client::Request) { match req { - client::Request::QueryDeviceInfo { - ref pattern, - rpy_chan, - } => { - let result = self.backend.get_device_info(pattern).await; + client::Request::QueryDeviceInfo { pattern, rpy_chan } => { + let result = self + .backend + .get_device_info(pattern.as_ref().map(|x| x.as_str())) + .await; if let Err(ref e) = result { info!("get_device_info() returned '{}'", e); diff --git a/drmemd/src/driver/drv_cycle.rs b/drmemd/src/driver/drv_cycle.rs index eeae516..0e02245 100644 --- a/drmemd/src/driver/drv_cycle.rs +++ b/drmemd/src/driver/drv_cycle.rs @@ -287,7 +287,8 @@ impl driver::API for Instance { // set the output to the inactive value. if let Some(v) = self.time_expired() { - debug!("state {:?} : timeout occurred -- output {}", &self.state, v); + debug!("state {:?} : timeout occurred -- output {}", + &self.state, v); (devices.d_output)(v).await; } } @@ -308,7 +309,8 @@ impl driver::API for Instance { reply(Ok(b)); - debug!("state {:?} : new input -> {}", &self.state, b); + debug!("state {:?} : new input -> {}", + &self.state, b); (devices.d_enable)(b).await; diff --git a/drmemd/src/graphql/mod.rs b/drmemd/src/graphql/mod.rs index 8109b2c..df399ff 100644 --- a/drmemd/src/graphql/mod.rs +++ b/drmemd/src/graphql/mod.rs @@ -116,8 +116,8 @@ impl DeviceInfo { } #[graphql(description = "The engineering units of the device's value.")] - fn units(&self) -> &Option { - &self.units + fn units(&self) -> Option<&String> { + self.units.as_ref() } #[graphql( diff --git a/drmemd/src/logic/mod.rs b/drmemd/src/logic/mod.rs index fcb6223..d90ac9d 100644 --- a/drmemd/src/logic/mod.rs +++ b/drmemd/src/logic/mod.rs @@ -192,7 +192,7 @@ impl Node { // Runs the node logic. This method should never return. - async fn run(&mut self) -> Result { + async fn run(mut self) -> Result { info!("starting"); // Wait for the next reading to arrive. All the incoming @@ -240,7 +240,7 @@ impl Node { // Create a new instance and let it initialize itself. If an // error occurs, return it. - let mut node = Node::init(c_req, cfg) + let node = Node::init(c_req, cfg) .instrument(info_span!("logic-init", name = &name)) .await?;