Skip to content

Commit

Permalink
refactor(turbo-tasks): Convert local Vcs to non-local Vcs when return…
Browse files Browse the repository at this point in the history
…ing from task functions
  • Loading branch information
bgw committed Jan 9, 2025
1 parent d2c1eab commit 110001f
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 126 deletions.
10 changes: 8 additions & 2 deletions turbopack/crates/turbo-tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,10 @@ impl<B: Backend + 'static> TurboTasks<B> {
let id = self.backend.create_transient_task(
TransientTaskType::Root(Box::new(move || {
let functor = functor.clone();
Box::pin(async move { Ok(functor().await?.node) })
Box::pin(async move {
let raw_vc = functor().await?.node;
raw_vc.to_non_local().await
})
})),
self,
);
Expand All @@ -533,7 +536,10 @@ impl<B: Backend + 'static> TurboTasks<B> {
Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
{
let id = self.backend.create_transient_task(
TransientTaskType::Once(Box::pin(async move { Ok(future.await?.node) })),
TransientTaskType::Once(Box::pin(async move {
let raw_vc = future.await?.node;
raw_vc.to_non_local().await
})),
self,
);
self.schedule(id);
Expand Down
30 changes: 30 additions & 0 deletions turbopack/crates/turbo-tasks/src/raw_vc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,36 @@ impl RawVc {
}
}

/// Convert a potentially local `RawVc` into a non-local `RawVc`. This is a subset of resolution
/// resolution, because the returned `RawVc` can be a `TaskOutput`.
pub(crate) async fn to_non_local(self) -> Result<RawVc> {
let tt = turbo_tasks();
let mut current = self;
let mut notified = false;
let mut lazily_notify = || {
if !notified {
tt.notify_scheduled_tasks();
notified = true;
}
};
loop {
match current {
RawVc::LocalOutput(task_id, local_cell_id) => {
lazily_notify();
current =
read_local_output(&*tt, task_id, local_cell_id, ReadConsistency::Eventual)
.await?;
}
RawVc::LocalCell(execution_id, local_cell_id) => {
let shared_reference = read_local_cell(execution_id, local_cell_id);
let value_type = get_value_type(shared_reference.0);
return Ok((value_type.raw_cell)(shared_reference));
}
non_local => return Ok(non_local),
}
}
}

pub(crate) fn connect(&self) {
let tt = turbo_tasks();
tt.connect_task(self.get_task_id());
Expand Down
194 changes: 72 additions & 122 deletions turbopack/crates/turbo-tasks/src/task/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{future::Future, marker::PhantomData, pin::Pin};
use anyhow::Result;

use super::{TaskInput, TaskOutput};
use crate::{magic_any::MagicAny, OperationVc, RawVc, Vc, VcRead, VcValueType};
use crate::{magic_any::MagicAny, RawVc, Vc, VcRead, VcValueType};

pub type NativeTaskFuture = Pin<Box<dyn Future<Output = Result<RawVc>> + Send>>;

Expand Down Expand Up @@ -151,12 +151,6 @@ impl TaskFnMode for FunctionMode {}
pub struct AsyncFunctionMode;
impl TaskFnMode for AsyncFunctionMode {}

pub struct OperationMode;
impl TaskFnMode for OperationMode {}

pub struct AsyncOperationMode;
impl TaskFnMode for AsyncOperationMode {}

pub struct MethodMode;
impl TaskFnMode for MethodMode {}

Expand All @@ -172,22 +166,47 @@ macro_rules! task_inputs_impl {
}
}

fn get_args<T: MagicAny>(arg: &dyn MagicAny) -> Result<&T> {
let value = arg.downcast_ref::<T>();
#[cfg(debug_assertions)]
return anyhow::Context::with_context(value, || {
format!(
"Invalid argument type, expected {} got {}",
std::any::type_name::<T>(),
(*arg).magic_type_name()
)
});
#[cfg(not(debug_assertions))]
return anyhow::Context::context(value, "Invalid argument type");
// Helper function for `task_fn_impl!()` to reduce the amount of code inside the macro, and to give
// the compiler more chances to dedupe monomorphized code across small functions with less typevars
#[cfg(debug_assertions)]
fn get_debug_downcast_error_msg<T: MagicAny>(arg: &dyn MagicAny) -> String {
format!(
"Invalid argument type, expected {} got {}",
std::any::type_name::<T>(),
(*arg).magic_type_name()
)
}

// Helper function for `task_fn_impl!()`
async fn output_try_into_non_local_raw_vc(output: impl TaskOutput) -> Result<RawVc> {
// TODO: Potential future optimization: If we know we're inside a local task, we can avoid
// calling `to_non_local()` here, which might let us avoid constructing a non-local cell for the
// local task's return value. Flattening chains of `RawVc::LocalOutput` may still be useful to
// reduce traversal later.
output.try_into_raw_vc()?.to_non_local().await
}

macro_rules! task_fn_impl {
( $async_fn_trait:ident $arg_len:literal $( $arg:ident )* ) => {
( $async_fn_trait:ident $get_args_fn:ident $arg_len:literal $( $arg:ident )* ) => {
/// Downcast, unpack, and clone all the arguments in the singular `arg` tuple.
//
// This helper is generic over the arguments, but not the function, potentially allowing two
// functions with the same argument types to potentially share monomorphized code.
#[allow(non_snake_case)]
fn $get_args_fn<$($arg: TaskInput + 'static,)*>(arg: &dyn MagicAny) -> Result<($($arg,)*)> {
match arg.downcast_ref::<($($arg,)*)>() {
Some(($($arg,)*)) => Ok(($($arg.clone(),)*)),
None => {
#[cfg(debug_assertions)]
let context_str = get_debug_downcast_error_msg::<($($arg,)*)>(arg);
#[cfg(not(debug_assertions))]
let context_str = "Invalid argument type";

anyhow::bail!(context_str);
}
}
}

impl<F, Output, $($arg,)*> TaskFnInputFunction<FunctionMode, ($($arg,)*)> for F
where
$($arg: TaskInput + 'static,)*
Expand All @@ -197,14 +216,10 @@ macro_rules! task_fn_impl {
#[allow(non_snake_case)]
fn functor(&self, arg: &dyn MagicAny) -> Result<NativeTaskFuture> {
let task_fn = self.clone();

let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?;
$(
let $arg = $arg.clone();
)*

let ($($arg,)*) = $get_args_fn::<$($arg,)*>(arg)?;
Ok(Box::pin(async move {
Output::try_into_raw_vc((task_fn)($($arg,)*))
let output = (task_fn)($($arg,)*);
output_try_into_non_local_raw_vc(output).await
}))
}
}
Expand All @@ -213,20 +228,16 @@ macro_rules! task_fn_impl {
where
$($arg: TaskInput + 'static,)*
F: Fn($($arg,)*) -> FutureOutput + Send + Sync + Clone + 'static,
FutureOutput: Future<Output = Output> + Send,
FutureOutput: Future<Output = Output> + Send + 'static,
Output: TaskOutput + 'static,
{
#[allow(non_snake_case)]
fn functor(&self, arg: &dyn MagicAny) -> Result<NativeTaskFuture> {
let task_fn = self.clone();

let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?;
$(
let $arg = $arg.clone();
)*

let ($($arg,)*) = $get_args_fn::<$($arg,)*>(arg)?;
Ok(Box::pin(async move {
Output::try_into_raw_vc((task_fn)($($arg,)*).await)
let output = (task_fn)($($arg,)*).await;
output_try_into_non_local_raw_vc(output).await
}))
}
}
Expand All @@ -242,16 +253,12 @@ macro_rules! task_fn_impl {
fn functor(&self, this: RawVc, arg: &dyn MagicAny) -> Result<NativeTaskFuture> {
let task_fn = self.clone();
let recv = Vc::<Recv>::from(this);

let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?;
$(
let $arg = $arg.clone();
)*

let ($($arg,)*) = $get_args_fn::<$($arg,)*>(arg)?;
Ok(Box::pin(async move {
let recv = recv.await?;
let recv = <Recv::Read as VcRead<Recv>>::target_to_value_ref(&*recv);
Output::try_into_raw_vc((task_fn)(recv, $($arg,)*))
let output = (task_fn)(recv, $($arg,)*);
output_try_into_non_local_raw_vc(output).await
}))
}
}
Expand All @@ -267,37 +274,10 @@ macro_rules! task_fn_impl {
fn functor(&self, this: RawVc, arg: &dyn MagicAny) -> Result<NativeTaskFuture> {
let task_fn = self.clone();
let recv = Vc::<Recv>::from(this);

let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?;
$(
let $arg = $arg.clone();
)*

Ok(Box::pin(async move {
Output::try_into_raw_vc((task_fn)(recv, $($arg,)*))
}))
}
}

impl<F, Output, Recv, $($arg,)*> TaskFnInputFunctionWithThis<OperationMode, Recv, ($($arg,)*)> for F
where
Recv: Sync + Send + 'static,
$($arg: TaskInput + 'static,)*
F: Fn(OperationVc<Recv>, $($arg,)*) -> Output + Send + Sync + Clone + 'static,
Output: TaskOutput + 'static,
{
#[allow(non_snake_case)]
fn functor(&self, this: RawVc, arg: &dyn MagicAny) -> Result<NativeTaskFuture> {
let task_fn = self.clone();
let recv = OperationVc::<Recv>::from(this);

let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?;
$(
let $arg = $arg.clone();
)*

let ($($arg,)*) = $get_args_fn::<$($arg,)*>(arg)?;
Ok(Box::pin(async move {
Output::try_into_raw_vc((task_fn)(recv, $($arg,)*))
let output = (task_fn)(recv, $($arg,)*);
output_try_into_non_local_raw_vc(output).await
}))
}
}
Expand All @@ -311,7 +291,7 @@ macro_rules! task_fn_impl {
where
F: Fn(A0, $($arg,)*) -> Fut,
Fut: Future + Send,
Fut::Output: TaskOutput
Fut::Output: TaskOutput + 'static
{
type OutputFuture = Fut;
type Output = Fut::Output;
Expand All @@ -327,16 +307,12 @@ macro_rules! task_fn_impl {
fn functor(&self, this: RawVc, arg: &dyn MagicAny) -> Result<NativeTaskFuture> {
let task_fn = self.clone();
let recv = Vc::<Recv>::from(this);

let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?;
$(
let $arg = $arg.clone();
)*

let ($($arg,)*) = $get_args_fn::<$($arg,)*>(arg)?;
Ok(Box::pin(async move {
let recv = recv.await?;
let recv = <Recv::Read as VcRead<Recv>>::target_to_value_ref(&*recv);
<F as $async_fn_trait<&Recv, $($arg,)*>>::Output::try_into_raw_vc((task_fn)(recv, $($arg,)*).await)
let output = (task_fn)(recv, $($arg,)*).await;
output_try_into_non_local_raw_vc(output).await
}))
}
}
Expand All @@ -351,55 +327,29 @@ macro_rules! task_fn_impl {
fn functor(&self, this: RawVc, arg: &dyn MagicAny) -> Result<NativeTaskFuture> {
let task_fn = self.clone();
let recv = Vc::<Recv>::from(this);

let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?;
$(
let $arg = $arg.clone();
)*

Ok(Box::pin(async move {
<F as $async_fn_trait<Vc<Recv>, $($arg,)*>>::Output::try_into_raw_vc((task_fn)(recv, $($arg,)*).await)
}))
}
}

impl<F, Recv, $($arg,)*> TaskFnInputFunctionWithThis<AsyncOperationMode, Recv, ($($arg,)*)> for F
where
Recv: Sync + Send + 'static,
$($arg: TaskInput + 'static,)*
F: $async_fn_trait<OperationVc<Recv>, $($arg,)*> + Clone + Send + Sync + 'static,
{
#[allow(non_snake_case)]
fn functor(&self, this: RawVc, arg: &dyn MagicAny) -> Result<NativeTaskFuture> {
let task_fn = self.clone();
let recv = OperationVc::<Recv>::from(this);

let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?;
$(
let $arg = $arg.clone();
)*

let ($($arg,)*) = $get_args_fn::<$($arg,)*>(arg)?;
Ok(Box::pin(async move {
<F as $async_fn_trait<OperationVc<Recv>, $($arg,)*>>::Output::try_into_raw_vc((task_fn)(recv, $($arg,)*).await)
let output = (task_fn)(recv, $($arg,)*).await;
output_try_into_non_local_raw_vc(output).await
}))
}
}
};
}

task_fn_impl! { AsyncFn0 0 }
task_fn_impl! { AsyncFn1 1 A1 }
task_fn_impl! { AsyncFn2 2 A1 A2 }
task_fn_impl! { AsyncFn3 3 A1 A2 A3 }
task_fn_impl! { AsyncFn4 4 A1 A2 A3 A4 }
task_fn_impl! { AsyncFn5 5 A1 A2 A3 A4 A5 }
task_fn_impl! { AsyncFn6 6 A1 A2 A3 A4 A5 A6 }
task_fn_impl! { AsyncFn7 7 A1 A2 A3 A4 A5 A6 A7 }
task_fn_impl! { AsyncFn8 8 A1 A2 A3 A4 A5 A6 A7 A8 }
task_fn_impl! { AsyncFn9 9 A1 A2 A3 A4 A5 A6 A7 A8 A9 }
task_fn_impl! { AsyncFn10 10 A1 A2 A3 A4 A5 A6 A7 A8 A9 A10 }
task_fn_impl! { AsyncFn11 11 A1 A2 A3 A4 A5 A6 A7 A8 A9 A10 A11 }
task_fn_impl! { AsyncFn12 12 A1 A2 A3 A4 A5 A6 A7 A8 A9 A10 A11 A12 }
task_fn_impl! { AsyncFn0 get_args_0 0 }
task_fn_impl! { AsyncFn1 get_args_1 1 A1 }
task_fn_impl! { AsyncFn2 get_args_2 2 A1 A2 }
task_fn_impl! { AsyncFn3 get_args_3 3 A1 A2 A3 }
task_fn_impl! { AsyncFn4 get_args_4 4 A1 A2 A3 A4 }
task_fn_impl! { AsyncFn5 get_args_5 5 A1 A2 A3 A4 A5 }
task_fn_impl! { AsyncFn6 get_args_6 6 A1 A2 A3 A4 A5 A6 }
task_fn_impl! { AsyncFn7 get_args_7 7 A1 A2 A3 A4 A5 A6 A7 }
task_fn_impl! { AsyncFn8 get_args_8 8 A1 A2 A3 A4 A5 A6 A7 A8 }
task_fn_impl! { AsyncFn9 get_args_9 9 A1 A2 A3 A4 A5 A6 A7 A8 A9 }
task_fn_impl! { AsyncFn10 get_args_10 10 A1 A2 A3 A4 A5 A6 A7 A8 A9 A10 }
task_fn_impl! { AsyncFn11 get_args_11 11 A1 A2 A3 A4 A5 A6 A7 A8 A9 A10 A11 }
task_fn_impl! { AsyncFn12 get_args_12 12 A1 A2 A3 A4 A5 A6 A7 A8 A9 A10 A11 A12 }

// There needs to be one more implementation than task_fn_impl to account for
// the receiver.
Expand Down
4 changes: 2 additions & 2 deletions turbopack/crates/turbo-tasks/src/task/task_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{RawVc, Vc};

/// Trait to implement in order for a type to be accepted as a
/// `turbo_tasks::function` return type.
pub trait TaskOutput {
pub trait TaskOutput: Send {
type Return;

fn try_from_raw_vc(raw_vc: RawVc) -> Self::Return;
Expand All @@ -15,7 +15,7 @@ pub trait TaskOutput {

impl<T> TaskOutput for Vc<T>
where
T: ?Sized,
T: Send + ?Sized,
{
type Return = Vc<T>;

Expand Down

0 comments on commit 110001f

Please sign in to comment.