Skip to content

Commit

Permalink
removes mutexes from zenoh-kuksa-client
Browse files Browse the repository at this point in the history
  • Loading branch information
eriksven authored and lukasmittag committed Nov 4, 2024
1 parent 3e589f2 commit 1cb785f
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 19 deletions.
3 changes: 0 additions & 3 deletions zenoh-kuksa-provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
# SPDX-License-Identifier: Apache-2.0
########################################################################


[workspace]

[package]
name = "zenoh-kuksa-provider"
version = "0.1.0"
Expand Down
16 changes: 6 additions & 10 deletions zenoh-kuksa-provider/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use provider_config::ProviderConfig;
use std::collections::HashMap;
use std::sync::Arc;
use std::{error::Error, fs};
use tokio::sync::Mutex;

mod provider_config;
mod utils;
Expand Down Expand Up @@ -51,7 +50,7 @@ async fn handling_zenoh_subscribtion(
provider_config: Arc<ProviderConfig>,
session: Arc<Session>,
metadata_store: MetadataStore,
kuksa_client: Arc<Mutex<kuksa::Client>>,
mut kuksa_client: kuksa::Client,
) {
info!("Listening on selector: {:?}", provider_config.zenoh.key_exp);

Expand All @@ -77,13 +76,11 @@ async fn handling_zenoh_subscribtion(
if field_type == "currentValue" {
let datapoint_update = new_datapoint_for_update(&vss_path, &sample, &store);

let mut sub_client = kuksa_client.lock().await;
debug!("Forwarding: {:#?}", datapoint_update);
sub_client
kuksa_client
.set_current_values(datapoint_update)
.await
.unwrap();
drop(sub_client);
}
}
}
Expand Down Expand Up @@ -180,11 +177,11 @@ async fn main() {

let uri = kuksa::Uri::try_from(provider_config.kuksa.databroker_url.as_str())
.expect("Invalid URI for Kuksa Databroker connection.");
let client = Arc::new(Mutex::new(kuksa::Client::new(uri.clone())));
let mut client = kuksa::Client::new(uri.clone());
let actuation_client = kuksa::Client::new(uri);

fetch_metadata(
client.clone(),
client = fetch_metadata(
client,
provider_config.signals.iter().map(|s| s as &str).collect(),
&metadata_store,
)
Expand All @@ -194,8 +191,7 @@ async fn main() {
let session = Arc::clone(&zenoh_session);
let provider_config = Arc::clone(&provider_config);
let metadata_store = Arc::clone(&metadata_store);
let kuksa_client = Arc::clone(&client);
handling_zenoh_subscribtion(provider_config, session, metadata_store, kuksa_client)
handling_zenoh_subscribtion(provider_config, session, metadata_store, client)
});

let publisher_handle = tokio::spawn({
Expand Down
13 changes: 7 additions & 6 deletions zenoh-kuksa-provider/src/utils/kuksa_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,23 @@
********************************************************************************/

use kuksa::proto::v1::{datapoint::Value, DataType, Datapoint};
use kuksa::Client;
use log::warn;
use prost_types::Timestamp;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;
use std::collections::HashMap;
use zenoh::{buffers::ZBuf, sample::Sample};

use crate::utils::{metadata_store::MetadataInfo, zenoh_utils::zbuf_to_string};

pub async fn fetch_metadata(
kuksa_client: Arc<Mutex<kuksa::Client>>,
mut kuksa_client: Client,
paths: Vec<&str>,
metadata_store: &super::metadata_store::MetadataStore,
) {
let mut client = kuksa_client.lock().await;
) -> Client {
let mut store = metadata_store.lock().await;

let data_entries: Vec<kuksa::DataEntry> = client.get_metadata(paths).await.unwrap();
let data_entries: Vec<kuksa::DataEntry> = kuksa_client.get_metadata(paths).await.unwrap();

for entry in data_entries {
store.insert(
Expand All @@ -40,6 +39,8 @@ pub async fn fetch_metadata(
},
);
}

kuksa_client
}

pub fn new_datapoint(data_type: &DataType, payload: &ZBuf) -> Datapoint {
Expand Down

0 comments on commit 1cb785f

Please sign in to comment.