Skip to content

Commit

Permalink
Convert async iterators to streams (#2401)
Browse files Browse the repository at this point in the history
* Add stream module to futures crate

The stream module exposes the JsStream type used to convert
JS objects implementing the AsyncIterator interface to be used
as Rust streams.

* Add stream feature to futures crate
  • Loading branch information
olanod authored Jan 5, 2021
1 parent 9f725e7 commit b2fe5d1
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 0 deletions.
5 changes: 5 additions & 0 deletions crates/futures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ edition = "2018"
cfg-if = "1.0.0"
js-sys = { path = "../js-sys", version = '0.3.46' }
wasm-bindgen = { path = "../..", version = '0.2.69' }
futures-core = { version = '0.3.8', default-features = false, optional = true }

[features]
futures-core-03-stream = ['futures-core']

[target.'cfg(target_feature = "atomics")'.dependencies.web-sys]
path = "../web-sys"
Expand All @@ -26,3 +30,4 @@ features = [
[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen-test = { path = '../test', version = '0.3.19' }
futures-channel-preview = { version = "0.3.0-alpha.18" }
futures-lite = { version = "1.11.3", default-features = false }
3 changes: 3 additions & 0 deletions crates/futures/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ This crate bridges the gap between a Rust `Future` and a JavaScript
1. From a JavaScript `Promise` into a Rust `Future`.
2. From a Rust `Future` into a JavaScript `Promise`.

Additionally under the feature flag `futures-core-03-stream` there is experimental
support for `AsyncIterator` to `Stream` conversion.

See the [API documentation][docs] for more info.

[docs]: https://rustwasm.github.io/wasm-bindgen/api/wasm_bindgen_futures/
2 changes: 2 additions & 0 deletions crates/futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ use std::task::{Context, Poll, Waker};
use wasm_bindgen::prelude::*;

mod queue;
#[cfg(feature = "futures-core-03-stream")]
pub mod stream;

mod task {
use cfg_if::cfg_if;
Expand Down
81 changes: 81 additions & 0 deletions crates/futures/src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
//! Converting JavaScript `AsyncIterator`s to Rust `Stream`s.
//!
//! Analogous to the promise to future convertion, this module allows the
//! turing objects implementing the async iterator protocol into `Stream`s
//! that produce values that can be awaited from.
//!

use crate::JsFuture;
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures_core::stream::Stream;
use js_sys::{AsyncIterator, IteratorNext};
use wasm_bindgen::{prelude::*, JsCast};

/// A `Stream` that yields values from an underlying `AsyncIterator`.
pub struct JsStream {
iter: AsyncIterator,
next: Option<JsFuture>,
done: bool,
}

impl JsStream {
fn next_future(&self) -> Result<JsFuture, JsValue> {
self.iter.next().map(JsFuture::from)
}
}

impl From<AsyncIterator> for JsStream {
fn from(iter: AsyncIterator) -> Self {
JsStream {
iter,
next: None,
done: false,
}
}
}

impl Stream for JsStream {
type Item = Result<JsValue, JsValue>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if self.done {
return Poll::Ready(None);
}

let future = match self.next.as_mut() {
Some(val) => val,
None => match self.next_future() {
Ok(val) => {
self.next = Some(val);
self.next.as_mut().unwrap()
}
Err(e) => {
self.done = true;
return Poll::Ready(Some(Err(e)));
}
},
};

match Pin::new(future).poll(cx) {
Poll::Ready(res) => match res {
Ok(iter_next) => {
let next = iter_next.unchecked_into::<IteratorNext>();
if next.done() {
self.done = true;
Poll::Ready(None)
} else {
self.next.take();
Poll::Ready(Some(Ok(next.value())))
}
}
Err(e) => {
self.done = true;
Poll::Ready(Some(Err(e)))
}
},
Poll::Pending => Poll::Pending,
}
}
}
23 changes: 23 additions & 0 deletions crates/futures/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,26 @@ async fn can_create_multiple_futures_from_same_promise() {
a.await.unwrap();
b.await.unwrap();
}

#[cfg(feature = "futures-core-03-stream")]
#[wasm_bindgen_test]
async fn can_use_an_async_iterable_as_stream() {
use futures_lite::stream::StreamExt;
use wasm_bindgen::JsCast;
use wasm_bindgen_futures::stream::JsStream;

let async_iter = js_sys::Function::new_no_args(
"return async function*() {
yield 42;
yield 24;
}()",
)
.call0(&JsValue::undefined())
.unwrap()
.unchecked_into::<js_sys::AsyncIterator>();

let mut stream = JsStream::from(async_iter);
assert_eq!(stream.next().await, Some(Ok(JsValue::from(42))));
assert_eq!(stream.next().await, Some(Ok(JsValue::from(24))));
assert_eq!(stream.next().await, None);
}

0 comments on commit b2fe5d1

Please sign in to comment.