-
Notifications
You must be signed in to change notification settings - Fork 41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Initial x11rb-async implementation #790
Conversation
Codecov ReportBase: 13.69% // Head: 12.46% // Decreases project coverage by
Additional details and impacted files@@ Coverage Diff @@
## master #790 +/- ##
==========================================
- Coverage 13.69% 12.46% -1.24%
==========================================
Files 142 180 +38
Lines 120124 133177 +13053
==========================================
+ Hits 16454 16602 +148
- Misses 103670 116575 +12905
Flags with carried forward coverage won't be shown. Click here to find out more.
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
Looks like whatever I did back then is still in some branches: https://github.com/psychon/x11rb/tree/async-experiments https://github.com/psychon/x11rb/tree/async-experiments2 I don't really remember details, but I do remember that I did not use the code generator, but instead had request sending implemented based on the traits that are now in x11rb-protocol ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave this a look and ended up being confused about the locking strategy. Sorry, I don't have time to look at this more carefully right now.
} | ||
|
||
fn discard_reply(&self, sequence: SequenceNumber, kind: RequestKind, mode: DiscardMode) { | ||
// Doesn't block. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well.... depends on your definition of "block". This does not perform I/O, but it does lock a mutex (which is supposed not to be held for long). I guess this is fine, right?
(Same comment for other "Doesn't block"-comments)
Edit: Actually, this is generic code. I could implement the trait with something that just sleeps for 5 minutes. But I guess it's not worth caring about such malicious implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's not like anything unsound can happen if this blocks. It's just that, unless there's a buggy/malicious implementation on our hands (which we shouldn't assume), it shouldn't block for a protracted period of time.
Also, it's fine to lock std::sync::Mutex
es in async code as long as you don't hold their guards across await
points, see this point in the tokio
documentation.
let sequence = cookie.sequence_number(); | ||
mem::forget(cookie); | ||
sequence | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't read the new cookies.rs
yet, but x11rb
's Cookie
has an into_sequence_number()
for the above sequence.
Edit: ....that is only pub(crate)
, but could still be good enough for this use here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think we might want to expose that method as pub
.
} | ||
|
||
/// Eat the cookie and return the connection. | ||
fn consume(self) -> (&'conn C, SequenceNumber) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, if you make this pub(crate)
, you could use this for my into_sequence_number()
comment from elsewhere. Dunno if that makes sense.
|
||
/// Multiple IPs. | ||
Multiple(std::vec::IntoIter<IpAddr>), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This smells like smallvec
and also a bit like premature optimisation. Why not always use a Vec
? That one memory allocation for the Single
case will be practically unmeasurable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, although I'd rather use tinyvec
if we go in this direction
/// Connect to the X11 server. | ||
pub async fn connect(dpy_name: Option<&str>) -> Result<(Self, usize), ConnectError> { | ||
let (stream, screen) = nb_connect::connect(dpy_name).await?; | ||
Ok((Self::connect_to_stream(stream, screen).await?, screen)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No .Xauthority
"fun"? I guess that deserves at least a TODO
comment somewhere.
(Yay, I guess that means the Xauth code needs to be made public somewhere... do we need another crate besides x11rb-protocol
for such shared code?
{ | ||
Box::pin(async move { | ||
// Gain exclusive access to the write end of the stream. | ||
let _guard = self.write_lock.lock().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this "two mutexes"-approach really work? 🤔
From libxcb's source code, I learnt that there was "once upon a time" a deadlock between libX11 and the X11 server: The X11 server stops reading from a client when it has stuff to sent, so the client must always read and write at the same time. Only writing without reading could produce a deadlock.
With your "two mutex" approach... well, there could be only a thread/task trying to write a request, so this task also has to read a request. I guess this means it also has to lock the read mutex? But there could be another task in wait_for_reply()
that already does the waiting, so in this task waiting for the mutex would produce unwanted blocking.
Oh and it could happen that the write thread is blocked for a while and in the mean time the wait_for_reply()
finishes and so the reader that was present at the beginning of send_request
goes away and the write thread has to also "start" reading.
For my async experiments I solved this by spawning an extra task that does all the reading and is always "blocked" in read()
. That way, nothing else has to worry about reading since it is always already in progress. Of course, this makes the synchronisation between things more complicated and also "proper shutdown" becomes more complicated....
) -> Fut<'_, Option<Self::Buf>, ConnectionError> { | ||
Box::pin(async move { | ||
// Gain exclusive access to the read end of the stream. | ||
let _guard = self.read_lock.lock().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the Error
was already received from the server (so sits in some queue somewhere), but another task sits in wait_for_event()
and thus this mutex is locked? That would produce unwanted blocking.
let _guard = self.write_lock.lock().await; | ||
|
||
self.do_io(PollMode::ReadAndWritable, |conn| { | ||
conn.inner.extension_information(name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, now I feel like I misunderstood the "two mutexes" approach. Why doesn't this need the read lock?
I thought that this had something to do with https://docs.rs/async-io/latest/async_io/struct.Async.html#concurrent-io, but now... I am no longer sure and I am more confused.
// Try to gain access to the read end of the stream. | ||
let _guard = match self.read_lock.try_lock() { | ||
Some(guard) => guard, | ||
None => return Ok(None), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't this incorrectly return None
when an event sits in a queue / was already received, but the mutex is locked because something waits for the reply to a request?
Oh and: How does |
Thanks for the review!
As someone who tried the "runtime independent" idea, you don't want to do the runtime independent idea. It usually ends up in sprawling code that is extraordinarily difficult to maintain (see this future, which appears in my reoccuring nightmares). Regarding the locking strategy: my initial goal was to write this crate as a wrapper around |
Since you have much more async-expertise than I do: How about providing a future to the user that must be spawned to the executor and that does all the reading and pushes X11 packets (and possibly FDs) into an AsyncQueue? That way, all other code does not have to care too much about the reading part and (most importantly) the interaction with writing goes away. Downside is that reading is still complicated. Since I know the sync world of things: I would use mutexes and condition variables to make this work correctly. Dunno if there is a better async alternative. In one of my async experiments, Cookies contained the read end of a oneshot channel and the write end was in the The sync version of the above would be "start a thread that does all the reading". |
Having a "driving" future is probably the best way of doing this, this is what Looking into this, it may be impossible to implement as a wrapper around |
Thank you so much for this! ❤️ Would this close #318 ? |
I think this is ready now. I've added an Also: We may want to add 1.64 for
Yes, I think it would. This would provide an
I think having a way to use |
Couldn't |
This would require including a slot for the |
Sorry for being so slow. I don't have a lot of time currently. And I still do not. 🙈 I looked at everything except Also: How does this handle futures being dropped before they are polled to completion. Specifically sending large requests "only half" is a great way to cause problems IIRC. |
My bad, I'll fix that.
I wanted to build directly on top of the
There is a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I ran out of time again. I looked through everything except rust_connection
. Here's a proposal for how to continue: Split this PR up!
The first version only uses the blocking pool (and IIRC I only had one comment that affects this code).
The async RustConnection is then later added in a second PR (and in that PR I will again take forever since I am trying to get rid of the x11rb-async -> x11rb dependency; having this dependency feels like a failure of x11rb-protocol / some stuff is still missing)
What do you think? Does that seem feasible? Sorry for the necessary history-rewriting. I promise that I can react to such a first PR more quickly since I basically already approved it.
x11rb-async/src/cookie.rs
Outdated
/// Check if this request caused an X11 error. | ||
pub async fn check(self) -> Result<(), ReplyError> { | ||
let (conn, seq) = self.consume(); | ||
match conn.check_for_raw_error(seq).await? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Async code makes my head spin...
This one can "leak" if the future for check()
is not polled to completion and the error was not yet received. This cookie is already consumed, so its Drop
impl won't run. When the error is later received, it will sit indefinitely in the "receive queue".
However, I guess one can just ignore this. It's quite unlikely that anyone ever hits this and even if it happens, nothing really bad happens. Just 32 bytes are kept around until the connection is dropped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've moved the consume()
to after the await
, which should ensure the drop happens.
That's probably for the best; I'll add the equivalent to
In the current state this probably won't happen; |
What's the state of this? Would you mind if I gave this a try myself? |
I got distracted. I can give it a go this weekend. |
I don't remember exactly where, but I remember @psychon saying that they were thinking of creating an
x11rb-async
crate that provides an asynchronous API overx11rb
. This PR creates anx11rb-async
crate that aims to do just that.Motivation: I saw this code and it made me angry because it could be an asynchronous task instead of taking up an entire thread.
Things this PR has so far:
RequestConnection
andConnection
traits that mirror their equivalents inx11rb
.BlockingConnection
connection that is implemented by running function calls on a separate thread.RustConnection
that wraps aroundx11rb::RustConnection
but registers theStream
into an async reactor. Generally trying to reduce the amount of duplicated code.Cookie
,VoidCookie
et alThings I need to do before I mark this as ready:
RustConnection
. This may require more finesse, including exposing greater fidelity in thex11rb
implementation.async-trait
, but on stable Rust we can use GATs and on nightly Rust we have access to real-life async traits (although the latter wouldn't let us usedyn
).Things to do in the future:
XCBConnection
.Closes #318