From 76b6df35b2f0c3f519e1f59de8874b9cdc8c4d6c Mon Sep 17 00:00:00 2001 From: Sebastian Schildt Date: Sat, 12 Oct 2024 19:12:20 +0200 Subject: [PATCH] Adding a simple persistence provider Signed-off-by: Sebastian Schildt --- kuksa-persistence-provider/Cargo.toml | 25 + kuksa-persistence-provider/Readme.md | 71 +++ kuksa-persistence-provider/config.json | 25 + .../src/kuksaconnector.rs | 458 ++++++++++++++++++ kuksa-persistence-provider/src/main.rs | 156 ++++++ kuksa-persistence-provider/src/storage.rs | 31 ++ .../src/storage/filestorage.rs | 116 +++++ kuksa-persistence-provider/statestore.json | 20 + 8 files changed, 902 insertions(+) create mode 100644 kuksa-persistence-provider/Cargo.toml create mode 100644 kuksa-persistence-provider/Readme.md create mode 100644 kuksa-persistence-provider/config.json create mode 100644 kuksa-persistence-provider/src/kuksaconnector.rs create mode 100644 kuksa-persistence-provider/src/main.rs create mode 100644 kuksa-persistence-provider/src/storage.rs create mode 100644 kuksa-persistence-provider/src/storage/filestorage.rs create mode 100644 kuksa-persistence-provider/statestore.json diff --git a/kuksa-persistence-provider/Cargo.toml b/kuksa-persistence-provider/Cargo.toml new file mode 100644 index 0000000..9b6d62b --- /dev/null +++ b/kuksa-persistence-provider/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "kuksa-persistence-provider" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +clap = { version = "4.5.20", features = ["derive"] } +tinyjson = "2.5.1" +kuksa = { git = "https://github.com/eclipse-kuksa/kuksa-databroker.git", branch = "feature/refactor_kuksa_crate", package = "kuksa" } +log = "0.4.22" +env_logger = "0.11.5" +regex = "1.11.0" +#Only this version is compatible with kuksa crate. 0.13 is not +prost-types = "0.11.9" +tokio = { version="1.40.0", features = ["full"] } + +#Still broken due to closed repo +#djson = { git="https://github.com/dependix/platform.git" , optional = true } +djson = { path="platform/modules/json-al/djson_rust/" , optional = true } + +[features] +# use djson library +djson = [ "dep:djson" ] diff --git a/kuksa-persistence-provider/Readme.md b/kuksa-persistence-provider/Readme.md new file mode 100644 index 0000000..dedd097 --- /dev/null +++ b/kuksa-persistence-provider/Readme.md @@ -0,0 +1,71 @@ +# KUKSA Persistence Provider + +All data in KUKSA is ephemereal. However, in a car there is often data that does not change over the lifetime of a vehicle, and data where you want changes to be persisted over ignition cycles. + +This provider can achieve this. It can restore certain values upon startup, either sensor (current) values, or actuations. + +An example for one-time restoration of current values are attributes that are maybe not set in a default VSS model deployed to ALL cars of a specific variant, but nevertheless are constant of a specific car, such as the VIN or the Vehicle Color. + +This provider can also watch (subscribe) certain current or actuation values. This is useful when interacting with components that do not provide their own persistence management. Assume a climate control UI that can react on unser input and interact with the HVAC system, but is otherwise stateless. By watching and restoring the desired target temperature, the user's preference is saved and restored, without the HVAC UI needing any specific code to handle this. + +## Configuration: config,json + +Main configuration is in config.json, and example may look like this + +```json +{ + "restore-only": { + "values": [ + "Vehicle.VehicleIdentification.VIN", + "Vehicle.VehicleIdentification.VehicleInteriorColor" + ], + "actuators": [ + "Vehicle.Cabin.Infotainment.HMI.TemperatureUnit" + ] + }, + + "restore-and-watch": { + "values": [ + "Vehicle.Cabin.Infotainment.HMI.TemperatureUnit", + "Vehicle.Cabin.HVAC.Station.Row4.Passenger.FanSpeed" + ], + "actuators": [ + "Vehicle.Cabin.Infotainment.Media.Volume" + ] + }, + "state-storage": { + "type": "file", + "path": "statestore.json" + } +} +``` + +## restore-only section + +These elements will be restored from the state store upon startup, but their values will not be watched and updated for changes. You can define whether the current values (values) will be restored or whether a target value is set (actuators). + +## restore-and-watch + +These elements will be restored from the state store upon startup. It is the intention to also monitor their state and update it in the state store. You can define whether the current values (values) will be restored and watched or whether a target value is set (actuators) and watched. As restore-and-watch includes restore, there is no need to add paths in restore-and-watch to restore-only as well. + +## state-storage + +Configures the state sotrage used to retrieve values. Currently supported: file + +## File storage: statestore.json + +This is a valid state store for the file storage. +*Note: ALl VALUES NEED TO BE STORED AS STRING*. +As the statestore does not make a difference between current and target (actuation) values it is currently not possible to watch or restore both for a single VSS path. + +```json + +{ + "Vehicle.VehicleIdentification.VIN": { + "value": "DEADBEEF" + }, + "Vehicle.VehicleIdentification.VehicleInteriorColor": { + "value": "Black" + } +} +``` diff --git a/kuksa-persistence-provider/config.json b/kuksa-persistence-provider/config.json new file mode 100644 index 0000000..28cda7d --- /dev/null +++ b/kuksa-persistence-provider/config.json @@ -0,0 +1,25 @@ +{ + "restore-only": { + "values": [ + "Vehicle.VehicleIdentification.VIN", + "Vehicle.VehicleIdentification.VehicleInteriorColor" + ], + "actuators": [ + "Vehicle.Cabin.Infotainment.HMI.TemperatureUnit" + ] + }, + + "restore-and-watch": { + "values": [ + "Vehicle.Cabin.Infotainment.HMI.TemperatureUnit", + "Vehicle.Cabin.HVAC.Station.Row4.Passenger.FanSpeed" + ], + "actuators": [ + "Vehicle.Cabin.Infotainment.Media.Volume" + ] + }, + "state-storage": { + "type": "file", + "path": "statestore.json" + } +} \ No newline at end of file diff --git a/kuksa-persistence-provider/src/kuksaconnector.rs b/kuksa-persistence-provider/src/kuksaconnector.rs new file mode 100644 index 0000000..cf8ca88 --- /dev/null +++ b/kuksa-persistence-provider/src/kuksaconnector.rs @@ -0,0 +1,458 @@ +/******************************************************************************** +* Copyright (c) 2024 Contributors to the Eclipse Foundation +* +* This program and the accompanying materials are made available under the +* terms of the Apache License 2.0 which is available at +* http://www.apache.org/licenses/LICENSE-2.0 +* +* SPDX-License-Identifier: Apache-2.0 +********************************************************************************/ + +use crate::storage::{self, StoreItem}; + +use std::collections::HashMap; +use std::fmt; +use std::time::SystemTime; + +use kuksa::proto; + +use std::sync::mpsc::Sender; + +use std::sync::{Arc, Mutex}; + +#[derive(Debug)] +pub struct ParseError {} + +pub fn create_kuksa_client(uri: &str) -> Arc> { + log::info!("Creating Kuksa Databroker client for URI: {}", uri); + let uri = kuksa::Uri::try_from(uri).expect("Invalid URI for Kuksa Databroker connection."); + Arc::new(Mutex::new(kuksa::Client::new(uri))) +} + +pub async fn get_from_storage_and_set_values( + storage: &impl storage::Storage, + kuksa_client: &Arc>, + vsspaths: &Vec, +) { + for vsspath in vsspaths { + get_from_storage_and_set(storage, kuksa_client, vsspath, false).await; + } +} + +pub async fn get_from_storage_and_set_actuations( + storage: &impl storage::Storage, + kuksa_client: &Arc>, + vsspaths: &Vec, +) { + for vsspath in vsspaths { + get_from_storage_and_set(storage, kuksa_client, vsspath, true).await; + } +} + +pub async fn get_from_storage_and_set( + storage: &impl storage::Storage, + kuksa_client: &Arc>, + vsspath: &str, + is_actuator: bool, +) { + log::debug!("Query storage for VSS signal: {}", vsspath); + let value = match storage.get(vsspath) { + Some(x) => x, + None => { + log::warn!("No value for VSS signal: {} stored", vsspath); + return; + } + }; + + //Figure out metadata: + let datapoint_entries = match kuksa_client + .lock() + .unwrap() + .get_metadata(vec![vsspath]) + .await + { + Ok(data_entries) => Some(data_entries), + Err(kuksa::Error::Status(status)) => { + log::warn!( + "Error: Could not get metadata for VSS signal: {}, Status: {}", + vsspath, + &status + ); + None + } + Err(kuksa::Error::Connection(msg)) => { + log::warn!( + "Connection Error: Could not get metadata for VSS signal: {}, Reason: {}", + vsspath, + &msg + ); + None + } + Err(kuksa::Error::Function(msg)) => { + log::warn!( + "Error: Could not get metadata for VSS signal: {}, Errors: {msg:?}", + vsspath + ); + None + } + }; + + if datapoint_entries.is_none() { + return; + } + + /* We can only have one match, as we query only one path (user entering branch + * in config is considered dumb) */ + if let Some(entries) = datapoint_entries { + if let Some(metadata) = &entries.first().unwrap().metadata { + let data_value = try_into_data_value( + value, + proto::v1::DataType::from_i32(metadata.data_type).unwrap(), + ); + if data_value.is_err() { + log::warn!( + "Could not parse \"{}\" as {:?}", + value, + proto::v1::DataType::from_i32(metadata.data_type).unwrap() + ); + return; + } + + let ts = prost_types::Timestamp::from(SystemTime::now()); + let datapoints = HashMap::from([( + vsspath.to_string().clone(), + proto::v1::Datapoint { + timestamp: Some(ts), + value: Some(data_value.unwrap()), + }, + )]); + + let result = { + if is_actuator { + kuksa_client + .lock() + .unwrap() + .set_target_values(datapoints) + .await + } else { + kuksa_client + .lock() + .unwrap() + .set_current_values(datapoints) + .await + } + }; + + match result { + Ok(_) => { + log::debug!("Succes setting {} to {}", vsspath, value); + } + Err(kuksa::Error::Status(status)) => { + log::warn!( + "Error: Could not set value for VSS signal: {}, Status: {}", + vsspath, + &status + ); + } + Err(kuksa::Error::Connection(msg)) => { + log::warn!( + "Connection Error: Could not set value for VSS signal: {}, Reason: {}", + vsspath, + &msg + ); + } + Err(kuksa::Error::Function(msg)) => { + log::warn!( + "Error: Could not set value for VSS signal: {}, Errors: {msg:?}", + vsspath + ); + } + }; + } + } +} + +pub async fn watch_values( + storage_queue: Sender, + kuksa_client: &Arc>, + vsspaths: Vec<&str>, + is_actuator: bool, +) { + log::info!( + "Subscribing to {} for VSS signals: {:?}", + { + match is_actuator { + true => "actuators", + false => "current values", + } + }, + &vsspaths + ); + + let res = match is_actuator { + true => { + kuksa_client + .lock() + .unwrap() + .subscribe_target_values(vsspaths) + .await + } + false => { + kuksa_client + .lock() + .unwrap() + .subscribe_current_values(vsspaths) + .await + } + }; + match res { + Ok(mut subs) => { + tokio::spawn(async move { + loop { + match subs.message().await { + Ok(resp) => { + if let Some(r) = resp { + for update in r.updates { + if let Some(entry) = update.entry { + let newdp = match is_actuator { + true => entry.actuator_target, + false => entry.value, + }; + if let Some(datapoint) = newdp { + let data = DisplayDatapoint(datapoint); + log::info!( + "Received value {} for VSS signal {}", + data.to_string(), + entry.path + ); + + match storage_queue.send(StoreItem { + path: entry.path.clone(), + value: data.to_string(), + }) { + Ok(_) => {} + Err(err) => { + log::warn!( + "Error sending data to storage {:?}", + err + ); + } + } + } + } + } + } + } + Err(err) => { + log::warn!("Error: Could not receive message: {:?}", err); + break; + } + } + } + }); + } + Err(err) => { + log::warn!("Error: Could not subscribe to VSS signals: {:?}", err); + } + } +} + +/* Donation from databroker-cli */ +fn try_into_data_value( + input: &str, + data_type: proto::v1::DataType, +) -> Result { + if input == "NotAvailable" { + return Ok(proto::v1::datapoint::Value::String(input.to_string())); + } + + match data_type { + proto::v1::DataType::String => Ok(proto::v1::datapoint::Value::String(input.to_owned())), + proto::v1::DataType::StringArray => match get_array_from_input(input.to_owned()) { + Ok(value) => Ok(proto::v1::datapoint::Value::StringArray( + proto::v1::StringArray { values: value }, + )), + Err(err) => Err(err), + }, + proto::v1::DataType::Boolean => match input.parse::() { + Ok(value) => Ok(proto::v1::datapoint::Value::Bool(value)), + Err(_) => Err(ParseError {}), + }, + proto::v1::DataType::BooleanArray => match get_array_from_input(input.to_owned()) { + Ok(value) => Ok(proto::v1::datapoint::Value::BoolArray( + proto::v1::BoolArray { values: value }, + )), + Err(err) => Err(err), + }, + proto::v1::DataType::Int8 => match input.parse::() { + Ok(value) => Ok(proto::v1::datapoint::Value::Int32(value as i32)), + Err(_) => Err(ParseError {}), + }, + proto::v1::DataType::Int8Array => match get_array_from_input(input.to_owned()) { + Ok(value) => Ok(proto::v1::datapoint::Value::Int32Array( + proto::v1::Int32Array { values: value }, + )), + Err(err) => Err(err), + }, + proto::v1::DataType::Int16 => match input.parse::() { + Ok(value) => Ok(proto::v1::datapoint::Value::Int32(value as i32)), + Err(_) => Err(ParseError {}), + }, + proto::v1::DataType::Int16Array => match get_array_from_input(input.to_owned()) { + Ok(value) => Ok(proto::v1::datapoint::Value::Int32Array( + proto::v1::Int32Array { values: value }, + )), + Err(err) => Err(err), + }, + proto::v1::DataType::Int32 => match input.parse::() { + Ok(value) => Ok(proto::v1::datapoint::Value::Int32(value)), + Err(_) => Err(ParseError {}), + }, + proto::v1::DataType::Int32Array => match get_array_from_input(input.to_owned()) { + Ok(value) => Ok(proto::v1::datapoint::Value::Int32Array( + proto::v1::Int32Array { values: value }, + )), + Err(err) => Err(err), + }, + proto::v1::DataType::Int64 => match input.parse::() { + Ok(value) => Ok(proto::v1::datapoint::Value::Int64(value)), + Err(_) => Err(ParseError {}), + }, + proto::v1::DataType::Int64Array => match get_array_from_input(input.to_owned()) { + Ok(value) => Ok(proto::v1::datapoint::Value::Int64Array( + proto::v1::Int64Array { values: value }, + )), + Err(err) => Err(err), + }, + proto::v1::DataType::Uint8 => match input.parse::() { + Ok(value) => Ok(proto::v1::datapoint::Value::Uint32(value as u32)), + Err(_) => Err(ParseError {}), + }, + proto::v1::DataType::Uint8Array => match get_array_from_input(input.to_owned()) { + Ok(value) => Ok(proto::v1::datapoint::Value::Uint32Array( + proto::v1::Uint32Array { values: value }, + )), + Err(err) => Err(err), + }, + proto::v1::DataType::Uint16 => match input.parse::() { + Ok(value) => Ok(proto::v1::datapoint::Value::Uint32(value as u32)), + Err(_) => Err(ParseError {}), + }, + proto::v1::DataType::Uint16Array => match get_array_from_input(input.to_owned()) { + Ok(value) => Ok(proto::v1::datapoint::Value::Uint32Array( + proto::v1::Uint32Array { values: value }, + )), + Err(err) => Err(err), + }, + proto::v1::DataType::Uint32 => match input.parse::() { + Ok(value) => Ok(proto::v1::datapoint::Value::Uint32(value)), + Err(_) => Err(ParseError {}), + }, + proto::v1::DataType::Uint32Array => match get_array_from_input(input.to_owned()) { + Ok(value) => Ok(proto::v1::datapoint::Value::Uint32Array( + proto::v1::Uint32Array { values: value }, + )), + Err(err) => Err(err), + }, + proto::v1::DataType::Uint64 => match input.parse::() { + Ok(value) => Ok(proto::v1::datapoint::Value::Uint64(value)), + Err(_) => Err(ParseError {}), + }, + proto::v1::DataType::Uint64Array => match get_array_from_input(input.to_owned()) { + Ok(value) => Ok(proto::v1::datapoint::Value::Uint64Array( + proto::v1::Uint64Array { values: value }, + )), + Err(err) => Err(err), + }, + proto::v1::DataType::Float => match input.parse::() { + Ok(value) => Ok(proto::v1::datapoint::Value::Float(value)), + Err(_) => Err(ParseError {}), + }, + proto::v1::DataType::FloatArray => match get_array_from_input(input.to_owned()) { + Ok(value) => Ok(proto::v1::datapoint::Value::FloatArray( + proto::v1::FloatArray { values: value }, + )), + Err(err) => Err(err), + }, + proto::v1::DataType::Double => match input.parse::() { + Ok(value) => Ok(proto::v1::datapoint::Value::Double(value)), + Err(_) => Err(ParseError {}), + }, + proto::v1::DataType::DoubleArray => match get_array_from_input(input.to_owned()) { + Ok(value) => Ok(proto::v1::datapoint::Value::DoubleArray( + proto::v1::DoubleArray { values: value }, + )), + Err(err) => Err(err), + }, + _ => Err(ParseError {}), + } +} + +pub fn get_array_from_input(values: String) -> Result, ParseError> { + let raw_input = values + .strip_prefix('[') + .and_then(|s| s.strip_suffix(']')) + .ok_or(ParseError {})?; + + let pattern = r#"(?:\\.|[^",])*"(?:\\.|[^"])*"|[^",]+"#; + + let regex = regex::Regex::new(pattern).unwrap(); + let inputs = regex.captures_iter(raw_input); + + let mut array: Vec = vec![]; + for part in inputs { + match part[0] + .trim() + .replace('\"', "") + .replace('\\', "\"") + .parse::() + { + Ok(value) => array.push(value), + Err(_) => return Err(ParseError {}), + } + } + Ok(array) +} + +struct DisplayDatapoint(proto::v1::Datapoint); + +fn display_array(f: &mut fmt::Formatter<'_>, array: &[T]) -> fmt::Result +where + T: fmt::Display, +{ + f.write_str("[")?; + let real_delimiter = ", "; + let mut delimiter = ""; + for value in array { + write!(f, "{delimiter}")?; + delimiter = real_delimiter; + write!(f, "{value}")?; + } + f.write_str("]") +} + +impl fmt::Display for DisplayDatapoint { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.0.value { + Some(value) => match value { + proto::v1::datapoint::Value::Bool(value) => f.pad(&format!("{value}")), + proto::v1::datapoint::Value::Int32(value) => f.pad(&format!("{value}")), + proto::v1::datapoint::Value::Int64(value) => f.pad(&format!("{value}")), + proto::v1::datapoint::Value::Uint32(value) => f.pad(&format!("{value}")), + proto::v1::datapoint::Value::Uint64(value) => f.pad(&format!("{value}")), + proto::v1::datapoint::Value::Float(value) => f.pad(&format!("{value:.2}")), + proto::v1::datapoint::Value::Double(value) => f.pad(&format!("{value}")), + proto::v1::datapoint::Value::String(value) => f.pad(&value.to_owned()), + proto::v1::datapoint::Value::StringArray(array) => display_array(f, &array.values), + proto::v1::datapoint::Value::BoolArray(array) => display_array(f, &array.values), + proto::v1::datapoint::Value::Int32Array(array) => display_array(f, &array.values), + proto::v1::datapoint::Value::Int64Array(array) => display_array(f, &array.values), + proto::v1::datapoint::Value::Uint32Array(array) => display_array(f, &array.values), + proto::v1::datapoint::Value::Uint64Array(array) => display_array(f, &array.values), + proto::v1::datapoint::Value::FloatArray(array) => display_array(f, &array.values), + proto::v1::datapoint::Value::DoubleArray(array) => display_array(f, &array.values), + }, + None => f.pad("None"), + } + } +} diff --git a/kuksa-persistence-provider/src/main.rs b/kuksa-persistence-provider/src/main.rs new file mode 100644 index 0000000..cc29f9a --- /dev/null +++ b/kuksa-persistence-provider/src/main.rs @@ -0,0 +1,156 @@ +/******************************************************************************** +* Copyright (c) 2024 Contributors to the Eclipse Foundation +* +* This program and the accompanying materials are made available under the +* terms of the Apache License 2.0 which is available at +* http://www.apache.org/licenses/LICENSE-2.0 +* +* SPDX-License-Identifier: Apache-2.0 +********************************************************************************/ + +mod kuksaconnector; +mod storage; + +use storage::Storage; + +use clap::Parser; +use std::collections::HashMap; +use std::{env, path::PathBuf}; +use tinyjson::JsonValue; + +use tokio::signal::ctrl_c; + +#[derive(Parser)] +#[command(version, about, long_about = None)] +struct CmdLine { + /// JSON file containing the configuration + #[arg(short, long, value_name = "FILE")] + config: Option, + + /// Turn debugging information on + #[arg(short, long, action = clap::ArgAction::Count)] + debug: u8, +} + +#[tokio::main] +async fn main() { + if env::var("RUST_LOG").is_err() { + env::set_var("RUST_LOG", "info") + } + env_logger::init(); + + let args = CmdLine::parse(); + + let config = args.config.unwrap_or_else(|| PathBuf::from("config.json")); + + //Check config exists + if !config.exists() { + log::error!("Error: Can not find configuration at {}", config.display()); + std::process::exit(1); + } + + log::info!("Reading configuration from: {}", config.display()); + // Reading configuration file into a string + let config_str = std::fs::read_to_string(config).unwrap(); + + log::debug!("Configuration file content: {}", config_str); + + let parsed: JsonValue = config_str.parse().unwrap(); + log::debug!("Parsed JSON data structure: {:?}", parsed); + + let storage = match parsed["state-storage"]["type"] + .get::() + .unwrap() + .as_str() + { + "file" => storage::FileStorage::new(&parsed["state-storage"]), + _ => { + log::error!("Error: state storage type is invalid"); + std::process::exit(1); + } + }; + + //let storage = Arc::new(Mutex::new(_storage)); + + let mut restore_current_values: Vec = vec![]; + let mut restore_actuation_values: Vec = vec![]; + let mut watch_current_values: Vec = vec![]; + let mut watch_actuation_values: Vec = vec![]; + + let section: Option<&HashMap> = parsed["restore-only"].get(); + + if section.is_some() { + let elements: Option<&Vec> = section.unwrap()["values"].get(); + if elements.is_some() { + for path in elements.unwrap() { + restore_current_values.push(path.get::().unwrap().to_string()); + } + } + let elements: Option<&Vec> = section.unwrap()["actuators"].get(); + if elements.is_some() { + for path in elements.unwrap() { + restore_actuation_values.push(path.get::().unwrap().to_string()); + } + } + } + + let section: Option<&HashMap> = parsed["restore-and-watch"].get(); + if section.is_some() { + let elements: Option<&Vec> = section.unwrap()["values"].get(); + if elements.is_some() { + for path in elements.unwrap() { + restore_current_values.push(path.get::().unwrap().to_string()); + watch_current_values.push(path.get::().unwrap().to_string()); + } + } + let elements: Option<&Vec> = section.unwrap()["actuators"].get(); + if elements.is_some() { + for path in elements.unwrap() { + restore_actuation_values.push(path.get::().unwrap().to_string()); + watch_actuation_values.push(path.get::().unwrap().to_string()); + } + } + } + + let kuksa_client = kuksaconnector::create_kuksa_client("grpc://127.0.01:55556"); + //Each subscription needs a separate client + let kuksa_client2 = kuksaconnector::create_kuksa_client("grpc://127.0.01:55556"); + + kuksaconnector::get_from_storage_and_set_values( + &storage, + &kuksa_client, + &restore_current_values, + ) + .await; + kuksaconnector::get_from_storage_and_set_actuations( + &storage, + &kuksa_client, + &restore_actuation_values, + ) + .await; + + drop(restore_actuation_values); + drop(restore_current_values); + + kuksaconnector::watch_values( + storage.get_queue(), + &kuksa_client, + watch_current_values.iter().map(|s| &**s).collect(), + false, + ) + .await; + kuksaconnector::watch_values( + storage.get_queue(), + &kuksa_client2, + watch_actuation_values.iter().map(|s| &**s).collect(), + true, + ) + .await; + + tokio::select! { + _ = ctrl_c() => { + println!("Received Ctrl+C, exiting."); + return; + } + } +} diff --git a/kuksa-persistence-provider/src/storage.rs b/kuksa-persistence-provider/src/storage.rs new file mode 100644 index 0000000..6a7a7ef --- /dev/null +++ b/kuksa-persistence-provider/src/storage.rs @@ -0,0 +1,31 @@ +/******************************************************************************** +* Copyright (c) 2024 Contributors to the Eclipse Foundation +* +* This program and the accompanying materials are made available under the +* terms of the Apache License 2.0 which is available at +* http://www.apache.org/licenses/LICENSE-2.0 +* +* SPDX-License-Identifier: Apache-2.0 +********************************************************************************/ + +pub mod filestorage; + +use std::sync::mpsc::Sender; + +pub use filestorage::FileStorage; +use tinyjson::JsonValue; + +pub struct StoreItem { + pub path: String, + pub value: String, +} + +pub trait Storage { + fn new(config: &JsonValue) -> Self; + + fn get(&self, vsspath: &str) -> Option<&str>; + + fn set(&self, vsspath: &str, vssvalue: &str) -> Result<(), ()>; + + fn get_queue(&self) -> Sender; +} diff --git a/kuksa-persistence-provider/src/storage/filestorage.rs b/kuksa-persistence-provider/src/storage/filestorage.rs new file mode 100644 index 0000000..a216212 --- /dev/null +++ b/kuksa-persistence-provider/src/storage/filestorage.rs @@ -0,0 +1,116 @@ +/******************************************************************************** +* Copyright (c) 2024 Contributors to the Eclipse Foundation +* +* This program and the accompanying materials are made available under the +* terms of the Apache License 2.0 which is available at +* http://www.apache.org/licenses/LICENSE-2.0 +* +* SPDX-License-Identifier: Apache-2.0 +********************************************************************************/ + +use tinyjson::JsonValue; + +use super::{Storage, StoreItem}; +use std::collections::HashMap; + +use std::io::Write; +use std::sync::mpsc; +use std::sync::mpsc::{Receiver, Sender}; + +use std::fs::File; + +use log; + +pub struct FileStorage { + state: JsonValue, + queue: Sender, +} + +impl Storage for FileStorage { + fn new(config: &JsonValue) -> Self { + match config["path"].get::() { + Some(x) => { + log::info!("Initializing file storage on {}", x); + let path = x.clone(); + println!("Reading storage from {}", path); + let config_str = std::fs::read_to_string(&path).unwrap(); + let state: JsonValue = config_str.parse().unwrap(); + let mut state_copy = state.get::>().unwrap().clone(); + let (tx, rx): (Sender, Receiver) = mpsc::channel(); + let fs = FileStorage { state, queue: tx }; + std::thread::spawn(move || loop { + match rx.recv() { + Ok(msg) => { + log::info!("Store value: {} for path {}", msg.value, msg.path); + let mut val: HashMap = HashMap::new(); + val.insert("value".to_string(), JsonValue::String(msg.value.clone())); + state_copy.insert(msg.path.clone(), JsonValue::from(val)); + let out_json: JsonValue = JsonValue::from(state_copy.to_owned()); + let mut file = File::create(&path).unwrap(); + match file.write_all(out_json.format().unwrap().as_bytes()) { + Ok(_) => {} + Err(e) => { + log::error!("Error writing to state storage file: {:?}", e); + break; + } + } + let _ = file.flush(); + drop(file); + } + Err(_) => { + log::error!("Error receiving message"); + break; + } + } + }); + fs + } + _ => { + log::error!("Error: file storage path is invalid"); + std::process::exit(1); + } + } + } + + fn get(&self, vsspath: &str) -> Option<&str> { + log::debug!("Try getting VSS signal {}", vsspath); + if !self + .state + .get::>() + .unwrap() + .contains_key(vsspath) + { + return None; + } + + let entry: Option<&HashMap> = self.state[vsspath].get(); + + if entry.is_some() && entry.unwrap().contains_key("value") { + let value = entry.unwrap()["value"].get::(); + + if let Some(v) = value { + return Some(v); + } + log::warn!( + "Error reading {vsspath}, make sure all values are quoted and stored as string" + ) + } + None + } + + fn set(&self, vsspath: &str, vssvalue: &str) -> Result<(), ()> { + log::debug!("Setting VSS signal {} to {}", vsspath, vssvalue); + self.queue + .send(StoreItem { + path: vsspath.to_string(), + value: vssvalue.to_string(), + }) + .map_err(|_| ()) + } + + fn get_queue(&self) -> Sender { + self.queue.clone() + } +} + +impl FileStorage {} diff --git a/kuksa-persistence-provider/statestore.json b/kuksa-persistence-provider/statestore.json new file mode 100644 index 0000000..68c59e8 --- /dev/null +++ b/kuksa-persistence-provider/statestore.json @@ -0,0 +1,20 @@ +{ + "Vehicle.VehicleIdentification.VIN": { + "value": "DEADBEEF" + }, + "Vehicle.Cabin.HVAC.Station.Row4.Passenger.FanSpeed": { + "value": "41" + }, + "Vehicle.Cabin.Infotainment.Media.Volume": { + "value": "22" + }, + "Vehicle.VehicleIdentification.VehicleInteriorColor": { + "value": "Black" + }, + "Vehicle.Cabin.Infotainment.HMI.TemperatureUnit": { + "value": "C" + }, + "LOL": { + "value": "LOL" + } +} \ No newline at end of file