Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optional async ops #3715

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions cli/ops/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ fn op_fetch_source_files(
Ok(v.into())
});

Ok(JsonOp::Async(future))
Ok(JsonOp::Async(future, true))
}

#[derive(Deserialize, Debug)]
Expand All @@ -166,13 +166,16 @@ fn op_compile(
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: CompileArgs = serde_json::from_value(args)?;
Ok(JsonOp::Async(runtime_compile_async(
state.global_state.clone(),
&args.root_name,
&args.sources,
args.bundle,
&args.options,
)))
Ok(JsonOp::Async(
runtime_compile_async(
state.global_state.clone(),
&args.root_name,
&args.sources,
args.bundle,
&args.options,
),
true,
))
}

#[derive(Deserialize, Debug)]
Expand All @@ -187,9 +190,12 @@ fn op_transpile(
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: TranspileArgs = serde_json::from_value(args)?;
Ok(JsonOp::Async(runtime_transpile_async(
state.global_state.clone(),
&args.sources,
&args.options,
)))
Ok(JsonOp::Async(
runtime_transpile_async(
state.global_state.clone(),
&args.sources,
&args.options,
),
true,
))
}
11 changes: 6 additions & 5 deletions cli/ops/dispatch_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ pub type AsyncJsonOp =

pub enum JsonOp {
Sync(Value),
Async(AsyncJsonOp),
/** The 2nd element is true when the op blocks exiting, false otherwise. */
Async(AsyncJsonOp, bool),
}

fn json_err(err: ErrBox) -> Value {
Expand Down Expand Up @@ -70,19 +71,19 @@ where
assert!(promise_id.is_none());
CoreOp::Sync(serialize_result(promise_id, Ok(sync_value)))
}
Ok(JsonOp::Async(fut)) => {
Ok(JsonOp::Async(fut, blocks_exit)) => {
assert!(promise_id.is_some());
let fut2 = fut.then(move |result| {
futures::future::ok(serialize_result(promise_id, result))
});
CoreOp::Async(fut2.boxed())
CoreOp::Async(fut2.boxed(), blocks_exit)
}
Err(sync_err) => {
let buf = serialize_result(promise_id, Err(sync_err));
if is_sync {
CoreOp::Sync(buf)
} else {
CoreOp::Async(futures::future::ok(buf).boxed())
CoreOp::Async(futures::future::ok(buf).boxed(), true)
}
}
}
Expand All @@ -101,6 +102,6 @@ where
let handle = pool
.spawn_with_handle(futures::future::lazy(move |_cx| f()))
.unwrap();
Ok(JsonOp::Async(handle.boxed()))
Ok(JsonOp::Async(handle.boxed(), true))
}
}
2 changes: 1 addition & 1 deletion cli/ops/dispatch_minimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ where
// works since they're simple polling futures.
Op::Sync(futures::executor::block_on(fut).unwrap())
} else {
Op::Async(fut.boxed())
Op::Async(fut.boxed(), true)
}
}
}
2 changes: 1 addition & 1 deletion cli/ops/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,5 @@ pub fn op_fetch(
Ok(json_res)
};

Ok(JsonOp::Async(future.boxed()))
Ok(JsonOp::Async(future.boxed(), true))
}
4 changes: 2 additions & 2 deletions cli/ops/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ fn op_open(
let buf = futures::executor::block_on(fut)?;
Ok(JsonOp::Sync(buf))
} else {
Ok(JsonOp::Async(fut.boxed()))
Ok(JsonOp::Async(fut.boxed(), true))
}
}

Expand Down Expand Up @@ -171,6 +171,6 @@ fn op_seek(
let buf = futures::executor::block_on(fut)?;
Ok(JsonOp::Sync(buf))
} else {
Ok(JsonOp::Async(fut.boxed()))
Ok(JsonOp::Async(fut.boxed(), true))
}
}
4 changes: 2 additions & 2 deletions cli/ops/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ fn op_accept(
}))
};

Ok(JsonOp::Async(op.boxed()))
Ok(JsonOp::Async(op.boxed(), true))
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -157,7 +157,7 @@ fn op_connect(
}))
};

Ok(JsonOp::Async(op.boxed()))
Ok(JsonOp::Async(op.boxed(), true))
}

#[derive(Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion cli/ops/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ fn op_run_status(
let pool = futures::executor::ThreadPool::new().unwrap();
let handle = pool.spawn_with_handle(future).unwrap();

Ok(JsonOp::Async(handle.boxed()))
Ok(JsonOp::Async(handle.boxed(), true))
}

#[derive(Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion cli/ops/timers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fn op_global_timer(
.new_timeout(deadline)
.then(move |_| futures::future::ok(json!({})));

Ok(JsonOp::Async(f.boxed()))
Ok(JsonOp::Async(f.boxed(), true))
}

// Returns a milliseconds and nanoseconds subsec
Expand Down
4 changes: 2 additions & 2 deletions cli/ops/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub fn op_connect_tls(
}))
};

Ok(JsonOp::Async(op.boxed()))
Ok(JsonOp::Async(op.boxed(), true))
}

fn load_certs(path: &str) -> Result<Vec<Certificate>, ErrBox> {
Expand Down Expand Up @@ -376,5 +376,5 @@ fn op_accept_tls(
}))
};

Ok(JsonOp::Async(op.boxed()))
Ok(JsonOp::Async(op.boxed(), true))
}
8 changes: 4 additions & 4 deletions cli/ops/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ fn op_worker_get_message(
Ok(json!({ "data": maybe_buf }))
};

Ok(JsonOp::Async(op.boxed()))
Ok(JsonOp::Async(op.boxed(), true))
}

/// Post message to host as guest worker
Expand Down Expand Up @@ -258,7 +258,7 @@ fn op_host_get_worker_loaded(
Ok(serialize_worker_result(result))
};

Ok(JsonOp::Async(op.boxed()))
Ok(JsonOp::Async(op.boxed(), true))
}

fn op_host_poll_worker(
Expand Down Expand Up @@ -286,7 +286,7 @@ fn op_host_poll_worker(

Ok(serialize_worker_result(result))
};
Ok(JsonOp::Async(op.boxed()))
Ok(JsonOp::Async(op.boxed(), true))
}

fn op_host_close_worker(
Expand Down Expand Up @@ -348,7 +348,7 @@ fn op_host_get_message(
Ok(json!({ "data": maybe_buf }))
};

Ok(JsonOp::Async(op.boxed()))
Ok(JsonOp::Async(op.boxed(), true))
}

#[derive(Deserialize)]
Expand Down
4 changes: 2 additions & 2 deletions cli/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ impl ThreadSafeState {
state.metrics_op_completed(buf.len());
Op::Sync(buf)
}
Op::Async(fut) => {
Op::Async(fut, blocks_exit) => {
let state = state.clone();
let result_fut = fut.map_ok(move |buf: Buf| {
state.metrics_op_completed(buf.len());
buf
});
Op::Async(result_fut.boxed())
Op::Async(result_fut.boxed(), blocks_exit)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/es_isolate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ pub mod tests {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
Op::Async(futures::future::ok(buf).boxed())
Op::Async(futures::future::ok(buf).boxed(), true)
};

isolate.register_op("test", dispatcher);
Expand Down
2 changes: 1 addition & 1 deletion core/examples/http_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ fn http_op(
if is_sync {
Op::Sync(futures::executor::block_on(fut).unwrap())
} else {
Op::Async(fut.boxed())
Op::Async(fut.boxed(), true)
}
}
}
Expand Down
56 changes: 49 additions & 7 deletions core/isolate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,9 +511,11 @@ impl Isolate {
let op_id = 0;
Some((op_id, buf))
}
Op::Async(fut) => {
Op::Async(fut, blocks_exit) => {
let fut2 = fut.map_ok(move |buf| (op_id, buf));
self.pending_ops.push(fut2.boxed());
self
.pending_ops
.push(Pin::new(Box::new(PendingOp(fut2.boxed(), blocks_exit))));
self.have_unpolled_ops = true;
Comment on lines -514 to 519
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend using Op::Async and Op::AsyncOptional.

Op::AsyncOptional should be kept in Isolate.pending_optional_ops - which is the same type as Isolate.pending_ops.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created another version of this at #3721, which uses AsyncOptional instead of adding flag to Async. I'd like to avoid having Isolate.pending_optional_ops because if we have 2 FuturesUnordered list in isolate, it seems to me that it's difficult to poll these 2 futures list correctly.

None
}
Expand Down Expand Up @@ -742,8 +744,11 @@ impl Future for Isolate {
inner.check_promise_errors();
inner.check_last_exception()?;

// We're idle if pending_ops is empty.
if inner.pending_ops.is_empty() {
// We're idle if all pending_ops have blocks_exit flag false.
// TODO(kt3k): This might affect the performance of the event loop when
// the user created thousands of optional ops. See the discussion at
// https://github.com/denoland/deno/pull/3715/files#r368270169
if inner.pending_ops.iter().all(|op| !op.1) {
Copy link
Contributor

@kevinkassimo kevinkassimo Jan 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not so sure if this would have some impact under large amount of concurrent async op (iterating potentially thousands of pending ops for each poll).

When I did my impl in #2735 , I used a simple counter: whenever optional op added, increment; when done, decrement; when length of pending ops is same as optional ops counter amount, we know all the remaining are optional ones and can safely exit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a todo comment for it.

I agree that this could cause perf issue in future, but I think it's not a huge problem for a moment because there is only a few optional async ops (0 at this moment, and 1 if #3610 landed).

Do you think that we need to address it before landing this?

Poll::Ready(Ok(()))
} else {
if inner.have_unpolled_ops {
Expand Down Expand Up @@ -814,6 +819,7 @@ pub mod tests {

pub enum Mode {
Async,
AsyncOptional,
OverflowReqSync,
OverflowResSync,
OverflowReqAsync,
Expand All @@ -834,7 +840,18 @@ pub mod tests {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
Op::Async(futures::future::ok(buf).boxed())
Op::Async(futures::future::ok(buf).boxed(), true)
}
Mode::AsyncOptional => {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let fut = async {
// This future never finish.
futures::future::pending::<()>().await;
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
Ok(buf)
};
Op::Async(fut.boxed(), false)
}
Mode::OverflowReqSync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
Expand All @@ -853,7 +870,7 @@ pub mod tests {
Mode::OverflowReqAsync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
Op::Async(futures::future::ok(buf).boxed())
Op::Async(futures::future::ok(buf).boxed(), true)
}
Mode::OverflowResAsync => {
assert_eq!(control.len(), 1);
Expand All @@ -862,7 +879,7 @@ pub mod tests {
vec.resize(100 * 1024 * 1024, 0);
vec[0] = 4;
let buf = vec.into_boxed_slice();
Op::Async(futures::future::ok(buf).boxed())
Op::Async(futures::future::ok(buf).boxed(), true)
}
}
};
Expand Down Expand Up @@ -953,6 +970,31 @@ pub mod tests {
});
}

#[test]
fn test_poll_async_optional_ops() {
run_in_task(|cx| {
let (mut isolate, dispatch_count) = setup(Mode::AsyncOptional);
js_check(isolate.execute(
"check1.js",
r#"
Deno.core.setAsyncHandler(1, (buf) => {
// This handler will never be called
assert(false);
});
let control = new Uint8Array([42]);
Deno.core.send(1, control);
"#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
// The above op never finish, but isolate can finish
// because the op is an optional async op (blocks_exit flag == false).
assert!(match isolate.poll_unpin(cx) {
Poll::Ready(Ok(_)) => true,
_ => false,
});
})
}

#[test]
fn terminate_execution() {
let (tx, rx) = std::sync::mpsc::channel::<bool>();
Expand Down
17 changes: 15 additions & 2 deletions core/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::RwLock;
use std::task::{Context, Poll};

pub type OpId = u32;

Expand All @@ -13,14 +14,26 @@ pub type Buf = Box<[u8]>;
pub type OpAsyncFuture<E> =
Pin<Box<dyn Future<Output = Result<Buf, E>> + Send>>;

pub(crate) type PendingOpFuture =
pub(crate) type PendingOpInnerFuture =
Pin<Box<dyn Future<Output = Result<(OpId, Buf), CoreError>> + Send>>;

pub(crate) struct PendingOp(pub PendingOpInnerFuture, pub bool);

impl Future for PendingOp {
type Output = Result<(OpId, Buf), CoreError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.as_mut().0.as_mut().poll(cx)
}
}

pub(crate) type PendingOpFuture = Pin<Box<PendingOp>>;

pub type OpResult<E> = Result<Op<E>, E>;

pub enum Op<E> {
Sync(Buf),
Async(OpAsyncFuture<E>),
/** The 2nd element is true when the op blocks exiting, false otherwise. */
Async(OpAsyncFuture<E>, bool),
}

pub type CoreError = ();
Expand Down
2 changes: 1 addition & 1 deletion test_plugin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,5 @@ pub fn op_test_async(data: &[u8], zero_copy: Option<PinnedBuf>) -> CoreOp {
Ok(result_box)
};

Op::Async(fut.boxed())
Op::Async(fut.boxed(), true)
}
Loading