diff --git a/kuksa_databroker/databroker/src/broker.rs b/kuksa_databroker/databroker/src/broker.rs index c7d8847a..dbf9f102 100644 --- a/kuksa_databroker/databroker/src/broker.rs +++ b/kuksa_databroker/databroker/src/broker.rs @@ -26,10 +26,10 @@ use std::convert::TryFrom; use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::Arc; use std::time::SystemTime; -use crate::query::{CompiledQuery, ExecutionInput}; -use tracing::{debug, info, warn}; +use crate::query::{CompiledQuery, ExecutionInput}; use crate::types::ExecutionInputImplData; +use tracing::{debug, info, warn}; use crate::glob; @@ -611,11 +611,11 @@ impl Subscriptions { let mut lag_updates: HashMap = HashMap::new(); for sub in &self.query_subscriptions { match sub.notify(changed, db).await { - Ok(None) => {}, + Ok(None) => {} Ok(Some(input)) => { for x in input.get_fields() { if x.1.lag_value != x.1.value { - if ! lag_updates.contains_key(x.0) { + if !lag_updates.contains_key(x.0) { lag_updates.insert(x.0.clone(), ()); } } @@ -640,7 +640,7 @@ impl Subscriptions { } else { Ok(None) } - }, + } } } @@ -785,21 +785,27 @@ impl QuerySubscription { &self, name: &String, db: &DatabaseReadAccess, - input: &mut query::ExecutionInputImpl) { + input: &mut query::ExecutionInputImpl, + ) { match db.get_entry_by_path(name) { Ok(entry) => { - input.add(name.to_owned(), ExecutionInputImplData{ - value: entry.datapoint.value.to_owned(), - lag_value: entry.lag_datapoint.value.to_owned() - }); - - }, + input.add( + name.to_owned(), + ExecutionInputImplData { + value: entry.datapoint.value.to_owned(), + lag_value: entry.lag_datapoint.value.to_owned(), + }, + ); + } Err(_) => { // TODO: This should probably generate an error - input.add(name.to_owned(), ExecutionInputImplData { - value: DataValue::NotAvailable, - lag_value: DataValue::NotAvailable - }) + input.add( + name.to_owned(), + ExecutionInputImplData { + value: DataValue::NotAvailable, + lag_value: DataValue::NotAvailable, + }, + ) } } } @@ -830,7 +836,7 @@ impl QuerySubscription { } None => { // Always generate input if `changed` is None - return true + return true; } } false @@ -839,7 +845,7 @@ impl QuerySubscription { &self, query: &CompiledQuery, db: &DatabaseReadAccess, - input: &mut query::ExecutionInputImpl + input: &mut query::ExecutionInputImpl, ) { for name in query.input_spec.iter() { self.find_in_db_and_add(name, db, input); @@ -1030,14 +1036,12 @@ impl<'a, 'b> DatabaseWriteAccess<'a, 'b> { pub fn update_entry_lag_to_be_equal(&mut self, path: &str) -> Result<(), UpdateError> { match self.db.path_to_id.get(path) { - Some(id) => { - match self.db.entries.get_mut(&id) { - Some(entry) => { - entry.apply_lag_after_execute(); - Ok(()) - } - None => Err(UpdateError::NotFound), + Some(id) => match self.db.entries.get_mut(&id) { + Some(entry) => { + entry.apply_lag_after_execute(); + Ok(()) } + None => Err(UpdateError::NotFound), }, None => Err(UpdateError::NotFound), } @@ -1389,7 +1393,6 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { // notifying subscribers (no writes in between) let db = db.downgrade(); - // Notify match self .broker @@ -1403,7 +1406,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { Ok(Some(lag_updates_)) => { lag_updates = lag_updates_.clone(); false - }, + } Err(_) => true, // Cleanup needed } }; @@ -1413,8 +1416,8 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { let mut db_write = db.authorized_write_access(self.permissions); for x in lag_updates { match db_write.update_entry_lag_to_be_equal(x.0.as_str()) { - Ok(_) => {}, - Err(_) => {}, + Ok(_) => {} + Err(_) => {} }; } } diff --git a/kuksa_databroker/databroker/src/query/compiler.rs b/kuksa_databroker/databroker/src/query/compiler.rs index 50134c78..f6136191 100644 --- a/kuksa_databroker/databroker/src/query/compiler.rs +++ b/kuksa_databroker/databroker/src/query/compiler.rs @@ -153,7 +153,7 @@ pub fn compile_expr( data_type, lag: false, }) - }, + } ast::Expr::Function(f) => { let name = &f.name.to_string(); return if name == "LAG" { @@ -164,31 +164,33 @@ pub fn compile_expr( ast::FunctionArgExpr::Expr(e) => { let function_expr = compile_expr(e, input, output)?; return match function_expr { - Expr::Datapoint { name, data_type, .. } => Ok(Expr::Datapoint { + Expr::Datapoint { + name, data_type, .. + } => Ok(Expr::Datapoint { name, data_type, - lag: true + lag: true, }), _ => Err(CompilationError::ParseError(format!( "Unable to create lag datapoint" - ))) + ))), }; } _ => Err(CompilationError::UnsupportedOperator(format!( "Unsupported function argument expression" - ))) + ))), } - }, + } _ => Err(CompilationError::UnsupportedOperator(format!( "Unsupported function argument" - ))) + ))), } } else { Err(CompilationError::UnsupportedOperator(format!( "Unsupported operator \"{name}\"" ))) }; - }, + } ast::Expr::BinaryOp { ref left, @@ -352,20 +354,20 @@ pub fn compile_expr( Ok(compiled_query) => { output.subquery.push(compiled_query); Ok(Expr::Subquery { - index: (output.subquery.len() - 1) as u32 + index: (output.subquery.len() - 1) as u32, }) - }, + } _ => Err(CompilationError::UnsupportedOperator(format!( "Subquery failed to compile query" - ))) + ))), } } else { Err(CompilationError::UnsupportedOperation( "Subquery to parse".to_string(), )) } - }, + } ast::Expr::UnaryOp { ref op, ref expr } => match op { ast::UnaryOperator::Not => Ok(Expr::UnaryOperation { @@ -475,7 +477,7 @@ fn resolve_literal( fn compile_select_statement( select: &ast::Select, - input: &impl CompilationInput + input: &impl CompilationInput, ) -> Result { let mut query = CompiledQuery::new(); @@ -486,8 +488,7 @@ fn compile_select_statement( if let Ok(data_type) = condition.get_type() { if data_type != DataType::Bool { return Err(CompilationError::TypeError( - "WHERE statement doesn't evaluate to a boolean expression" - .to_string(), + "WHERE statement doesn't evaluate to a boolean expression".to_string(), )); } } diff --git a/kuksa_databroker/databroker/src/query/executor.rs b/kuksa_databroker/databroker/src/query/executor.rs index e8f45b26..6c5e6b44 100644 --- a/kuksa_databroker/databroker/src/query/executor.rs +++ b/kuksa_databroker/databroker/src/query/executor.rs @@ -40,7 +40,8 @@ impl CompiledQuery { fn execute_internal( &self, query: &CompiledQuery, - input: &impl ExecutionInput) -> Result>, ExecutionError> { + input: &impl ExecutionInput, + ) -> Result>, ExecutionError> { // Check condition let condition_fulfilled = match &query.selection { Some(condition) => match condition.execute(input) { @@ -65,15 +66,18 @@ impl CompiledQuery { let mut is_subquery = false; for (index, e) in query.projection.iter().enumerate() { let expr_info = match e { - Expr::Datapoint { name, data_type: _, .. } => NameAndData { + Expr::Datapoint { + name, data_type: _, .. + } => NameAndData { name: name.clone(), - data: None + data: None, }, Expr::Alias { alias, expr } => { match expr.as_ref() { Expr::Subquery { index } => { is_subquery = true; - match self.execute_internal(&query.subquery[*index as usize], input) { + match self.execute_internal(&query.subquery[*index as usize], input) + { Ok(f) => match f { None => NameAndData { name: alias.clone(), @@ -81,8 +85,8 @@ impl CompiledQuery { }, Some(vec) => NameAndData { name: alias.clone(), - data: Some(vec) - } + data: Some(vec), + }, }, Err(_) => { // Don't be rude and just return None @@ -92,10 +96,13 @@ impl CompiledQuery { } } } + } + _ => NameAndData { + name: alias.clone(), + data: None, }, - _ => NameAndData { name: alias.clone(), data: None } } - }, + } Expr::Subquery { index } => { is_subquery = true; match self.execute_internal(&query.subquery[*index as usize], input) { @@ -106,8 +113,8 @@ impl CompiledQuery { }, Some(vec) => NameAndData { name: format!("subquery_{index}"), - data: Some(vec) - } + data: Some(vec), + }, }, Err(_) => { // Don't be rude and just return None @@ -117,23 +124,23 @@ impl CompiledQuery { } } } - }, + } _ => NameAndData { name: format!("field_{index}"), - data: None + data: None, }, }; match expr_info.data { None => match e.execute(input) { Ok(value) => { - if ! is_subquery { + if !is_subquery { fields.push((expr_info.name, value)) } - }, + } Err(e) => return Err(e), - } - Some(mut vec) => fields.append(&mut vec) + }, + Some(mut vec) => fields.append(&mut vec), } } if fields.len() > 0 { @@ -157,14 +164,18 @@ impl CompiledQuery { impl Expr { pub fn execute(&self, input: &impl ExecutionInput) -> Result { match &self { - Expr::Datapoint { name, data_type: _, lag } => { + Expr::Datapoint { + name, + data_type: _, + lag, + } => { let field = input.lookup(name); if *lag { Ok(field.lag_value.clone()) } else { Ok(field.value.clone()) } - }, + } Expr::Alias { expr, .. } => expr.execute(input), Expr::BinaryOperation { left, @@ -196,8 +207,8 @@ impl Expr { Err(ExecutionError::TypeError( "Unresolved literal found while executing query".to_string(), )) - }, - Expr::Subquery { index } => Ok(DataValue::Uint32(index.clone())) + } + Expr::Subquery { index } => Ok(DataValue::Uint32(index.clone())), } } } @@ -376,7 +387,7 @@ impl ExecutionInput for ExecutionInputImpl { Some(value) => value, None => &ExecutionInputImplData { value: DataValue::NotAvailable, - lag_value: DataValue::NotAvailable + lag_value: DataValue::NotAvailable, }, } } @@ -449,18 +460,27 @@ fn executor_test() { println!("EXECUTE"); let mut execution_input1 = ExecutionInputImpl::new(); - execution_input1.add("Vehicle.Cabin.Seat.Row1.Pos1.Position".parse().unwrap(), ExecutionInputImplData{ - value: DataValue::Int32(230), - lag_value: DataValue::NotAvailable - }); - execution_input1.add("Vehicle.Datapoint1".parse().unwrap(), ExecutionInputImplData { - value: DataValue::Int32(61), - lag_value: DataValue::NotAvailable - }); - execution_input1.add("Vehicle.Datapoint2".parse().unwrap(), ExecutionInputImplData { - value: DataValue::Bool(true), - lag_value: DataValue::NotAvailable - }); + execution_input1.add( + "Vehicle.Cabin.Seat.Row1.Pos1.Position".to_string(), + ExecutionInputImplData { + value: DataValue::Int32(230), + lag_value: DataValue::NotAvailable, + }, + ); + execution_input1.add( + "Vehicle.Datapoint1".to_string(), + ExecutionInputImplData { + value: DataValue::Int32(61), + lag_value: DataValue::NotAvailable, + }, + ); + execution_input1.add( + "Vehicle.Datapoint2".to_string(), + ExecutionInputImplData { + value: DataValue::Bool(true), + lag_value: DataValue::NotAvailable, + }, + ); let res = compiled_query.execute(&execution_input1).unwrap(); println!("RESULT: "); @@ -474,22 +494,31 @@ fn executor_test() { DataValue::NotAvailable, ), ]; - assert_expected(res,&expected); + assert_expected(res, &expected); println!("EXECUTE"); let mut execution_input1 = ExecutionInputImpl::new(); - execution_input1.add("Vehicle.Cabin.Seat.Row1.Pos1.Position".to_string(), ExecutionInputImplData{ - value: DataValue::Int32(230), - lag_value: DataValue::NotAvailable - }); - execution_input1.add("Vehicle.Datapoint1".parse().unwrap(), ExecutionInputImplData { - value: DataValue::Int32(40), - lag_value: DataValue::NotAvailable - }); - execution_input1.add("Vehicle.Datapoint2".parse().unwrap(), ExecutionInputImplData { - value: DataValue::Bool(true), - lag_value: DataValue::NotAvailable - }); + execution_input1.add( + "Vehicle.Cabin.Seat.Row1.Pos1.Position".to_string(), + ExecutionInputImplData { + value: DataValue::Int32(230), + lag_value: DataValue::NotAvailable, + }, + ); + execution_input1.add( + "Vehicle.Datapoint1".to_string(), + ExecutionInputImplData { + value: DataValue::Int32(40), + lag_value: DataValue::NotAvailable, + }, + ); + execution_input1.add( + "Vehicle.Datapoint2".to_string(), + ExecutionInputImplData { + value: DataValue::Bool(true), + lag_value: DataValue::NotAvailable, + }, + ); let res = compiled_query.execute(&execution_input1).unwrap(); assert!(matches!(res, None)); @@ -519,10 +548,13 @@ fn executor_lag_test() { println!("EXECUTE"); let mut execution_input1 = ExecutionInputImpl::new(); - execution_input1.add("Vehicle.Cabin.Seat.Row1.Pos1.Position".parse().unwrap(), ExecutionInputImplData{ - value: DataValue::Int32(230), - lag_value: DataValue::Int32(231) - }); + execution_input1.add( + "Vehicle.Cabin.Seat.Row1.Pos1.Position".to_string(), + ExecutionInputImplData { + value: DataValue::Int32(230), + lag_value: DataValue::Int32(231), + }, + ); let res = compiled_query.execute(&execution_input1).unwrap(); assert!(matches!(res, Some(_))); let expected = vec![ @@ -535,7 +567,7 @@ fn executor_lag_test() { DataValue::Int32(231), ), ]; - assert_expected(res,&expected); + assert_expected(res, &expected); } #[test] @@ -549,13 +581,18 @@ fn executor_lag_subquery_test() { let compiled_query = compiler::compile(sql, &test_compilation_input).unwrap(); assert_eq!(compiled_query.subquery.len(), 2); if let Some(subquery) = compiled_query.subquery.get(0) { - assert!(subquery.input_spec.contains("Vehicle.Cabin.Seat.Row1.Pos1.Position")); + assert!(subquery + .input_spec + .contains("Vehicle.Cabin.Seat.Row1.Pos1.Position")); } let mut execution_input1 = ExecutionInputImpl::new(); - execution_input1.add("Vehicle.Cabin.Seat.Row1.Pos1.Position".parse().unwrap(), ExecutionInputImplData{ - value: DataValue::Int32(230), - lag_value: DataValue::Int32(231) - }); + execution_input1.add( + "Vehicle.Cabin.Seat.Row1.Pos1.Position".to_string(), + ExecutionInputImplData { + value: DataValue::Int32(230), + lag_value: DataValue::Int32(231), + }, + ); let res = compiled_query.execute(&execution_input1).unwrap(); assert!(matches!(res, Some(_))); let expected = vec![ @@ -568,7 +605,7 @@ fn executor_lag_subquery_test() { DataValue::Int32(231), ), ]; - assert_expected(res,&expected); + assert_expected(res, &expected); } #[test] @@ -583,25 +620,29 @@ fn executor_where_lag_subquery_test() { let test_compilation_input = TestCompilationInput {}; let compiled_query = compiler::compile(sql, &test_compilation_input).unwrap(); let mut execution_input1 = ExecutionInputImpl::new(); - execution_input1.add("Vehicle.Cabin.Seat.Row1.Pos1.Position".parse().unwrap(), ExecutionInputImplData{ - value: DataValue::Int32(230), - lag_value: DataValue::NotAvailable - }); + execution_input1.add( + "Vehicle.Cabin.Seat.Row1.Pos1.Position".to_string(), + ExecutionInputImplData { + value: DataValue::Int32(230), + lag_value: DataValue::NotAvailable, + }, + ); let res = compiled_query.execute(&execution_input1).unwrap(); assert!(matches!(res, Some(_))); - let expected = vec![ - ( - "Vehicle.Cabin.Seat.Row1.Pos1.Position".to_owned(), - DataValue::Int32(230), - ), - ]; - assert_expected(res,&expected); + let expected = vec![( + "Vehicle.Cabin.Seat.Row1.Pos1.Position".to_owned(), + DataValue::Int32(230), + )]; + assert_expected(res, &expected); let mut execution_input1 = ExecutionInputImpl::new(); - execution_input1.add("Vehicle.Cabin.Seat.Row1.Pos1.Position".parse().unwrap(), ExecutionInputImplData{ - value: DataValue::Int32(230), - lag_value: DataValue::Int32(230) - }); + execution_input1.add( + "Vehicle.Cabin.Seat.Row1.Pos1.Position".to_string(), + ExecutionInputImplData { + value: DataValue::Int32(230), + lag_value: DataValue::Int32(230), + }, + ); let res = compiled_query.execute(&execution_input1).unwrap(); assert!(matches!(res, None)); } diff --git a/kuksa_databroker/databroker/src/query/expr.rs b/kuksa_databroker/databroker/src/query/expr.rs index 92d28765..1e97028e 100644 --- a/kuksa_databroker/databroker/src/query/expr.rs +++ b/kuksa_databroker/databroker/src/query/expr.rs @@ -18,7 +18,7 @@ pub enum Expr { Datapoint { name: String, data_type: DataType, - lag: bool + lag: bool, }, Alias { expr: Box, @@ -45,7 +45,7 @@ pub enum Expr { operator: UnaryOperator, }, Subquery { - index: u32 + index: u32, }, Between { expr: Box, @@ -80,7 +80,9 @@ pub enum UnresolvedLiteral { impl Expr { pub fn get_type(&self) -> Result { match self { - Expr::Datapoint { name: _, data_type, .. } => Ok(data_type.clone()), + Expr::Datapoint { + name: _, data_type, .. + } => Ok(data_type.clone()), Expr::Alias { expr, alias: _ } => expr.get_type(), Expr::Cast { expr: _, data_type } => Ok(data_type.clone()), Expr::UnresolvedLiteral { raw } => Err(UnresolvedLiteral::Number(raw.clone())), diff --git a/kuksa_databroker/databroker/src/types.rs b/kuksa_databroker/databroker/src/types.rs index 106b5147..6d9241fd 100644 --- a/kuksa_databroker/databroker/src/types.rs +++ b/kuksa_databroker/databroker/src/types.rs @@ -76,8 +76,6 @@ pub enum DataValue { DoubleArray(Vec), } - - #[derive(Debug)] pub struct CastError {} @@ -555,22 +553,18 @@ impl DataValue { // TODO: Implement better floating point comparison Ok((value - other_value).abs() < f64::EPSILON) } - (DataValue::NotAvailable, DataValue::Int32(..)) | - (DataValue::NotAvailable, DataValue::Int64(..)) | - (DataValue::NotAvailable, DataValue::Uint32(..)) | - (DataValue::NotAvailable, DataValue::Uint64(..)) | - (DataValue::NotAvailable, DataValue::Float(..)) | - (DataValue::NotAvailable, DataValue::Double(..)) => { - Ok(false) - } - (DataValue::Int32(..), DataValue::NotAvailable) | - (DataValue::Int64(..), DataValue::NotAvailable) | - (DataValue::Uint32(..), DataValue::NotAvailable) | - (DataValue::Uint64(..), DataValue::NotAvailable) | - (DataValue::Float(..), DataValue::NotAvailable) | - (DataValue::Double(..), DataValue::NotAvailable) => { - Ok(false) - } + (DataValue::NotAvailable, DataValue::Int32(..)) + | (DataValue::NotAvailable, DataValue::Int64(..)) + | (DataValue::NotAvailable, DataValue::Uint32(..)) + | (DataValue::NotAvailable, DataValue::Uint64(..)) + | (DataValue::NotAvailable, DataValue::Float(..)) + | (DataValue::NotAvailable, DataValue::Double(..)) => Ok(false), + (DataValue::Int32(..), DataValue::NotAvailable) + | (DataValue::Int64(..), DataValue::NotAvailable) + | (DataValue::Uint32(..), DataValue::NotAvailable) + | (DataValue::Uint64(..), DataValue::NotAvailable) + | (DataValue::Float(..), DataValue::NotAvailable) + | (DataValue::Double(..), DataValue::NotAvailable) => Ok(false), _ => Err(CastError {}), } } @@ -579,7 +573,7 @@ impl DataValue { #[derive(Debug)] pub struct ExecutionInputImplData { pub value: DataValue, - pub lag_value: DataValue + pub lag_value: DataValue, } #[test]