Skip to content

Commit

Permalink
Web: queue EventLoopProxy::send_event() to microtask
Browse files Browse the repository at this point in the history
  • Loading branch information
daxpedda authored and kchibisov committed Jun 15, 2024
1 parent 437747b commit b14d5c0
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 34 deletions.
16 changes: 16 additions & 0 deletions src/changelog/unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,19 @@ The migration guide could reference other migration examples in the current
changelog entry.

## Unreleased

### Changed

- On Web, let events wake up event loop immediately when using
`ControlFlow::Poll`.

### Fixed

- On Web, fix `EventLoopProxy::send_event()` triggering event loop immediately
when not called from inside the event loop. Now queues a microtask instead.

### Removed

- Remove `EventLoop::run`.
- Remove `EventLoopExtRunOnDemand::run_on_demand`.
- Remove `EventLoopExtPumpEvents::pump_events`.
47 changes: 27 additions & 20 deletions src/platform_impl/web/async/waker.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
use std::future;
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Poll;

use super::super::main_thread::MainThreadMarker;
use super::{AtomicWaker, Wrapper};

pub struct WakerSpawner<T: 'static>(Wrapper<Handler<T>, Sender, usize>);
pub struct WakerSpawner<T: 'static>(Wrapper<Handler<T>, Sender, NonZeroUsize>);

pub struct Waker<T: 'static>(Wrapper<Handler<T>, Sender, usize>);
pub struct Waker<T: 'static>(Wrapper<Handler<T>, Sender, NonZeroUsize>);

struct Handler<T> {
value: T,
handler: fn(&T, usize),
handler: fn(&T, NonZeroUsize, bool),
}

#[derive(Clone)]
struct Sender(Arc<Inner>);

impl<T> WakerSpawner<T> {
#[track_caller]
pub fn new(main_thread: MainThreadMarker, value: T, handler: fn(&T, usize)) -> Option<Self> {
pub fn new(
main_thread: MainThreadMarker,
value: T,
handler: fn(&T, NonZeroUsize, bool),
) -> Option<Self> {
let inner = Arc::new(Inner {
counter: AtomicUsize::new(0),
waker: AtomicWaker::new(),
Expand All @@ -37,7 +42,7 @@ impl<T> WakerSpawner<T> {
|handler, count| {
let handler = handler.borrow();
let handler = handler.as_ref().unwrap();
(handler.handler)(&handler.value, count);
(handler.handler)(&handler.value, count, true);
},
{
let inner = Arc::clone(&inner);
Expand All @@ -46,29 +51,31 @@ impl<T> WakerSpawner<T> {
while let Some(count) = future::poll_fn(|cx| {
let count = inner.counter.swap(0, Ordering::Relaxed);

if count > 0 {
Poll::Ready(Some(count))
} else {
inner.waker.register(cx.waker());
match NonZeroUsize::new(count) {
Some(count) => Poll::Ready(Some(count)),
None => {
inner.waker.register(cx.waker());

let count = inner.counter.swap(0, Ordering::Relaxed);
let count = inner.counter.swap(0, Ordering::Relaxed);

if count > 0 {
Poll::Ready(Some(count))
} else {
if inner.closed.load(Ordering::Relaxed) {
return Poll::Ready(None);
}
match NonZeroUsize::new(count) {
Some(count) => Poll::Ready(Some(count)),
None => {
if inner.closed.load(Ordering::Relaxed) {
return Poll::Ready(None);
}

Poll::Pending
}
Poll::Pending
},
}
},
}
})
.await
{
let handler = handler.borrow();
let handler = handler.as_ref().unwrap();
(handler.handler)(&handler.value, count);
(handler.handler)(&handler.value, count, false);
}
}
},
Expand Down Expand Up @@ -107,7 +114,7 @@ impl<T> Drop for WakerSpawner<T> {

impl<T> Waker<T> {
pub fn wake(&self) {
self.0.send(1)
self.0.send(NonZeroUsize::MIN)
}
}

Expand Down
69 changes: 55 additions & 14 deletions src/platform_impl/web/event_loop/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ use crate::platform_impl::platform::r#async::{DispatchRunner, Waker, WakerSpawne
use crate::platform_impl::platform::window::Inner;
use crate::window::WindowId;

use js_sys::Function;
use std::cell::{Cell, RefCell};
use std::collections::{HashSet, VecDeque};
use std::iter;
use std::num::NonZeroUsize;
use std::ops::Deref;
use std::rc::{Rc, Weak};
use wasm_bindgen::prelude::Closure;
use wasm_bindgen::prelude::{wasm_bindgen, Closure};
use wasm_bindgen::JsCast;
use web_sys::{Document, KeyboardEvent, PageTransitionEvent, PointerEvent, WheelEvent};
use web_time::{Duration, Instant};

Expand Down Expand Up @@ -133,12 +136,13 @@ impl Shared {
let document = window.document().expect("Failed to obtain document");

Shared(Rc::<Execution>::new_cyclic(|weak| {
let proxy_spawner = WakerSpawner::new(main_thread, weak.clone(), |runner, count| {
if let Some(runner) = runner.upgrade() {
Shared(runner).send_events(iter::repeat(Event::UserEvent(())).take(count))
}
})
.expect("`EventLoop` has to be created in the main thread");
let proxy_spawner =
WakerSpawner::new(main_thread, weak.clone(), |runner, count, local| {
if let Some(runner) = runner.upgrade() {
Shared(runner).send_user_events(count, local)
}
})
.expect("`EventLoop` has to be created in the main thread");

Execution {
main_thread,
Expand Down Expand Up @@ -460,6 +464,48 @@ impl Shared {
self.send_events(iter::once(event));
}

// Add a series of user events to the event loop runner
//
// This will schedule the event loop to wake up instead of waking it up immediately if its not
// running.
pub(crate) fn send_user_events(&self, count: NonZeroUsize, local: bool) {
// If the event loop is closed, it should discard any new events
if self.is_closed() {
return;
}

if local {
// If the loop is not running and triggered locally, queue on next microtick.
if let Ok(RunnerEnum::Running(_)) =
self.0.runner.try_borrow().as_ref().map(Deref::deref)
{
#[wasm_bindgen]
extern "C" {
#[wasm_bindgen(js_name = queueMicrotask)]
fn queue_microtask(task: Function);
}

queue_microtask(
Closure::once_into_js({
let this = Rc::downgrade(&self.0);
move || {
if let Some(shared) = this.upgrade() {
Shared(shared).send_events(
iter::repeat(Event::UserEvent(())).take(count.get()),
)
}
}
})
.unchecked_into(),
);

return;
}
}

self.send_events(iter::repeat(Event::UserEvent(())).take(count.get()))
}

// Add a series of events to the event loop runner
//
// It will determine if the event should be immediately sent to the user or buffered for later
Expand All @@ -471,13 +517,8 @@ impl Shared {
// If we can run the event processing right now, or need to queue this and wait for later
let mut process_immediately = true;
match self.0.runner.try_borrow().as_ref().map(Deref::deref) {
Ok(RunnerEnum::Running(ref runner)) => {
// If we're currently polling, queue this and wait for the poll() method to be
// called
if let State::Poll { .. } = runner.state {
process_immediately = false;
}
},
// If the runner is attached but not running, we always wake it up.
Ok(RunnerEnum::Running(_)) => (),
Ok(RunnerEnum::Pending) => {
// The runner still hasn't been attached: queue this event and wait for it to be
process_immediately = false;
Expand Down

0 comments on commit b14d5c0

Please sign in to comment.