Skip to content

Commit

Permalink
Pre-fetch window commands when possible
Browse files Browse the repository at this point in the history
  • Loading branch information
daxpedda committed Oct 16, 2023
1 parent 2529807 commit a563b81
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 36 deletions.
25 changes: 23 additions & 2 deletions src/platform_impl/web/async/channel.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use atomic_waker::AtomicWaker;
use std::future;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, RecvError, SendError, Sender, TryRecvError};
use std::sync::{Arc, Mutex};
Expand All @@ -21,7 +22,10 @@ pub fn channel<T>() -> (AsyncSender<T>, AsyncReceiver<T>) {
sender,
inner: Arc::clone(&inner),
};
let receiver = AsyncReceiver { receiver, inner };
let receiver = AsyncReceiver {
receiver: Rc::new(receiver),
inner,
};

(sender, receiver)
}
Expand Down Expand Up @@ -59,7 +63,7 @@ impl<T> Clone for AsyncSender<T> {
}

pub struct AsyncReceiver<T> {
receiver: Receiver<T>,
receiver: Rc<Receiver<T>>,
inner: Arc<Inner>,
}

Expand All @@ -86,6 +90,23 @@ impl<T> AsyncReceiver<T> {
})
.await
}

pub fn try_recv(&self) -> Result<Option<T>, RecvError> {
match self.receiver.try_recv() {
Ok(value) => Ok(Some(value)),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Disconnected) => Err(RecvError),
}
}
}

impl<T> Clone for AsyncReceiver<T> {
fn clone(&self) -> Self {
Self {
receiver: Rc::clone(&self.receiver),
inner: Arc::clone(&self.inner),
}
}
}

struct Inner {
Expand Down
42 changes: 34 additions & 8 deletions src/platform_impl/web/async/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{channel, AsyncSender, Wrapper};
use super::{channel, AsyncReceiver, AsyncSender, Wrapper};
use std::sync::{Arc, Condvar, Mutex};

pub struct Dispatcher<T: 'static>(Wrapper<true, T, AsyncSender<Closure<T>>, Closure<T>>);
Expand All @@ -7,7 +7,7 @@ struct Closure<T>(Box<dyn FnOnce(&T) + Send>);

impl<T> Dispatcher<T> {
#[track_caller]
pub fn new(value: T) -> Option<Self> {
pub fn new(value: T) -> Option<(Self, DispatchRunner<T>)> {
let (sender, receiver) = channel::<Closure<T>>();

Wrapper::new(
Expand All @@ -17,11 +17,14 @@ impl<T> Dispatcher<T> {
// funny with it here. See `Self::queue()`.
closure(value.read().unwrap().as_ref().unwrap())
},
move |value| async move {
while let Ok(Closure(closure)) = receiver.next().await {
// SAFETY: The given `Closure` here isn't really `'static`, so we shouldn't do anything
// funny with it here. See `Self::queue()`.
closure(value.read().unwrap().as_ref().unwrap())
{
let receiver = receiver.clone();
move |value| async move {
while let Ok(Closure(closure)) = receiver.next().await {
// SAFETY: The given `Closure` here isn't really `'static`, so we shouldn't do anything
// funny with it here. See `Self::queue()`.
closure(value.read().unwrap().as_ref().unwrap())
}
}
},
sender,
Expand All @@ -31,7 +34,7 @@ impl<T> Dispatcher<T> {
sender.send(closure).unwrap()
},
)
.map(Self)
.map(|wrapper| (Self(wrapper.clone()), DispatchRunner { wrapper, receiver }))
}

pub fn with<R>(&self, f: impl FnOnce(&T) -> R) -> Option<R> {
Expand Down Expand Up @@ -81,3 +84,26 @@ impl<T> Drop for Dispatcher<T> {
self.0.with_sender_data(|sender| sender.close())
}
}

pub struct DispatchRunner<T: 'static> {
wrapper: Wrapper<true, T, AsyncSender<Closure<T>>, Closure<T>>,
receiver: AsyncReceiver<Closure<T>>,
}

impl<T> DispatchRunner<T> {
pub fn run(&self) {
while let Some(Closure(closure)) = self
.receiver
.try_recv()
.expect("should only be closed when `Dispatcher` is dropped")
{
self.wrapper
.with(|value| {
// SAFETY: The given `Closure` here isn't really `'static`, so we shouldn't do anything
// funny with it here. See `Self::queue()`.
closure(value)
})
.expect("don't call this outside the main thread")
}
}
}
4 changes: 2 additions & 2 deletions src/platform_impl/web/async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod dispatcher;
mod waker;
mod wrapper;

use self::channel::{channel, AsyncSender};
pub use self::dispatcher::Dispatcher;
use self::channel::{channel, AsyncReceiver, AsyncSender};
pub use self::dispatcher::{DispatchRunner, Dispatcher};
pub use self::waker::{Waker, WakerSpawner};
use self::wrapper::Wrapper;
69 changes: 47 additions & 22 deletions src/platform_impl/web/event_loop/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use crate::event::{
use crate::event_loop::{ControlFlow, DeviceEvents};
use crate::platform::web::PollStrategy;
use crate::platform_impl::platform::backend::EventListenerHandle;
use crate::platform_impl::platform::r#async::{Waker, WakerSpawner};
use crate::platform_impl::platform::r#async::{DispatchRunner, Waker, WakerSpawner};
use crate::platform_impl::platform::window::Inner;
use crate::window::WindowId;

use std::{
Expand Down Expand Up @@ -47,7 +48,14 @@ pub struct Execution {
id: RefCell<u32>,
window: web_sys::Window,
document: Document,
all_canvases: RefCell<Vec<(WindowId, Weak<RefCell<backend::Canvas>>)>>,
#[allow(clippy::type_complexity)]
all_canvases: RefCell<
Vec<(
WindowId,
Weak<RefCell<backend::Canvas>>,
DispatchRunner<Inner>,
)>,
>,
redraw_pending: RefCell<HashSet<WindowId>>,
destroy_pending: RefCell<VecDeque<WindowId>>,
page_transition_event_handle: RefCell<Option<backend::PageTransitionEventHandle>>,
Expand Down Expand Up @@ -186,11 +194,13 @@ impl Shared {
&self.0.document
}

pub fn add_canvas(&self, id: WindowId, canvas: &Rc<RefCell<backend::Canvas>>) {
self.0
.all_canvases
.borrow_mut()
.push((id, Rc::downgrade(canvas)));
pub fn add_canvas(
&self,
id: WindowId,
canvas: Weak<RefCell<backend::Canvas>>,
runner: DispatchRunner<Inner>,
) {
self.0.all_canvases.borrow_mut().push((id, canvas, runner));
}

pub fn notify_destroy_window(&self, id: WindowId) {
Expand Down Expand Up @@ -422,7 +432,7 @@ impl Shared {
"visibilitychange",
Closure::new(move |_| {
if !runner.0.suspended.get() {
for (id, canvas) in &*runner.0.all_canvases.borrow() {
for (id, canvas, _) in &*runner.0.all_canvases.borrow() {
if let Some(canvas) = canvas.upgrade() {
let is_visible = backend::is_visible(runner.document());
// only fire if:
Expand Down Expand Up @@ -560,7 +570,7 @@ impl Shared {
self.0
.all_canvases
.borrow_mut()
.retain(|&(item_id, _)| item_id != id);
.retain(|&(item_id, _, _)| item_id != id);
self.handle_event(Event::WindowEvent {
window_id: id,
event: crate::event::WindowEvent::Destroyed,
Expand Down Expand Up @@ -629,6 +639,15 @@ impl Shared {
// Don't take events out of the queue if the loop is closed or the runner doesn't exist
// If the runner doesn't exist and this method recurses, it will recurse infinitely
if !is_closed && self.0.runner.borrow().maybe_runner().is_some() {
// Pre-fetch window commands to avoid having to wait until the next event loop cycle
// and potentially block other threads in the meantime.
for (_, window, runner) in self.0.all_canvases.borrow().iter() {
if let Some(window) = window.upgrade() {
runner.run();
drop(window)
}
}

// Take an event out of the queue and handle it
// Make sure not to let the borrow_mut live during the next handle_event
let event = {
Expand Down Expand Up @@ -712,7 +731,7 @@ impl Shared {
// Dropping the `Runner` drops the event handler closure, which will in
// turn drop all `Window`s moved into the closure.
*self.0.runner.borrow_mut() = RunnerEnum::Destroyed;
for (_, canvas) in all_canvases {
for (_, canvas, _) in all_canvases {
// In case any remaining `Window`s are still not dropped, we will need
// to explicitly remove the event handlers associated with their canvases.
if let Some(canvas) = canvas.upgrade() {
Expand Down Expand Up @@ -756,23 +775,29 @@ impl Shared {
fn device_events(&self) -> bool {
match self.0.device_events.get() {
DeviceEvents::Always => true,
DeviceEvents::WhenFocused => self.0.all_canvases.borrow().iter().any(|(_, canvas)| {
if let Some(canvas) = canvas.upgrade() {
canvas.borrow().has_focus.get()
} else {
false
}
}),
DeviceEvents::WhenFocused => {
self.0.all_canvases.borrow().iter().any(|(_, canvas, _)| {
if let Some(canvas) = canvas.upgrade() {
canvas.borrow().has_focus.get()
} else {
false
}
})
}
DeviceEvents::Never => false,
}
}

fn transient_activation(&self) {
self.0.all_canvases.borrow().iter().for_each(|(_, canvas)| {
if let Some(canvas) = canvas.upgrade() {
canvas.borrow().transient_activation();
}
});
self.0
.all_canvases
.borrow()
.iter()
.for_each(|(_, canvas, _)| {
if let Some(canvas) = canvas.upgrade() {
canvas.borrow().transient_activation();
}
});
}

pub fn event_loop_recreation(&self, allow: bool) {
Expand Down
1 change: 0 additions & 1 deletion src/platform_impl/web/event_loop/window_target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ impl<T> EventLoopWindowTarget<T> {
id: WindowId,
prevent_default: bool,
) {
self.runner.add_canvas(RootWindowId(id), canvas);
let canvas_clone = canvas.clone();
let mut canvas = canvas.borrow_mut();
canvas.set_attribute("data-raw-handle", &id.0.to_string());
Expand Down
6 changes: 5 additions & 1 deletion src/platform_impl/web/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@ impl Window {
inner.set_visible(attr.visible);
inner.set_window_icon(attr.window_icon);

let canvas = Rc::downgrade(&inner.canvas);
let (dispatcher, runner) = Dispatcher::new(inner).unwrap();
target.runner.add_canvas(RootWI(id), canvas, runner);

Ok(Window {
inner: Dispatcher::new(inner).unwrap(),
inner: dispatcher,
})
}

Expand Down

0 comments on commit a563b81

Please sign in to comment.