Skip to content

Commit

Permalink
adds second kuksa client in zenoh-kuksa-provider
Browse files Browse the repository at this point in the history
To avoid  a deadlock between waiting for the kuksa subscription and sending messages on the same client, a second client gets initialized. In addition, some clippy warning get fixed.
  • Loading branch information
eriksven committed Oct 15, 2024
1 parent f70153a commit 565e6f2
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 12 deletions.
14 changes: 6 additions & 8 deletions zenoh-kuksa-provider/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,8 @@ async fn handling_zenoh_subscribtion(
async fn publish_to_zenoh(
provider_config: Arc<ProviderConfig>,
session: Arc<Session>,
kuksa_client: Arc<Mutex<kuksa::Client>>,
mut kuksa_client: kuksa::Client,
) {
let mut actuation_client = kuksa_client.lock().await;

let attachment = Some(String::from("type=targetValue"));

let vss_paths = Vec::from_iter(provider_config.signals.iter().map(String::as_str));
Expand All @@ -114,7 +112,7 @@ async fn publish_to_zenoh(
vss_paths
);

match actuation_client.subscribe_target_values(vss_paths).await {
match kuksa_client.subscribe_target_values(vss_paths).await {
Ok(mut stream) => {
while let Some(response) = stream.message().await.unwrap() {
for update in &response.updates {
Expand All @@ -123,7 +121,7 @@ async fn publish_to_zenoh(
let vss_path = &entry.path;

if let Some(publisher) = publishers.get(vss_path.as_str()) {
let buf = match datapoint_to_string(&datapoint) {
let buf = match datapoint_to_string(datapoint) {
Some(v) => v,
None => "null".to_string(),
};
Expand Down Expand Up @@ -182,7 +180,8 @@ async fn main() {

let uri = kuksa::Uri::try_from(provider_config.kuksa.databroker_url.as_str())
.expect("Invalid URI for Kuksa Databroker connection.");
let client = Arc::new(Mutex::new(kuksa::Client::new(uri)));
let client = Arc::new(Mutex::new(kuksa::Client::new(uri.clone())));
let actuation_client = kuksa::Client::new(uri);

fetch_metadata(
client.clone(),
Expand All @@ -202,8 +201,7 @@ async fn main() {
let publisher_handle = tokio::spawn({
let session = Arc::clone(&zenoh_session);
let provider_config = Arc::clone(&provider_config);
let kuksa_client = Arc::clone(&client);
publish_to_zenoh(provider_config, session, kuksa_client)
publish_to_zenoh(provider_config, session, actuation_client)
});

let _ = subscriber_handle.await;
Expand Down
2 changes: 1 addition & 1 deletion zenoh-kuksa-provider/src/provider_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl ProviderConfig {
};
config.set_mode(Some(mode)).unwrap();

if self.zenoh.scouting.multicast.enabled == true {
if self.zenoh.scouting.multicast.enabled {
config.scouting.multicast.set_enabled(Some(true)).unwrap();

config
Expand Down
4 changes: 2 additions & 2 deletions zenoh-kuksa-provider/src/utils/kuksa_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ pub fn new_datapoint(data_type: &DataType, payload: &ZBuf) -> Datapoint {
nanos: duration_since_epoch.subsec_nanos() as i32,
};

return Datapoint {
Datapoint {
timestamp: Some(timestamp), // TODO: get timestamp right
value: Some(value),
};
}
}

pub fn new_datapoint_for_update(
Expand Down
2 changes: 1 addition & 1 deletion zenoh-kuksa-provider/src/utils/zenoh_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub fn zbuf_to_string(zbuf: &ZBuf) -> Result<String, std::str::Utf8Error> {
for zslice in zbuf.zslices() {
bytes.extend_from_slice(zslice.as_slice());
}
String::from_utf8(bytes).map_err(|e| std::str::Utf8Error::from(e.utf8_error()))
String::from_utf8(bytes).map_err(|e| e.utf8_error())
}

pub fn extract_attachment_as_string(sample: &Sample) -> String {
Expand Down

0 comments on commit 565e6f2

Please sign in to comment.