Skip to content

Commit

Permalink
fix(#3279): Create a fast path for sending PublishMessages
Browse files Browse the repository at this point in the history
Solve the deadlock that can arise when C8yMapperActor can't complete
sending MqttMessage to some other actor because that other actor can't
complete the send of a `PublishMessage` to C8yMapperActor by creating a
fast path to drain PublishMessage buffer ASAP.

Previously when `PublishMessage`s were a part of the main message box we
could not prioritize them because we had to process messages in order;

separating out the receiver for `PublishMessage`s makes it so no other
messages can block `PublishMessage`s and allows us to use select! to
concurrently process them when processing other events blocks

Signed-off-by: Marcel Guzik <[email protected]>
  • Loading branch information
Bravo555 committed Dec 12, 2024
1 parent 3d77ad5 commit 5b89850
Showing 1 changed file with 44 additions and 14 deletions.
58 changes: 44 additions & 14 deletions crates/extensions/c8y_mapper_ext/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tedge_actors::Builder;
use tedge_actors::ClientMessageBox;
use tedge_actors::CloneSender;
use tedge_actors::DynSender;
use tedge_actors::LoggingReceiver;
use tedge_actors::LoggingSender;
use tedge_actors::MessageReceiver;
use tedge_actors::MessageSink;
Expand Down Expand Up @@ -41,6 +42,7 @@ use tedge_uploader_ext::UploadRequest;
use tedge_uploader_ext::UploadResult;
use tedge_utils::file::create_directory_with_defaults;
use tedge_utils::file::FileError;
use tokio::select;

const SYNC_WINDOW: Duration = Duration::from_secs(3);

Expand Down Expand Up @@ -68,7 +70,7 @@ impl From<PublishMessage> for MqttMessage {
}
}

fan_in_message_type!(C8yMapperInput[MqttMessage, FsWatchEvent, SyncComplete, PublishMessage] : Debug);
fan_in_message_type!(C8yMapperInput[MqttMessage, FsWatchEvent, SyncComplete] : Debug);

type C8yMapperOutput = MqttMessage;

Expand All @@ -79,6 +81,16 @@ pub struct C8yMapperActor {
timer_sender: LoggingSender<SyncStart>,
bridge_status_messages: SimpleMessageBox<MqttMessage, MqttMessage>,
message_handlers: HashMap<ChannelFilter, Vec<LoggingSender<MqttMessage>>>,
/// Receiver for MqttMessages sent by other actors meant to be sent to MQTT actor to be published.
///
/// Other actors may want to send MqttMessages but we want to ensure that the entity referred to in the message
/// topic exists; that's why we route them to the mapper rather than to MqttActor directly.
///
/// Needs to be a separate receiver so that we can select! and drain the message queue ASAP. Otherwise
/// `process_mqtt_message` may involve sending messages to other actors, and if those other actors are blocked on
/// sending a `PublishMessage` because we're trying to send a message to them before processing their responses,
/// we'll hit a deadlock.
publish_messages: LoggingReceiver<PublishMessage>,
}

#[async_trait]
Expand Down Expand Up @@ -111,20 +123,28 @@ impl Actor for C8yMapperActor {
.send(SyncStart::new(SYNC_WINDOW, ()))
.await?;

while let Some(event) = self.messages.recv().await {
match event {
C8yMapperInput::MqttMessage(message) => {
self.process_mqtt_message(message).await?;
}
C8yMapperInput::FsWatchEvent(event) => {
self.process_file_watch_event(event).await?;
}
C8yMapperInput::SyncComplete(_) => {
self.process_sync_timeout().await?;
}
C8yMapperInput::PublishMessage(message) => {
loop {
select! {
// in some tests may not be connected to the sender, so we need to ignore `None`
// here, but should get picked up by the main message box anyway
Some(message) = self.publish_messages.recv() => {
self.mqtt_publisher.send(message.0).await?;
}
event = self.messages.recv() => {
let Some(event) = event else { break; };
match event {
C8yMapperInput::MqttMessage(message) => {
self.process_mqtt_message(message).await?;
}
C8yMapperInput::FsWatchEvent(event) => {
self.process_file_watch_event(event).await?;
}
C8yMapperInput::SyncComplete(_) => {
self.process_sync_timeout().await?;
}
}
}
else => break
}
}
Ok(())
Expand All @@ -139,6 +159,7 @@ impl C8yMapperActor {
timer_sender: LoggingSender<SyncStart>,
bridge_status_messages: SimpleMessageBox<MqttMessage, MqttMessage>,
message_handlers: HashMap<ChannelFilter, Vec<LoggingSender<MqttMessage>>>,
publish_messages: LoggingReceiver<PublishMessage>,
) -> Self {
Self {
converter,
Expand All @@ -147,6 +168,7 @@ impl C8yMapperActor {
timer_sender,
bridge_status_messages,
message_handlers,
publish_messages,
}
}

Expand Down Expand Up @@ -334,6 +356,7 @@ pub struct C8yMapperBuilder {
uploader: ClientMessageBox<IdUploadRequest, IdUploadResult>,
bridge_monitor_builder: SimpleMessageBoxBuilder<MqttMessage, MqttMessage>,
message_handlers: HashMap<ChannelFilter, Vec<LoggingSender<MqttMessage>>>,
publish_message_box: SimpleMessageBoxBuilder<PublishMessage, PublishMessage>,
}

impl C8yMapperBuilder {
Expand Down Expand Up @@ -377,6 +400,8 @@ impl C8yMapperBuilder {

let message_handlers = HashMap::new();

let publish_message_box = SimpleMessageBoxBuilder::new("PublishMessage", 16);

Ok(Self {
config,
box_builder,
Expand All @@ -387,6 +412,7 @@ impl C8yMapperBuilder {
downloader,
bridge_monitor_builder,
message_handlers,
publish_message_box,
})
}

Expand Down Expand Up @@ -421,7 +447,7 @@ impl MessageSource<MqttMessage, Vec<ChannelFilter>> for C8yMapperBuilder {

impl MessageSink<PublishMessage> for C8yMapperBuilder {
fn get_sender(&self) -> DynSender<PublishMessage> {
self.box_builder.get_sender().sender_clone()
self.publish_message_box.get_sender().sender_clone()
}
}

Expand All @@ -444,13 +470,17 @@ impl Builder<C8yMapperActor> for C8yMapperBuilder {
let message_box = self.box_builder.build();
let bridge_monitor_box = self.bridge_monitor_builder.build();

// discard the leftover sender; it was cloned to actors that want to send `PublishMessage`s in `connect_sink`
let (_, publish_messages) = self.publish_message_box.build().into_split();

Ok(C8yMapperActor::new(
converter,
message_box,
mqtt_publisher,
timer_sender,
bridge_monitor_box,
self.message_handlers,
publish_messages,
))
}
}

0 comments on commit 5b89850

Please sign in to comment.