Skip to content
This repository has been archived by the owner on Jan 17, 2020. It is now read-only.

Cloning channels in public interface. #172

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open

Cloning channels in public interface. #172

wants to merge 1 commit into from

Conversation

leoniloris
Copy link
Contributor

@leoniloris leoniloris commented Sep 3, 2019

Closes #171

  • The publish method requires a mutable reference of self, but it does not seem to be really correct to mutate it in the publish/subscribe etc since those represent a command being sent to a channel.

  • This PR removes the mutable reference by cloning the channels, since, channels are supposed to be cloned by design.

Copy link
Contributor

@TheBestJohn TheBestJohn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me. I don't see any issues with cloning the tx channel. I was actually just running into this issue myself. Going to test.

Copy link
Contributor

@TheBestJohn TheBestJohn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Played with this a bit today. I was able to do a whole load of concurrent requests and everything seemed to work nicely. As far as I'm concerned this is a good pull. I did run into some ALPN issues but that's a separate issue I think.

@leoniloris
Copy link
Contributor Author

Hello @tekjar. Could you please take a look into this?

@tekjar
Copy link

tekjar commented Sep 6, 2019

@leoniloris I'll check this today. Wish github had some sort of reminder option

@tekjar
Copy link

tekjar commented Sep 6, 2019

@leoniloris @TheBestJohn I'm not sure if cloning the channel everytime you do a publish is the right way to go here because I'm not sure of the costs involved (Remember channel implementation doubling the capacity for every clone).

I want to understand how "usefulness of channels" and removing &muts are related. What wrong with cloning the client itself?

@leoniloris leoniloris changed the title Cloning channels so the client using the library can enjoy the usefulness of channels. Cloning channels in public interface. Sep 6, 2019
@leoniloris
Copy link
Contributor Author

Thanks for the answer @tekjar !

  • So, This PR is just one suggestion for not needing to clone the client, since this would be more costly than just cloning the channels.

  • Besides, it becomes a little cumbersome to demand all the layers above the MqttClient to be mutable just to post in a channel,

Do you have a suggestion?

@TheBestJohn
Copy link
Contributor

Adding my use case here, I've been playing with connecting this library to the Rocket web framework. I'm receiving a POST request, doing some auth and validation, then sending an MQTT message. Rocket can manage the state of anything with the <Send> and <Sync> traits. However, needing to be borrowed mutably would force me to put this entire struct into a Mutex if I wanted my requests to be properly async leaving me with State<Mutex<MqttClient>>. This change makes using it with the library much more straightforward but I do admit I hadn't looked at any of the memory consumption factors when I did my testing, just performance.

as a note, if you want to do this you will have to excise the ALPN functionality as Rocket uses an older version of Ring and thus older rustls, and thus has a different type for the ALPN configurations &[String] instead of &[Vec<u8>]. I'm trying to figure out a fix for this but in the meantime I've just removed it from the lib locally

@TheBestJohn
Copy link
Contributor

TheBestJohn commented Sep 9, 2019

Also according to the documentation on crossbeam here.

Note that cloning only creates a new handle to the same sending or receiving side. It does not create a separate stream of messages in any way

and shortly thereafter they say that

It's also possible to share senders and receivers by reference

This seems to suggest that the two methods to do this are about the same.

@tekjar
Copy link

tekjar commented Sep 10, 2019

@TheBestJohn crossbeam channels are convenient. We don't have to use &mut to send data over crossbeam channel. But we don't use crossbeam channel to handle user requests as it doesn't support futures Stream.

The problem with futures channel is capacity doubling for every temporary clone. I don't think this is the correct way to go here. I think @stjepang is working on crossbeam channel for async-std. We'll have to wait and see

Rocket can manage the state of anything with the <Send> and <Sync> traits. However, needing to be borrowed mutably would force me to put this entire struct into a Mutex if I wanted my requests to be properly async leaving me with State<Mutex<MqttClient>>

Just to make sure that I understand this completely, You have a struct which has both rocket handler and MqttClient where rocket handler is send + sync while MqttClient is not causing the inconvenience?

@TheBestJohn
Copy link
Contributor

TheBestJohn commented Sep 10, 2019

Oh it's not using crossbeam there. Sorry, my mistake.

edit: Ah and looking into this I can see you here: crossbeam-rs/crossbeam-channel#22

I've gone down the rabbit-hole and ended up here: https://stjepang.github.io/2019/03/02/new-channels.html Pretty interesting read and I'm starting to get what the problem is.

Just to make sure that I understand this completely, You have a struct which has both rocket handler and MqttClient where rocket handler is send + sync while MqttClient is not causing the inconvenience?

Sort of. State<T> allows for Rocket to take ownership of any type that implements Send and Sync. While MqttClient itself is thread safe, it's publish method is inherently racy as you can't have two threads trying to get a mutable ref to it at the same time. The only other way to fix this, as far as I understand, would be to wrap the client in a mutex or RWLock and lock it whenever I want to add something to the send queue. With that comes a whole bunch of error handling and managing unsent publishes for re-try which at that point it's almost better to also wrap it in an Arc and put the whole interaction into another thread or some other form that would essentially work out to this functionality.

It just seems the nature of this lib being focused on async with the server, lends itself to the idea that the library should be able to accommodate an application or library's interaction with rumqtt in an async manner as well. Maybe that's naïve and asking too much? I don't know, and please don't take it as a criticism of your work. I don't fully understand the library but hope to know more.

There's a thread on mutable refs in State here

I can provide a MWE for demo purposes as well if you'd like to take a look.

@tekjar
Copy link

tekjar commented Sep 12, 2019

Yeah a minimal example will help

@TheBestJohn
Copy link
Contributor

TheBestJohn commented Sep 12, 2019

Here you are. It uses my fork of rumqtt because of issues with rustls versions. Swap it back to head if you'd like to see those too

https://github.com/TheBestJohn/mwe_rocket_rumqtt

@TheBestJohn
Copy link
Contributor

TheBestJohn commented Sep 24, 2019

Updated the MWE to use Rocket head directly with this branch. Rocket updated it's TLS stuff so the ALPN issues no longer arise.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Is it needed the mutable reference in publish method?
3 participants