Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optional async ops 2 #3721

Merged
merged 7 commits into from
Jan 21, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cli/ops/dispatch_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub type AsyncJsonOp =
pub enum JsonOp {
Sync(Value),
Async(AsyncJsonOp),
/// AsyncUnref is the variation of Async, which doesn't block the program
Copy link
Member Author

@kt3k kt3k Jan 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. I misread your comment!

/// exiting.
AsyncUnref(AsyncJsonOp),
}

fn json_err(err: ErrBox) -> Value {
Expand Down Expand Up @@ -77,6 +80,13 @@ where
});
CoreOp::Async(fut2.boxed())
}
Ok(JsonOp::AsyncUnref(fut)) => {
assert!(promise_id.is_some());
let fut2 = fut.then(move |result| {
futures::future::ok(serialize_result(promise_id, result))
});
CoreOp::AsyncUnref(fut2.boxed())
}
Err(sync_err) => {
let buf = serialize_result(promise_id, Err(sync_err));
if is_sync {
Expand Down
8 changes: 8 additions & 0 deletions cli/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ impl ThreadSafeState {
});
Op::Async(result_fut.boxed())
}
Op::AsyncUnref(fut) => {
let state = state.clone();
let result_fut = fut.map_ok(move |buf: Buf| {
state.metrics_op_completed(buf.len());
buf
});
Op::AsyncUnref(result_fut.boxed())
}
}
}
}
Expand Down
50 changes: 49 additions & 1 deletion core/isolate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use futures::stream::select;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::task::AtomicWaker;
Expand Down Expand Up @@ -178,6 +179,7 @@ pub struct Isolate {
needs_init: bool,
pub(crate) shared: SharedQueue,
pending_ops: FuturesUnordered<PendingOpFuture>,
pending_unref_ops: FuturesUnordered<PendingOpFuture>,
have_unpolled_ops: bool,
startup_script: Option<OwnedScript>,
pub op_registry: Arc<OpRegistry>,
Expand Down Expand Up @@ -340,6 +342,7 @@ impl Isolate {
shared,
needs_init,
pending_ops: FuturesUnordered::new(),
pending_unref_ops: FuturesUnordered::new(),
have_unpolled_ops: false,
startup_script,
op_registry: Arc::new(OpRegistry::new()),
Expand Down Expand Up @@ -517,6 +520,12 @@ impl Isolate {
self.have_unpolled_ops = true;
None
}
Op::AsyncUnref(fut) => {
let fut2 = fut.map_ok(move |buf| (op_id, buf));
self.pending_unref_ops.push(fut2.boxed());
self.have_unpolled_ops = true;
None
}
}
}

Expand Down Expand Up @@ -711,7 +720,9 @@ impl Future for Isolate {
// Now handle actual ops.
inner.have_unpolled_ops = false;
#[allow(clippy::match_wild_err_arm)]
match inner.pending_ops.poll_next_unpin(cx) {
match select(&mut inner.pending_ops, &mut inner.pending_unref_ops)
.poll_next_unpin(cx)
{
Comment on lines +723 to +725
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very elegant

Poll::Ready(Some(Err(_))) => panic!("unexpected op error"),
Poll::Ready(None) => break,
Poll::Pending => break,
Expand Down Expand Up @@ -814,6 +825,7 @@ pub mod tests {

pub enum Mode {
Async,
AsyncUnref,
OverflowReqSync,
OverflowResSync,
OverflowReqAsync,
Expand All @@ -836,6 +848,17 @@ pub mod tests {
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
Op::Async(futures::future::ok(buf).boxed())
}
Mode::AsyncUnref => {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let fut = async {
// This future never finish.
futures::future::pending::<()>().await;
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
Ok(buf)
};
Op::AsyncUnref(fut.boxed())
}
Mode::OverflowReqSync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
Expand Down Expand Up @@ -953,6 +976,31 @@ pub mod tests {
});
}

#[test]
fn test_poll_async_optional_ops() {
run_in_task(|cx| {
let (mut isolate, dispatch_count) = setup(Mode::AsyncUnref);
js_check(isolate.execute(
"check1.js",
r#"
Deno.core.setAsyncHandler(1, (buf) => {
// This handler will never be called
assert(false);
});
let control = new Uint8Array([42]);
Deno.core.send(1, control);
"#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
// The above op never finish, but isolate can finish
// because the op is an unreffed async op.
assert!(match isolate.poll_unpin(cx) {
Poll::Ready(Ok(_)) => true,
_ => false,
});
})
}

#[test]
fn terminate_execution() {
let (tx, rx) = std::sync::mpsc::channel::<bool>();
Expand Down
5 changes: 5 additions & 0 deletions core/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ pub type OpResult<E> = Result<Op<E>, E>;
pub enum Op<E> {
Sync(Buf),
Async(OpAsyncFuture<E>),
/**
* AsyncUnref is the variation of Async, which
* doesn't block the program exiting.
*/
AsyncUnref(OpAsyncFuture<E>),
}

pub type CoreError = ();
Expand Down