diff --git a/crates/core/c8y_api/src/smartrest/message.rs b/crates/core/c8y_api/src/smartrest/message.rs index b466789ed25..0e1b68452d6 100644 --- a/crates/core/c8y_api/src/smartrest/message.rs +++ b/crates/core/c8y_api/src/smartrest/message.rs @@ -78,13 +78,13 @@ pub fn get_failure_reason_for_smartrest(input: &[u8], max_size: usize) -> String /// Split MQTT message payload to multiple SmartREST messages. /// /// ``` -/// use c8y_api::smartrest::message::collect_smartrest_messages; +/// use c8y_api::smartrest::message::collect_c8y_messages; /// let data = "511,device,echo hello\n511,device,\"echo hello\necho world\""; -/// let messages = collect_smartrest_messages(data); +/// let messages = collect_c8y_messages(data); /// assert_eq!(messages[0], "511,device,echo hello"); /// assert_eq!(messages[1], "511,device,\"echo hello\necho world\""); /// ``` -pub fn collect_smartrest_messages(data: &str) -> Vec { +pub fn collect_c8y_messages(data: &str) -> Vec { let mut stack: Vec = Vec::new(); let mut smartrest_messages: Vec = Vec::new(); let mut is_inside = false; // Inside an outermost double quote block or not. @@ -228,7 +228,7 @@ mod tests { #[test] fn split_single_smartrest_message() { let data = r#"528,DeviceSerial,softwareA,1.0,url1,install,softwareB,2.0,url2,install"#; - let message = collect_smartrest_messages(data); + let message = collect_c8y_messages(data); assert_eq!( message[0], r#"528,DeviceSerial,softwareA,1.0,url1,install,softwareB,2.0,url2,install"# @@ -250,7 +250,7 @@ echo world" 524,DeviceSerial,"something",http://www.my.url,type 511,device,511,rina0005,echo \\\"#; - let messages = collect_smartrest_messages(data); + let messages = collect_c8y_messages(data); assert_eq!(messages[0], r#"511,device,echo hello"#); assert_eq!( diff --git a/crates/extensions/c8y_firmware_manager/src/actor.rs b/crates/extensions/c8y_firmware_manager/src/actor.rs index 00869fd3441..3d886c7a2ba 100644 --- a/crates/extensions/c8y_firmware_manager/src/actor.rs +++ b/crates/extensions/c8y_firmware_manager/src/actor.rs @@ -8,7 +8,7 @@ use crate::worker::IdDownloadRequest; use crate::worker::IdDownloadResult; use crate::worker::OperationOutcome; use async_trait::async_trait; -use c8y_api::smartrest::message::collect_smartrest_messages; +use c8y_api::smartrest::message::collect_c8y_messages; use c8y_api::smartrest::message::get_smartrest_template_id; use c8y_api::smartrest::message_ids::FIRMWARE; use c8y_api::smartrest::smartrest_deserializer::SmartRestFirmwareRequest; @@ -131,7 +131,7 @@ impl FirmwareManagerActor { &mut self, message: MqttMessage, ) -> Result<(), FirmwareManagementError> { - for smartrest_message in collect_smartrest_messages(message.payload_str()?) { + for smartrest_message in collect_c8y_messages(message.payload_str()?) { let smartrest_template_id = get_smartrest_template_id(&smartrest_message); let result = match smartrest_template_id.as_str().parse::() { Ok(id) if id == FIRMWARE => { diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index da2bbebd031..0e8adce2dee 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -28,7 +28,7 @@ use c8y_api::json_c8y_deserializer::C8ySoftwareUpdate; use c8y_api::smartrest::error::SmartRestDeserializerError; use c8y_api::smartrest::inventory::child_device_creation_message; use c8y_api::smartrest::inventory::service_creation_message; -use c8y_api::smartrest::message::collect_smartrest_messages; +use c8y_api::smartrest::message::collect_c8y_messages; use c8y_api::smartrest::message::get_failure_reason_for_smartrest; use c8y_api::smartrest::message::get_smartrest_device_id; use c8y_api::smartrest::message::get_smartrest_template_id; @@ -632,24 +632,36 @@ impl CumulocityConverter { &mut self, message: &MqttMessage, ) -> Result, ConversionError> { - let operation = C8yOperation::from_json(message.payload.as_str()?)?; - let device_xid = operation.external_source.external_id; - let cmd_id = self.command_id.new_id_with_str(&operation.op_id); + // JSON over MQTT messages on c8y/devicecontrol/notifications can contain multiple operations in a single MQTT + // message, so split them + let operation_payloads = collect_c8y_messages(message.payload_str()?); + + let mut output = vec![]; + for operation_payload in operation_payloads { + let operation = C8yOperation::from_json(operation_payload.as_str())?; + let device_xid = operation.external_source.external_id; + let cmd_id = self.command_id.new_id_with_str(&operation.op_id); + + if self.active_commands.contains(&cmd_id) { + info!("{cmd_id} is already addressed"); + return Ok(vec![]); + } - if self.active_commands.contains(&cmd_id) { - info!("{cmd_id} is already addressed"); - return Ok(vec![]); - } + // wrap operation payload in a dummy MqttMessage wrapper because the code below assumes 1 MQTT message = 1 operation + // TODO: refactor to avoid this intermediate step and extra copies + let operation_message = MqttMessage::new(&message.topic, operation_payload); - let result = self - .process_json_over_mqtt( - device_xid, - operation.op_id.clone(), - &operation.extras, - message, - ) - .await; - let output = self.handle_c8y_operation_result(&result, Some(operation.op_id)); + let result = self + .process_json_over_mqtt( + device_xid, + operation.op_id.clone(), + &operation.extras, + &operation_message, + ) + .await; + let result = self.handle_c8y_operation_result(&result, Some(operation.op_id)); + output.extend(result); + } Ok(output) } @@ -872,7 +884,7 @@ impl CumulocityConverter { message: &MqttMessage, ) -> Result, ConversionError> { let mut output: Vec = Vec::new(); - for smartrest_message in collect_smartrest_messages(message.payload_str()?) { + for smartrest_message in collect_c8y_messages(message.payload_str()?) { let result = self.process_smartrest(smartrest_message.as_str()).await; let mut msgs = self.handle_c8y_operation_result(&result, None); output.append(&mut msgs) diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index 57d9c6a1db3..4996e5cfae0 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -2017,6 +2017,84 @@ async fn json_custom_operation_status_update_with_operation_id() { assert_received_contains_str(&mut mqtt, [("c8y/s/us", "506,1234,\"do something\n\"")]).await; } +#[tokio::test] +async fn json_custom_operation_status_multiple_operations_in_one_mqtt_message() { + let ttd = TempTedgeDir::new(); + ttd.dir("operations") + .dir("c8y") + .file("c8y_Command") + .with_raw_content( + r#"[exec] + command = "echo ${.payload.c8y_Command.text}" + on_fragment = "c8y_Command" + "#, + ); + + let config = C8yMapperConfig { + smartrest_use_operation_id: true, + ..test_mapper_config(&ttd) + }; + let test_handle = spawn_c8y_mapper_actor_with_config(&ttd, config, true).await; + let TestHandle { mqtt, http, .. } = test_handle; + spawn_dummy_c8y_http_proxy(http); + + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + skip_init_messages(&mut mqtt).await; + + // Simulate c8y_Command SmartREST request + let operation_1 = json!({ + "status":"PENDING", + "id": "111", + "c8y_Command": { + "text": "do something 1" + }, + "externalSource":{ + "externalId":"test-device", + "type":"c8y_Serial" + } + }) + .to_string(); + let operation_2 = json!({ + "status":"PENDING", + "id": "222", + "c8y_Command": { + "text": "do something 2" + }, + "externalSource":{ + "externalId":"test-device", + "type":"c8y_Serial" + } + }) + .to_string(); + let operation_3 = json!({ + "status":"PENDING", + "id": "333", + "c8y_Command": { + "text": "do something 3" + }, + "externalSource":{ + "externalId":"test-device", + "type":"c8y_Serial" + } + }) + .to_string(); + + let input_message = MqttMessage::new( + &Topic::new_unchecked("c8y/devicecontrol/notifications"), + [operation_1, operation_2, operation_3].join("\n"), + ); + mqtt.send(input_message).await.expect("Send failed"); + + assert_received_contains_str(&mut mqtt, [("c8y/s/us", "504,111")]).await; + assert_received_contains_str(&mut mqtt, [("c8y/s/us", "504,222")]).await; + assert_received_contains_str(&mut mqtt, [("c8y/s/us", "504,333")]).await; + + assert_received_contains_str(&mut mqtt, [("c8y/s/us", "506,111,\"do something 1\n\"")]).await; + assert_received_contains_str(&mut mqtt, [("c8y/s/us", "506,222,\"do something 2\n\"")]).await; + assert_received_contains_str(&mut mqtt, [("c8y/s/us", "506,333,\"do something 3\n\"")]).await; +} + #[tokio::test] async fn json_custom_operation_status_update_with_operation_name() { let ttd = TempTedgeDir::new();