Skip to content

Commit

Permalink
Merge pull request #2832 from Bravo555/fix/2728/mqtt-connection-error…
Browse files Browse the repository at this point in the history
…-delay

fix: apply delay to mqtt connect errors by default
  • Loading branch information
Bravo555 authored Apr 23, 2024
2 parents 8dfc2b2 + c0b0f6a commit ac6323a
Showing 1 changed file with 2 additions and 19 deletions.
21 changes: 2 additions & 19 deletions crates/common/mqtt_channel/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ use futures::StreamExt;
use log::error;
use log::info;
use rumqttc::AsyncClient;
use rumqttc::ConnectionError;
use rumqttc::Event;
use rumqttc::EventLoop;
use rumqttc::Incoming;
use rumqttc::Outgoing;
use rumqttc::Packet;
use rumqttc::StateError;
use std::time::Duration;
use tokio::time::sleep;

Expand Down Expand Up @@ -178,14 +176,11 @@ impl Connection {
host = config.broker.host,
port = config.broker.port
);
let should_delay = Connection::pause_on_error(&err);

// Errors on send are ignored: it just means the client has closed the receiving channel.
let _ = error_sender.send(err.into()).await;

if should_delay {
Connection::do_pause().await;
}
Connection::do_pause().await;
}
_ => (),
}
Expand Down Expand Up @@ -249,14 +244,11 @@ impl Connection {

Err(err) => {
error!("MQTT connection error: {err}");
let delay = Connection::pause_on_error(&err);

// Errors on send are ignored: it just means the client has closed the receiving channel.
let _ = error_sender.send(err.into()).await;

if delay {
Connection::do_pause().await;
}
Connection::do_pause().await;
}
_ => (),
}
Expand Down Expand Up @@ -305,15 +297,6 @@ impl Connection {
let _ = done.send(());
}

pub(crate) fn pause_on_error(err: &ConnectionError) -> bool {
matches!(
err,
rumqttc::ConnectionError::Io(_)
| rumqttc::ConnectionError::MqttState(StateError::Io(_))
| rumqttc::ConnectionError::MqttState(_)
)
}

pub(crate) async fn do_pause() {
sleep(Duration::from_secs(1)).await;
}
Expand Down

0 comments on commit ac6323a

Please sign in to comment.