From 87e6df79cf19432fda0bebb4d7531f6bbd66c5d7 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 18 Apr 2019 17:11:49 +0200 Subject: [PATCH] Initial commit --- .gitignore | 3 + Cargo.toml | 17 ++ LICENSE | 18 ++ README.md | 6 + src/lib.rs | 27 +++ src/wasm.rs | 513 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 584 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 LICENSE create mode 100644 README.md create mode 100644 src/lib.rs create mode 100644 src/wasm.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2f88dba --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/target +**/*.rs.bk +Cargo.lock \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..4925be8 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "wasm-timer" +edition = "2018" +description = "Abstraction over std::time::Instant and tokio_timer that works on WASM" +version = "0.1.0" +authors = ["Pierre Krieger "] +license = "MIT" +repository = "https://github.com/tomaka/wasm-timer" + +[target.'cfg(any(target_arch = "wasm32"))'.dependencies] +futures = "0.1" +send_wrapper = "0.2" +wasm-bindgen = "0.2.37" +web-sys = { version = "0.3.14", features = ["Performance", "Window"] } + +[target.'cfg(not(any(target_arch = "wasm32")))'.dependencies] +tokio-timer = "0.2" diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..8fefffe --- /dev/null +++ b/LICENSE @@ -0,0 +1,18 @@ +Copyright 2019 Pierre Krieger + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS +OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..f45b8c9 --- /dev/null +++ b/README.md @@ -0,0 +1,6 @@ +# Wasm-timer + +Exports the `Instant`, `Delay`, `Interval` and `Timeout` structs. + +On non-WASM targets, this re-exports the types from `tokio-timer`. +On WASM targets, this uses `web-sys` to implement their functionalities. diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..062ebc2 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,27 @@ +// Copyright 2019 Pierre Krieger +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#[cfg(not(target_arch = "wasm32"))] +pub use tokio_timer::*; +#[cfg(target_arch = "wasm32")] +pub use wasm::*; + +#[cfg(target_arch = "wasm32")] +mod wasm; diff --git a/src/wasm.rs b/src/wasm.rs new file mode 100644 index 0000000..e1b1f14 --- /dev/null +++ b/src/wasm.rs @@ -0,0 +1,513 @@ +// Copyright 2019 Pierre Krieger +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#![cfg(target_arch = "wasm32")] + +use futures::{prelude::*, sync::oneshot, try_ready}; +use std::{error, fmt}; +use std::cmp::{Eq, PartialEq, Ord, PartialOrd, Ordering}; +use std::ops::{Add, Sub}; +use std::time::Duration; +use wasm_bindgen::{prelude::*, JsCast}; + +pub use self::timeout::Timeout; + +#[derive(Debug, Copy, Clone)] +pub struct Instant { + /// Unit is milliseconds. + inner: f64, +} + +impl PartialEq for Instant { + fn eq(&self, other: &Instant) -> bool { + // Note that this will most likely only compare equal if we clone an `Instant`, + // but that's ok. + self.inner == other.inner + } +} + +impl Eq for Instant {} + +impl PartialOrd for Instant { + fn partial_cmp(&self, other: &Instant) -> Option { + self.inner.partial_cmp(&other.inner) + } +} + +impl Ord for Instant { + fn cmp(&self, other: &Self) -> Ordering { + self.inner.partial_cmp(&other.inner).unwrap() + } +} + +impl Instant { + pub fn now() -> Instant { + let val = web_sys::window() + .expect("not in a browser") + .performance() + .expect("performance object not available") + .now(); + Instant { inner: val } + } + + pub fn duration_since(&self, earlier: Instant) -> Duration { + *self - earlier + } + + pub fn elapsed(&self) -> Duration { + Instant::now() - *self + } +} + +impl Add for Instant { + type Output = Instant; + + fn add(self, other: Duration) -> Instant { + let new_val = self.inner + other.as_millis() as f64; + Instant { inner: new_val as f64 } + } +} + +impl Sub for Instant { + type Output = Instant; + + fn sub(self, other: Duration) -> Instant { + let new_val = self.inner - other.as_millis() as f64; + Instant { inner: new_val as f64 } + } +} + +impl Sub for Instant { + type Output = Duration; + + fn sub(self, other: Instant) -> Duration { + let ms = self.inner - other.inner; + assert!(ms >= 0.0); + Duration::from_millis(ms as u64) + } +} + +#[derive(Debug)] +pub struct Error; + +impl error::Error for Error { +} + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "Timer error") + } +} + +pub struct Delay { + handle: i32, + deadline: Instant, + triggered_rx: oneshot::Receiver<()>, + _cb: send_wrapper::SendWrapper>, +} + +// TODO: +unsafe impl Sync for Delay {} + +impl Delay { + pub fn new(deadline: Instant) -> Delay { + let now = Instant::now(); + if deadline > now { + let dur = deadline - now; + Delay::new_timeout(deadline, dur) + } else { + Delay::new_timeout(deadline, Duration::new(0, 0)) + } + } + + pub fn deadline(&self) -> Instant { + self.deadline + } + + fn new_timeout(deadline: Instant, duration: Duration) -> Delay { + let (tx, rx) = oneshot::channel(); + let mut tx = Some(tx); + + let cb = Closure::wrap(Box::new(move || { + let _ = tx.take().unwrap().send(()); + }) as Box); + + let handle = web_sys::window() + .expect("not in a browser") + .set_timeout_with_callback_and_timeout_and_arguments_0(cb.as_ref().unchecked_ref(), duration.as_millis() as i32) + .expect("failed to call set_timeout"); + + Delay { handle, triggered_rx: rx, deadline, _cb: send_wrapper::SendWrapper::new(cb) } + } + + fn reset_timeout(&mut self) { + // TODO: what does that do? + } + + pub fn reset(&mut self, deadline: Instant) { + *self = Delay::new(deadline); + } +} + +impl Drop for Delay { + fn drop(&mut self) { + web_sys::window().unwrap().clear_timeout_with_handle(self.handle); + } +} + +impl fmt::Debug for Delay { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_tuple("Delay").field(&self.deadline).finish() + } +} + +impl Future for Delay { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll { + self.triggered_rx.poll().map_err(|_| unreachable!()) + } +} + +/// A stream representing notifications at fixed interval +#[derive(Debug)] +pub struct Interval { + /// Future that completes the next time the `Interval` yields a value. + delay: Delay, + + /// The duration between values yielded by `Interval`. + duration: Duration, +} + +impl Interval { + /// Create a new `Interval` that starts at `at` and yields every `duration` + /// interval after that. + /// + /// Note that when it starts, it produces item too. + /// + /// The `duration` argument must be a non-zero duration. + /// + /// # Panics + /// + /// This function panics if `duration` is zero. + pub fn new(at: Instant, duration: Duration) -> Interval { + assert!( + duration > Duration::new(0, 0), + "`duration` must be non-zero." + ); + + Interval::new_with_delay(Delay::new(at), duration) + } + + /// Creates new `Interval` that yields with interval of `duration`. + /// + /// The function is shortcut for `Interval::new(Instant::now() + duration, duration)`. + /// + /// The `duration` argument must be a non-zero duration. + /// + /// # Panics + /// + /// This function panics if `duration` is zero. + pub fn new_interval(duration: Duration) -> Interval { + Interval::new(Instant::now() + duration, duration) + } + + pub(crate) fn new_with_delay(delay: Delay, duration: Duration) -> Interval { + Interval { delay, duration } + } +} + +impl Stream for Interval { + type Item = Instant; + type Error = crate::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + // Wait for the delay to be done + let _ = try_ready!(self.delay.poll()); + + // Get the `now` by looking at the `delay` deadline + let now = self.delay.deadline(); + + // The next interval value is `duration` after the one that just + // yielded. + self.delay.reset(now + self.duration); + + // Return the current instant + Ok(Some(now).into()) + } +} + +pub mod timeout { + use super::{Delay, Instant}; + use futures::prelude::*; + use std::{error, fmt, time::Duration}; + + #[must_use = "futures do nothing unless polled"] + #[derive(Debug)] + pub struct Timeout { + value: T, + delay: Delay, + } + + /// Error returned by `Timeout`. + #[derive(Debug)] + pub struct Error(Kind); + + /// Timeout error variants + #[derive(Debug)] + enum Kind { + /// Inner value returned an error + Inner(T), + + /// The timeout elapsed. + Elapsed, + + /// Timer returned an error. + Timer(crate::Error), + } + + impl Timeout { + /// Create a new `Timeout` that allows `value` to execute for a duration of + /// at most `timeout`. + /// + /// The exact behavior depends on if `value` is a `Future` or a `Stream`. + /// + /// See [type] level documentation for more details. + /// + /// [type]: # + /// + /// # Examples + /// + /// Create a new `Timeout` set to expire in 10 milliseconds. + /// + /// ```rust + /// # extern crate futures; + /// # extern crate tokio; + /// use tokio::timer::Timeout; + /// use futures::Future; + /// use futures::sync::oneshot; + /// use std::time::Duration; + /// + /// # fn main() { + /// let (tx, rx) = oneshot::channel(); + /// # tx.send(()).unwrap(); + /// + /// # tokio::runtime::current_thread::block_on_all( + /// // Wrap the future with a `Timeout` set to expire in 10 milliseconds. + /// Timeout::new(rx, Duration::from_millis(10)) + /// # ).unwrap(); + /// # } + /// ``` + pub fn new(value: T, timeout: Duration) -> Timeout { + let delay = Delay::new_timeout(Instant::now() + timeout, timeout); + + Timeout { + value, + delay, + } + } + + /// Gets a reference to the underlying value in this timeout. + pub fn get_ref(&self) -> &T { + &self.value + } + + /// Gets a mutable reference to the underlying value in this timeout. + pub fn get_mut(&mut self) -> &mut T { + &mut self.value + } + + /// Consumes this timeout, returning the underlying value. + pub fn into_inner(self) -> T { + self.value + } + } + + impl Timeout { + /// Create a new `Timeout` that completes when `future` completes or when + /// `deadline` is reached. + /// + /// This function differs from `new` in that: + /// + /// * It only accepts `Future` arguments. + /// * It sets an explicit `Instant` at which the timeout expires. + pub fn new_at(future: T, deadline: Instant) -> Timeout { + let delay = Delay::new(deadline); + + Timeout { + value: future, + delay, + } + } + } + + impl Future for Timeout + where T: Future, + { + type Item = T::Item; + type Error = Error; + + fn poll(&mut self) -> Poll { + // First, try polling the future + match self.value.poll() { + Ok(Async::Ready(v)) => return Ok(Async::Ready(v)), + Ok(Async::NotReady) => {} + Err(e) => return Err(Error::inner(e)), + } + + // Now check the timer + match self.delay.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(_)) => { + Err(Error::elapsed()) + }, + Err(e) => Err(Error::timer(e)), + } + } + } + + impl Stream for Timeout + where T: Stream, + { + type Item = T::Item; + type Error = Error; + + fn poll(&mut self) -> Poll, Self::Error> { + // First, try polling the future + match self.value.poll() { + Ok(Async::Ready(v)) => { + if v.is_some() { + self.delay.reset_timeout(); + } + return Ok(Async::Ready(v)) + } + Ok(Async::NotReady) => {} + Err(e) => return Err(Error::inner(e)), + } + + // Now check the timer + match self.delay.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(_)) => { + self.delay.reset_timeout(); + Err(Error::elapsed()) + }, + Err(e) => Err(Error::timer(e)), + } + } + } + + impl Error { + /// Create a new `Error` representing the inner value completing with `Err`. + pub fn inner(err: T) -> Error { + Error(Kind::Inner(err)) + } + + /// Returns `true` if the error was caused by the inner value completing + /// with `Err`. + pub fn is_inner(&self) -> bool { + match self.0 { + Kind::Inner(_) => true, + _ => false, + } + } + + /// Consumes `self`, returning the inner future error. + pub fn into_inner(self) -> Option { + match self.0 { + Kind::Inner(err) => Some(err), + _ => None, + } + } + + /// Create a new `Error` representing the inner value not completing before + /// the deadline is reached. + pub fn elapsed() -> Error { + Error(Kind::Elapsed) + } + + /// Returns `true` if the error was caused by the inner value not completing + /// before the deadline is reached. + pub fn is_elapsed(&self) -> bool { + match self.0 { + Kind::Elapsed => true, + _ => false, + } + } + + /// Creates a new `Error` representing an error encountered by the timer + /// implementation + pub fn timer(err: crate::Error) -> Error { + Error(Kind::Timer(err)) + } + + /// Returns `true` if the error was caused by the timer. + pub fn is_timer(&self) -> bool { + match self.0 { + Kind::Timer(_) => true, + _ => false, + } + } + + /// Consumes `self`, returning the error raised by the timer implementation. + pub fn into_timer(self) -> Option { + match self.0 { + Kind::Timer(err) => Some(err), + _ => None, + } + } + } + + impl error::Error for Error { + fn description(&self) -> &str { + use self::Kind::*; + + match self.0 { + Inner(ref e) => e.description(), + Elapsed => "deadline has elapsed", + Timer(ref e) => e.description(), + } + } + } + + impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + use self::Kind::*; + + match self.0 { + Inner(ref e) => e.fmt(fmt), + Elapsed => "deadline has elapsed".fmt(fmt), + Timer(ref e) => e.fmt(fmt), + } + } + } +} + +#[cfg(test)] +mod tests { + use crate::Delay; + + #[test] + fn test_send_sync() { + fn req() {} + req::(); + } +}