Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(turbo-tasks): Convert local Vcs to non-local Vcs when returning from task functions #74714

Merged
merged 3 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions turbopack/crates/turbo-tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,10 @@ impl<B: Backend + 'static> TurboTasks<B> {
let id = self.backend.create_transient_task(
TransientTaskType::Root(Box::new(move || {
let functor = functor.clone();
Box::pin(async move { Ok(functor().await?.node) })
Box::pin(async move {
let raw_vc = functor().await?.node;
raw_vc.to_non_local().await
})
})),
self,
);
Expand All @@ -533,7 +536,10 @@ impl<B: Backend + 'static> TurboTasks<B> {
Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
{
let id = self.backend.create_transient_task(
TransientTaskType::Once(Box::pin(async move { Ok(future.await?.node) })),
TransientTaskType::Once(Box::pin(async move {
let raw_vc = future.await?.node;
raw_vc.to_non_local().await
})),
self,
);
self.schedule(id);
Expand Down
30 changes: 30 additions & 0 deletions turbopack/crates/turbo-tasks/src/raw_vc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,36 @@ impl RawVc {
}
}

/// Convert a potentially local `RawVc` into a non-local `RawVc`. This is a subset of resolution
/// resolution, because the returned `RawVc` can be a `TaskOutput`.
pub(crate) async fn to_non_local(self) -> Result<RawVc> {
let tt = turbo_tasks();
let mut current = self;
let mut notified = false;
let mut lazily_notify = || {
if !notified {
tt.notify_scheduled_tasks();
notified = true;
}
};
loop {
match current {
RawVc::LocalOutput(task_id, local_cell_id) => {
lazily_notify();
current =
read_local_output(&*tt, task_id, local_cell_id, ReadConsistency::Eventual)
.await?;
}
RawVc::LocalCell(execution_id, local_cell_id) => {
let shared_reference = read_local_cell(execution_id, local_cell_id);
let value_type = get_value_type(shared_reference.0);
return Ok((value_type.raw_cell)(shared_reference));
}
non_local => return Ok(non_local),
}
}
}

pub(crate) fn connect(&self) {
let tt = turbo_tasks();
tt.connect_task(self.get_task_id());
Expand Down
138 changes: 41 additions & 97 deletions turbopack/crates/turbo-tasks/src/task/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{future::Future, marker::PhantomData, pin::Pin};
use anyhow::Result;

use super::{TaskInput, TaskOutput};
use crate::{magic_any::MagicAny, OperationVc, RawVc, Vc, VcRead, VcValueType};
use crate::{magic_any::MagicAny, RawVc, Vc, VcRead, VcValueType};

pub type NativeTaskFuture = Pin<Box<dyn Future<Output = Result<RawVc>> + Send>>;

Expand Down Expand Up @@ -151,12 +151,6 @@ impl TaskFnMode for FunctionMode {}
pub struct AsyncFunctionMode;
impl TaskFnMode for AsyncFunctionMode {}

pub struct OperationMode;
impl TaskFnMode for OperationMode {}

pub struct AsyncOperationMode;
impl TaskFnMode for AsyncOperationMode {}

pub struct MethodMode;
impl TaskFnMode for MethodMode {}

Expand All @@ -172,20 +166,39 @@ macro_rules! task_inputs_impl {
}
}

fn get_args<T: MagicAny>(arg: &dyn MagicAny) -> Result<&T> {
let value = arg.downcast_ref::<T>();
bgw marked this conversation as resolved.
Show resolved Hide resolved
#[cfg(debug_assertions)]
#[inline(never)]
fn get_debug_downcast_error_msg(expected: &str, actual: &dyn MagicAny) -> String {
format!(
"Invalid argument type, expected {expected} got {}",
(*actual).magic_type_name()
)
}

/// Downcast, and clone all the arguments in the singular `arg` tuple.
///
/// This helper function for `task_fn_impl!()` reduces the amount of code inside the macro, and
/// gives the compiler more chances to dedupe monomorphized code across small functions with less
/// typevars.
fn get_args<T: MagicAny + Clone>(arg: &dyn MagicAny) -> Result<T> {
let value = arg.downcast_ref::<T>().cloned();
#[cfg(debug_assertions)]
return anyhow::Context::with_context(value, || {
format!(
"Invalid argument type, expected {} got {}",
std::any::type_name::<T>(),
(*arg).magic_type_name()
)
get_debug_downcast_error_msg(std::any::type_name::<T>(), arg)
});
#[cfg(not(debug_assertions))]
return anyhow::Context::context(value, "Invalid argument type");
}

// Helper function for `task_fn_impl!()`
async fn output_try_into_non_local_raw_vc(output: impl TaskOutput) -> Result<RawVc> {
// TODO: Potential future optimization: If we know we're inside a local task, we can avoid
// calling `to_non_local()` here, which might let us avoid constructing a non-local cell for the
// local task's return value. Flattening chains of `RawVc::LocalOutput` may still be useful to
// reduce traversal later.
output.try_into_raw_vc()?.to_non_local().await
}

macro_rules! task_fn_impl {
( $async_fn_trait:ident $arg_len:literal $( $arg:ident )* ) => {
impl<F, Output, $($arg,)*> TaskFnInputFunction<FunctionMode, ($($arg,)*)> for F
Expand All @@ -197,14 +210,10 @@ macro_rules! task_fn_impl {
#[allow(non_snake_case)]
fn functor(&self, arg: &dyn MagicAny) -> Result<NativeTaskFuture> {
let task_fn = self.clone();

let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?;
$(
let $arg = $arg.clone();
)*

Ok(Box::pin(async move {
Output::try_into_raw_vc((task_fn)($($arg,)*))
let output = (task_fn)($($arg,)*);
output_try_into_non_local_raw_vc(output).await
}))
}
}
Expand All @@ -213,20 +222,16 @@ macro_rules! task_fn_impl {
where
$($arg: TaskInput + 'static,)*
F: Fn($($arg,)*) -> FutureOutput + Send + Sync + Clone + 'static,
FutureOutput: Future<Output = Output> + Send,
FutureOutput: Future<Output = Output> + Send + 'static,
Output: TaskOutput + 'static,
{
#[allow(non_snake_case)]
fn functor(&self, arg: &dyn MagicAny) -> Result<NativeTaskFuture> {
let task_fn = self.clone();

let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?;
$(
let $arg = $arg.clone();
)*

Ok(Box::pin(async move {
Output::try_into_raw_vc((task_fn)($($arg,)*).await)
let output = (task_fn)($($arg,)*).await;
output_try_into_non_local_raw_vc(output).await
}))
}
}
Expand All @@ -242,16 +247,12 @@ macro_rules! task_fn_impl {
fn functor(&self, this: RawVc, arg: &dyn MagicAny) -> Result<NativeTaskFuture> {
let task_fn = self.clone();
let recv = Vc::<Recv>::from(this);

let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?;
$(
let $arg = $arg.clone();
)*

Ok(Box::pin(async move {
let recv = recv.await?;
let recv = <Recv::Read as VcRead<Recv>>::target_to_value_ref(&*recv);
Output::try_into_raw_vc((task_fn)(recv, $($arg,)*))
let output = (task_fn)(recv, $($arg,)*);
output_try_into_non_local_raw_vc(output).await
}))
}
}
Expand All @@ -267,37 +268,10 @@ macro_rules! task_fn_impl {
fn functor(&self, this: RawVc, arg: &dyn MagicAny) -> Result<NativeTaskFuture> {
let task_fn = self.clone();
let recv = Vc::<Recv>::from(this);

let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?;
$(
let $arg = $arg.clone();
)*

Ok(Box::pin(async move {
Output::try_into_raw_vc((task_fn)(recv, $($arg,)*))
}))
}
}

impl<F, Output, Recv, $($arg,)*> TaskFnInputFunctionWithThis<OperationMode, Recv, ($($arg,)*)> for F
where
Recv: Sync + Send + 'static,
$($arg: TaskInput + 'static,)*
F: Fn(OperationVc<Recv>, $($arg,)*) -> Output + Send + Sync + Clone + 'static,
Output: TaskOutput + 'static,
{
#[allow(non_snake_case)]
fn functor(&self, this: RawVc, arg: &dyn MagicAny) -> Result<NativeTaskFuture> {
let task_fn = self.clone();
let recv = OperationVc::<Recv>::from(this);

let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?;
$(
let $arg = $arg.clone();
)*

Ok(Box::pin(async move {
Output::try_into_raw_vc((task_fn)(recv, $($arg,)*))
let output = (task_fn)(recv, $($arg,)*);
output_try_into_non_local_raw_vc(output).await
}))
}
}
Expand All @@ -311,7 +285,7 @@ macro_rules! task_fn_impl {
where
F: Fn(A0, $($arg,)*) -> Fut,
Fut: Future + Send,
Fut::Output: TaskOutput
Fut::Output: TaskOutput + 'static
{
type OutputFuture = Fut;
type Output = Fut::Output;
Expand All @@ -327,16 +301,12 @@ macro_rules! task_fn_impl {
fn functor(&self, this: RawVc, arg: &dyn MagicAny) -> Result<NativeTaskFuture> {
let task_fn = self.clone();
let recv = Vc::<Recv>::from(this);

let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?;
$(
let $arg = $arg.clone();
)*

Ok(Box::pin(async move {
let recv = recv.await?;
let recv = <Recv::Read as VcRead<Recv>>::target_to_value_ref(&*recv);
<F as $async_fn_trait<&Recv, $($arg,)*>>::Output::try_into_raw_vc((task_fn)(recv, $($arg,)*).await)
let output = (task_fn)(recv, $($arg,)*).await;
output_try_into_non_local_raw_vc(output).await
}))
}
}
Expand All @@ -351,36 +321,10 @@ macro_rules! task_fn_impl {
fn functor(&self, this: RawVc, arg: &dyn MagicAny) -> Result<NativeTaskFuture> {
let task_fn = self.clone();
let recv = Vc::<Recv>::from(this);

let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?;
$(
let $arg = $arg.clone();
)*

Ok(Box::pin(async move {
<F as $async_fn_trait<Vc<Recv>, $($arg,)*>>::Output::try_into_raw_vc((task_fn)(recv, $($arg,)*).await)
}))
}
}

impl<F, Recv, $($arg,)*> TaskFnInputFunctionWithThis<AsyncOperationMode, Recv, ($($arg,)*)> for F
where
Recv: Sync + Send + 'static,
$($arg: TaskInput + 'static,)*
F: $async_fn_trait<OperationVc<Recv>, $($arg,)*> + Clone + Send + Sync + 'static,
{
#[allow(non_snake_case)]
fn functor(&self, this: RawVc, arg: &dyn MagicAny) -> Result<NativeTaskFuture> {
let task_fn = self.clone();
let recv = OperationVc::<Recv>::from(this);

let ($($arg,)*) = get_args::<($($arg,)*)>(arg)?;
$(
let $arg = $arg.clone();
)*

Ok(Box::pin(async move {
<F as $async_fn_trait<OperationVc<Recv>, $($arg,)*>>::Output::try_into_raw_vc((task_fn)(recv, $($arg,)*).await)
let output = (task_fn)(recv, $($arg,)*).await;
output_try_into_non_local_raw_vc(output).await
}))
}
}
Expand Down
4 changes: 2 additions & 2 deletions turbopack/crates/turbo-tasks/src/task/task_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{RawVc, Vc};

/// Trait to implement in order for a type to be accepted as a
/// `turbo_tasks::function` return type.
pub trait TaskOutput {
pub trait TaskOutput: Send {
type Return;

fn try_from_raw_vc(raw_vc: RawVc) -> Self::Return;
Expand All @@ -15,7 +15,7 @@ pub trait TaskOutput {

impl<T> TaskOutput for Vc<T>
where
T: ?Sized,
T: Send + ?Sized,
{
type Return = Vc<T>;

Expand Down
Loading