diff --git a/crates/extensions/c8y_mapper_ext/src/actor.rs b/crates/extensions/c8y_mapper_ext/src/actor.rs index abbd50628e..460acc11d1 100644 --- a/crates/extensions/c8y_mapper_ext/src/actor.rs +++ b/crates/extensions/c8y_mapper_ext/src/actor.rs @@ -13,6 +13,7 @@ use tedge_actors::Builder; use tedge_actors::ClientMessageBox; use tedge_actors::CloneSender; use tedge_actors::DynSender; +use tedge_actors::LoggingReceiver; use tedge_actors::LoggingSender; use tedge_actors::MessageReceiver; use tedge_actors::MessageSink; @@ -41,6 +42,7 @@ use tedge_uploader_ext::UploadRequest; use tedge_uploader_ext::UploadResult; use tedge_utils::file::create_directory_with_defaults; use tedge_utils::file::FileError; +use tokio::select; const SYNC_WINDOW: Duration = Duration::from_secs(3); @@ -68,7 +70,7 @@ impl From for MqttMessage { } } -fan_in_message_type!(C8yMapperInput[MqttMessage, FsWatchEvent, SyncComplete, PublishMessage] : Debug); +fan_in_message_type!(C8yMapperInput[MqttMessage, FsWatchEvent, SyncComplete] : Debug); type C8yMapperOutput = MqttMessage; @@ -79,6 +81,16 @@ pub struct C8yMapperActor { timer_sender: LoggingSender, bridge_status_messages: SimpleMessageBox, message_handlers: HashMap>>, + /// Receiver for MqttMessages sent by other actors meant to be sent to MQTT actor to be published. + /// + /// Other actors may want to send MqttMessages but we want to ensure that the entity referred to in the message + /// topic exists; that's why we route them to the mapper rather than to MqttActor directly. + /// + /// Needs to be a separate receiver so that we can select! and drain the message queue ASAP. Otherwise + /// `process_mqtt_message` may involve sending messages to other actors, and if those other actors are blocked on + /// sending a `PublishMessage` because we're trying to send a message to them before processing their responses, + /// we'll hit a deadlock. + publish_messages: LoggingReceiver, } #[async_trait] @@ -111,20 +123,28 @@ impl Actor for C8yMapperActor { .send(SyncStart::new(SYNC_WINDOW, ())) .await?; - while let Some(event) = self.messages.recv().await { - match event { - C8yMapperInput::MqttMessage(message) => { - self.process_mqtt_message(message).await?; - } - C8yMapperInput::FsWatchEvent(event) => { - self.process_file_watch_event(event).await?; - } - C8yMapperInput::SyncComplete(_) => { - self.process_sync_timeout().await?; - } - C8yMapperInput::PublishMessage(message) => { + loop { + select! { + // in some tests may not be connected to the sender, so we need to ignore `None` + // here, but should get picked up by the main message box anyway + Some(message) = self.publish_messages.recv() => { self.mqtt_publisher.send(message.0).await?; } + event = self.messages.recv() => { + let Some(event) = event else { break; }; + match event { + C8yMapperInput::MqttMessage(message) => { + self.process_mqtt_message(message).await?; + } + C8yMapperInput::FsWatchEvent(event) => { + self.process_file_watch_event(event).await?; + } + C8yMapperInput::SyncComplete(_) => { + self.process_sync_timeout().await?; + } + } + } + else => break } } Ok(()) @@ -139,6 +159,7 @@ impl C8yMapperActor { timer_sender: LoggingSender, bridge_status_messages: SimpleMessageBox, message_handlers: HashMap>>, + publish_messages: LoggingReceiver, ) -> Self { Self { converter, @@ -147,6 +168,7 @@ impl C8yMapperActor { timer_sender, bridge_status_messages, message_handlers, + publish_messages, } } @@ -334,6 +356,7 @@ pub struct C8yMapperBuilder { uploader: ClientMessageBox, bridge_monitor_builder: SimpleMessageBoxBuilder, message_handlers: HashMap>>, + publish_message_box: SimpleMessageBoxBuilder, } impl C8yMapperBuilder { @@ -377,6 +400,8 @@ impl C8yMapperBuilder { let message_handlers = HashMap::new(); + let publish_message_box = SimpleMessageBoxBuilder::new("PublishMessage", 16); + Ok(Self { config, box_builder, @@ -387,6 +412,7 @@ impl C8yMapperBuilder { downloader, bridge_monitor_builder, message_handlers, + publish_message_box, }) } @@ -421,7 +447,7 @@ impl MessageSource> for C8yMapperBuilder { impl MessageSink for C8yMapperBuilder { fn get_sender(&self) -> DynSender { - self.box_builder.get_sender().sender_clone() + self.publish_message_box.get_sender().sender_clone() } } @@ -444,6 +470,9 @@ impl Builder for C8yMapperBuilder { let message_box = self.box_builder.build(); let bridge_monitor_box = self.bridge_monitor_builder.build(); + // discard the leftover sender; it was cloned to actors that want to send `PublishMessage`s in `connect_sink` + let (_, publish_messages) = self.publish_message_box.build().into_split(); + Ok(C8yMapperActor::new( converter, message_box, @@ -451,6 +480,7 @@ impl Builder for C8yMapperBuilder { timer_sender, bridge_monitor_box, self.message_handlers, + publish_messages, )) } }