Skip to content
This repository has been archived by the owner on Dec 18, 2024. It is now read-only.

LAG functionality, Subqueries, Comparing with NotAvailable values #705

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion kuksa_apps/node-red/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ By default, the mqtt flows will be configured in node-red. You can also use the

Now you can view the example under [http://localhost:1880](http://localhost:1880/).

<!-- markdown-link-check-enable -->
<!-- markdown-link-check-enable -->

To test the example, you can use [Kuksa Client](../../kuksa-client) or use the [gps feeder](https://github.com/eclipse/kuksa.val.feeders/tree/main/gps2val).
In [`feeders.yml`](./feeders.yml), you can find the experimental [config](kuksa_config/gpsd_feeder.ini) for gps feeder container. You use the following command to also start containers of feeders:
Expand All @@ -61,3 +61,7 @@ docker-compose -f docker-compose.yml -f feeders.yml up
- [websocket-advanced.json](./websocket-advanced.json) implements a test client and uses secure connection with server

![screenshot](./node-red-screenshot.png)

*Note*: Websocket node-red configs are using url **wss://127.0.0.1:8090** if docker-compose used for running demo **127.0.0.1** is not
available by docker container and in the flow dashboard state will be "disconnected". Compose is creating network between containers so check ip address of
kuksa-val container and change websocket node accordingly.
184 changes: 145 additions & 39 deletions kuksa_databroker/databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
use std::time::SystemTime;

use crate::query::{CompiledQuery, ExecutionInput};
use crate::types::ExecutionInputImplData;
use tracing::{debug, info, warn};

use crate::glob;
Expand Down Expand Up @@ -75,6 +77,7 @@ pub struct Datapoint {
#[derive(Debug, Clone)]
pub struct Entry {
pub datapoint: Datapoint,
pub lag_datapoint: Datapoint,
pub actuator_target: Option<Datapoint>,
pub metadata: Metadata,
}
Expand Down Expand Up @@ -556,9 +559,14 @@ impl Entry {
}
}

pub fn apply_lag_after_execute(&mut self) {
self.lag_datapoint = self.datapoint.clone();
}

pub fn apply(&mut self, update: EntryUpdate) -> HashSet<Field> {
let mut changed = HashSet::new();
if let Some(datapoint) = update.datapoint {
self.lag_datapoint = self.datapoint.clone();
self.datapoint = datapoint;
changed.insert(Field::Datapoint);
}
Expand Down Expand Up @@ -598,11 +606,19 @@ impl Subscriptions {
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
db: &Database,
) -> Result<(), NotificationError> {
) -> Result<Option<HashMap<String, ()>>, NotificationError> {
let mut error = None;
let mut lag_updates: HashMap<String, ()> = HashMap::new();
for sub in &self.query_subscriptions {
match sub.notify(changed, db).await {
Ok(_) => {}
Ok(None) => {}
Ok(Some(input)) => {
for x in input.get_fields() {
if x.1.lag_value != x.1.value && !lag_updates.contains_key(x.0) {
lag_updates.insert(x.0.clone(), ());
}
}
}
Err(err) => error = Some(err),
}
}
Expand All @@ -616,7 +632,13 @@ impl Subscriptions {

match error {
Some(err) => Err(err),
None => Ok(()),
None => {
if !lag_updates.is_empty() {
Ok(Some(lag_updates))
} else {
Ok(None)
}
}
}
}

Expand Down Expand Up @@ -757,45 +779,94 @@ impl ChangeSubscription {
}

impl QuerySubscription {
fn generate_input(
fn find_in_db_and_add(
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
name: &String,
db: &DatabaseReadAccess,
) -> Option<impl query::ExecutionInput> {
let id_used_in_query = {
let mut query_uses_id = false;
match changed {
Some(changed) => {
for (id, fields) in changed {
if let Some(metadata) = db.get_metadata_by_id(*id) {
if self.query.input_spec.contains(&metadata.path)
&& fields.contains(&Field::Datapoint)
{
query_uses_id = true;
break;
input: &mut query::ExecutionInputImpl,
) {
match db.get_entry_by_path(name) {
Ok(entry) => {
input.add(
name.to_owned(),
ExecutionInputImplData {
value: entry.datapoint.value.to_owned(),
lag_value: entry.lag_datapoint.value.to_owned(),
},
);
}
Err(_) => {
// TODO: This should probably generate an error
input.add(
name.to_owned(),
ExecutionInputImplData {
value: DataValue::NotAvailable,
lag_value: DataValue::NotAvailable,
},
)
}
}
}
fn check_if_changes_match(
query: &CompiledQuery,
changed_origin: Option<&HashMap<i32, HashSet<Field>>>,
db: &DatabaseReadAccess,
) -> bool {
match changed_origin {
Some(changed) => {
for (id, fields) in changed {
if let Some(metadata) = db.get_metadata_by_id(*id) {
if query.input_spec.contains(&metadata.path)
&& fields.contains(&Field::Datapoint)
{
return true;
}
if !query.subquery.is_empty() {
for sub in query.subquery.iter() {
if QuerySubscription::check_if_changes_match(
sub,
changed_origin,
db,
) {
return true;
}
}
}
}
}
None => {
// Always generate input if `changed` is None
query_uses_id = true;
}
}
query_uses_id
};
None => {
// Always generate input if `changed` is None
return true;
}
}
false
}
fn generate_input_list(
&self,
query: &CompiledQuery,
db: &DatabaseReadAccess,
input: &mut query::ExecutionInputImpl,
) {
for name in query.input_spec.iter() {
self.find_in_db_and_add(name, db, input);
}
if !query.subquery.is_empty() {
for sub in query.subquery.iter() {
self.generate_input_list(sub, db, input)
}
}
}
fn generate_input(
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
db: &DatabaseReadAccess,
) -> Option<impl ExecutionInput> {
let id_used_in_query = QuerySubscription::check_if_changes_match(&self.query, changed, db);

if id_used_in_query {
let mut input = query::ExecutionInputImpl::new();
for name in self.query.input_spec.iter() {
match db.get_entry_by_path(name) {
Ok(entry) => input.add(name.to_owned(), entry.datapoint.value.to_owned()),
Err(_) => {
// TODO: This should probably generate an error
input.add(name.to_owned(), DataValue::NotAvailable)
}
}
}
self.generate_input_list(&self.query, db, &mut input);
Some(input)
} else {
None
Expand All @@ -806,8 +877,9 @@ impl QuerySubscription {
&self,
changed: Option<&HashMap<i32, HashSet<Field>>>,
db: &Database,
) -> Result<(), NotificationError> {
) -> Result<Option<impl query::ExecutionInput>, NotificationError> {
let db_read = db.authorized_read_access(&self.permissions);

match self.generate_input(changed, &db_read) {
Some(input) =>
// Execute query (if anything queued)
Expand All @@ -827,19 +899,19 @@ impl QuerySubscription {
})
.await
{
Ok(()) => Ok(()),
Ok(()) => Ok(Some(input)),
Err(_) => Err(NotificationError {}),
},
None => Ok(()),
None => Ok(None),
},
Err(e) => {
// TODO: send error to subscriber
debug!("{:?}", e);
Ok(()) // no cleanup needed
Ok(None) // no cleanup needed
}
}
}
None => Ok(()),
None => Ok(None),
}
}
}
Expand Down Expand Up @@ -963,6 +1035,19 @@ impl<'a, 'b> DatabaseWriteAccess<'a, 'b> {
}
}

pub fn update_entry_lag_to_be_equal(&mut self, path: &str) -> Result<(), UpdateError> {
match self.db.path_to_id.get(path) {
Some(id) => match self.db.entries.get_mut(id) {
Some(entry) => {
entry.apply_lag_after_execute();
Ok(())
}
None => Err(UpdateError::NotFound),
},
None => Err(UpdateError::NotFound),
}
}

pub fn update(&mut self, id: i32, update: EntryUpdate) -> Result<HashSet<Field>, UpdateError> {
match self.db.entries.get_mut(&id) {
Some(entry) => {
Expand Down Expand Up @@ -1056,7 +1141,14 @@ impl<'a, 'b> DatabaseWriteAccess<'a, 'b> {
description,
allowed,
},
datapoint: match datapoint {
datapoint: match datapoint.clone() {
Some(datapoint) => datapoint,
None => Datapoint {
ts: SystemTime::now(),
value: DataValue::NotAvailable,
},
},
lag_datapoint: match datapoint {
Some(datapoint) => datapoint,
None => Datapoint {
ts: SystemTime::now(),
Expand Down Expand Up @@ -1277,6 +1369,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
let mut errors = Vec::new();
let mut db = self.broker.database.write().await;
let mut db_write = db.authorized_write_access(self.permissions);
let mut lag_updates: HashMap<String, ()> = HashMap::new();

let cleanup_needed = {
let changed = {
Expand Down Expand Up @@ -1310,11 +1403,23 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
.notify(Some(&changed), &db)
.await
{
Ok(()) => false,
Ok(None) => false,
Ok(Some(lag_updates_)) => {
lag_updates = lag_updates_.clone();
false
}
Err(_) => true, // Cleanup needed
}
};

if !lag_updates.is_empty() {
let mut db = self.broker.database.write().await;
let mut db_write = db.authorized_write_access(self.permissions);
for x in lag_updates {
if db_write.update_entry_lag_to_be_equal(x.0.as_str()).is_ok() {}
}
}

// Cleanup closed subscriptions
if cleanup_needed {
self.broker.subscriptions.write().await.cleanup();
Expand Down Expand Up @@ -1367,6 +1472,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
) -> Result<impl Stream<Item = QueryResponse>, QueryError> {
let db_read = self.broker.database.read().await;
let db_read_access = db_read.authorized_read_access(self.permissions);

let compiled_query = query::compile(query, &db_read_access);

match compiled_query {
Expand Down
Loading