From 48f6582eb43c1754bd62766fdc5aca7e14d0e467 Mon Sep 17 00:00:00 2001 From: daxpedda Date: Mon, 16 Oct 2023 15:50:22 +0200 Subject: [PATCH] Web Async Rework (#3082) --- src/platform_impl/web/async.rs | 297 ------------------ src/platform_impl/web/async/channel.rs | 115 +++++++ src/platform_impl/web/async/dispatcher.rs | 113 +++++++ src/platform_impl/web/async/mod.rs | 9 + src/platform_impl/web/async/waker.rs | 123 ++++++++ src/platform_impl/web/async/wrapper.rs | 131 ++++++++ src/platform_impl/web/event_loop/mod.rs | 11 +- src/platform_impl/web/event_loop/proxy.rs | 27 +- src/platform_impl/web/event_loop/runner.rs | 145 ++++++--- .../web/event_loop/window_target.rs | 79 +++-- src/platform_impl/web/web_sys/canvas.rs | 5 +- src/platform_impl/web/window.rs | 19 +- 12 files changed, 653 insertions(+), 421 deletions(-) delete mode 100644 src/platform_impl/web/async.rs create mode 100644 src/platform_impl/web/async/channel.rs create mode 100644 src/platform_impl/web/async/dispatcher.rs create mode 100644 src/platform_impl/web/async/mod.rs create mode 100644 src/platform_impl/web/async/waker.rs create mode 100644 src/platform_impl/web/async/wrapper.rs diff --git a/src/platform_impl/web/async.rs b/src/platform_impl/web/async.rs deleted file mode 100644 index 801c4f4e6a..0000000000 --- a/src/platform_impl/web/async.rs +++ /dev/null @@ -1,297 +0,0 @@ -use atomic_waker::AtomicWaker; -use std::future; -use std::marker::PhantomData; -use std::ops::Deref; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{self, Receiver, RecvError, SendError, Sender, TryRecvError}; -use std::sync::{Arc, Condvar, Mutex, RwLock}; -use std::task::Poll; -use wasm_bindgen::prelude::wasm_bindgen; -use wasm_bindgen::{JsCast, JsValue}; - -// Unsafe wrapper type that allows us to use `T` when it's not `Send` from other threads. -// `value` **must** only be accessed on the main thread. -pub struct MainThreadSafe { - // We wrap this in an `Arc` to allow it to be safely cloned without accessing the value. - // The `RwLock` lets us safely drop in any thread. - // The `Option` lets us safely drop `T` only in the main thread, while letting other threads drop `None`. - value: Arc>>, - handler: fn(&RwLock>, E), - sender: AsyncSender, - // Prevent's `Send` or `Sync` to be automatically implemented. - local: PhantomData<*const ()>, -} - -impl MainThreadSafe { - thread_local! { - static MAIN_THREAD: bool = { - #[wasm_bindgen] - extern "C" { - #[derive(Clone)] - type Global; - - #[wasm_bindgen(method, getter, js_name = Window)] - fn window(this: &Global) -> JsValue; - } - - let global: Global = js_sys::global().unchecked_into(); - !global.window().is_undefined() - }; - } - - #[track_caller] - fn new(value: T, handler: fn(&RwLock>, E)) -> Option { - Self::MAIN_THREAD.with(|safe| { - if !safe { - panic!("only callable from inside the `Window`") - } - }); - - let value = Arc::new(RwLock::new(Some(value))); - - let (sender, receiver) = channel::(); - - wasm_bindgen_futures::spawn_local({ - let value = Arc::clone(&value); - async move { - while let Ok(event) = receiver.next().await { - handler(&value, event) - } - - // An error was returned because the channel was closed, which - // happens when all senders are dropped. - value.write().unwrap().take().unwrap(); - } - }); - - Some(Self { - value, - handler, - sender, - local: PhantomData, - }) - } - - pub fn send(&self, event: E) { - Self::MAIN_THREAD.with(|is_main_thread| { - if *is_main_thread { - (self.handler)(&self.value, event) - } else { - self.sender.send(event).unwrap() - } - }) - } - - fn is_main_thread(&self) -> bool { - Self::MAIN_THREAD.with(|is_main_thread| *is_main_thread) - } - - pub fn with(&self, f: impl FnOnce(&T) -> R) -> Option { - Self::MAIN_THREAD.with(|is_main_thread| { - if *is_main_thread { - Some(f(self.value.read().unwrap().as_ref().unwrap())) - } else { - None - } - }) - } -} - -impl Clone for MainThreadSafe { - fn clone(&self) -> Self { - Self { - value: self.value.clone(), - handler: self.handler, - sender: self.sender.clone(), - local: PhantomData, - } - } -} - -unsafe impl Send for MainThreadSafe {} -unsafe impl Sync for MainThreadSafe {} - -fn channel() -> (AsyncSender, AsyncReceiver) { - let (sender, receiver) = mpsc::channel(); - let sender = Arc::new(Mutex::new(sender)); - let waker = Arc::new(AtomicWaker::new()); - let closed = Arc::new(AtomicBool::new(false)); - - let sender = AsyncSender { - sender, - closed: closed.clone(), - waker: Arc::clone(&waker), - }; - let receiver = AsyncReceiver { - receiver, - closed, - waker, - }; - - (sender, receiver) -} - -struct AsyncSender { - // We need to wrap it into a `Mutex` to make it `Sync`. So the sender can't - // be accessed on the main thread, as it could block. Additionally we need - // to wrap it in an `Arc` to make it clonable on the main thread without - // having to block. - sender: Arc>>, - closed: Arc, - waker: Arc, -} - -impl AsyncSender { - pub fn send(&self, event: T) -> Result<(), SendError> { - self.sender.lock().unwrap().send(event)?; - self.waker.wake(); - - Ok(()) - } -} - -impl Clone for AsyncSender { - fn clone(&self) -> Self { - Self { - sender: self.sender.clone(), - waker: self.waker.clone(), - closed: self.closed.clone(), - } - } -} - -impl Drop for AsyncSender { - fn drop(&mut self) { - // If it's the last + the one held by the receiver make sure to wake it - // up and tell it that all receiver have dropped. - if Arc::strong_count(&self.closed) == 2 { - self.closed.store(true, Ordering::Relaxed); - self.waker.wake() - } - } -} - -struct AsyncReceiver { - receiver: Receiver, - closed: Arc, - waker: Arc, -} - -impl AsyncReceiver { - pub async fn next(&self) -> Result { - future::poll_fn(|cx| match self.receiver.try_recv() { - Ok(event) => Poll::Ready(Ok(event)), - Err(TryRecvError::Empty) => { - if self.closed.load(Ordering::Relaxed) { - return Poll::Ready(Err(RecvError)); - } - - self.waker.register(cx.waker()); - - match self.receiver.try_recv() { - Ok(event) => Poll::Ready(Ok(event)), - Err(TryRecvError::Empty) => { - if self.closed.load(Ordering::Relaxed) { - Poll::Ready(Err(RecvError)) - } else { - Poll::Pending - } - } - Err(TryRecvError::Disconnected) => Poll::Ready(Err(RecvError)), - } - } - Err(TryRecvError::Disconnected) => Poll::Ready(Err(RecvError)), - }) - .await - } -} - -pub struct Dispatcher(MainThreadSafe>); - -pub struct Closure(Box); - -impl Dispatcher { - #[track_caller] - pub fn new(value: T) -> Option { - MainThreadSafe::new(value, |value, Closure(closure)| { - // SAFETY: The given `Closure` here isn't really `'static`, so we shouldn't do anything - // funny with it here. See `Self::queue()`. - closure(value.read().unwrap().as_ref().unwrap()) - }) - .map(Self) - } - - pub fn dispatch(&self, f: impl 'static + FnOnce(&T) + Send) { - if self.is_main_thread() { - self.0.with(f).unwrap() - } else { - self.0.send(Closure(Box::new(f))) - } - } - - pub fn queue(&self, f: impl FnOnce(&T) -> R + Send) -> R { - if self.is_main_thread() { - self.0.with(f).unwrap() - } else { - let pair = Arc::new((Mutex::new(None), Condvar::new())); - let closure = Box::new({ - let pair = pair.clone(); - move |value: &T| { - *pair.0.lock().unwrap() = Some(f(value)); - pair.1.notify_one(); - } - }) as Box; - // SAFETY: The `transmute` is necessary because `Closure` requires `'static`. This is - // safe because this function won't return until `f` has finished executing. See - // `Self::new()`. - let closure = Closure(unsafe { std::mem::transmute(closure) }); - - self.0.send(closure); - - let mut started = pair.0.lock().unwrap(); - - while started.is_none() { - started = pair.1.wait(started).unwrap(); - } - - started.take().unwrap() - } - } -} - -impl Deref for Dispatcher { - type Target = MainThreadSafe>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -type ChannelValue = MainThreadSafe; - -pub struct Channel(ChannelValue); - -impl Channel { - pub fn new(value: T, handler: fn(&T, E)) -> Option { - MainThreadSafe::new((value, handler), |runner, event| { - let lock = runner.read().unwrap(); - let (value, handler) = lock.as_ref().unwrap(); - handler(value, event); - }) - .map(Self) - } -} - -impl Clone for Channel { - fn clone(&self) -> Self { - Self(self.0.clone()) - } -} - -impl Deref for Channel { - type Target = ChannelValue; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} diff --git a/src/platform_impl/web/async/channel.rs b/src/platform_impl/web/async/channel.rs new file mode 100644 index 0000000000..3f9c72a156 --- /dev/null +++ b/src/platform_impl/web/async/channel.rs @@ -0,0 +1,115 @@ +use atomic_waker::AtomicWaker; +use std::future; +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{self, Receiver, RecvError, SendError, Sender, TryRecvError}; +use std::sync::{Arc, Mutex}; +use std::task::Poll; + +// NOTE: This channel doesn't wake up when all senders or receivers are +// dropped. This is acceptable as long as it's only used in `Dispatcher`, which +// has it's own `Drop` behavior. + +pub fn channel() -> (AsyncSender, AsyncReceiver) { + let (sender, receiver) = mpsc::channel(); + let sender = Arc::new(Mutex::new(sender)); + let inner = Arc::new(Inner { + closed: AtomicBool::new(false), + waker: AtomicWaker::new(), + }); + + let sender = AsyncSender { + sender, + inner: Arc::clone(&inner), + }; + let receiver = AsyncReceiver { + receiver: Rc::new(receiver), + inner, + }; + + (sender, receiver) +} + +pub struct AsyncSender { + // We need to wrap it into a `Mutex` to make it `Sync`. So the sender can't + // be accessed on the main thread, as it could block. Additionally we need + // to wrap it in an `Arc` to make it clonable on the main thread without + // having to block. + sender: Arc>>, + inner: Arc, +} + +impl AsyncSender { + pub fn send(&self, event: T) -> Result<(), SendError> { + self.sender.lock().unwrap().send(event)?; + self.inner.waker.wake(); + + Ok(()) + } + + pub fn close(&self) { + self.inner.closed.store(true, Ordering::Relaxed); + self.inner.waker.wake() + } +} + +impl Clone for AsyncSender { + fn clone(&self) -> Self { + Self { + sender: Arc::clone(&self.sender), + inner: Arc::clone(&self.inner), + } + } +} + +pub struct AsyncReceiver { + receiver: Rc>, + inner: Arc, +} + +impl AsyncReceiver { + pub async fn next(&self) -> Result { + future::poll_fn(|cx| match self.receiver.try_recv() { + Ok(event) => Poll::Ready(Ok(event)), + Err(TryRecvError::Empty) => { + self.inner.waker.register(cx.waker()); + + match self.receiver.try_recv() { + Ok(event) => Poll::Ready(Ok(event)), + Err(TryRecvError::Empty) => { + if self.inner.closed.load(Ordering::Relaxed) { + Poll::Ready(Err(RecvError)) + } else { + Poll::Pending + } + } + Err(TryRecvError::Disconnected) => Poll::Ready(Err(RecvError)), + } + } + Err(TryRecvError::Disconnected) => Poll::Ready(Err(RecvError)), + }) + .await + } + + pub fn try_recv(&self) -> Result, RecvError> { + match self.receiver.try_recv() { + Ok(value) => Ok(Some(value)), + Err(TryRecvError::Empty) => Ok(None), + Err(TryRecvError::Disconnected) => Err(RecvError), + } + } +} + +impl Clone for AsyncReceiver { + fn clone(&self) -> Self { + Self { + receiver: Rc::clone(&self.receiver), + inner: Arc::clone(&self.inner), + } + } +} + +struct Inner { + closed: AtomicBool, + waker: AtomicWaker, +} diff --git a/src/platform_impl/web/async/dispatcher.rs b/src/platform_impl/web/async/dispatcher.rs new file mode 100644 index 0000000000..daa7702558 --- /dev/null +++ b/src/platform_impl/web/async/dispatcher.rs @@ -0,0 +1,113 @@ +use super::{channel, AsyncReceiver, AsyncSender, Wrapper}; +use std::{ + cell::Ref, + sync::{Arc, Condvar, Mutex}, +}; + +pub struct Dispatcher(Wrapper>, Closure>); + +struct Closure(Box); + +impl Dispatcher { + #[track_caller] + pub fn new(value: T) -> Option<(Self, DispatchRunner)> { + let (sender, receiver) = channel::>(); + + Wrapper::new( + value, + |value, Closure(closure)| { + // SAFETY: The given `Closure` here isn't really `'static`, so we shouldn't do anything + // funny with it here. See `Self::queue()`. + closure(value.borrow().as_ref().unwrap()) + }, + { + let receiver = receiver.clone(); + move |value| async move { + while let Ok(Closure(closure)) = receiver.next().await { + // SAFETY: The given `Closure` here isn't really `'static`, so we shouldn't do anything + // funny with it here. See `Self::queue()`. + closure(value.borrow().as_ref().unwrap()) + } + } + }, + sender, + |sender, closure| { + // SAFETY: The given `Closure` here isn't really `'static`, so we shouldn't do anything + // funny with it here. See `Self::queue()`. + sender.send(closure).unwrap() + }, + ) + .map(|wrapper| (Self(wrapper.clone()), DispatchRunner { wrapper, receiver })) + } + + pub fn value(&self) -> Option> { + self.0.value() + } + + pub fn dispatch(&self, f: impl 'static + FnOnce(&T) + Send) { + if let Some(value) = self.0.value() { + f(&value) + } else { + self.0.send(Closure(Box::new(f))) + } + } + + pub fn queue(&self, f: impl FnOnce(&T) -> R + Send) -> R { + if let Some(value) = self.0.value() { + f(&value) + } else { + let pair = Arc::new((Mutex::new(None), Condvar::new())); + let closure = Box::new({ + let pair = pair.clone(); + move |value: &T| { + *pair.0.lock().unwrap() = Some(f(value)); + pair.1.notify_one(); + } + }) as Box; + // SAFETY: The `transmute` is necessary because `Closure` requires `'static`. This is + // safe because this function won't return until `f` has finished executing. See + // `Self::new()`. + let closure = Closure(unsafe { std::mem::transmute(closure) }); + + self.0.send(closure); + + let mut started = pair.0.lock().unwrap(); + + while started.is_none() { + started = pair.1.wait(started).unwrap(); + } + + started.take().unwrap() + } + } +} + +impl Drop for Dispatcher { + fn drop(&mut self) { + self.0.with_sender_data(|sender| sender.close()) + } +} + +pub struct DispatchRunner { + wrapper: Wrapper>, Closure>, + receiver: AsyncReceiver>, +} + +impl DispatchRunner { + pub fn run(&self) { + while let Some(Closure(closure)) = self + .receiver + .try_recv() + .expect("should only be closed when `Dispatcher` is dropped") + { + // SAFETY: The given `Closure` here isn't really `'static`, so we shouldn't do anything + // funny with it here. See `Self::queue()`. + closure( + &self + .wrapper + .value() + .expect("don't call this outside the main thread"), + ) + } + } +} diff --git a/src/platform_impl/web/async/mod.rs b/src/platform_impl/web/async/mod.rs new file mode 100644 index 0000000000..f1317f6e39 --- /dev/null +++ b/src/platform_impl/web/async/mod.rs @@ -0,0 +1,9 @@ +mod channel; +mod dispatcher; +mod waker; +mod wrapper; + +use self::channel::{channel, AsyncReceiver, AsyncSender}; +pub use self::dispatcher::{DispatchRunner, Dispatcher}; +pub use self::waker::{Waker, WakerSpawner}; +use self::wrapper::Wrapper; diff --git a/src/platform_impl/web/async/waker.rs b/src/platform_impl/web/async/waker.rs new file mode 100644 index 0000000000..2c27c474c9 --- /dev/null +++ b/src/platform_impl/web/async/waker.rs @@ -0,0 +1,123 @@ +use super::Wrapper; +use atomic_waker::AtomicWaker; +use std::future; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::Arc; +use std::task::Poll; + +pub struct WakerSpawner(Wrapper, Sender, usize>); + +pub struct Waker(Wrapper, Sender, usize>); + +struct Handler { + value: T, + handler: fn(&T, usize), +} + +#[derive(Clone)] +struct Sender(Arc); + +impl WakerSpawner { + #[track_caller] + pub fn new(value: T, handler: fn(&T, usize)) -> Option { + let inner = Arc::new(Inner { + counter: AtomicUsize::new(0), + waker: AtomicWaker::new(), + closed: AtomicBool::new(false), + }); + + let handler = Handler { value, handler }; + + let sender = Sender(Arc::clone(&inner)); + + let wrapper = Wrapper::new( + handler, + |handler, count| { + let handler = handler.borrow(); + let handler = handler.as_ref().unwrap(); + (handler.handler)(&handler.value, count); + }, + { + let inner = Arc::clone(&inner); + + move |handler| async move { + while let Some(count) = future::poll_fn(|cx| { + let count = inner.counter.swap(0, Ordering::Relaxed); + + if count > 0 { + Poll::Ready(Some(count)) + } else { + inner.waker.register(cx.waker()); + + let count = inner.counter.swap(0, Ordering::Relaxed); + + if count > 0 { + Poll::Ready(Some(count)) + } else { + if inner.closed.load(Ordering::Relaxed) { + return Poll::Ready(None); + } + + Poll::Pending + } + } + }) + .await + { + let handler = handler.borrow(); + let handler = handler.as_ref().unwrap(); + (handler.handler)(&handler.value, count); + } + } + }, + sender, + |inner, _| { + inner.0.counter.fetch_add(1, Ordering::Relaxed); + inner.0.waker.wake(); + }, + )?; + + Some(Self(wrapper)) + } + + pub fn waker(&self) -> Waker { + Waker(self.0.clone()) + } + + pub fn fetch(&self) -> usize { + debug_assert!( + self.0.is_main_thread(), + "this should only be called from the main thread" + ); + + self.0 + .with_sender_data(|inner| inner.0.counter.swap(0, Ordering::Relaxed)) + } +} + +impl Drop for WakerSpawner { + fn drop(&mut self) { + self.0.with_sender_data(|inner| { + inner.0.closed.store(true, Ordering::Relaxed); + inner.0.waker.wake(); + }); + } +} + +impl Waker { + pub fn wake(&self) { + self.0.send(1) + } +} + +impl Clone for Waker { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +struct Inner { + counter: AtomicUsize, + waker: AtomicWaker, + closed: AtomicBool, +} diff --git a/src/platform_impl/web/async/wrapper.rs b/src/platform_impl/web/async/wrapper.rs new file mode 100644 index 0000000000..22088ef1fc --- /dev/null +++ b/src/platform_impl/web/async/wrapper.rs @@ -0,0 +1,131 @@ +use std::cell::{Ref, RefCell}; +use std::future::Future; +use std::marker::PhantomData; +use std::sync::Arc; +use wasm_bindgen::prelude::wasm_bindgen; +use wasm_bindgen::{JsCast, JsValue}; + +// Unsafe wrapper type that allows us to use `T` when it's not `Send` from other threads. +// `value` **must** only be accessed on the main thread. +pub struct Wrapper { + value: Value, + handler: fn(&RefCell>, E), + sender_data: S, + sender_handler: fn(&S, E), +} + +struct Value { + // SAFETY: + // This value must not be accessed if not on the main thread. + // + // - We wrap this in an `Arc` to allow it to be safely cloned without + // accessing the value. + // - The `RefCell` lets us mutably access in the main thread but is safe to + // drop in any thread because it has no `Drop` behavior. + // - The `Option` lets us safely drop `T` only in the main thread. + value: Arc>>, + // Prevent's `Send` or `Sync` to be automatically implemented. + local: PhantomData<*const ()>, +} + +// SAFETY: See `Self::value`. +unsafe impl Send for Value {} +// SAFETY: See `Self::value`. +unsafe impl Sync for Value {} + +impl Wrapper { + thread_local! { + static MAIN_THREAD: bool = { + #[wasm_bindgen] + extern "C" { + #[derive(Clone)] + type Global; + + #[wasm_bindgen(method, getter, js_name = Window)] + fn window(this: &Global) -> JsValue; + } + + let global: Global = js_sys::global().unchecked_into(); + !global.window().is_undefined() + }; + } + + #[track_caller] + pub fn new>( + value: V, + handler: fn(&RefCell>, E), + receiver: impl 'static + FnOnce(Arc>>) -> R, + sender_data: S, + sender_handler: fn(&S, E), + ) -> Option { + Self::MAIN_THREAD.with(|safe| { + if !safe { + panic!("only callable from inside the `Window`") + } + }); + + let value = Arc::new(RefCell::new(Some(value))); + + wasm_bindgen_futures::spawn_local({ + let value = Arc::clone(&value); + async move { + receiver(Arc::clone(&value)).await; + drop(value.borrow_mut().take().unwrap()); + } + }); + + Some(Self { + value: Value { + value, + local: PhantomData, + }, + handler, + sender_data, + sender_handler, + }) + } + + pub fn send(&self, event: E) { + Self::MAIN_THREAD.with(|is_main_thread| { + if *is_main_thread { + (self.handler)(&self.value.value, event) + } else { + (self.sender_handler)(&self.sender_data, event) + } + }) + } + + pub fn is_main_thread(&self) -> bool { + Self::MAIN_THREAD.with(|is_main_thread| *is_main_thread) + } + + pub fn value(&self) -> Option> { + Self::MAIN_THREAD.with(|is_main_thread| { + if *is_main_thread { + Some(Ref::map(self.value.value.borrow(), |value| { + value.as_ref().unwrap() + })) + } else { + None + } + }) + } + + pub fn with_sender_data(&self, f: impl FnOnce(&S) -> T) -> T { + f(&self.sender_data) + } +} + +impl Clone for Wrapper { + fn clone(&self) -> Self { + Self { + value: Value { + value: self.value.value.clone(), + local: PhantomData, + }, + handler: self.handler, + sender_data: self.sender_data.clone(), + sender_handler: self.sender_handler, + } + } +} diff --git a/src/platform_impl/web/event_loop/mod.rs b/src/platform_impl/web/event_loop/mod.rs index a93d160c8d..d0b7f5d37b 100644 --- a/src/platform_impl/web/event_loop/mod.rs +++ b/src/platform_impl/web/event_loop/mod.rs @@ -27,11 +27,12 @@ pub(crate) struct PlatformSpecificEventLoopAttributes {} impl EventLoop { pub(crate) fn new(_: &PlatformSpecificEventLoopAttributes) -> Result { let (user_event_sender, user_event_receiver) = mpsc::channel(); + let elw = RootEventLoopWindowTarget { + p: EventLoopWindowTarget::new(), + _marker: PhantomData, + }; Ok(EventLoop { - elw: RootEventLoopWindowTarget { - p: EventLoopWindowTarget::new(), - _marker: PhantomData, - }, + elw, user_event_sender, user_event_receiver, }) @@ -101,7 +102,7 @@ impl EventLoop { } pub fn create_proxy(&self) -> EventLoopProxy { - EventLoopProxy::new(self.elw.p.runner.clone(), self.user_event_sender.clone()) + EventLoopProxy::new(self.elw.p.waker(), self.user_event_sender.clone()) } pub fn window_target(&self) -> &RootEventLoopWindowTarget { diff --git a/src/platform_impl/web/event_loop/proxy.rs b/src/platform_impl/web/event_loop/proxy.rs index bb1ea207cf..691efa32b8 100644 --- a/src/platform_impl/web/event_loop/proxy.rs +++ b/src/platform_impl/web/event_loop/proxy.rs @@ -1,30 +1,25 @@ -use std::sync::mpsc::Sender; +use std::rc::Weak; +use std::sync::mpsc::{SendError, Sender}; -use super::runner; -use crate::event::Event; +use super::runner::Execution; use crate::event_loop::EventLoopClosed; -use crate::platform_impl::platform::r#async::Channel; +use crate::platform_impl::platform::r#async::Waker; pub struct EventLoopProxy { - // used to wake the event loop handler, not to actually pass data - runner: Channel, + runner: Waker>, sender: Sender, } impl EventLoopProxy { - pub fn new(runner: runner::Shared, sender: Sender) -> Self { - Self { - runner: Channel::new(runner, |runner, event| { - runner.send_event(Event::UserEvent(event)) - }) - .unwrap(), - sender, - } + pub fn new(runner: Waker>, sender: Sender) -> Self { + Self { runner, sender } } pub fn send_event(&self, event: T) -> Result<(), EventLoopClosed> { - self.sender.send(event).unwrap(); - self.runner.send(()); + self.sender + .send(event) + .map_err(|SendError(event)| EventLoopClosed(event))?; + self.runner.wake(); Ok(()) } } diff --git a/src/platform_impl/web/event_loop/runner.rs b/src/platform_impl/web/event_loop/runner.rs index d82eafbb1a..068d2893be 100644 --- a/src/platform_impl/web/event_loop/runner.rs +++ b/src/platform_impl/web/event_loop/runner.rs @@ -8,9 +8,10 @@ use crate::event::{ use crate::event_loop::{ControlFlow, DeviceEvents}; use crate::platform::web::PollStrategy; use crate::platform_impl::platform::backend::EventListenerHandle; +use crate::platform_impl::platform::r#async::{DispatchRunner, Waker, WakerSpawner}; +use crate::platform_impl::platform::window::Inner; use crate::window::WindowId; -use std::sync::atomic::Ordering; use std::{ cell::{Cell, RefCell}, clone::Clone, @@ -36,6 +37,7 @@ impl Clone for Shared { type OnEventHandle = RefCell>>; pub struct Execution { + proxy_spawner: WakerSpawner>, control_flow: Cell, poll_strategy: Cell, exit: Cell, @@ -46,7 +48,14 @@ pub struct Execution { id: RefCell, window: web_sys::Window, document: Document, - all_canvases: RefCell>)>>, + #[allow(clippy::type_complexity)] + all_canvases: RefCell< + Vec<( + WindowId, + Weak>, + DispatchRunner, + )>, + >, redraw_pending: RefCell>, destroy_pending: RefCell>, page_transition_event_handle: RefCell>, @@ -140,30 +149,40 @@ impl Shared { #[allow(clippy::disallowed_methods)] let document = window.document().expect("Failed to obtain document"); - Shared(Rc::new(Execution { - control_flow: Cell::new(ControlFlow::default()), - poll_strategy: Cell::new(PollStrategy::default()), - exit: Cell::new(false), - runner: RefCell::new(RunnerEnum::Pending), - suspended: Cell::new(false), - event_loop_recreation: Cell::new(false), - events: RefCell::new(VecDeque::new()), - window, - document, - id: RefCell::new(0), - all_canvases: RefCell::new(Vec::new()), - redraw_pending: RefCell::new(HashSet::new()), - destroy_pending: RefCell::new(VecDeque::new()), - page_transition_event_handle: RefCell::new(None), - device_events: Cell::default(), - on_mouse_move: RefCell::new(None), - on_wheel: RefCell::new(None), - on_mouse_press: RefCell::new(None), - on_mouse_release: RefCell::new(None), - on_key_press: RefCell::new(None), - on_key_release: RefCell::new(None), - on_visibility_change: RefCell::new(None), - on_touch_end: RefCell::new(None), + Shared(Rc::::new_cyclic(|weak| { + let proxy_spawner = WakerSpawner::new(weak.clone(), |runner, count| { + if let Some(runner) = runner.upgrade() { + Shared(runner).send_events(iter::repeat(Event::UserEvent(())).take(count)) + } + }) + .expect("`EventLoop` has to be created in the main thread"); + + Execution { + proxy_spawner, + control_flow: Cell::new(ControlFlow::default()), + poll_strategy: Cell::new(PollStrategy::default()), + exit: Cell::new(false), + runner: RefCell::new(RunnerEnum::Pending), + suspended: Cell::new(false), + event_loop_recreation: Cell::new(false), + events: RefCell::new(VecDeque::new()), + window, + document, + id: RefCell::new(0), + all_canvases: RefCell::new(Vec::new()), + redraw_pending: RefCell::new(HashSet::new()), + destroy_pending: RefCell::new(VecDeque::new()), + page_transition_event_handle: RefCell::new(None), + device_events: Cell::default(), + on_mouse_move: RefCell::new(None), + on_wheel: RefCell::new(None), + on_mouse_press: RefCell::new(None), + on_mouse_release: RefCell::new(None), + on_key_press: RefCell::new(None), + on_key_release: RefCell::new(None), + on_visibility_change: RefCell::new(None), + on_touch_end: RefCell::new(None), + } })) } @@ -175,11 +194,13 @@ impl Shared { &self.0.document } - pub fn add_canvas(&self, id: WindowId, canvas: &Rc>) { - self.0 - .all_canvases - .borrow_mut() - .push((id, Rc::downgrade(canvas))); + pub fn add_canvas( + &self, + id: WindowId, + canvas: Weak>, + runner: DispatchRunner, + ) { + self.0.all_canvases.borrow_mut().push((id, canvas, runner)); } pub fn notify_destroy_window(&self, id: WindowId) { @@ -411,7 +432,7 @@ impl Shared { "visibilitychange", Closure::new(move |_| { if !runner.0.suspended.get() { - for (id, canvas) in &*runner.0.all_canvases.borrow() { + for (id, canvas, _) in &*runner.0.all_canvases.borrow() { if let Some(canvas) = canvas.upgrade() { let is_visible = backend::is_visible(runner.document()); // only fire if: @@ -549,7 +570,7 @@ impl Shared { self.0 .all_canvases .borrow_mut() - .retain(|&(item_id, _)| item_id != id); + .retain(|&(item_id, _, _)| item_id != id); self.handle_event(Event::WindowEvent { window_id: id, event: crate::event::WindowEvent::Destroyed, @@ -618,9 +639,29 @@ impl Shared { // Don't take events out of the queue if the loop is closed or the runner doesn't exist // If the runner doesn't exist and this method recurses, it will recurse infinitely if !is_closed && self.0.runner.borrow().maybe_runner().is_some() { + // Pre-fetch window commands to avoid having to wait until the next event loop cycle + // and potentially block other threads in the meantime. + for (_, window, runner) in self.0.all_canvases.borrow().iter() { + if let Some(window) = window.upgrade() { + runner.run(); + drop(window) + } + } + // Take an event out of the queue and handle it // Make sure not to let the borrow_mut live during the next handle_event - let event = { self.0.events.borrow_mut().pop_front() }; + let event = { + let mut events = self.0.events.borrow_mut(); + + // Pre-fetch `UserEvent`s to avoid having to wait until the next event loop cycle. + events.extend( + iter::repeat(Event::UserEvent(())) + .take(self.0.proxy_spawner.fetch()) + .map(EventWrapper::from), + ); + + events.pop_front() + }; if let Some(event) = event { self.handle_event(event); } @@ -690,7 +731,7 @@ impl Shared { // Dropping the `Runner` drops the event handler closure, which will in // turn drop all `Window`s moved into the closure. *self.0.runner.borrow_mut() = RunnerEnum::Destroyed; - for (_, canvas) in all_canvases { + for (_, canvas, _) in all_canvases { // In case any remaining `Window`s are still not dropped, we will need // to explicitly remove the event handlers associated with their canvases. if let Some(canvas) = canvas.upgrade() { @@ -734,23 +775,29 @@ impl Shared { fn device_events(&self) -> bool { match self.0.device_events.get() { DeviceEvents::Always => true, - DeviceEvents::WhenFocused => self.0.all_canvases.borrow().iter().any(|(_, canvas)| { - if let Some(canvas) = canvas.upgrade() { - canvas.borrow().has_focus.load(Ordering::Relaxed) - } else { - false - } - }), + DeviceEvents::WhenFocused => { + self.0.all_canvases.borrow().iter().any(|(_, canvas, _)| { + if let Some(canvas) = canvas.upgrade() { + canvas.borrow().has_focus.get() + } else { + false + } + }) + } DeviceEvents::Never => false, } } fn transient_activation(&self) { - self.0.all_canvases.borrow().iter().for_each(|(_, canvas)| { - if let Some(canvas) = canvas.upgrade() { - canvas.borrow().transient_activation(); - } - }); + self.0 + .all_canvases + .borrow() + .iter() + .for_each(|(_, canvas, _)| { + if let Some(canvas) = canvas.upgrade() { + canvas.borrow().transient_activation(); + } + }); } pub fn event_loop_recreation(&self, allow: bool) { @@ -780,6 +827,10 @@ impl Shared { pub(crate) fn poll_strategy(&self) -> PollStrategy { self.0.poll_strategy.get() } + + pub(crate) fn waker(&self) -> Waker> { + self.0.proxy_spawner.waker() + } } pub(crate) enum EventWrapper { diff --git a/src/platform_impl/web/event_loop/window_target.rs b/src/platform_impl/web/event_loop/window_target.rs index ba1868532d..09a5cc8b6b 100644 --- a/src/platform_impl/web/event_loop/window_target.rs +++ b/src/platform_impl/web/event_loop/window_target.rs @@ -3,10 +3,9 @@ use std::clone::Clone; use std::collections::{vec_deque::IntoIter as VecDequeIter, VecDeque}; use std::iter; use std::marker::PhantomData; -use std::rc::Rc; -use std::sync::atomic::Ordering; +use std::rc::{Rc, Weak}; -use super::runner::EventWrapper; +use super::runner::{EventWrapper, Execution}; use super::{ super::{monitor::MonitorHandle, KeyEventExtra}, backend, @@ -20,6 +19,7 @@ use crate::event::{ use crate::event_loop::{ControlFlow, DeviceEvents}; use crate::keyboard::ModifiersState; use crate::platform::web::PollStrategy; +use crate::platform_impl::platform::r#async::Waker; use crate::window::{Theme, WindowId as RootWindowId}; #[derive(Default)] @@ -81,7 +81,6 @@ impl EventLoopWindowTarget { id: WindowId, prevent_default: bool, ) { - self.runner.add_canvas(RootWindowId(id), canvas); let canvas_clone = canvas.clone(); let mut canvas = canvas.borrow_mut(); canvas.set_attribute("data-raw-handle", &id.0.to_string()); @@ -92,7 +91,7 @@ impl EventLoopWindowTarget { let has_focus = canvas.has_focus.clone(); let modifiers = self.modifiers.clone(); canvas.on_blur(move || { - has_focus.store(false, Ordering::Relaxed); + has_focus.set(false); let clear_modifiers = (!modifiers.get().is_empty()).then(|| { modifiers.set(ModifiersState::empty()); @@ -115,7 +114,7 @@ impl EventLoopWindowTarget { let runner = self.runner.clone(); let has_focus = canvas.has_focus.clone(); canvas.on_focus(move || { - if !has_focus.swap(true, Ordering::Relaxed) { + if !has_focus.replace(true) { runner.send_event(Event::WindowEvent { window_id: RootWindowId(id), event: WindowEvent::Focused(true), @@ -204,15 +203,13 @@ impl EventLoopWindowTarget { let modifiers = self.modifiers.clone(); move |active_modifiers, pointer_id| { - let focus = (has_focus.load(Ordering::Relaxed) - && modifiers.get() != active_modifiers) - .then(|| { - modifiers.set(active_modifiers); - Event::WindowEvent { - window_id: RootWindowId(id), - event: WindowEvent::ModifiersChanged(active_modifiers.into()), - } - }); + let focus = (has_focus.get() && modifiers.get() != active_modifiers).then(|| { + modifiers.set(active_modifiers); + Event::WindowEvent { + window_id: RootWindowId(id), + event: WindowEvent::ModifiersChanged(active_modifiers.into()), + } + }); let pointer = pointer_id.map(|pointer_id| Event::WindowEvent { window_id: RootWindowId(id), @@ -233,15 +230,13 @@ impl EventLoopWindowTarget { let modifiers = self.modifiers.clone(); move |active_modifiers, pointer_id| { - let focus = (has_focus.load(Ordering::Relaxed) - && modifiers.get() != active_modifiers) - .then(|| { - modifiers.set(active_modifiers); - Event::WindowEvent { - window_id: RootWindowId(id), - event: WindowEvent::ModifiersChanged(active_modifiers.into()), - } - }); + let focus = (has_focus.get() && modifiers.get() != active_modifiers).then(|| { + modifiers.set(active_modifiers); + Event::WindowEvent { + window_id: RootWindowId(id), + event: WindowEvent::ModifiersChanged(active_modifiers.into()), + } + }); let pointer = pointer_id.map(|pointer_id| Event::WindowEvent { window_id: RootWindowId(id), @@ -263,7 +258,7 @@ impl EventLoopWindowTarget { let modifiers = self.modifiers.clone(); move |active_modifiers| { - if has_focus.load(Ordering::Relaxed) && modifiers.get() != active_modifiers { + if has_focus.get() && modifiers.get() != active_modifiers { modifiers.set(active_modifiers); runner.send_event(Event::WindowEvent { window_id: RootWindowId(id), @@ -278,9 +273,8 @@ impl EventLoopWindowTarget { let modifiers = self.modifiers.clone(); move |active_modifiers, pointer_id, events| { - let modifiers = (has_focus.load(Ordering::Relaxed) - && modifiers.get() != active_modifiers) - .then(|| { + let modifiers = + (has_focus.get() && modifiers.get() != active_modifiers).then(|| { modifiers.set(active_modifiers); Event::WindowEvent { window_id: RootWindowId(id), @@ -307,9 +301,8 @@ impl EventLoopWindowTarget { let modifiers = self.modifiers.clone(); move |active_modifiers, device_id, events| { - let modifiers = (has_focus.load(Ordering::Relaxed) - && modifiers.get() != active_modifiers) - .then(|| { + let modifiers = + (has_focus.get() && modifiers.get() != active_modifiers).then(|| { modifiers.set(active_modifiers); Event::WindowEvent { window_id: RootWindowId(id), @@ -341,9 +334,8 @@ impl EventLoopWindowTarget { position: crate::dpi::PhysicalPosition, buttons, button| { - let modifiers = (has_focus.load(Ordering::Relaxed) - && modifiers.get() != active_modifiers) - .then(|| { + let modifiers = + (has_focus.get() && modifiers.get() != active_modifiers).then(|| { modifiers.set(active_modifiers); Event::WindowEvent { window_id: RootWindowId(id), @@ -473,7 +465,7 @@ impl EventLoopWindowTarget { let modifiers = self.modifiers.clone(); move |active_modifiers| { - if has_focus.load(Ordering::Relaxed) && modifiers.get() != active_modifiers { + if has_focus.get() && modifiers.get() != active_modifiers { modifiers.set(active_modifiers); runner.send_event(Event::WindowEvent { window_id: RootWindowId(id), @@ -488,9 +480,8 @@ impl EventLoopWindowTarget { let modifiers = self.modifiers.clone(); move |active_modifiers, pointer_id, position, button| { - let modifiers = (has_focus.load(Ordering::Relaxed) - && modifiers.get() != active_modifiers) - .then(|| { + let modifiers = + (has_focus.get() && modifiers.get() != active_modifiers).then(|| { modifiers.set(active_modifiers); Event::WindowEvent { window_id: RootWindowId(id), @@ -528,9 +519,8 @@ impl EventLoopWindowTarget { let modifiers = self.modifiers.clone(); move |active_modifiers, device_id, location, force| { - let modifiers = (has_focus.load(Ordering::Relaxed) - && modifiers.get() != active_modifiers) - .then(|| { + let modifiers = + (has_focus.get() && modifiers.get() != active_modifiers).then(|| { modifiers.set(active_modifiers); Event::WindowEvent { window_id: RootWindowId(id), @@ -558,8 +548,7 @@ impl EventLoopWindowTarget { let modifiers = self.modifiers.clone(); canvas.on_mouse_wheel( move |pointer_id, delta, active_modifiers| { - let modifiers_changed = (has_focus.load(Ordering::Relaxed) - && modifiers.get() != active_modifiers) + let modifiers_changed = (has_focus.get() && modifiers.get() != active_modifiers) .then(|| { modifiers.set(active_modifiers); Event::WindowEvent { @@ -713,4 +702,8 @@ impl EventLoopWindowTarget { pub(crate) fn poll_strategy(&self) -> PollStrategy { self.runner.poll_strategy() } + + pub(crate) fn waker(&self) -> Waker> { + self.runner.waker() + } } diff --git a/src/platform_impl/web/web_sys/canvas.rs b/src/platform_impl/web/web_sys/canvas.rs index 049c89162f..1196ae8dc7 100644 --- a/src/platform_impl/web/web_sys/canvas.rs +++ b/src/platform_impl/web/web_sys/canvas.rs @@ -1,6 +1,5 @@ use std::cell::Cell; use std::rc::{Rc, Weak}; -use std::sync::atomic::AtomicBool; use std::sync::{Arc, Mutex}; use smol_str::SmolStr; @@ -29,7 +28,7 @@ use super::{event, ButtonsState, ResizeScaleHandle}; pub struct Canvas { common: Common, id: WindowId, - pub has_focus: Arc, + pub has_focus: Rc>, pub is_intersecting: Option, on_touch_start: Option>, on_focus: Option>, @@ -139,7 +138,7 @@ impl Canvas { Ok(Canvas { common, id, - has_focus: Arc::new(AtomicBool::new(false)), + has_focus: Rc::new(Cell::new(false)), is_intersecting: None, on_touch_start: None, on_blur: None, diff --git a/src/platform_impl/web/window.rs b/src/platform_impl/web/window.rs index fd6fad4e29..6d456247b3 100644 --- a/src/platform_impl/web/window.rs +++ b/src/platform_impl/web/window.rs @@ -14,8 +14,6 @@ use super::{backend, monitor::MonitorHandle, EventLoopWindowTarget, Fullscreen}; use std::cell::RefCell; use std::collections::VecDeque; use std::rc::Rc; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; pub struct Window { inner: Dispatcher, @@ -27,7 +25,6 @@ pub struct Inner { canvas: Rc>, previous_pointer: RefCell<&'static str>, destroy_fn: Option>, - has_focus: Arc, } impl Window { @@ -51,14 +48,12 @@ impl Window { let runner = target.runner.clone(); let destroy_fn = Box::new(move || runner.notify_destroy_window(RootWI(id))); - let has_focus = canvas.borrow().has_focus.clone(); let inner = Inner { id, window: window.clone(), canvas, previous_pointer: RefCell::new("auto"), destroy_fn: Some(destroy_fn), - has_focus, }; inner.set_title(&attr.title); @@ -66,9 +61,11 @@ impl Window { inner.set_visible(attr.visible); inner.set_window_icon(attr.window_icon); - Ok(Window { - inner: Dispatcher::new(inner).unwrap(), - }) + let canvas = Rc::downgrade(&inner.canvas); + let (dispatcher, runner) = Dispatcher::new(inner).unwrap(); + target.runner.add_canvas(RootWI(id), canvas, runner); + + Ok(Window { inner: dispatcher }) } pub(crate) fn maybe_queue_on_main(&self, f: impl FnOnce(&Inner) + Send + 'static) { @@ -80,7 +77,9 @@ impl Window { } pub fn canvas(&self) -> Option { - self.inner.with(|inner| inner.canvas.borrow().raw().clone()) + self.inner + .value() + .map(|inner| inner.canvas.borrow().raw().clone()) } } @@ -414,7 +413,7 @@ impl Inner { #[inline] pub fn has_focus(&self) -> bool { - self.has_focus.load(Ordering::Relaxed) + self.canvas.borrow().has_focus.get() } pub fn title(&self) -> String {