Skip to content

Commit

Permalink
Merge pull request #3301 from Bravo555/fix/3297/c8y-devicecontrol-spl…
Browse files Browse the repository at this point in the history
…it-on-newlines

fix: Collect multiple smartrest messages from c8y devicecontrol topic
  • Loading branch information
reubenmiller authored Dec 16, 2024
2 parents 37c5073 + 6748f0e commit db7b497
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 25 deletions.
10 changes: 5 additions & 5 deletions crates/core/c8y_api/src/smartrest/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ pub fn get_failure_reason_for_smartrest(input: &[u8], max_size: usize) -> String
/// Split MQTT message payload to multiple SmartREST messages.
///
/// ```
/// use c8y_api::smartrest::message::collect_smartrest_messages;
/// use c8y_api::smartrest::message::collect_c8y_messages;
/// let data = "511,device,echo hello\n511,device,\"echo hello\necho world\"";
/// let messages = collect_smartrest_messages(data);
/// let messages = collect_c8y_messages(data);
/// assert_eq!(messages[0], "511,device,echo hello");
/// assert_eq!(messages[1], "511,device,\"echo hello\necho world\"");
/// ```
pub fn collect_smartrest_messages(data: &str) -> Vec<String> {
pub fn collect_c8y_messages(data: &str) -> Vec<String> {
let mut stack: Vec<char> = Vec::new();
let mut smartrest_messages: Vec<String> = Vec::new();
let mut is_inside = false; // Inside an outermost double quote block or not.
Expand Down Expand Up @@ -228,7 +228,7 @@ mod tests {
#[test]
fn split_single_smartrest_message() {
let data = r#"528,DeviceSerial,softwareA,1.0,url1,install,softwareB,2.0,url2,install"#;
let message = collect_smartrest_messages(data);
let message = collect_c8y_messages(data);
assert_eq!(
message[0],
r#"528,DeviceSerial,softwareA,1.0,url1,install,softwareB,2.0,url2,install"#
Expand All @@ -250,7 +250,7 @@ echo world"
524,DeviceSerial,"something",http://www.my.url,type
511,device,511,rina0005,echo \\\"#;

let messages = collect_smartrest_messages(data);
let messages = collect_c8y_messages(data);

assert_eq!(messages[0], r#"511,device,echo hello"#);
assert_eq!(
Expand Down
4 changes: 2 additions & 2 deletions crates/extensions/c8y_firmware_manager/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::worker::IdDownloadRequest;
use crate::worker::IdDownloadResult;
use crate::worker::OperationOutcome;
use async_trait::async_trait;
use c8y_api::smartrest::message::collect_smartrest_messages;
use c8y_api::smartrest::message::collect_c8y_messages;
use c8y_api::smartrest::message::get_smartrest_template_id;
use c8y_api::smartrest::message_ids::FIRMWARE;
use c8y_api::smartrest::smartrest_deserializer::SmartRestFirmwareRequest;
Expand Down Expand Up @@ -131,7 +131,7 @@ impl FirmwareManagerActor {
&mut self,
message: MqttMessage,
) -> Result<(), FirmwareManagementError> {
for smartrest_message in collect_smartrest_messages(message.payload_str()?) {
for smartrest_message in collect_c8y_messages(message.payload_str()?) {
let smartrest_template_id = get_smartrest_template_id(&smartrest_message);
let result = match smartrest_template_id.as_str().parse::<usize>() {
Ok(id) if id == FIRMWARE => {
Expand Down
48 changes: 30 additions & 18 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use c8y_api::json_c8y_deserializer::C8ySoftwareUpdate;
use c8y_api::smartrest::error::SmartRestDeserializerError;
use c8y_api::smartrest::inventory::child_device_creation_message;
use c8y_api::smartrest::inventory::service_creation_message;
use c8y_api::smartrest::message::collect_smartrest_messages;
use c8y_api::smartrest::message::collect_c8y_messages;
use c8y_api::smartrest::message::get_failure_reason_for_smartrest;
use c8y_api::smartrest::message::get_smartrest_device_id;
use c8y_api::smartrest::message::get_smartrest_template_id;
Expand Down Expand Up @@ -632,24 +632,36 @@ impl CumulocityConverter {
&mut self,
message: &MqttMessage,
) -> Result<Vec<MqttMessage>, ConversionError> {
let operation = C8yOperation::from_json(message.payload.as_str()?)?;
let device_xid = operation.external_source.external_id;
let cmd_id = self.command_id.new_id_with_str(&operation.op_id);
// JSON over MQTT messages on c8y/devicecontrol/notifications can contain multiple operations in a single MQTT
// message, so split them
let operation_payloads = collect_c8y_messages(message.payload_str()?);

let mut output = vec![];
for operation_payload in operation_payloads {
let operation = C8yOperation::from_json(operation_payload.as_str())?;
let device_xid = operation.external_source.external_id;
let cmd_id = self.command_id.new_id_with_str(&operation.op_id);

if self.active_commands.contains(&cmd_id) {
info!("{cmd_id} is already addressed");
return Ok(vec![]);
}

if self.active_commands.contains(&cmd_id) {
info!("{cmd_id} is already addressed");
return Ok(vec![]);
}
// wrap operation payload in a dummy MqttMessage wrapper because the code below assumes 1 MQTT message = 1 operation
// TODO: refactor to avoid this intermediate step and extra copies
let operation_message = MqttMessage::new(&message.topic, operation_payload);

let result = self
.process_json_over_mqtt(
device_xid,
operation.op_id.clone(),
&operation.extras,
message,
)
.await;
let output = self.handle_c8y_operation_result(&result, Some(operation.op_id));
let result = self
.process_json_over_mqtt(
device_xid,
operation.op_id.clone(),
&operation.extras,
&operation_message,
)
.await;
let result = self.handle_c8y_operation_result(&result, Some(operation.op_id));
output.extend(result);
}

Ok(output)
}
Expand Down Expand Up @@ -872,7 +884,7 @@ impl CumulocityConverter {
message: &MqttMessage,
) -> Result<Vec<MqttMessage>, ConversionError> {
let mut output: Vec<MqttMessage> = Vec::new();
for smartrest_message in collect_smartrest_messages(message.payload_str()?) {
for smartrest_message in collect_c8y_messages(message.payload_str()?) {
let result = self.process_smartrest(smartrest_message.as_str()).await;
let mut msgs = self.handle_c8y_operation_result(&result, None);
output.append(&mut msgs)
Expand Down
78 changes: 78 additions & 0 deletions crates/extensions/c8y_mapper_ext/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2017,6 +2017,84 @@ async fn json_custom_operation_status_update_with_operation_id() {
assert_received_contains_str(&mut mqtt, [("c8y/s/us", "506,1234,\"do something\n\"")]).await;
}

#[tokio::test]
async fn json_custom_operation_status_multiple_operations_in_one_mqtt_message() {
let ttd = TempTedgeDir::new();
ttd.dir("operations")
.dir("c8y")
.file("c8y_Command")
.with_raw_content(
r#"[exec]
command = "echo ${.payload.c8y_Command.text}"
on_fragment = "c8y_Command"
"#,
);

let config = C8yMapperConfig {
smartrest_use_operation_id: true,
..test_mapper_config(&ttd)
};
let test_handle = spawn_c8y_mapper_actor_with_config(&ttd, config, true).await;
let TestHandle { mqtt, http, .. } = test_handle;
spawn_dummy_c8y_http_proxy(http);

let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS);

skip_init_messages(&mut mqtt).await;

// Simulate c8y_Command SmartREST request
let operation_1 = json!({
"status":"PENDING",
"id": "111",
"c8y_Command": {
"text": "do something 1"
},
"externalSource":{
"externalId":"test-device",
"type":"c8y_Serial"
}
})
.to_string();
let operation_2 = json!({
"status":"PENDING",
"id": "222",
"c8y_Command": {
"text": "do something 2"
},
"externalSource":{
"externalId":"test-device",
"type":"c8y_Serial"
}
})
.to_string();
let operation_3 = json!({
"status":"PENDING",
"id": "333",
"c8y_Command": {
"text": "do something 3"
},
"externalSource":{
"externalId":"test-device",
"type":"c8y_Serial"
}
})
.to_string();

let input_message = MqttMessage::new(
&Topic::new_unchecked("c8y/devicecontrol/notifications"),
[operation_1, operation_2, operation_3].join("\n"),
);
mqtt.send(input_message).await.expect("Send failed");

assert_received_contains_str(&mut mqtt, [("c8y/s/us", "504,111")]).await;
assert_received_contains_str(&mut mqtt, [("c8y/s/us", "504,222")]).await;
assert_received_contains_str(&mut mqtt, [("c8y/s/us", "504,333")]).await;

assert_received_contains_str(&mut mqtt, [("c8y/s/us", "506,111,\"do something 1\n\"")]).await;
assert_received_contains_str(&mut mqtt, [("c8y/s/us", "506,222,\"do something 2\n\"")]).await;
assert_received_contains_str(&mut mqtt, [("c8y/s/us", "506,333,\"do something 3\n\"")]).await;
}

#[tokio::test]
async fn json_custom_operation_status_update_with_operation_name() {
let ttd = TempTedgeDir::new();
Expand Down

0 comments on commit db7b497

Please sign in to comment.