diff --git a/cli/ops/compiler.rs b/cli/ops/compiler.rs index 5d6875fb0ed52e..bc7fcd990fa138 100644 --- a/cli/ops/compiler.rs +++ b/cli/ops/compiler.rs @@ -148,7 +148,7 @@ fn op_fetch_source_files( Ok(v.into()) }); - Ok(JsonOp::Async(future)) + Ok(JsonOp::Async(future, true)) } #[derive(Deserialize, Debug)] @@ -166,13 +166,16 @@ fn op_compile( _zero_copy: Option, ) -> Result { let args: CompileArgs = serde_json::from_value(args)?; - Ok(JsonOp::Async(runtime_compile_async( - state.global_state.clone(), - &args.root_name, - &args.sources, - args.bundle, - &args.options, - ))) + Ok(JsonOp::Async( + runtime_compile_async( + state.global_state.clone(), + &args.root_name, + &args.sources, + args.bundle, + &args.options, + ), + true, + )) } #[derive(Deserialize, Debug)] @@ -187,9 +190,12 @@ fn op_transpile( _zero_copy: Option, ) -> Result { let args: TranspileArgs = serde_json::from_value(args)?; - Ok(JsonOp::Async(runtime_transpile_async( - state.global_state.clone(), - &args.sources, - &args.options, - ))) + Ok(JsonOp::Async( + runtime_transpile_async( + state.global_state.clone(), + &args.sources, + &args.options, + ), + true, + )) } diff --git a/cli/ops/dispatch_json.rs b/cli/ops/dispatch_json.rs index 7f53a3d8004ff4..011385ed08e383 100644 --- a/cli/ops/dispatch_json.rs +++ b/cli/ops/dispatch_json.rs @@ -13,7 +13,8 @@ pub type AsyncJsonOp = pub enum JsonOp { Sync(Value), - Async(AsyncJsonOp), + /** The 2nd element is true when the op blocks exiting, false otherwise. */ + Async(AsyncJsonOp, bool), } fn json_err(err: ErrBox) -> Value { @@ -70,19 +71,19 @@ where assert!(promise_id.is_none()); CoreOp::Sync(serialize_result(promise_id, Ok(sync_value))) } - Ok(JsonOp::Async(fut)) => { + Ok(JsonOp::Async(fut, blocks_exit)) => { assert!(promise_id.is_some()); let fut2 = fut.then(move |result| { futures::future::ok(serialize_result(promise_id, result)) }); - CoreOp::Async(fut2.boxed()) + CoreOp::Async(fut2.boxed(), blocks_exit) } Err(sync_err) => { let buf = serialize_result(promise_id, Err(sync_err)); if is_sync { CoreOp::Sync(buf) } else { - CoreOp::Async(futures::future::ok(buf).boxed()) + CoreOp::Async(futures::future::ok(buf).boxed(), true) } } } @@ -101,6 +102,6 @@ where let handle = pool .spawn_with_handle(futures::future::lazy(move |_cx| f())) .unwrap(); - Ok(JsonOp::Async(handle.boxed())) + Ok(JsonOp::Async(handle.boxed(), true)) } } diff --git a/cli/ops/dispatch_minimal.rs b/cli/ops/dispatch_minimal.rs index 2d5618d65c2bb7..9f2206fd11d023 100644 --- a/cli/ops/dispatch_minimal.rs +++ b/cli/ops/dispatch_minimal.rs @@ -164,7 +164,7 @@ where // works since they're simple polling futures. Op::Sync(futures::executor::block_on(fut).unwrap()) } else { - Op::Async(fut.boxed()) + Op::Async(fut.boxed(), true) } } } diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs index e084fdeffb13f8..a8496a969fdf33 100644 --- a/cli/ops/fetch.rs +++ b/cli/ops/fetch.rs @@ -81,5 +81,5 @@ pub fn op_fetch( Ok(json_res) }; - Ok(JsonOp::Async(future.boxed())) + Ok(JsonOp::Async(future.boxed(), true)) } diff --git a/cli/ops/files.rs b/cli/ops/files.rs index 8bb3c8acb905e2..2fc24d9b37bb82 100644 --- a/cli/ops/files.rs +++ b/cli/ops/files.rs @@ -100,7 +100,7 @@ fn op_open( let buf = futures::executor::block_on(fut)?; Ok(JsonOp::Sync(buf)) } else { - Ok(JsonOp::Async(fut.boxed())) + Ok(JsonOp::Async(fut.boxed(), true)) } } @@ -172,6 +172,6 @@ fn op_seek( let buf = futures::executor::block_on(fut)?; Ok(JsonOp::Sync(buf)) } else { - Ok(JsonOp::Async(fut.boxed())) + Ok(JsonOp::Async(fut.boxed(), true)) } } diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 836ec2e8d39153..457f1d376188ec 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -130,7 +130,7 @@ fn op_accept( })) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed(), true)) } #[derive(Deserialize)] @@ -173,7 +173,7 @@ fn op_connect( })) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed(), true)) } #[derive(Deserialize)] diff --git a/cli/ops/process.rs b/cli/ops/process.rs index 30de5a7356079e..6a0e48e27b986b 100644 --- a/cli/ops/process.rs +++ b/cli/ops/process.rs @@ -244,7 +244,7 @@ fn op_run_status( let pool = futures::executor::ThreadPool::new().unwrap(); let handle = pool.spawn_with_handle(future).unwrap(); - Ok(JsonOp::Async(handle.boxed())) + Ok(JsonOp::Async(handle.boxed(), true)) } #[derive(Deserialize)] diff --git a/cli/ops/timers.rs b/cli/ops/timers.rs index 8bec10f70d1f82..c07e44256786f9 100644 --- a/cli/ops/timers.rs +++ b/cli/ops/timers.rs @@ -51,7 +51,7 @@ fn op_global_timer( .new_timeout(deadline) .then(move |_| futures::future::ok(json!({}))); - Ok(JsonOp::Async(f.boxed())) + Ok(JsonOp::Async(f.boxed(), true)) } // Returns a milliseconds and nanoseconds subsec diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs index 87a067a9e1d271..ca211b9722f1b3 100644 --- a/cli/ops/tls.rs +++ b/cli/ops/tls.rs @@ -116,7 +116,7 @@ pub fn op_connect_tls( })) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed(), true)) } fn load_certs(path: &str) -> Result, ErrBox> { @@ -397,5 +397,5 @@ fn op_accept_tls( })) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed(), true)) } diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs index eeffb39305c2d2..dab8022b728945 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/workers.rs @@ -97,7 +97,7 @@ fn op_worker_get_message( Ok(json!({ "data": maybe_buf })) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed(), true)) } /// Post message to host as guest worker @@ -258,7 +258,7 @@ fn op_host_get_worker_loaded( Ok(serialize_worker_result(result)) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed(), true)) } fn op_host_poll_worker( @@ -286,7 +286,7 @@ fn op_host_poll_worker( Ok(serialize_worker_result(result)) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed(), true)) } fn op_host_close_worker( @@ -348,7 +348,7 @@ fn op_host_get_message( Ok(json!({ "data": maybe_buf })) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed(), true)) } #[derive(Deserialize)] diff --git a/cli/state.rs b/cli/state.rs index acd661f251a795..f7fb2aabeeb3c3 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -103,13 +103,13 @@ impl ThreadSafeState { state.metrics_op_completed(buf.len()); Op::Sync(buf) } - Op::Async(fut) => { + Op::Async(fut, blocks_exit) => { let state = state.clone(); let result_fut = fut.map_ok(move |buf: Buf| { state.metrics_op_completed(buf.len()); buf }); - Op::Async(result_fut.boxed()) + Op::Async(result_fut.boxed(), blocks_exit) } } } diff --git a/core/es_isolate.rs b/core/es_isolate.rs index a3231a90c21717..1063a8f7f145e1 100644 --- a/core/es_isolate.rs +++ b/core/es_isolate.rs @@ -650,7 +650,7 @@ pub mod tests { assert_eq!(control.len(), 1); assert_eq!(control[0], 42); let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); - Op::Async(futures::future::ok(buf).boxed()) + Op::Async(futures::future::ok(buf).boxed(), true) }; isolate.register_op("test", dispatcher); diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index 5a5b43c5128270..a3e591d0b628ab 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -133,7 +133,7 @@ fn http_op( if is_sync { Op::Sync(futures::executor::block_on(fut).unwrap()) } else { - Op::Async(fut.boxed()) + Op::Async(fut.boxed(), true) } } } diff --git a/core/isolate.rs b/core/isolate.rs index 5617caa8609339..e5558b1317e93a 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -511,9 +511,11 @@ impl Isolate { let op_id = 0; Some((op_id, buf)) } - Op::Async(fut) => { + Op::Async(fut, blocks_exit) => { let fut2 = fut.map_ok(move |buf| (op_id, buf)); - self.pending_ops.push(fut2.boxed()); + self + .pending_ops + .push(Pin::new(Box::new(PendingOp(fut2.boxed(), blocks_exit)))); self.have_unpolled_ops = true; None } @@ -742,8 +744,11 @@ impl Future for Isolate { inner.check_promise_errors(); inner.check_last_exception()?; - // We're idle if pending_ops is empty. - if inner.pending_ops.is_empty() { + // We're idle if all pending_ops have blocks_exit flag false. + // TODO(kt3k): This might affect the performance of the event loop when + // the user created thousands of optional ops. See the discussion at + // https://github.com/denoland/deno/pull/3715/files#r368270169 + if inner.pending_ops.iter().all(|op| !op.1) { Poll::Ready(Ok(())) } else { if inner.have_unpolled_ops { @@ -814,6 +819,7 @@ pub mod tests { pub enum Mode { Async, + AsyncOptional, OverflowReqSync, OverflowResSync, OverflowReqAsync, @@ -834,7 +840,18 @@ pub mod tests { assert_eq!(control.len(), 1); assert_eq!(control[0], 42); let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); - Op::Async(futures::future::ok(buf).boxed()) + Op::Async(futures::future::ok(buf).boxed(), true) + } + Mode::AsyncOptional => { + assert_eq!(control.len(), 1); + assert_eq!(control[0], 42); + let fut = async { + // This future never finish. + futures::future::pending::<()>().await; + let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); + Ok(buf) + }; + Op::Async(fut.boxed(), false) } Mode::OverflowReqSync => { assert_eq!(control.len(), 100 * 1024 * 1024); @@ -853,7 +870,7 @@ pub mod tests { Mode::OverflowReqAsync => { assert_eq!(control.len(), 100 * 1024 * 1024); let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); - Op::Async(futures::future::ok(buf).boxed()) + Op::Async(futures::future::ok(buf).boxed(), true) } Mode::OverflowResAsync => { assert_eq!(control.len(), 1); @@ -862,7 +879,7 @@ pub mod tests { vec.resize(100 * 1024 * 1024, 0); vec[0] = 4; let buf = vec.into_boxed_slice(); - Op::Async(futures::future::ok(buf).boxed()) + Op::Async(futures::future::ok(buf).boxed(), true) } } }; @@ -953,6 +970,31 @@ pub mod tests { }); } + #[test] + fn test_poll_async_optional_ops() { + run_in_task(|cx| { + let (mut isolate, dispatch_count) = setup(Mode::AsyncOptional); + js_check(isolate.execute( + "check1.js", + r#" + Deno.core.setAsyncHandler(1, (buf) => { + // This handler will never be called + assert(false); + }); + let control = new Uint8Array([42]); + Deno.core.send(1, control); + "#, + )); + assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); + // The above op never finish, but isolate can finish + // because the op is an optional async op (blocks_exit flag == false). + assert!(match isolate.poll_unpin(cx) { + Poll::Ready(Ok(_)) => true, + _ => false, + }); + }) + } + #[test] fn terminate_execution() { let (tx, rx) = std::sync::mpsc::channel::(); diff --git a/core/ops.rs b/core/ops.rs index 7ed14268280a7a..83fca539603ae4 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -5,6 +5,7 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::sync::RwLock; +use std::task::{Context, Poll}; pub type OpId = u32; @@ -13,14 +14,26 @@ pub type Buf = Box<[u8]>; pub type OpAsyncFuture = Pin> + Send>>; -pub(crate) type PendingOpFuture = +pub(crate) type PendingOpInnerFuture = Pin> + Send>>; +pub(crate) struct PendingOp(pub PendingOpInnerFuture, pub bool); + +impl Future for PendingOp { + type Output = Result<(OpId, Buf), CoreError>; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + self.as_mut().0.as_mut().poll(cx) + } +} + +pub(crate) type PendingOpFuture = Pin>; + pub type OpResult = Result, E>; pub enum Op { Sync(Buf), - Async(OpAsyncFuture), + /** The 2nd element is true when the op blocks exiting, false otherwise. */ + Async(OpAsyncFuture, bool), } pub type CoreError = (); diff --git a/test_plugin/src/lib.rs b/test_plugin/src/lib.rs index 95cd6e9ca4e82f..8cc39148cced33 100644 --- a/test_plugin/src/lib.rs +++ b/test_plugin/src/lib.rs @@ -49,5 +49,5 @@ pub fn op_test_async(data: &[u8], zero_copy: Option) -> CoreOp { Ok(result_box) }; - Op::Async(fut.boxed()) + Op::Async(fut.boxed(), true) }