Skip to content

Commit

Permalink
feat(vm): Allow extern functions to return futures
Browse files Browse the repository at this point in the history
This integrates the futures crate which provides the functinality for
running asynchronous computations which is something that was almost
possible already thanks to the vm's ability to yield.

To make it explicit which functions are (potentially) asynchronous the
`Error::Yield` variant was replaced with `futures::Async`.
`AsyncPushable` were added to represent a subset of pushable values
which can be asynchronous. This is then used to only allow the direct
return value of a function to be asynchronous (`() -> Future<A>` works
but `() -> { x: Future<A> }` will not compile).
  • Loading branch information
Marwes committed Jan 28, 2017
1 parent 81f6d95 commit 120f9d1
Show file tree
Hide file tree
Showing 10 changed files with 312 additions and 80 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ gluon_vm = { path = "vm", version = "0.2.2" }
clap = "2.2.5"
log = "0.3.6"
quick-error = "1.0.0"
futures = "0.1.0"

env_logger = { version = "0.3.4", optional = true }
lazy_static = { version = "0.2.0", optional = true }
Expand Down
6 changes: 4 additions & 2 deletions src/compiler_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

use std::borrow::{Borrow, BorrowMut};

use futures::Future;

use base::ast::SpannedExpr;
use base::error::InFile;
use base::types::ArcType;
Expand Down Expand Up @@ -300,7 +302,7 @@ impl<E> Executable<()> for CompileValue<E>
let CompileValue { expr, typ, mut function } = self;
function.id = Symbol::from(name);
let closure = vm.global_env().new_global_thunk(function)?;
let value = vm.call_thunk(closure)?;
let value = vm.call_thunk(closure).wait()?;
Ok(ExecuteValue {
expr: expr,
typ: typ,
Expand All @@ -319,7 +321,7 @@ impl<E> Executable<()> for CompileValue<E>
let CompileValue { mut expr, typ, function } = self;
let metadata = metadata::metadata(&*vm.get_env(), expr.borrow_mut());
let closure = vm.global_env().new_global_thunk(function)?;
let value = vm.call_thunk(closure)?;
let value = vm.call_thunk(closure).wait()?;
vm.set_global(closure.function.name.clone(), typ, metadata, value)?;
info!("Loaded module `{}` filename", filename);
Ok(())
Expand Down
11 changes: 10 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ extern crate log;
#[macro_use]
extern crate quick_error;

extern crate futures;

#[macro_use]
pub extern crate gluon_vm as vm;
pub extern crate gluon_base as base;
Expand All @@ -27,6 +29,8 @@ use std::result::Result as StdResult;
use std::string::String as StdString;
use std::env;

use futures::Async;

use base::ast::{self, SpannedExpr};
use base::error::{Errors, InFile};
use base::metadata::Metadata;
Expand Down Expand Up @@ -290,7 +294,12 @@ impl Compiler {
.unwrap_or(false)
};
let value = if is_io {
vm.execute_io(*value)?
match vm.execute_io(*value)? {
Async::Ready(value) => value,
Async::NotReady => {
return Err(VmError::Message("Unhandled asynchronous execution".into()).into())
}
}
} else {
*value
};
Expand Down
67 changes: 66 additions & 1 deletion tests/api.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
extern crate env_logger;
extern crate futures;

#[macro_use]
extern crate gluon_vm;
extern crate gluon;

use futures::{BoxFuture, Future, IntoFuture};
use futures::future::lazy;

use gluon::base::types::Type;
use gluon::vm::api::{VmType, FunctionRef, Userdata};
use gluon::vm::Error;
use gluon::vm::api::{FutureResult, VmType, FunctionRef, Userdata};
use gluon::vm::thread::{RootedThread, Thread, Traverseable, Root, RootStr};
use gluon::vm::types::VmInt;
use gluon::Compiler;
Expand Down Expand Up @@ -118,3 +124,62 @@ sum_bytes [100b, 42b, 3b, 15b]

assert_eq!(result, expected);
}

#[test]
fn return_finished_future() {
let _ = ::env_logger::init();

fn add(x: i32, y: i32) -> FutureResult<BoxFuture<i32, Error>> {
FutureResult(Ok(x + y).into_future().boxed())
}

let expr = r#"
add 1 2
"#;

let vm = make_vm();
vm.define_global("add", primitive!(2 add)).unwrap();

let result =
Compiler::new().run_expr::<i32>(&vm, "<top>", expr).unwrap_or_else(|err| panic!("{}", err));
let expected = (3, Type::int());

assert_eq!(result, expected);
}

#[test]
fn return_delayed_future() {
let _ = ::env_logger::init();

fn poll_n(i: i32) -> FutureResult<BoxFuture<i32, Error>> {
use std::thread::spawn;
use futures::sync::oneshot::channel;

let (ping_c, ping_p) = channel();
let (pong_c, pong_p) = channel();
spawn(move || {
ping_p.wait().unwrap();
pong_c.complete(i);
});
FutureResult(lazy(move || {
ping_c.complete(());
Ok(())
})
.and_then(|_| pong_p.map_err(|err| Error::Message(format!("{}", err))))
.boxed())
}

let expr = r#"
poll_n 3
"#;

let vm = make_vm();
vm.define_global("poll_n", primitive!(1 poll_n)).unwrap();

let result = Compiler::new()
.run_expr::<i32>(&vm, "<top>", expr)
.unwrap_or_else(|err| panic!("{}", err));
let expected = (3, Type::int());

assert_eq!(result, expected);
}
2 changes: 2 additions & 0 deletions vm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ collect-mac = "0.1.0"
pretty = "0.2.0"
bitflags = "0.7.0"
itertools = "0.5.6"
futures = "0.1.0"

gluon_base = { path = "../base", version = "0.2.2" }
gluon_check = { path = "../check", version = "0.2.2" }

Expand Down
129 changes: 108 additions & 21 deletions vm/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use base::symbol::Symbol;
use stack::{State, StackFrame};
use vm::{self, Thread, Status, RootStr, RootedValue, Root};
use value::{ArrayRepr, Cloner, DataStruct, ExternFunction, GcStr, Value, ValueArray, Def};
use thread::{self, Context, RootedThread};
use thread::{self, Context, OwnedContext, RootedThread};
use thread::ThreadInternal;
use base::types::{self, ArcType, Type};
use types::{VmIndex, VmTag, VmInt};

use std::any::Any;
use std::cell::Ref;
use std::cmp::Ordering;
Expand All @@ -17,6 +18,8 @@ use std::marker::PhantomData;
use std::ops::Deref;
use std::result::Result as StdResult;

use futures::{Async, BoxFuture, Future};

pub use value::Userdata;

macro_rules! count {
Expand Down Expand Up @@ -304,19 +307,15 @@ pub trait VmType {
}
}


/// Trait which allows a rust value to be pushed to the virtual machine
pub trait Pushable<'vm> {
/// Pushes `self` to `stack`. If the call is successful a single element should have been added
/// to the stack and `Ok(())` should be returned. If the call is unsuccessful `Status:Error`
/// should be returned and the stack should be left intact
fn push(self, vm: &'vm Thread, context: &mut Context) -> Result<()>;
pub trait AsyncPushable<'vm> {
fn async_push(self, vm: &'vm Thread, context: &mut Context) -> Result<Async<()>>;

fn status_push(self, vm: &'vm Thread, context: &mut Context) -> Status
where Self: Sized,
{
match self.push(vm, context) {
Ok(()) => Status::Ok,
match self.async_push(vm, context) {
Ok(Async::Ready(())) => Status::Ok,
Ok(Async::NotReady) => Status::Yield,
Err(err) => {
let msg = unsafe {
GcStr::from_utf8_unchecked(context.alloc_ignore_limit(format!("{}", err).as_bytes()))
Expand All @@ -328,6 +327,22 @@ pub trait Pushable<'vm> {
}
}

impl<'vm, T> AsyncPushable<'vm> for T
where T: Pushable<'vm>,
{
fn async_push(self, vm: &'vm Thread, context: &mut Context) -> Result<Async<()>> {
self.push(vm, context).map(Async::Ready)
}
}

/// Trait which allows a rust value to be pushed to the virtual machine
pub trait Pushable<'vm>: AsyncPushable<'vm> {
/// Pushes `self` to `stack`. If the call is successful a single element should have been added
/// to the stack and `Ok(())` should be returned. If the call is unsuccessful `Status:Error`
/// should be returned and the stack should be left intact
fn push(self, vm: &'vm Thread, context: &mut Context) -> Result<()>;
}

/// Trait which allows rust values to be retrieved from the virtual machine
pub trait Getable<'vm>: Sized {
/// unsafe version of from_value which allows references to the internal of GcPtr's to be
Expand Down Expand Up @@ -847,6 +862,33 @@ impl<'vm, T: Getable<'vm>, E: Getable<'vm>> Getable<'vm> for StdResult<T, E> {
}
}

pub struct FutureResult<F>(pub F);

impl<F> VmType for FutureResult<F>
where F: Future,
F::Item: VmType,
{
type Type = <F::Item as VmType>::Type;
fn make_type(vm: &Thread) -> ArcType {
<F::Item>::make_type(vm)
}
}
impl<'vm, F> AsyncPushable<'vm> for FutureResult<F>
where F: Future<Error = Error> + Send + 'static,
F::Item: Pushable<'vm>,
{
fn async_push(mut self, vm: &'vm Thread, context: &mut Context) -> Result<Async<()>> {
match self.0.poll() {
Ok(Async::Ready(value)) => value.push(vm, context).map(Async::Ready),
Ok(Async::NotReady) => unsafe {
context.return_future(self.0);
Ok(Async::NotReady)
},
Err(err) => Err(err),
}
}
}

pub enum RuntimeResult<T, E> {
Return(T),
Panic(E),
Expand Down Expand Up @@ -1701,7 +1743,7 @@ impl <'vm, $($args,)* R: VmType> FunctionType for fn ($($args),*) -> R {

impl <'vm, $($args,)* R> VmFunction<'vm> for fn ($($args),*) -> R
where $($args: Getable<'vm> + 'vm,)*
R: Pushable<'vm> + VmType + 'vm
R: AsyncPushable<'vm> + VmType + 'vm
{
#[allow(non_snake_case, unused_mut, unused_assignments, unused_variables, unused_unsafe)]
fn unpack_and_call(&self, vm: &'vm Thread) -> Status {
Expand Down Expand Up @@ -1749,7 +1791,7 @@ impl <'s, $($args: VmType,)* R: VmType> VmType for Fn($($args),*) -> R + 's {

impl <'vm, $($args,)* R> VmFunction<'vm> for Fn($($args),*) -> R + 'vm
where $($args: Getable<'vm> + 'vm,)*
R: Pushable<'vm> + VmType + 'vm
R: AsyncPushable<'vm> + VmType + 'vm
{
#[allow(non_snake_case, unused_mut, unused_assignments, unused_variables, unused_unsafe)]
fn unpack_and_call(&self, vm: &'vm Thread) -> Status {
Expand Down Expand Up @@ -1782,28 +1824,73 @@ where $($args: Getable<'vm> + 'vm,)*
impl<'vm, T, $($args,)* R> Function<T, fn($($args),*) -> R>
where $($args: Pushable<'vm>,)*
T: Deref<Target = Thread>,
R: VmType + Getable<'vm>
R: VmType + for<'x> Getable<'x>,
{
#[allow(non_snake_case)]
pub fn call(&'vm mut self $(, $args: $args)*) -> Result<R> {
match self.call_first($($args),*)? {
Async::Ready(context) => {
let value = context.unwrap().stack.pop();
R::from_value(self.value.vm(), Variants(&value))
.ok_or_else(|| {
error!("Wrong type {:?}", value);
Error::Message("Wrong type".to_string())
})
}
Async::NotReady => Err(Error::Message("Unexpected async".into())),
}
}

#[allow(non_snake_case)]
fn call_first(&'vm self $(, $args: $args)*) -> Result<Async<Option<OwnedContext<'vm>>>> {
let vm = self.value.vm();
let mut context = vm.context();
StackFrame::current(&mut context.stack).enter_scope(0, State::Unknown);
context.stack.push(*self.value);
$(
$args.push(vm, &mut context)?;
$args.push(&vm, &mut context)?;
)*
for _ in 0..R::extra_args() {
0.push(vm, &mut context).unwrap();
0.push(&vm, &mut context).unwrap();
}
let args = count!($($args),*) + R::extra_args();
let mut context = vm.call_function(context, args)?.unwrap();
let result = context.stack.pop();
R::from_value(vm, Variants(&result))
.ok_or_else(|| {
error!("Wrong type {:?}", result);
Error::Message("Wrong type".to_string())
vm.call_function(context, args)
}
}

impl<'vm, T, $($args,)* R> Function<T, fn($($args),*) -> R>
where $($args: Pushable<'vm>,)*
T: Deref<Target = Thread> + Clone + Send + 'static,
R: VmType + for<'x> Getable<'x> + Send + 'static,
{
#[allow(non_snake_case)]
pub fn call_async(&'vm mut self $(, $args: $args)*) -> BoxFuture<R, Error> {
use futures::{failed, finished};
use futures::future::Either;
use thread::Execute;

let future = match self.call_first($($args),*) {
Ok(ok) => {
Either::A(match ok {
Async::Ready(context) => {
Either::A(finished(context.unwrap().stack.pop()))
}
Async::NotReady => Either::B(Execute::new(self.value.clone_vm()))
})
}
Err(err) => {
Either::B(failed(err))
}
};
let vm = self.value.clone_vm();
future.and_then(move |value| {
R::from_value(&vm, Variants(&value))
.ok_or_else(|| {
error!("Wrong type {:?}", value);
Error::Message("Wrong type".to_string())
})
})
.boxed()
}
}
)
Expand Down
5 changes: 2 additions & 3 deletions vm/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::collections::VecDeque;
use base::types::{ArcType, Type};

use {Error, Result as VmResult};
use api::{Generic, VmType, primitive, WithVM, Function, Pushable, RuntimeResult};
use api::{Generic, VmType, primitive, WithVM, Function, AsyncPushable, Pushable, RuntimeResult};
use api::generic::A;
use gc::{Traverseable, Gc, GcPtr};
use vm::{Thread, RootedThread, Status};
Expand Down Expand Up @@ -132,8 +132,7 @@ extern "C" fn resume(vm: &Thread) -> Status {
context = vm.context();
context.stack.release_lock(lock);
match result {
Ok(()) |
Err(Error::Yield) => {
Ok(_) => {
let value: Result<(), &str> = Ok(());
value.status_push(vm, &mut context)
}
Expand Down
Loading

0 comments on commit 120f9d1

Please sign in to comment.