From 1f217771d8d913b4d1930dd5500e89abef39323b Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Tue, 7 Jan 2025 16:33:12 -0800 Subject: [PATCH] refactor(turbo-tasks): Convert local Vcs to non-local Vcs when returning from task functions --- turbopack/crates/turbo-tasks/src/manager.rs | 10 +- turbopack/crates/turbo-tasks/src/raw_vc.rs | 30 +++ .../crates/turbo-tasks/src/task/function.rs | 194 +++++++----------- .../turbo-tasks/src/task/task_output.rs | 4 +- 4 files changed, 112 insertions(+), 126 deletions(-) diff --git a/turbopack/crates/turbo-tasks/src/manager.rs b/turbopack/crates/turbo-tasks/src/manager.rs index c208548c678ce..caf0f9f93063f 100644 --- a/turbopack/crates/turbo-tasks/src/manager.rs +++ b/turbopack/crates/turbo-tasks/src/manager.rs @@ -511,7 +511,10 @@ impl TurboTasks { let id = self.backend.create_transient_task( TransientTaskType::Root(Box::new(move || { let functor = functor.clone(); - Box::pin(async move { Ok(functor().await?.node) }) + Box::pin(async move { + let raw_vc = functor().await?.node; + raw_vc.to_non_local().await + }) })), self, ); @@ -533,7 +536,10 @@ impl TurboTasks { Fut: Future>> + Send + 'static, { let id = self.backend.create_transient_task( - TransientTaskType::Once(Box::pin(async move { Ok(future.await?.node) })), + TransientTaskType::Once(Box::pin(async move { + let raw_vc = future.await?.node; + raw_vc.to_non_local().await + })), self, ); self.schedule(id); diff --git a/turbopack/crates/turbo-tasks/src/raw_vc.rs b/turbopack/crates/turbo-tasks/src/raw_vc.rs index 2cebcf96ae6b0..12988e30b8871 100644 --- a/turbopack/crates/turbo-tasks/src/raw_vc.rs +++ b/turbopack/crates/turbo-tasks/src/raw_vc.rs @@ -244,6 +244,36 @@ impl RawVc { } } + /// Convert a potentially local `RawVc` into a non-local `RawVc`. This is a subset of resolution + /// resolution, because the returned `RawVc` can be a `TaskOutput`. + pub(crate) async fn to_non_local(self) -> Result { + let tt = turbo_tasks(); + let mut current = self; + let mut notified = false; + let mut lazily_notify = || { + if !notified { + tt.notify_scheduled_tasks(); + notified = true; + } + }; + loop { + match current { + RawVc::LocalOutput(task_id, local_cell_id) => { + lazily_notify(); + current = + read_local_output(&*tt, task_id, local_cell_id, ReadConsistency::Eventual) + .await?; + } + RawVc::LocalCell(execution_id, local_cell_id) => { + let shared_reference = read_local_cell(execution_id, local_cell_id); + let value_type = get_value_type(shared_reference.0); + return Ok((value_type.raw_cell)(shared_reference)); + } + non_local => return Ok(non_local), + } + } + } + pub(crate) fn connect(&self) { let tt = turbo_tasks(); tt.connect_task(self.get_task_id()); diff --git a/turbopack/crates/turbo-tasks/src/task/function.rs b/turbopack/crates/turbo-tasks/src/task/function.rs index b0aba77077780..efa86a51a7857 100644 --- a/turbopack/crates/turbo-tasks/src/task/function.rs +++ b/turbopack/crates/turbo-tasks/src/task/function.rs @@ -26,7 +26,7 @@ use std::{future::Future, marker::PhantomData, pin::Pin}; use anyhow::Result; use super::{TaskInput, TaskOutput}; -use crate::{magic_any::MagicAny, OperationVc, RawVc, Vc, VcRead, VcValueType}; +use crate::{magic_any::MagicAny, RawVc, Vc, VcRead, VcValueType}; pub type NativeTaskFuture = Pin> + Send>>; @@ -151,12 +151,6 @@ impl TaskFnMode for FunctionMode {} pub struct AsyncFunctionMode; impl TaskFnMode for AsyncFunctionMode {} -pub struct OperationMode; -impl TaskFnMode for OperationMode {} - -pub struct AsyncOperationMode; -impl TaskFnMode for AsyncOperationMode {} - pub struct MethodMode; impl TaskFnMode for MethodMode {} @@ -172,22 +166,47 @@ macro_rules! task_inputs_impl { } } -fn get_args(arg: &dyn MagicAny) -> Result<&T> { - let value = arg.downcast_ref::(); - #[cfg(debug_assertions)] - return anyhow::Context::with_context(value, || { - format!( - "Invalid argument type, expected {} got {}", - std::any::type_name::(), - (*arg).magic_type_name() - ) - }); - #[cfg(not(debug_assertions))] - return anyhow::Context::context(value, "Invalid argument type"); +// Helper function for `task_fn_impl!()` to reduce the amount of code inside the macro, and to give +// the compiler more chances to dedupe monomorphized code across small functions with less typevars +#[cfg(debug_assertions)] +fn get_debug_downcast_error_msg(arg: &dyn MagicAny) -> String { + format!( + "Invalid argument type, expected {} got {}", + std::any::type_name::(), + (*arg).magic_type_name() + ) +} + +// Helper function for `task_fn_impl!()` +async fn output_try_into_non_local_raw_vc(output: impl TaskOutput) -> Result { + // TODO: Potential future optimization: If we know we're inside a local task, we can avoid + // calling `to_non_local()` here, which might let us avoid constructing a non-local cell for the + // local task's return value. Flattening chains of `RawVc::LocalOutput` may still be useful to + // reduce traversal later. + output.try_into_raw_vc()?.to_non_local().await } macro_rules! task_fn_impl { - ( $async_fn_trait:ident $arg_len:literal $( $arg:ident )* ) => { + ( $async_fn_trait:ident $get_args_fn:ident $arg_len:literal $( $arg:ident )* ) => { + /// Downcast, unpack, and clone all the arguments in the singular `arg` tuple. + // + // This helper is generic over the arguments, but not the function, potentially allowing two + // functions with the same argument types to potentially share monomorphized code. + #[allow(non_snake_case)] + fn $get_args_fn<$($arg: TaskInput + 'static,)*>(arg: &dyn MagicAny) -> Result<($($arg,)*)> { + match arg.downcast_ref::<($($arg,)*)>() { + Some(($($arg,)*)) => Ok(($($arg.clone(),)*)), + None => { + #[cfg(debug_assertions)] + let context_str = get_debug_downcast_error_msg::<($($arg,)*)>(arg); + #[cfg(not(debug_assertions))] + let context_str = "Invalid argument type"; + + anyhow::bail!(context_str); + } + } + } + impl TaskFnInputFunction for F where $($arg: TaskInput + 'static,)* @@ -197,14 +216,10 @@ macro_rules! task_fn_impl { #[allow(non_snake_case)] fn functor(&self, arg: &dyn MagicAny) -> Result { let task_fn = self.clone(); - - let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?; - $( - let $arg = $arg.clone(); - )* - + let ($($arg,)*) = $get_args_fn::<$($arg,)*>(arg)?; Ok(Box::pin(async move { - Output::try_into_raw_vc((task_fn)($($arg,)*)) + let output = (task_fn)($($arg,)*); + output_try_into_non_local_raw_vc(output).await })) } } @@ -213,20 +228,16 @@ macro_rules! task_fn_impl { where $($arg: TaskInput + 'static,)* F: Fn($($arg,)*) -> FutureOutput + Send + Sync + Clone + 'static, - FutureOutput: Future + Send, + FutureOutput: Future + Send + 'static, Output: TaskOutput + 'static, { #[allow(non_snake_case)] fn functor(&self, arg: &dyn MagicAny) -> Result { let task_fn = self.clone(); - - let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?; - $( - let $arg = $arg.clone(); - )* - + let ($($arg,)*) = $get_args_fn::<$($arg,)*>(arg)?; Ok(Box::pin(async move { - Output::try_into_raw_vc((task_fn)($($arg,)*).await) + let output = (task_fn)($($arg,)*).await; + output_try_into_non_local_raw_vc(output).await })) } } @@ -242,16 +253,12 @@ macro_rules! task_fn_impl { fn functor(&self, this: RawVc, arg: &dyn MagicAny) -> Result { let task_fn = self.clone(); let recv = Vc::::from(this); - - let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?; - $( - let $arg = $arg.clone(); - )* - + let ($($arg,)*) = $get_args_fn::<$($arg,)*>(arg)?; Ok(Box::pin(async move { let recv = recv.await?; let recv = >::target_to_value_ref(&*recv); - Output::try_into_raw_vc((task_fn)(recv, $($arg,)*)) + let output = (task_fn)(recv, $($arg,)*); + output_try_into_non_local_raw_vc(output).await })) } } @@ -267,37 +274,10 @@ macro_rules! task_fn_impl { fn functor(&self, this: RawVc, arg: &dyn MagicAny) -> Result { let task_fn = self.clone(); let recv = Vc::::from(this); - - let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?; - $( - let $arg = $arg.clone(); - )* - - Ok(Box::pin(async move { - Output::try_into_raw_vc((task_fn)(recv, $($arg,)*)) - })) - } - } - - impl TaskFnInputFunctionWithThis for F - where - Recv: Sync + Send + 'static, - $($arg: TaskInput + 'static,)* - F: Fn(OperationVc, $($arg,)*) -> Output + Send + Sync + Clone + 'static, - Output: TaskOutput + 'static, - { - #[allow(non_snake_case)] - fn functor(&self, this: RawVc, arg: &dyn MagicAny) -> Result { - let task_fn = self.clone(); - let recv = OperationVc::::from(this); - - let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?; - $( - let $arg = $arg.clone(); - )* - + let ($($arg,)*) = $get_args_fn::<$($arg,)*>(arg)?; Ok(Box::pin(async move { - Output::try_into_raw_vc((task_fn)(recv, $($arg,)*)) + let output = (task_fn)(recv, $($arg,)*); + output_try_into_non_local_raw_vc(output).await })) } } @@ -311,7 +291,7 @@ macro_rules! task_fn_impl { where F: Fn(A0, $($arg,)*) -> Fut, Fut: Future + Send, - Fut::Output: TaskOutput + Fut::Output: TaskOutput + 'static { type OutputFuture = Fut; type Output = Fut::Output; @@ -327,16 +307,12 @@ macro_rules! task_fn_impl { fn functor(&self, this: RawVc, arg: &dyn MagicAny) -> Result { let task_fn = self.clone(); let recv = Vc::::from(this); - - let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?; - $( - let $arg = $arg.clone(); - )* - + let ($($arg,)*) = $get_args_fn::<$($arg,)*>(arg)?; Ok(Box::pin(async move { let recv = recv.await?; let recv = >::target_to_value_ref(&*recv); - >::Output::try_into_raw_vc((task_fn)(recv, $($arg,)*).await) + let output = (task_fn)(recv, $($arg,)*).await; + output_try_into_non_local_raw_vc(output).await })) } } @@ -351,55 +327,29 @@ macro_rules! task_fn_impl { fn functor(&self, this: RawVc, arg: &dyn MagicAny) -> Result { let task_fn = self.clone(); let recv = Vc::::from(this); - - let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?; - $( - let $arg = $arg.clone(); - )* - - Ok(Box::pin(async move { - , $($arg,)*>>::Output::try_into_raw_vc((task_fn)(recv, $($arg,)*).await) - })) - } - } - - impl TaskFnInputFunctionWithThis for F - where - Recv: Sync + Send + 'static, - $($arg: TaskInput + 'static,)* - F: $async_fn_trait, $($arg,)*> + Clone + Send + Sync + 'static, - { - #[allow(non_snake_case)] - fn functor(&self, this: RawVc, arg: &dyn MagicAny) -> Result { - let task_fn = self.clone(); - let recv = OperationVc::::from(this); - - let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?; - $( - let $arg = $arg.clone(); - )* - + let ($($arg,)*) = $get_args_fn::<$($arg,)*>(arg)?; Ok(Box::pin(async move { - , $($arg,)*>>::Output::try_into_raw_vc((task_fn)(recv, $($arg,)*).await) + let output = (task_fn)(recv, $($arg,)*).await; + output_try_into_non_local_raw_vc(output).await })) } } }; } -task_fn_impl! { AsyncFn0 0 } -task_fn_impl! { AsyncFn1 1 A1 } -task_fn_impl! { AsyncFn2 2 A1 A2 } -task_fn_impl! { AsyncFn3 3 A1 A2 A3 } -task_fn_impl! { AsyncFn4 4 A1 A2 A3 A4 } -task_fn_impl! { AsyncFn5 5 A1 A2 A3 A4 A5 } -task_fn_impl! { AsyncFn6 6 A1 A2 A3 A4 A5 A6 } -task_fn_impl! { AsyncFn7 7 A1 A2 A3 A4 A5 A6 A7 } -task_fn_impl! { AsyncFn8 8 A1 A2 A3 A4 A5 A6 A7 A8 } -task_fn_impl! { AsyncFn9 9 A1 A2 A3 A4 A5 A6 A7 A8 A9 } -task_fn_impl! { AsyncFn10 10 A1 A2 A3 A4 A5 A6 A7 A8 A9 A10 } -task_fn_impl! { AsyncFn11 11 A1 A2 A3 A4 A5 A6 A7 A8 A9 A10 A11 } -task_fn_impl! { AsyncFn12 12 A1 A2 A3 A4 A5 A6 A7 A8 A9 A10 A11 A12 } +task_fn_impl! { AsyncFn0 get_args_0 0 } +task_fn_impl! { AsyncFn1 get_args_1 1 A1 } +task_fn_impl! { AsyncFn2 get_args_2 2 A1 A2 } +task_fn_impl! { AsyncFn3 get_args_3 3 A1 A2 A3 } +task_fn_impl! { AsyncFn4 get_args_4 4 A1 A2 A3 A4 } +task_fn_impl! { AsyncFn5 get_args_5 5 A1 A2 A3 A4 A5 } +task_fn_impl! { AsyncFn6 get_args_6 6 A1 A2 A3 A4 A5 A6 } +task_fn_impl! { AsyncFn7 get_args_7 7 A1 A2 A3 A4 A5 A6 A7 } +task_fn_impl! { AsyncFn8 get_args_8 8 A1 A2 A3 A4 A5 A6 A7 A8 } +task_fn_impl! { AsyncFn9 get_args_9 9 A1 A2 A3 A4 A5 A6 A7 A8 A9 } +task_fn_impl! { AsyncFn10 get_args_10 10 A1 A2 A3 A4 A5 A6 A7 A8 A9 A10 } +task_fn_impl! { AsyncFn11 get_args_11 11 A1 A2 A3 A4 A5 A6 A7 A8 A9 A10 A11 } +task_fn_impl! { AsyncFn12 get_args_12 12 A1 A2 A3 A4 A5 A6 A7 A8 A9 A10 A11 A12 } // There needs to be one more implementation than task_fn_impl to account for // the receiver. diff --git a/turbopack/crates/turbo-tasks/src/task/task_output.rs b/turbopack/crates/turbo-tasks/src/task/task_output.rs index 41d36c2826f72..15c1699aae167 100644 --- a/turbopack/crates/turbo-tasks/src/task/task_output.rs +++ b/turbopack/crates/turbo-tasks/src/task/task_output.rs @@ -6,7 +6,7 @@ use crate::{RawVc, Vc}; /// Trait to implement in order for a type to be accepted as a /// `turbo_tasks::function` return type. -pub trait TaskOutput { +pub trait TaskOutput: Send { type Return; fn try_from_raw_vc(raw_vc: RawVc) -> Self::Return; @@ -15,7 +15,7 @@ pub trait TaskOutput { impl TaskOutput for Vc where - T: ?Sized, + T: Send + ?Sized, { type Return = Vc;