Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make async mqtt client implement Send #461

Merged
merged 1 commit into from
Jul 28, 2024

Conversation

HalfVoxel
Copy link
Contributor

There were already some Send implementations.
However, they weren't completely correct, which made it impossible to use the async mqtt client in e.g. a tokio worker thread.

Note that it could be used in the async main task, because that never sends off any work to other worker threads, and thus Send is not required. The errors would surface when trying to use tokio::spawn with an async mqtt client.

See also https://users.rust-lang.org/t/implementation-of-send-is-not-general-enough-but-cannot-make-it-more-general/115087

Also. I note that the Unblocker deletes the FreeRTOS task when it is dropped. This seems sketchy. Deleting the task will, if I understand correctly, cause it to stop running immediately, without running any associated Drop implementations. Couldn't this cause memory leaks? Wouldn't it be better to just let it close the channel and let the worker thread clean itself up?

There were already some Send implementations.
However, they weren't completely correct, which made it impossible
to use the async mqtt client in e.g. a tokio worker thread.

Note that it could be used in the async main task, because that never
sends off any work to other worker threads, and thus Send is not required.
The errors would surface when trying to use tokio::spawn with an async mqtt client.
src/private/zerocopy.rs Show resolved Hide resolved
@@ -182,8 +182,8 @@ where
}
}

unsafe impl<T> Send for Channel<T> where T: Send + 'static {}
unsafe impl<T> Sync for Channel<T> where T: Send + 'static {}
unsafe impl<'a, T> Send for Channel<T> where T: Send + 'a {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

src/private/zerocopy.rs Show resolved Hide resolved
@ivmarkov
Copy link
Collaborator

ivmarkov commented Jul 28, 2024

Note that it could be used in the async main task, because that never sends off any work to other worker threads, and thus Send is not required. The errors would surface when trying to use tokio::spawn with an async mqtt client.

To use async code with a work stealing executor, like tokio's default one (because you can still use tokio LocalSet), you need the Futures themselves to be Send.

Most of the futures in esp-idf-svc are not Send because this is often difficult to do and work-stealing is not such a big deal on embedded devices.

With that said, I agree that if it is relatively easy to make the futures of some driver/service Send, we should do it.
In your particular case we have hit the jackpot by just making Unblocker Send, and this works simply because the ESP IDF MqttClient is actually anything but natively async. So we simulate async-ness by spinning a hidden thread. Terrible, I know, but that's as much as we can do and I think even this is too much. We just need all-Rust App-layer network implementations like the ones provided by edge-net and other crates. Unfortunately, no all-rust, embedded-friendly MQTT client that supports the 3.1 protocol is available anywhere yet.

See also https://users.rust-lang.org/t/implementation-of-send-is-not-general-enough-but-cannot-make-it-more-general/115087

Ah! I take my words back for the 'a lifetime. Another Rust paperclip. :(

Also. I note that the Unblocker deletes the FreeRTOS task when it is dropped. This seems sketchy. Deleting the task will, if I understand correctly, cause it to stop running immediately, without running any associated Drop implementations. Couldn't this cause memory leaks? Wouldn't it be better to just let it close the channel and let the worker thread clean itself up?

Yes! This is spot-on. Would you enhance the PR with this fix? Basically

@HalfVoxel
Copy link
Contributor Author

Ah! I take my words back for the 'a lifetime. Another Rust paperclip. :(

Yes, I believe this is a limitation in the current rust compiler.

Essentially, rust does not understand that the original bound is enough.

The new bound

unsafe impl<'a, T> Send for Receiver<T> where T: Send + 'a {}

essentially says, for all lifetimes 'a where T: Send + 'a implement Send for Receiver<T>. But since Receiver<T> requires T to be 'static, this will not allow anything that is unsound.

@HalfVoxel
Copy link
Contributor Author

So we simulate async-ness by spinning a hidden thread. Terrible, I know, but that's as much as we can do and I think even this is too much.

Oof. That sounds horrible. 😭

@ivmarkov
Copy link
Collaborator

Since the 'a lifetimes seem to have to be there because of the rustc bug, I'll merge this change as-is.
Though I would appreciate if you take the effort to fix the FreeRTOS task deletion to happen from within the unblocker trait itself.

Need to review if we need the same unsafe pattern elsewhere. We might be...

@ivmarkov
Copy link
Collaborator

Thanks!

@ivmarkov
Copy link
Collaborator

ivmarkov commented Jul 28, 2024

So we simulate async-ness by spinning a hidden thread. Terrible, I know, but that's as much as we can do and I think even this is too much.

Oof. That sounds horrible. 😭

Didn't you see what Unblocker actually does? :p

I mean... if you would like to contribute a truly-async client to edge-net or - like - improve rust-mqtt (seems to be a bit of an abandonware ATM and does not really support 3.11) I would be super happy, but in the meantime, that's all we have...

@ivmarkov ivmarkov merged commit 148e4f1 into esp-rs:master Jul 28, 2024
0 of 15 checks passed
@HalfVoxel
Copy link
Contributor Author

Didn't you see what Unblocker actually does? :p

Hmm. Everything I've seen seems to indicate it uses a separate thread that waits using conditional variables and that kind of stuff. But I haven't seen anything that spins (i.e. looping using a short delay, which is how I interpret "spin").
Where is that code?

@ivmarkov
Copy link
Collaborator

ivmarkov commented Jul 28, 2024

Didn't you see what Unblocker actually does? :p

Hmm. Everything I've seen seems to indicate it uses a separate thread that waits using conditional variables and that kind of stuff. But I haven't seen anything that spins (i.e. looping using a short delay, which is how I interpret "spin"). Where is that code?

My English. :) I didn't mean to say that we busyloop by polling stuff with short sleeps in-between. That would truly be indeed very horrible.

By "spinning" I meant simply that the blocking MQTT ESP IDF client is turned into an async one by working it in a separate thread and then delegating to that separate thread the execution. And using condvars to sync indeed.

Still pretty gross, as that's not a "native" async. A bit like the hack how smol and async-std turn the blocking Rust STD file IO into "async". That's fake async as you spin a separate hidden thread in there, with its own stack etc.

@HalfVoxel
Copy link
Contributor Author

My English. :) I didn't mean to say that we busyloop by polling stuff with short sleeps in-between. That would truly be indeed very horrible.

Ah. I see!
Yeah, not the prettiest, but workable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants