Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Simplify SmartREST publish topics #3292

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions crates/core/c8y_api/src/smartrest/inventory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// smartrest messages are sent. There should be one comprehensive API for
// generating them.

use crate::smartrest::topic::publish_topic_from_ancestors;
use crate::smartrest::topic::publish_topic_from_parent;
use crate::smartrest::topic::C8yTopic;
use mqtt_channel::MqttMessage;
use std::time::Duration;
Expand All @@ -29,7 +29,8 @@ pub fn child_device_creation_message(
child_id: &str,
device_name: Option<&str>,
device_type: Option<&str>,
ancestors: &[String],
parent: Option<&str>,
main_device_id: &str,
prefix: &TopicPrefix,
) -> Result<MqttMessage, InvalidValueError> {
if child_id.is_empty() {
Expand Down Expand Up @@ -60,7 +61,7 @@ pub fn child_device_creation_message(
.expect("child_id, device_name, device_type should not increase payload size over the limit");

Ok(MqttMessage::new(
&publish_topic_from_ancestors(ancestors, prefix),
&publish_topic_from_parent(parent, main_device_id, prefix),
payload.into_inner(),
))
}
Expand All @@ -73,11 +74,12 @@ pub fn service_creation_message(
service_name: &str,
service_type: &str,
service_status: &str,
ancestors: &[String],
parent: Option<&str>,
main_device_id: &str,
prefix: &TopicPrefix,
) -> Result<MqttMessage, InvalidValueError> {
Ok(MqttMessage::new(
&publish_topic_from_ancestors(ancestors, prefix),
&publish_topic_from_parent(parent, main_device_id, prefix),
service_creation_message_payload(service_id, service_name, service_type, service_status)?
.into_inner(),
))
Expand Down
58 changes: 30 additions & 28 deletions crates/core/c8y_api/src/smartrest/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::json_c8y::C8yAlarm;
use mqtt_channel::MqttError;
use mqtt_channel::Topic;
use mqtt_channel::TopicFilter;
use tedge_api::entity_store::EntityMetadata;
use tedge_api::entity_store::EntityExternalId;
use tedge_api::entity_store::EntityType;
use tedge_config::TopicPrefix;

Expand All @@ -19,13 +19,14 @@ pub enum C8yTopic {
impl C8yTopic {
/// Return the c8y SmartRest response topic for the given entity
pub fn smartrest_response_topic(
entity: &EntityMetadata,
external_id: &EntityExternalId,
entity_type: &EntityType,
prefix: &TopicPrefix,
) -> Option<Topic> {
match entity.r#type {
match entity_type {
EntityType::MainDevice => Some(C8yTopic::upstream_topic(prefix)),
EntityType::ChildDevice | EntityType::Service => {
Self::ChildSmartRestResponse(entity.external_id.clone().into())
Self::ChildSmartRestResponse(external_id.clone().into())
.to_topic(prefix)
.ok()
}
Expand Down Expand Up @@ -77,28 +78,30 @@ impl From<&C8yAlarm> for C8yTopic {
}
}

/// Generates the SmartREST topic to publish to, for a given managed object
/// from the list of external IDs of itself and all its parents.
///
/// The parents are appended in the reverse order,
/// starting from the main device at the end of the list.
/// The main device itself is represented by the root topic c8y/s/us,
/// with the rest of the children appended to it at each topic level.
/// Generates the SmartREST topic to publish to, from the external ID of its parent.
/// If the parent is the main device, the topic would be `<prefix>/s/us`.
/// For all other parent devices, the target topic would be `<prefix>/s/us/<parent-xid>`.
/// For the main device with no parent, and the topic would be `<prefix>/s/us` in that case as well.
///
/// # Examples
///
/// - `["main"]` -> `c8y/s/us`
/// - `["child1", "main"]` -> `c8y/s/us/child1`
/// - `["child2", "child1", "main"]` -> `c8y/s/us/child1/child2`
pub fn publish_topic_from_ancestors(ancestors: &[impl AsRef<str>], prefix: &TopicPrefix) -> Topic {
let mut target_topic = format!("{prefix}/{SMARTREST_PUBLISH_TOPIC}");
for ancestor in ancestors.iter().rev().skip(1) {
// Skipping the last ancestor as it is the main device represented by the root topic itself
target_topic.push('/');
target_topic.push_str(ancestor.as_ref());
/// - `(Some("main"), "main", "c8y")` -> `c8y/s/us`
/// - `[Some("child1"), "main", "c8y"]` -> `c8y/s/us/child1`
/// - `[Some("service1"), "main", "c8y"]` -> `c8y/s/us/service1`
/// - `(None, "main", "c8y")` -> `c8y/s/us`
pub fn publish_topic_from_parent(
parent_xid: Option<&str>,
main_device_xid: &str,
prefix: &TopicPrefix,
) -> Topic {
if let Some(parent) = parent_xid {
if parent != main_device_xid {
return C8yTopic::ChildSmartRestResponse(parent.to_string())
.to_topic(prefix)
.unwrap();
}
}

Topic::new_unchecked(&target_topic)
C8yTopic::upstream_topic(prefix)
}

#[cfg(test)]
Expand Down Expand Up @@ -135,13 +138,12 @@ mod tests {
)
}

#[test_case(& ["main"], "c8y2/s/us")]
#[test_case(& ["foo"], "c8y2/s/us")]
#[test_case(& ["child1", "main"], "c8y2/s/us/child1")]
#[test_case(& ["child3", "child2", "child1", "main"], "c8y2/s/us/child1/child2/child3")]
fn topic_from_ancestors(ancestors: &[&str], topic: &str) {
#[test_case(None, "main-device", "c8y2/s/us")]
#[test_case(Some("child01"), "main-device", "c8y2/s/us/child01")]
#[test_case(Some("main-device"), "main-device", "c8y2/s/us")]
fn topic_from_parent(parent_xid: Option<&str>, main_device_xid: &str, topic: &str) {
let nested_child_topic =
publish_topic_from_ancestors(ancestors, &"c8y2".try_into().unwrap());
publish_topic_from_parent(parent_xid, main_device_xid, &"c8y2".try_into().unwrap());
assert_eq!(nested_child_topic, Topic::new_unchecked(topic));
}
}
19 changes: 19 additions & 0 deletions crates/core/tedge_api/src/entity_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,25 @@ impl EntityStore {
self.get(&self.main_device).unwrap().external_id.clone()
}

/// Returns the external id of the parent of the given entity.
/// Returns None for the main device, that doesn't have any parents.
pub fn parent_external_id(
&self,
entity_tid: &EntityTopicId,
) -> Result<Option<&EntityExternalId>, Error> {
let entity = self.try_get(entity_tid)?;
let parent_xid = entity.parent.as_ref().map(|tid| {
&self
.try_get(tid)
.expect(
"for every registered entity, its parent is also guaranteed to be registered",
)
.external_id
});

Ok(parent_xid)
}

/// Returns an ordered list of ancestors of the given entity
/// starting from the immediate parent all the way till the root main device.
/// The last parent in the list for any entity would always be the main device.
Expand Down
3 changes: 2 additions & 1 deletion crates/core/tedge_mapper/src/c8y/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ pub fn service_monitor_client_config(
c8y_mapper_name,
service_type.as_str(),
"down",
&[],
None,
main_device_xid.as_ref(),
prefix,
)?;

Expand Down
47 changes: 21 additions & 26 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use c8y_api::smartrest::smartrest_serializer::succeed_operation_with_id;
use c8y_api::smartrest::smartrest_serializer::succeed_operation_with_name;
use c8y_api::smartrest::smartrest_serializer::EmbeddedCsv;
use c8y_api::smartrest::smartrest_serializer::TextOrCsv;
use c8y_api::smartrest::topic::publish_topic_from_ancestors;
use c8y_api::smartrest::topic::C8yTopic;
use c8y_http_proxy::handle::C8YHttpProxy;
use c8y_http_proxy::messages::CreateEvent;
Expand Down Expand Up @@ -372,31 +371,29 @@ impl CumulocityConverter {
let display_type = input.other.get("type").and_then(|v| v.as_str());

let entity_topic_id = &input.topic_id;
let external_id = self
.entity_store
.try_get(entity_topic_id)
.map(|e| &e.external_id)?;
let entity = self.entity_store.try_get(entity_topic_id)?;
let external_id = &entity.external_id;
match input.r#type {
EntityType::MainDevice => {
self.entity_store.update(input.clone())?;
Ok(vec![])
}
EntityType::ChildDevice => {
let ancestors_external_ids =
self.entity_store.ancestors_external_ids(entity_topic_id)?;
let parent_xid = self.entity_store.parent_external_id(entity_topic_id)?;

let child_creation_message = child_device_creation_message(
external_id.as_ref(),
display_name,
display_type,
&ancestors_external_ids,
parent_xid.map(|xid| xid.as_ref()),
&self.device_name,
&self.config.bridge_config.c8y_prefix,
)
.context("Could not create device creation message")?;
Ok(vec![child_creation_message])
}
EntityType::Service => {
let ancestors_external_ids =
self.entity_store.ancestors_external_ids(entity_topic_id)?;
let parent_xid = self.entity_store.parent_external_id(entity_topic_id)?;

let service_creation_message = service_creation_message(
external_id.as_ref(),
Expand All @@ -407,7 +404,8 @@ impl CumulocityConverter {
}),
display_type.unwrap_or(&self.service_type),
"up",
&ancestors_external_ids,
parent_xid.map(|xid| xid.as_ref()),
&self.device_name,
&self.config.bridge_config.c8y_prefix,
)
.context("Could not create service creation message")?;
Expand All @@ -423,14 +421,13 @@ impl CumulocityConverter {
entity_topic_id: &EntityTopicId,
) -> Result<Topic, ConversionError> {
let entity = self.entity_store.try_get(entity_topic_id)?;

let mut ancestors_external_ids =
self.entity_store.ancestors_external_ids(entity_topic_id)?;
ancestors_external_ids.insert(0, entity.external_id.as_ref().into());
Ok(publish_topic_from_ancestors(
&ancestors_external_ids,
let topic = C8yTopic::smartrest_response_topic(
&entity.external_id,
&entity.r#type,
&self.config.bridge_config.c8y_prefix,
))
)
.expect("Topic must have been valid as the external id is pre-validated");
Ok(topic)
}

/// Generates external ID of the given entity.
Expand Down Expand Up @@ -610,19 +607,17 @@ impl CumulocityConverter {

pub async fn process_health_status_message(
&mut self,
entity: &EntityTopicId,
entity_tid: &EntityTopicId,
message: &MqttMessage,
) -> Result<Vec<MqttMessage>, ConversionError> {
let entity_metadata = self
.entity_store
.get(entity)
.expect("entity was registered");
let entity = self.entity_store.try_get(entity_tid)?;
let parent_xid = self.entity_store.parent_external_id(entity_tid)?;

let ancestors_external_ids = self.entity_store.ancestors_external_ids(entity)?;
Ok(convert_health_status_message(
&self.config.mqtt_schema,
entity_metadata,
&ancestors_external_ids,
entity,
parent_xid,
&self.entity_store.main_device_external_id(),
message,
&self.config.bridge_config.c8y_prefix,
))
Expand Down
23 changes: 15 additions & 8 deletions crates/extensions/c8y_mapper_ext/src/service_monitor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use c8y_api::smartrest;
use tedge_api::entity_store::EntityExternalId;
use tedge_api::entity_store::EntityMetadata;
use tedge_api::entity_store::EntityType;
use tedge_api::mqtt_topics::MqttSchema;
Expand Down Expand Up @@ -26,7 +27,8 @@ pub fn is_c8y_bridge_established(
pub fn convert_health_status_message(
mqtt_schema: &MqttSchema,
entity: &EntityMetadata,
ancestors_external_ids: &[String],
parent_xid: Option<&EntityExternalId>,
main_device_xid: &EntityExternalId,
message: &MqttMessage,
prefix: &TopicPrefix,
) -> Vec<MqttMessage> {
Expand Down Expand Up @@ -56,7 +58,8 @@ pub fn convert_health_status_message(
display_name,
display_type,
&status.to_string(),
ancestors_external_ids,
parent_xid.map(|v| v.as_ref()),
main_device_xid.as_ref(),
prefix,
) else {
error!("Can't create 102 for service status update");
Expand Down Expand Up @@ -174,7 +177,7 @@ mod tests {
"service-monitoring-mosquitto-bridge-unknown-status"
)]
fn translate_health_status_to_c8y_service_monitoring_message(
device_name: &str,
main_device_id: &str,
health_topic: &str,
health_payload: &str,
c8y_monitor_topic: &str,
Expand All @@ -193,7 +196,7 @@ mod tests {

let temp_dir = tempfile::tempdir().unwrap();
let main_device_registration =
EntityRegistrationMessage::main_device(device_name.to_string());
EntityRegistrationMessage::main_device(main_device_id.to_string());
let mut entity_store = EntityStore::with_main_device_and_default_service_type(
MqttSchema::default(),
main_device_registration,
Expand All @@ -220,14 +223,18 @@ mod tests {
entity_store.update(entity_registration).unwrap();

let entity = entity_store.get(&entity_topic_id).unwrap();
let ancestors_external_ids = entity_store
.ancestors_external_ids(&entity_topic_id)
.unwrap();
let parent = entity
.parent
.as_ref()
.filter(|tid| *tid != "device/main//")
.map(|tid| &entity_store.try_get(tid).unwrap().external_id);
dbg!(&parent);

let msg = convert_health_status_message(
&mqtt_schema,
entity,
&ancestors_external_ids,
parent,
&main_device_id.into(),
&health_message,
&"c8y".try_into().unwrap(),
);
Expand Down
4 changes: 2 additions & 2 deletions crates/extensions/c8y_mapper_ext/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ async fn child_device_registration_mapping() {
assert_received_contains_str(
&mut mqtt,
[(
"c8y/s/us/test-device:device:child1/test-device:device:child2",
"c8y/s/us/test-device:device:child2",
"101,child3,child3,thin-edge.io-child",
)],
)
Expand Down Expand Up @@ -326,7 +326,7 @@ async fn service_registration_mapping() {
assert_received_contains_str(
&mut mqtt,
[(
"c8y/s/us/test-device:device:child1/test-device:device:child2",
"c8y/s/us/test-device:device:child2",
"102,test-device:device:child2:service:collectd,systemd,Collectd,up",
)],
)
Expand Down
4 changes: 2 additions & 2 deletions crates/extensions/tedge_mqtt_ext/src/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ pub fn assert_message_contains_str(message: &MqttMessage, expected: (&str, &str)
let expected_payload = expected.1;
assert!(
TopicFilter::new_unchecked(expected_topic).accept(message),
"\nReceived unexpected message: {:?} \nExpected: {expected_payload:?}",
"\nReceived unexpected message: {:?} \nExpected message with topic: {expected_topic}, payload: {expected_payload}",
message
);
let payload = message.payload_str().expect("non UTF-8 payload");
assert!(
payload.contains(expected_payload),
"Payload assertion failed.\n Actual: {payload:?} \nExpected: {expected_payload:?}",
"Payload assertion failed.\n Actual: {payload:?} \nExpected message with topic: {expected_topic}, payload: {expected_payload}",
)
}

Expand Down
Loading