diff --git a/examples/gcloud.rs b/examples/gcloud.rs index 06e2ad1..d1de492 100644 --- a/examples/gcloud.rs +++ b/examples/gcloud.rs @@ -40,7 +40,7 @@ fn main() -> Result<(), io::Error> { .set_keep_alive(10) .set_security_opts(security_options); - let (mut mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap(); + let (mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap(); let topic = "/devices/".to_owned() + &config.id + "/events/imu"; thread::spawn(move || { diff --git a/examples/httpconnect.rs b/examples/httpconnect.rs index 9bb223e..055dcdf 100644 --- a/examples/httpconnect.rs +++ b/examples/httpconnect.rs @@ -28,7 +28,7 @@ fn main() { .set_reconnect_opts(reconnect_options) .set_proxy(proxy); - let (mut mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap(); + let (mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap(); mqtt_client.subscribe("hello/world", QoS::AtLeastOnce).unwrap(); diff --git a/examples/keepalive.rs b/examples/keepalive.rs index 5937d58..383d305 100644 --- a/examples/keepalive.rs +++ b/examples/keepalive.rs @@ -9,7 +9,7 @@ fn main() { .set_keep_alive(10) .set_reconnect_opts(reconnect_options); - let (mut mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap(); + let (mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap(); thread::spawn(move || { thread::sleep(Duration::from_secs(2)); diff --git a/examples/playpause.rs b/examples/playpause.rs index 3686b26..7442181 100644 --- a/examples/playpause.rs +++ b/examples/playpause.rs @@ -5,12 +5,12 @@ fn main() { pretty_env_logger::init(); let mqtt_options = MqttOptions::new("test-id", "127.0.0.1", 1883).set_keep_alive(10); - let (mut mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap(); + let (mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap(); mqtt_client.subscribe("hello/world", QoS::AtLeastOnce).unwrap(); - let mut c1 = mqtt_client.clone(); - let mut c2 = mqtt_client.clone(); + let c1 = mqtt_client.clone(); + let c2 = mqtt_client.clone(); thread::spawn(move || { let dur = Duration::new(1, 0); diff --git a/examples/pubsub1.rs b/examples/pubsub1.rs index cc029a3..2f4d02f 100644 --- a/examples/pubsub1.rs +++ b/examples/pubsub1.rs @@ -15,7 +15,7 @@ fn main() { .set_reconnect_opts(reconnection_options) .set_clean_session(false); - let (mut mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap(); + let (mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap(); mqtt_client.subscribe("hello/world", QoS::AtLeastOnce).unwrap(); thread::spawn(move || { diff --git a/examples/pubsub2.rs b/examples/pubsub2.rs index b7c5d40..4bc752c 100644 --- a/examples/pubsub2.rs +++ b/examples/pubsub2.rs @@ -4,7 +4,7 @@ use std::{thread, time::Duration}; fn main() { pretty_env_logger::init(); let mqtt_options = MqttOptions::new("test-pubsub2", "127.0.0.1", 1883).set_keep_alive(10); - let (mut mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap(); + let (mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap(); //mqtt_client.subscribe("hello/world", QoS::ExactlyOnce).unwrap(); diff --git a/examples/select.rs b/examples/select.rs index 8abcc9c..cd10a5f 100644 --- a/examples/select.rs +++ b/examples/select.rs @@ -6,7 +6,7 @@ use std::{thread, time::Duration}; fn main() { pretty_env_logger::init(); let mqtt_options = MqttOptions::new("test-id", "127.0.0.1", 1883).set_keep_alive(30); - let (mut mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap(); + let (mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap(); let (done_tx, done_rx) = crossbeam_channel::bounded(1); mqtt_client.subscribe("hello/world", QoS::AtLeastOnce).unwrap(); diff --git a/examples/shutdown.rs b/examples/shutdown.rs index 8e4a9ec..b9491e1 100644 --- a/examples/shutdown.rs +++ b/examples/shutdown.rs @@ -5,7 +5,7 @@ fn main() { pretty_env_logger::init(); let mqtt_options = MqttOptions::new("test-id-1", "localhost", 1883).set_keep_alive(10); - let (mut mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap(); + let (mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap(); thread::spawn(move || { thread::sleep(Duration::from_secs(5)); diff --git a/examples/tls.rs b/examples/tls.rs index 14f9904..6a30385 100644 --- a/examples/tls.rs +++ b/examples/tls.rs @@ -15,7 +15,7 @@ fn main() { .set_client_auth(client_cert, client_key) .set_keep_alive(10); - let (mut mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap(); + let (mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap(); let topic = "hello/world"; thread::spawn(move || { diff --git a/src/client/mod.rs b/src/client/mod.rs index 585a5bf..6020f0a 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -102,7 +102,7 @@ impl MqttClient { } /// Requests the eventloop for mqtt publish - pub fn publish(&mut self, topic: S, qos: QoS, retained: B, payload: V) -> Result<(), ClientError> + pub fn publish(&self, topic: S, qos: QoS, retained: B, payload: V) -> Result<(), ClientError> where S: Into, V: Into>, @@ -122,13 +122,13 @@ impl MqttClient { payload: Arc::new(payload), }; - let tx = &mut self.request_tx; + let tx = self.request_tx.clone(); tx.send(Request::Publish(publish)).wait()?; Ok(()) } /// Requests the eventloop for mqtt subscribe - pub fn subscribe(&mut self, topic: S, qos: QoS) -> Result<(), ClientError> + pub fn subscribe(&self, topic: S, qos: QoS) -> Result<(), ClientError> where S: Into, { @@ -141,13 +141,13 @@ impl MqttClient { topics: vec![topic], }; - let tx = &mut self.request_tx; + let tx = self.request_tx.clone(); tx.send(Request::Subscribe(subscribe)).wait()?; Ok(()) } /// Requests the eventloop for mqtt unsubscribe - pub fn unsubscribe(&mut self, topic: S) -> Result<(), ClientError> + pub fn unsubscribe(&self, topic: S) -> Result<(), ClientError> where S: Into, { @@ -156,7 +156,7 @@ impl MqttClient { topics: vec![topic.into()], }; - let tx = &mut self.request_tx; + let tx = self.request_tx.clone(); tx.send(Request::Unsubscribe(unsubscribe)).wait()?; Ok(()) } @@ -166,24 +166,24 @@ impl MqttClient { /// network for reconnection /// /// [Resume]: struct.MqttClient.html#method.resume - pub fn pause(&mut self) -> Result<(), ClientError> { - let tx = &mut self.command_tx; + pub fn pause(&self) -> Result<(), ClientError> { + let tx = self.command_tx.clone(); tx.send(Command::Pause).wait()?; Ok(()) } /// Commands the network eventloop to reconnect to the broker and /// resume network io - pub fn resume(&mut self) -> Result<(), ClientError> { - let tx = &mut self.command_tx; + pub fn resume(&self) -> Result<(), ClientError> { + let tx = self.command_tx.clone(); tx.send(Command::Resume).wait()?; Ok(()) } /// Commands the network eventloop to gracefully shutdown /// the connection to the broker. - pub fn shutdown(&mut self) -> Result<(), ClientError> { - let tx = &mut self.request_tx; + pub fn shutdown(&self) -> Result<(), ClientError> { + let tx = self.request_tx.clone(); tx.send(Request::Disconnect).wait()?; Ok(()) }