Skip to content

Commit

Permalink
fix(thin-edge#3279): plug AvailabilityActor directly to MQTT actor
Browse files Browse the repository at this point in the history
Because the C8yMapperActor synchronously first receives a message from
the input channel, and then produces output by sending to the output
channel, if one of these channels block, the other one will also not be
processed.

In this case, there was a loop: C8yMapperActor sent `MqttMessage`s to
AvailabilityActor, and AvailabilityActor send `PublishMessage`s to
C8yMapperActor for it to relay it to MqttActor.

When a sender for `PublishMessage`s was full when AvailabilityActor
tried sending another message, it would block, so until this message was
processed, AvailabilityActor wasn't processing now input. However, if
before this output was sent by C8yMapperActor there was another message
to relay to AvailabilityActor, and the inbound channel was also full,
this would result in a deadlock where neither AvailabilityActor input or
output could be sent through.

Signed-off-by: Marcel Guzik <[email protected]>
  • Loading branch information
Bravo555 committed Dec 9, 2024
1 parent 98b0f1d commit cae10ef
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 15 deletions.
1 change: 1 addition & 0 deletions crates/core/tedge_mapper/src/c8y/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ impl TEdgeComponent for CumulocityMapper {
Some(AvailabilityBuilder::new(
AvailabilityConfig::try_new(&tedge_config, c8y_profile)?,
&mut c8y_mapper_actor,
&mut mqtt_actor,
&mut timer_actor,
))
} else {
Expand Down
16 changes: 7 additions & 9 deletions crates/extensions/c8y_mapper_ext/src/availability/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::actor::PublishMessage;
use crate::availability::actor::AvailabilityActor;
use crate::availability::AvailabilityConfig;
use crate::availability::AvailabilityInput;
Expand Down Expand Up @@ -32,19 +31,20 @@ pub struct AvailabilityBuilder {
impl AvailabilityBuilder {
pub fn new(
config: AvailabilityConfig,
mqtt: &mut (impl MessageSource<MqttMessage, Vec<ChannelFilter>> + MessageSink<PublishMessage>),
mqtt_in: &mut impl MessageSource<MqttMessage, Vec<ChannelFilter>>,
mqtt_out: &mut impl MessageSink<MqttMessage>,
timer: &mut impl Service<TimerStart, TimerComplete>,
) -> Self {
let mut box_builder: SimpleMessageBoxBuilder<AvailabilityInput, AvailabilityOutput> =
SimpleMessageBoxBuilder::new("AvailabilityMonitoring", 16);

box_builder.connect_mapped_source(
Self::channels(),
mqtt,
mqtt_in,
Self::mqtt_message_parser(config.clone()),
);

mqtt.connect_mapped_source(NoConfig, &mut box_builder, Self::mqtt_message_builder());
mqtt_out.connect_mapped_source(NoConfig, &mut box_builder, Self::mqtt_message_builder());

let timer_sender = timer.connect_client(box_builder.get_sender().sender_clone());

Expand Down Expand Up @@ -84,12 +84,10 @@ impl AvailabilityBuilder {
}
}

fn mqtt_message_builder() -> impl Fn(AvailabilityOutput) -> Option<PublishMessage> {
fn mqtt_message_builder() -> impl Fn(AvailabilityOutput) -> Option<MqttMessage> {
move |res| match res {
AvailabilityOutput::C8ySmartRestSetInterval117(value) => {
Some(PublishMessage(value.into()))
}
AvailabilityOutput::C8yJsonInventoryUpdate(value) => Some(PublishMessage(value.into())),
AvailabilityOutput::C8ySmartRestSetInterval117(value) => Some(MqttMessage::from(value)),
AvailabilityOutput::C8yJsonInventoryUpdate(value) => Some(MqttMessage::from(value)),
}
}
}
Expand Down
37 changes: 31 additions & 6 deletions crates/extensions/c8y_mapper_ext/src/availability/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::actor::PublishMessage;
use crate::availability::actor::TimerPayload;
use crate::availability::AvailabilityBuilder;
use crate::availability::AvailabilityConfig;
Expand Down Expand Up @@ -414,24 +413,50 @@ async fn timer_send(timer: &mut FakeServerBox<TimerStart, TimerComplete>, event:
}

struct TestHandler {
pub mqtt_box: SimpleMessageBox<PublishMessage, MqttMessage>,
pub mqtt_box: SimpleMessageBox<MqttMessage, MqttMessage>,
pub timer_box: FakeServerBox<TimerStart, TimerComplete>,
}

async fn spawn_availability_actor(config: AvailabilityConfig) -> TestHandler {
let mut mqtt_builder: SimpleMessageBoxBuilder<PublishMessage, MqttMessage> =
// create two separate Mqtt message boxes to circumvent "can't borrow the same object as mutable more than one time"
// use the message boxes because they implement `MessageSource` and `MessageSink` (receivers and senders alone do
// not) and then take out their halves not given to the actor and recombine them
//
// we need separate mqtt in and mqtt out because on the in-side CumulocityMapperActor attaches a `ChannelFilter`
// which we use to only get messages we care about, and on the out-side we want to output directly to the MQTT actor
// because on CumulocityMapperActor side the halves that send to AvailabilityActor and receive from
// AvailabilityActor aren't separate, so one can block the other
//
// TODO: find a cleaner and more permanent solution
let mut mqtt_in_builder: SimpleMessageBoxBuilder<MqttMessage, MqttMessage> =
SimpleMessageBoxBuilder::new("MQTT", 10);
let mut mqtt_out_builder: SimpleMessageBoxBuilder<MqttMessage, MqttMessage> =
SimpleMessageBoxBuilder::new("MQTT", 10);
let mut timer_builder: FakeServerBoxBuilder<TimerStart, TimerComplete> =
FakeServerBoxBuilder::default();

let availability_builder =
AvailabilityBuilder::new(config, &mut mqtt_builder, &mut timer_builder);
let availability_builder = AvailabilityBuilder::new(
config,
&mut mqtt_in_builder,
&mut mqtt_out_builder,
&mut timer_builder,
);

let actor = availability_builder.build();
tokio::spawn(async move { actor.run().await });

let mqtt_in_message_box = mqtt_in_builder.build();
let mqtt_out_message_box = mqtt_out_builder.build();

// in_receiver cloned to actor
let (in_tx, _) = mqtt_in_message_box.into_split();
// out_sender cloned to actor
let (_, out_rx) = mqtt_out_message_box.into_split();

let mqtt_box = SimpleMessageBox::new(out_rx, in_tx);

TestHandler {
mqtt_box: mqtt_builder.build(),
mqtt_box,
timer_box: timer_builder.build(),
}
}
Expand Down

0 comments on commit cae10ef

Please sign in to comment.