diff --git a/Cargo.toml b/Cargo.toml index 78a7a9a399..2176fe8bf0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ gluon_vm = { path = "vm", version = "0.2.2" } clap = "2.2.5" log = "0.3.6" quick-error = "1.0.0" +futures = "0.1.0" env_logger = { version = "0.3.4", optional = true } lazy_static = { version = "0.2.0", optional = true } diff --git a/src/compiler_pipeline.rs b/src/compiler_pipeline.rs index 9fed882e66..edb90d1b90 100644 --- a/src/compiler_pipeline.rs +++ b/src/compiler_pipeline.rs @@ -10,6 +10,8 @@ use std::borrow::{Borrow, BorrowMut}; +use futures::Future; + use base::ast::SpannedExpr; use base::error::InFile; use base::types::ArcType; @@ -300,7 +302,7 @@ impl Executable<()> for CompileValue let CompileValue { expr, typ, mut function } = self; function.id = Symbol::from(name); let closure = vm.global_env().new_global_thunk(function)?; - let value = vm.call_thunk(closure)?; + let value = vm.call_thunk(closure).wait()?; Ok(ExecuteValue { expr: expr, typ: typ, @@ -319,7 +321,7 @@ impl Executable<()> for CompileValue let CompileValue { mut expr, typ, function } = self; let metadata = metadata::metadata(&*vm.get_env(), expr.borrow_mut()); let closure = vm.global_env().new_global_thunk(function)?; - let value = vm.call_thunk(closure)?; + let value = vm.call_thunk(closure).wait()?; vm.set_global(closure.function.name.clone(), typ, metadata, value)?; info!("Loaded module `{}` filename", filename); Ok(()) diff --git a/src/lib.rs b/src/lib.rs index 6b2607139d..0ef631f108 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,8 @@ extern crate log; #[macro_use] extern crate quick_error; +extern crate futures; + #[macro_use] pub extern crate gluon_vm as vm; pub extern crate gluon_base as base; @@ -27,6 +29,8 @@ use std::result::Result as StdResult; use std::string::String as StdString; use std::env; +use futures::Async; + use base::ast::{self, SpannedExpr}; use base::error::{Errors, InFile}; use base::metadata::Metadata; @@ -290,7 +294,12 @@ impl Compiler { .unwrap_or(false) }; let value = if is_io { - vm.execute_io(*value)? + match vm.execute_io(*value)? { + Async::Ready(value) => value, + Async::NotReady => { + return Err(VmError::Message("Unhandled asynchronous execution".into()).into()) + } + } } else { *value }; diff --git a/tests/api.rs b/tests/api.rs index 100e93a00a..c54fe6ded1 100644 --- a/tests/api.rs +++ b/tests/api.rs @@ -1,10 +1,16 @@ extern crate env_logger; +extern crate futures; + #[macro_use] extern crate gluon_vm; extern crate gluon; +use futures::{BoxFuture, Future, IntoFuture}; +use futures::future::lazy; + use gluon::base::types::Type; -use gluon::vm::api::{VmType, FunctionRef, Userdata}; +use gluon::vm::Error; +use gluon::vm::api::{FutureResult, VmType, FunctionRef, Userdata}; use gluon::vm::thread::{RootedThread, Thread, Traverseable, Root, RootStr}; use gluon::vm::types::VmInt; use gluon::Compiler; @@ -118,3 +124,62 @@ sum_bytes [100b, 42b, 3b, 15b] assert_eq!(result, expected); } + +#[test] +fn return_finished_future() { + let _ = ::env_logger::init(); + + fn add(x: i32, y: i32) -> FutureResult> { + FutureResult(Ok(x + y).into_future().boxed()) + } + + let expr = r#" + add 1 2 +"#; + + let vm = make_vm(); + vm.define_global("add", primitive!(2 add)).unwrap(); + + let result = + Compiler::new().run_expr::(&vm, "", expr).unwrap_or_else(|err| panic!("{}", err)); + let expected = (3, Type::int()); + + assert_eq!(result, expected); +} + +#[test] +fn return_delayed_future() { + let _ = ::env_logger::init(); + + fn poll_n(i: i32) -> FutureResult> { + use std::thread::spawn; + use futures::sync::oneshot::channel; + + let (ping_c, ping_p) = channel(); + let (pong_c, pong_p) = channel(); + spawn(move || { + ping_p.wait().unwrap(); + pong_c.complete(i); + }); + FutureResult(lazy(move || { + ping_c.complete(()); + Ok(()) + }) + .and_then(|_| pong_p.map_err(|err| Error::Message(format!("{}", err)))) + .boxed()) + } + + let expr = r#" + poll_n 3 +"#; + + let vm = make_vm(); + vm.define_global("poll_n", primitive!(1 poll_n)).unwrap(); + + let result = Compiler::new() + .run_expr::(&vm, "", expr) + .unwrap_or_else(|err| panic!("{}", err)); + let expected = (3, Type::int()); + + assert_eq!(result, expected); +} diff --git a/vm/Cargo.toml b/vm/Cargo.toml index f29a67a757..28ae097ccf 100644 --- a/vm/Cargo.toml +++ b/vm/Cargo.toml @@ -19,6 +19,8 @@ collect-mac = "0.1.0" pretty = "0.2.0" bitflags = "0.7.0" itertools = "0.5.6" +futures = "0.1.0" + gluon_base = { path = "../base", version = "0.2.2" } gluon_check = { path = "../check", version = "0.2.2" } diff --git a/vm/src/api.rs b/vm/src/api.rs index 1eb0f5f5dd..020341338c 100644 --- a/vm/src/api.rs +++ b/vm/src/api.rs @@ -5,10 +5,11 @@ use base::symbol::Symbol; use stack::{State, StackFrame}; use vm::{self, Thread, Status, RootStr, RootedValue, Root}; use value::{ArrayRepr, Cloner, DataStruct, ExternFunction, GcStr, Value, ValueArray, Def}; -use thread::{self, Context, RootedThread}; +use thread::{self, Context, OwnedContext, RootedThread}; use thread::ThreadInternal; use base::types::{self, ArcType, Type}; use types::{VmIndex, VmTag, VmInt}; + use std::any::Any; use std::cell::Ref; use std::cmp::Ordering; @@ -17,6 +18,8 @@ use std::marker::PhantomData; use std::ops::Deref; use std::result::Result as StdResult; +use futures::{Async, BoxFuture, Future}; + pub use value::Userdata; macro_rules! count { @@ -304,19 +307,15 @@ pub trait VmType { } } - -/// Trait which allows a rust value to be pushed to the virtual machine -pub trait Pushable<'vm> { - /// Pushes `self` to `stack`. If the call is successful a single element should have been added - /// to the stack and `Ok(())` should be returned. If the call is unsuccessful `Status:Error` - /// should be returned and the stack should be left intact - fn push(self, vm: &'vm Thread, context: &mut Context) -> Result<()>; +pub trait AsyncPushable<'vm> { + fn async_push(self, vm: &'vm Thread, context: &mut Context) -> Result>; fn status_push(self, vm: &'vm Thread, context: &mut Context) -> Status where Self: Sized, { - match self.push(vm, context) { - Ok(()) => Status::Ok, + match self.async_push(vm, context) { + Ok(Async::Ready(())) => Status::Ok, + Ok(Async::NotReady) => Status::Yield, Err(err) => { let msg = unsafe { GcStr::from_utf8_unchecked(context.alloc_ignore_limit(format!("{}", err).as_bytes())) @@ -328,6 +327,22 @@ pub trait Pushable<'vm> { } } +impl<'vm, T> AsyncPushable<'vm> for T + where T: Pushable<'vm>, +{ + fn async_push(self, vm: &'vm Thread, context: &mut Context) -> Result> { + self.push(vm, context).map(Async::Ready) + } +} + +/// Trait which allows a rust value to be pushed to the virtual machine +pub trait Pushable<'vm>: AsyncPushable<'vm> { + /// Pushes `self` to `stack`. If the call is successful a single element should have been added + /// to the stack and `Ok(())` should be returned. If the call is unsuccessful `Status:Error` + /// should be returned and the stack should be left intact + fn push(self, vm: &'vm Thread, context: &mut Context) -> Result<()>; +} + /// Trait which allows rust values to be retrieved from the virtual machine pub trait Getable<'vm>: Sized { /// unsafe version of from_value which allows references to the internal of GcPtr's to be @@ -847,6 +862,33 @@ impl<'vm, T: Getable<'vm>, E: Getable<'vm>> Getable<'vm> for StdResult { } } +pub struct FutureResult(pub F); + +impl VmType for FutureResult + where F: Future, + F::Item: VmType, +{ + type Type = ::Type; + fn make_type(vm: &Thread) -> ArcType { + ::make_type(vm) + } +} +impl<'vm, F> AsyncPushable<'vm> for FutureResult + where F: Future + Send + 'static, + F::Item: Pushable<'vm>, +{ + fn async_push(mut self, vm: &'vm Thread, context: &mut Context) -> Result> { + match self.0.poll() { + Ok(Async::Ready(value)) => value.push(vm, context).map(Async::Ready), + Ok(Async::NotReady) => unsafe { + context.return_future(self.0); + Ok(Async::NotReady) + }, + Err(err) => Err(err), + } + } +} + pub enum RuntimeResult { Return(T), Panic(E), @@ -1701,7 +1743,7 @@ impl <'vm, $($args,)* R: VmType> FunctionType for fn ($($args),*) -> R { impl <'vm, $($args,)* R> VmFunction<'vm> for fn ($($args),*) -> R where $($args: Getable<'vm> + 'vm,)* - R: Pushable<'vm> + VmType + 'vm + R: AsyncPushable<'vm> + VmType + 'vm { #[allow(non_snake_case, unused_mut, unused_assignments, unused_variables, unused_unsafe)] fn unpack_and_call(&self, vm: &'vm Thread) -> Status { @@ -1749,7 +1791,7 @@ impl <'s, $($args: VmType,)* R: VmType> VmType for Fn($($args),*) -> R + 's { impl <'vm, $($args,)* R> VmFunction<'vm> for Fn($($args),*) -> R + 'vm where $($args: Getable<'vm> + 'vm,)* - R: Pushable<'vm> + VmType + 'vm + R: AsyncPushable<'vm> + VmType + 'vm { #[allow(non_snake_case, unused_mut, unused_assignments, unused_variables, unused_unsafe)] fn unpack_and_call(&self, vm: &'vm Thread) -> Status { @@ -1782,28 +1824,73 @@ where $($args: Getable<'vm> + 'vm,)* impl<'vm, T, $($args,)* R> Function R> where $($args: Pushable<'vm>,)* T: Deref, - R: VmType + Getable<'vm> + R: VmType + for<'x> Getable<'x>, { #[allow(non_snake_case)] pub fn call(&'vm mut self $(, $args: $args)*) -> Result { + match self.call_first($($args),*)? { + Async::Ready(context) => { + let value = context.unwrap().stack.pop(); + R::from_value(self.value.vm(), Variants(&value)) + .ok_or_else(|| { + error!("Wrong type {:?}", value); + Error::Message("Wrong type".to_string()) + }) + } + Async::NotReady => Err(Error::Message("Unexpected async".into())), + } + } + + #[allow(non_snake_case)] + fn call_first(&'vm self $(, $args: $args)*) -> Result>>> { let vm = self.value.vm(); let mut context = vm.context(); StackFrame::current(&mut context.stack).enter_scope(0, State::Unknown); context.stack.push(*self.value); $( - $args.push(vm, &mut context)?; + $args.push(&vm, &mut context)?; )* for _ in 0..R::extra_args() { - 0.push(vm, &mut context).unwrap(); + 0.push(&vm, &mut context).unwrap(); } let args = count!($($args),*) + R::extra_args(); - let mut context = vm.call_function(context, args)?.unwrap(); - let result = context.stack.pop(); - R::from_value(vm, Variants(&result)) - .ok_or_else(|| { - error!("Wrong type {:?}", result); - Error::Message("Wrong type".to_string()) + vm.call_function(context, args) + } +} + +impl<'vm, T, $($args,)* R> Function R> + where $($args: Pushable<'vm>,)* + T: Deref + Clone + Send + 'static, + R: VmType + for<'x> Getable<'x> + Send + 'static, +{ + #[allow(non_snake_case)] + pub fn call_async(&'vm mut self $(, $args: $args)*) -> BoxFuture { + use futures::{failed, finished}; + use futures::future::Either; + use thread::Execute; + + let future = match self.call_first($($args),*) { + Ok(ok) => { + Either::A(match ok { + Async::Ready(context) => { + Either::A(finished(context.unwrap().stack.pop())) + } + Async::NotReady => Either::B(Execute::new(self.value.clone_vm())) + }) + } + Err(err) => { + Either::B(failed(err)) + } + }; + let vm = self.value.clone_vm(); + future.and_then(move |value| { + R::from_value(&vm, Variants(&value)) + .ok_or_else(|| { + error!("Wrong type {:?}", value); + Error::Message("Wrong type".to_string()) + }) }) + .boxed() } } ) diff --git a/vm/src/channel.rs b/vm/src/channel.rs index cbb36a7c0f..35a99fae64 100644 --- a/vm/src/channel.rs +++ b/vm/src/channel.rs @@ -6,7 +6,7 @@ use std::collections::VecDeque; use base::types::{ArcType, Type}; use {Error, Result as VmResult}; -use api::{Generic, VmType, primitive, WithVM, Function, Pushable, RuntimeResult}; +use api::{Generic, VmType, primitive, WithVM, Function, AsyncPushable, Pushable, RuntimeResult}; use api::generic::A; use gc::{Traverseable, Gc, GcPtr}; use vm::{Thread, RootedThread, Status}; @@ -132,8 +132,7 @@ extern "C" fn resume(vm: &Thread) -> Status { context = vm.context(); context.stack.release_lock(lock); match result { - Ok(()) | - Err(Error::Yield) => { + Ok(_) => { let value: Result<(), &str> = Ok(()); value.status_push(vm, &mut context) } diff --git a/vm/src/lazy.rs b/vm/src/lazy.rs index d5a3d99a3a..9990e1788e 100644 --- a/vm/src/lazy.rs +++ b/vm/src/lazy.rs @@ -3,6 +3,8 @@ use std::fmt; use std::marker::PhantomData; use std::sync::Mutex; +use futures::Async; + use base::types; use base::types::{Type, ArcType}; use gc::{Gc, GcPtr, Move, Traverseable}; @@ -92,8 +94,8 @@ extern "C" fn force(vm: &Thread) -> Status { *lazy.value.lock().unwrap() = Lazy_::Blackhole; let result = vm.call_function(context, 1); match result { - Ok(None) => panic!("Expected stack"), - Ok(Some(mut context)) => { + Ok(Async::Ready(None)) => panic!("Expected stack"), + Ok(Async::Ready(Some(mut context))) => { let mut stack = StackFrame::current(&mut context.stack); let value = stack.pop(); while stack.len() > 1 { @@ -103,6 +105,14 @@ extern "C" fn force(vm: &Thread) -> Status { stack.push(value); Status::Ok } + Ok(Async::NotReady) => { + let mut context = vm.context(); + let err = "Evaluating a lazy value cannot be done asynchronously at \ + the moment"; + let result = Value::String(context.alloc_ignore_limit(&err[..])); + context.stack.push(result); + Status::Error + } Err(err) => { let mut context = vm.context(); let result: RuntimeResult<(), _> = RuntimeResult::Panic(err); diff --git a/vm/src/lib.rs b/vm/src/lib.rs index a11945cd70..a4a3963b93 100644 --- a/vm/src/lib.rs +++ b/vm/src/lib.rs @@ -14,6 +14,8 @@ extern crate collect_mac; extern crate bitflags; extern crate itertools; extern crate pretty; +#[macro_use] +extern crate futures; #[macro_use] extern crate gluon_base as base; @@ -70,8 +72,6 @@ quick_error! { /// Representation of all possible errors that can occur when interacting with the `vm` crate #[derive(Debug, PartialEq)] pub enum Error { - Yield { - } Dead { } UndefinedBinding(symbol: String) { diff --git a/vm/src/thread.rs b/vm/src/thread.rs index 702ba03267..99f4c0d3e6 100644 --- a/vm/src/thread.rs +++ b/vm/src/thread.rs @@ -10,6 +10,8 @@ use std::result::Result as StdResult; use std::sync::Arc; use std::usize; +use futures::{Async, Future, Poll}; + use base::metadata::Metadata; use base::pos::Line; use base::symbol::Symbol; @@ -34,6 +36,28 @@ use value::Value::{Int, Float, String, Data, Function, PartialApplication, Closu pub use gc::Traverseable; +pub struct Execute { + thread: T, +} + +impl Execute + where T: Deref, +{ + pub fn new(thread: T) -> Execute { + Execute { thread: thread } + } +} + +impl Future for Execute + where T: Deref, +{ + type Item = Value; + type Error = Error; + fn poll(&mut self) -> Poll { + self.thread.resume().map(|async| async.map(|mut context| context.stack.pop())) + } +} + /// Enum signaling a successful or unsuccess ful call to an extern function. /// If an error occured the error message is expected to be on the top of the stack. #[derive(Eq, PartialEq)] @@ -85,6 +109,12 @@ impl RootedValue pub fn vm(&self) -> &Thread { &self.vm } + + pub fn clone_vm(&self) -> T + where T: Clone, + { + self.vm.clone() + } } impl<'vm> RootedValue<&'vm Thread> { @@ -379,10 +409,8 @@ impl Thread { /// Runs a garbage collection. pub fn collect(&self) { let mut context = self.current_context(); - self.with_roots(&mut context, |gc, roots| { - unsafe { - gc.collect(roots); - } + self.with_roots(&mut context, |gc, roots| unsafe { + gc.collect(roots); }) } @@ -443,14 +471,6 @@ impl Thread { }; f(&mut context.gc, roots) } - - fn call_context<'b>(&'b self, - mut context: OwnedContext<'b>, - args: VmIndex) - -> Result>> { - context.borrow_mut().do_call(args)?; - context.execute() - } } /// Internal functions for interacting with threads. These functions should be considered both @@ -479,10 +499,10 @@ pub trait ThreadInternal { -> Result<()>; /// Evaluates a zero argument function (a thunk) - fn call_thunk(&self, closure: GcPtr) -> Result; + fn call_thunk(&self, closure: GcPtr) -> Execute<&Self>; /// Executes an `IO` action - fn execute_io(&self, value: Value) -> Result; + fn execute_io(&self, value: Value) -> Result>; /// Calls a function on the stack. /// When this function is called it is expected that the function exists at @@ -490,9 +510,9 @@ pub trait ThreadInternal { fn call_function<'b>(&'b self, stack: OwnedContext<'b>, args: VmIndex) - -> Result>>; + -> Result>>>; - fn resume(&self) -> Result<()>; + fn resume(&self) -> Result>; fn global_env(&self) -> &Arc; @@ -509,7 +529,6 @@ pub trait ThreadInternal { fn can_share_values_with(&self, gc: &mut Gc, other: &Thread) -> bool; } - impl ThreadInternal for Thread { fn context(&self) -> OwnedContext { OwnedContext { @@ -570,16 +589,15 @@ impl ThreadInternal for Thread { Ok(()) } - fn call_thunk(&self, closure: GcPtr) -> Result { + fn call_thunk(&self, closure: GcPtr) -> Execute<&Thread> { let mut context = self.current_context(); context.stack.push(Closure(closure)); context.borrow_mut().enter_scope(0, State::Closure(closure)); - context.execute()?; - Ok(self.current_context().stack.pop()) + Execute { thread: self } } /// Calls a module, allowed to to run IO expressions - fn execute_io(&self, value: Value) -> Result { + fn execute_io(&self, value: Value) -> Result> { debug!("Run IO {:?}", value); let mut context = OwnedContext { thread: self, @@ -592,7 +610,7 @@ impl ThreadInternal for Thread { context.stack.push(Int(0)); context.borrow_mut().enter_scope(2, State::Unknown); - context = self.call_context(context, 1)? + context = try_ready!(self.call_function(context, 1)) .expect("call_module to have the stack remaining"); let result = context.stack.pop(); { @@ -602,27 +620,28 @@ impl ThreadInternal for Thread { } } let _ = context.exit_scope(); - Ok(result) + Ok(Async::Ready(result)) } /// Calls a function on the stack. /// When this function is called it is expected that the function exists at /// `stack.len() - args - 1` and that the arguments are of the correct type fn call_function<'b>(&'b self, - context: OwnedContext<'b>, + mut context: OwnedContext<'b>, args: VmIndex) - -> Result>> { - self.call_context(context, args) + -> Result>>> { + context.borrow_mut().do_call(args)?; + context.execute() } - fn resume(&self) -> Result<()> { - let context = self.current_context(); + fn resume(&self) -> Result> { + let mut context = self.current_context(); if context.stack.get_frames().len() == 1 { // Only the top level frame left means that the thread has finished return Err(Error::Dead); } - context.execute() - .map(|_| ()) + context = try_ready!(context.execute()).unwrap(); + Ok(Async::Ready(context)) } fn global_env(&self) -> &Arc { @@ -645,7 +664,7 @@ impl ThreadInternal for Thread { let mut cloner = ::value::Cloner::new(self, &mut context.gc); if full_clone { cloner.force_full_clone(); - } + } cloner.deep_clone(value) } @@ -802,6 +821,8 @@ pub struct Context { record_map: FieldMap, hook: Hook, max_stack_size: VmIndex, + + poll_fn: Option Result> + Send>>, } impl Context { @@ -816,6 +837,7 @@ impl Context { previous_instruction_index: usize::max_value(), }, max_stack_size: VmIndex::max_value(), + poll_fn: None, } } @@ -853,6 +875,18 @@ impl Context { pub fn set_max_stack_size(&mut self, limit: VmIndex) { self.max_stack_size = limit; } + + pub unsafe fn return_future<'vm, F>(&mut self, mut future: F) + where F: Future + Send + 'static, + F::Item: Pushable<'vm>, + { + use std::mem::transmute; + self.poll_fn = Some(Box::new(move |vm, context| { + let vm = transmute::<&Thread, &'vm Thread>(vm); + let value = try_ready!(future.poll()); + value.push(vm, context).map(Async::Ready) + })); + } } impl<'b> OwnedContext<'b> { @@ -910,7 +944,7 @@ impl<'b> OwnedContext<'b> { if exists { Ok(self) } else { Err(()) } } - fn execute(self) -> Result>> { + fn execute(self) -> Result>>> { let mut maybe_context = Some(self); while let Some(mut context) = maybe_context { debug!("STACK\n{:?}", context.stack.get_frames()); @@ -936,16 +970,28 @@ impl<'b> OwnedContext<'b> { } maybe_context = match state { - State::Lock | State::Unknown => return Ok(Some(context)), + State::Lock | State::Unknown => return Ok(Async::Ready(Some(context))), State::Excess => context.exit_scope().ok(), State::Extern(ext) => { let instruction_index = context.borrow_mut().stack.frame.instruction_index; if instruction_index != 0 { // This function was already called - return Ok(Some(context)); + return Ok(Async::Ready(Some(context))); } else { + let thread = context.thread; context.borrow_mut().stack.frame.instruction_index = 1; - Some(context.execute_function(&ext)?) + let result = context.execute_function(&ext); + match result { + Ok(Async::Ready(context)) => Some(context), + Ok(Async::NotReady) => { + let mut context = thread.current_context(); + if context.poll_fn.is_some() { + context.borrow_mut().stack.frame.instruction_index = 0; + } + return Ok(Async::NotReady); + } + Err(err) => return Err(err), + } } } State::Closure(closure) => { @@ -978,8 +1024,7 @@ impl<'b> OwnedContext<'b> { closure.function.instructions.len()); let new_context = context.execute_(instruction_index, - &closure.function - .instructions, + &closure.function.instructions, &closure.function)?; if new_context.is_some() { State::Exists @@ -991,22 +1036,34 @@ impl<'b> OwnedContext<'b> { match state { State::Exists => Some(context), State::DoesNotExist => None, - State::ReturnContext => return Ok(Some(context)), + State::ReturnContext => return Ok(Async::Ready(Some(context))), } } }; } - Ok(maybe_context) + Ok(Async::Ready(maybe_context)) } - fn execute_function(mut self, function: &ExternFunction) -> Result> { + fn execute_function(mut self, function: &ExternFunction) -> Result>> { debug!("CALL EXTERN {} {:?}", function.id, self.stack); - // Make sure that the stack is not borrowed during the external function call - // Necessary since we do not know what will happen during the function call - let thread = self.thread; - drop(self); - let status = (function.function)(thread); - self = thread.current_context(); + let status = if let Some(mut poll_fn) = self.poll_fn.take() { + let result = poll_fn(self.thread, &mut self); + self.poll_fn = Some(poll_fn); + try_ready!(result); + self.poll_fn = None; + Status::Ok + } else { + // Make sure that the stack is not borrowed during the external function call + // Necessary since we do not know what will happen during the function call + let thread = self.thread; + drop(self); + let status = (function.function)(thread); + self = thread.current_context(); + if self.poll_fn.is_some() && status == Status::Yield { + return Ok(Async::NotReady); + } + status + }; let result = self.stack.pop(); { let mut stack = self.stack.current_frame(); @@ -1024,8 +1081,8 @@ impl<'b> OwnedContext<'b> { self.stack.push(result); match status { - Status::Ok => Ok(self), - Status::Yield => Err(Error::Yield), + Status::Ok => Ok(Async::Ready(self)), + Status::Yield => Ok(Async::NotReady), Status::Error => { match self.stack.pop() { String(s) => Err(Error::Panic(s.to_string())),