From 694aa77f09c072846e33de4498b98334cc064c35 Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Tue, 20 Aug 2024 22:09:21 -0700 Subject: [PATCH] feat(turbo-tasks): Optionally schedule ResolveNative/ResolveTrait tasks as local tasks --- .../crates/turbo-tasks-memory/src/task.rs | 25 +- turbopack/crates/turbo-tasks/Cargo.toml | 1 + turbopack/crates/turbo-tasks/src/manager.rs | 258 +++++++++++++++--- .../crates/turbo-tasks/src/task/local_task.rs | 82 ++++++ turbopack/crates/turbo-tasks/src/task/mod.rs | 1 + 5 files changed, 315 insertions(+), 52 deletions(-) create mode 100644 turbopack/crates/turbo-tasks/src/task/local_task.rs diff --git a/turbopack/crates/turbo-tasks-memory/src/task.rs b/turbopack/crates/turbo-tasks-memory/src/task.rs index aacb6f138f66e..cf1a9bffe7f81 100644 --- a/turbopack/crates/turbo-tasks-memory/src/task.rs +++ b/turbopack/crates/turbo-tasks-memory/src/task.rs @@ -744,11 +744,11 @@ impl Task { ), TaskType::Persistent { ty, .. } | TaskType::Transient { ty, .. } => match &***ty { CachedTaskType::Native { - fn_type: native_fn, + fn_type: native_fn_id, this, arg, } => { - let func = registry::get_function(*native_fn); + let func = registry::get_function(*native_fn_id); let span = func.span(); let entered = span.enter(); let future = func.execute(*this, &**arg); @@ -756,21 +756,19 @@ impl Task { (future, span) } CachedTaskType::ResolveNative { - fn_type: ref native_fn_id, + fn_type: native_fn_id, this, arg, } => { - let native_fn_id = *native_fn_id; - let func = registry::get_function(native_fn_id); + let func = registry::get_function(*native_fn_id); let span = func.resolve_span(); let entered = span.enter(); - let turbo_tasks = turbo_tasks.pin(); let future = Box::pin(CachedTaskType::run_resolve_native( - native_fn_id, + *native_fn_id, *this, &**arg, self.id.persistence(), - turbo_tasks, + turbo_tasks.pin(), )); drop(entered); (future, span) @@ -781,19 +779,16 @@ impl Task { this, arg, } => { - let trait_type_id = *trait_type_id; - let trait_type = registry::get_trait(trait_type_id); + let trait_type = registry::get_trait(*trait_type_id); let span = trait_type.resolve_span(name); let entered = span.enter(); - let name = name.clone(); - let turbo_tasks = turbo_tasks.pin(); let future = Box::pin(CachedTaskType::run_resolve_trait( - trait_type_id, - name, + *trait_type_id, + name.clone(), *this, &**arg, self.id.persistence(), - turbo_tasks, + turbo_tasks.pin(), )); drop(entered); (future, span) diff --git a/turbopack/crates/turbo-tasks/Cargo.toml b/turbopack/crates/turbo-tasks/Cargo.toml index a22c8c7fd9cb9..162ff3cb5afdb 100644 --- a/turbopack/crates/turbo-tasks/Cargo.toml +++ b/turbopack/crates/turbo-tasks/Cargo.toml @@ -12,6 +12,7 @@ bench = false default = [] tokio_tracing = ["tokio/tracing"] hanging_detection = [] +local_resolution = [] [lints] workspace = true diff --git a/turbopack/crates/turbo-tasks/src/manager.rs b/turbopack/crates/turbo-tasks/src/manager.rs index ac4ce1c34ead3..3637719426856 100644 --- a/turbopack/crates/turbo-tasks/src/manager.rs +++ b/turbopack/crates/turbo-tasks/src/manager.rs @@ -39,13 +39,13 @@ use crate::{ magic_any::MagicAny, raw_vc::{CellId, RawVc}, registry::{self, get_function}, - task::shared_reference::TypedSharedReference, + task::{local_task::LocalTask, shared_reference::TypedSharedReference}, trace::TraceRawVcs, trait_helpers::get_trait_method, util::StaticOrArc, vc::ReadVcFuture, - Completion, FunctionMeta, InvalidationReason, InvalidationReasonSet, SharedReference, TaskId, - TaskIdSet, ValueTypeId, Vc, VcRead, VcValueTrait, VcValueType, + Completion, FunctionMeta, InvalidationReason, InvalidationReasonSet, OutputContent, + SharedReference, TaskId, TaskIdSet, ValueTypeId, Vc, VcRead, VcValueTrait, VcValueType, }; pub trait TurboTasksCallApi: Sync + Send { @@ -311,7 +311,7 @@ pub struct UpdateInfo { placeholder_for_future_fields: (), } -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum TaskPersistence { /// Tasks that may be persisted across sessions using serialization. Persistent, @@ -400,8 +400,11 @@ struct CurrentGlobalTaskState { /// `CurrentGlobalTaskState`) when the task finishes executing. local_cells: Vec, + /// Local tasks created while this global task has been running. Indexed by `LocalTaskId`. + local_tasks: Vec, + /// Tracks currently running local tasks, and defers cleanup of the global task until those - /// complete. + /// complete. Also used by `detached_for_testing`. local_task_tracker: TaskTracker, backend_state: Box, @@ -415,10 +418,51 @@ impl CurrentGlobalTaskState { stateful: false, cell_counters: Some(AutoMap::default()), local_cells: Vec::new(), + local_tasks: Vec::new(), local_task_tracker: TaskTracker::new(), backend_state, } } + + fn assert_task_id(&self, expected_task_id: TaskId) { + if self.task_id != expected_task_id { + unimplemented!( + "Local tasks can currently only be scheduled/awaited within their parent task" + ); + } + } + + /// Create a [`LocalTask::Unscheduled`]. + #[cfg(feature = "local_resolution")] + fn create_local_task( + &mut self, + ty: CachedTaskType, + // if this is a `CachedTaskType::Resolve*`, we'll spawn another task with this persistence + persistence: TaskPersistence, + ) -> LocalTaskId { + use crate::task::local_task; + + self.local_tasks + .push(LocalTask::Unscheduled(Arc::new(local_task::Unscheduled { + ty, + persistence, + }))); + // generate a one-indexed id + if cfg!(debug_assertions) { + LocalTaskId::from(u32::try_from(self.local_tasks.len()).unwrap()) + } else { + unsafe { LocalTaskId::new_unchecked(self.local_tasks.len() as u32) } + } + } + + fn get_local_task(&self, local_task_id: LocalTaskId) -> &LocalTask { + // local task ids are one-indexed (they use NonZeroU32) + &self.local_tasks[(*local_task_id as usize) - 1] + } + + fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask { + &mut self.local_tasks[(*local_task_id as usize) - 1] + } } /// Information specific to the current "local" task. A local task re-uses it's parent global task's @@ -428,8 +472,8 @@ impl CurrentGlobalTaskState { /// of the global task. #[derive(Clone)] struct CurrentLocalTaskState { - /// A unique identifier created for each unique [`CurrentLocalTaskState`]. Used to check that - /// [`CurrentTaskState::local_cells`] are valid for the current [`RawVc::LocalCell`]. + /// A unique identifier created for each unique[`CurrentLocalTaskState`]. Used to check that + /// [`CurrentTaskState::local_cells`] are valid for the currant [`RawVc::LocalCell`]. execution_id: ExecutionId, /// The function's metadata if this is a persistent task. Contains information about arguments @@ -627,28 +671,32 @@ impl TurboTasks { if registry::get_function(func).arg_meta.is_resolved(&*arg) { return self.native_call(func, arg, persistence); } + let task_type = CachedTaskType::ResolveNative { + fn_type: func, + this: None, + arg, + }; + #[cfg(feature = "local_resolution")] + return CURRENT_GLOBAL_TASK_STATE.with(move |gts| { + let mut gts_write = gts.write().unwrap(); + let local_task_id = gts_write.create_local_task(task_type, persistence); + RawVc::LocalOutput(gts_write.task_id, local_task_id) + }); + #[cfg(not(feature = "local_resolution"))] match persistence { TaskPersistence::LocalCells => { todo!("bgw: local tasks"); } TaskPersistence::Transient => { RawVc::TaskOutput(self.backend.get_or_create_transient_task( - CachedTaskType::ResolveNative { - fn_type: func, - this: None, - arg, - }, + task_type, current_task("turbo_function calls"), self, )) } TaskPersistence::Persistent => { RawVc::TaskOutput(self.backend.get_or_create_persistent_task( - CachedTaskType::ResolveNative { - fn_type: func, - this: None, - arg, - }, + task_type, current_task("turbo_function calls"), self, )) @@ -671,7 +719,14 @@ impl TurboTasks { this: Some(this), arg, }; - match persistence { + #[cfg(feature = "local_resolution")] + return CURRENT_GLOBAL_TASK_STATE.with(move |gts| { + let mut gts_write = gts.write().unwrap(); + let local_task_id = gts_write.create_local_task(task_type, persistence); + RawVc::LocalOutput(gts_write.task_id, local_task_id) + }); + #[cfg(not(feature = "local_resolution"))] + return match persistence { TaskPersistence::LocalCells => { todo!("bgw: local tasks"); } @@ -689,7 +744,7 @@ impl TurboTasks { self, )) } - } + }; } pub fn trait_call( @@ -721,7 +776,15 @@ impl TurboTasks { this, arg, }; - match persistence { + + #[cfg(feature = "local_resolution")] + return CURRENT_GLOBAL_TASK_STATE.with(move |gts| { + let mut gts_write = gts.write().unwrap(); + let local_task_id = gts_write.create_local_task(task_type, persistence); + RawVc::LocalOutput(gts_write.task_id, local_task_id) + }); + #[cfg(not(feature = "local_resolution"))] + return match persistence { TaskPersistence::LocalCells => { todo!("bgw: local tasks"); } @@ -739,7 +802,7 @@ impl TurboTasks { self, )) } - } + }; } #[track_caller] @@ -747,9 +810,6 @@ impl TurboTasks { self.begin_primary_job(); self.scheduled_tasks.fetch_add(1, Ordering::AcqRel); - #[cfg(feature = "tokio_tracing")] - let description = self.backend.get_task_description(task_id); - let this = self.pin(); let future = async move { let mut schedule_again = true; @@ -825,6 +885,102 @@ impl TurboTasks { let future = TURBO_TASKS.scope(self.pin(), future).in_current_span(); + #[cfg(feature = "tokio_tracing")] + { + let description = self.backend.get_task_description(task_id); + tokio::task::Builder::new() + .name(&description) + .spawn(future) + .unwrap(); + } + #[cfg(not(feature = "tokio_tracing"))] + tokio::task::spawn(future); + } + + fn schedule_local_task(&self, parent_task_id: TaskId, local_task_id: LocalTaskId) { + let Some((global_task_state, unscheduled_local_task)) = + CURRENT_GLOBAL_TASK_STATE.with(|gts| { + let mut gts_write = gts.write().unwrap(); + gts_write.assert_task_id(parent_task_id); + let local_task = gts_write.get_mut_local_task(local_task_id); + let LocalTask::Unscheduled(unscheduled_local_task) = local_task else { + return None; + }; + let unscheduled_local_task = Arc::clone(unscheduled_local_task); + *local_task = LocalTask::Scheduled { + done_event: Event::new({ + let ult = Arc::clone(&unscheduled_local_task); + move || format!("LocalTask({})::done_event", ult.ty) + }), + }; + + Some((Arc::clone(gts), unscheduled_local_task)) + }) + else { + // it's either already scheduled or already done + return; + }; + + let local_task_state = CurrentLocalTaskState::new( + self.execution_id_factory.get(), + unscheduled_local_task + .ty + .try_get_function_id() + .map(|func_id| &get_function(func_id).function_meta), + ); + + #[cfg(feature = "tokio_tracing")] + let description = format!( + "[local] (parent: {}) {}", + self.backend.get_task_description(parent_task_id), + unscheduled_local_task.ty, + ); + + let this = self.pin(); + let future = async move { + let TaskExecutionSpec { future, span } = unscheduled_local_task.start_execution(&*this); + async move { + let (result, _duration, _memory_usage) = + CaptureFuture::new(AssertUnwindSafe(future).catch_unwind()).await; + + let result = result.map_err(|any| match any.downcast::() { + Ok(owned) => Some(Cow::Owned(*owned)), + Err(any) => match any.downcast::<&'static str>() { + Ok(str) => Some(Cow::Borrowed(*str)), + Err(_) => None, + }, + }); + let local_task = LocalTask::Done { + output: match result { + Ok(Ok(raw_vc)) => OutputContent::Link(raw_vc), + Ok(Err(err)) => OutputContent::Error(err.into()), + Err(panic_err) => OutputContent::Panic(panic_err.map(Box::new)), + }, + }; + + let done_event = CURRENT_GLOBAL_TASK_STATE.with(move |gts| { + let mut gts_write = gts.write().unwrap(); + let scheduled_task = + std::mem::replace(gts_write.get_mut_local_task(local_task_id), local_task); + let LocalTask::Scheduled { done_event } = scheduled_task else { + panic!("local task finished, but was not in the scheduled state?"); + }; + done_event + }); + done_event.notify(usize::MAX) + } + .instrument(span) + .await + }; + let future = global_task_state + .read() + .unwrap() + .local_task_tracker + .track_future(future); + let future = CURRENT_LOCAL_TASK_STATE.scope(local_task_state, future); + let future = CURRENT_GLOBAL_TASK_STATE.scope(global_task_state, future); + let future = TURBO_TASKS.scope(self.pin(), future).in_current_span(); + #[cfg(feature = "tokio_tracing")] tokio::task::Builder::new() .name(&description) @@ -1315,22 +1471,42 @@ impl TurboTasksApi for TurboTasks { fn try_read_local_output( &self, - _parent_task_id: TaskId, - _local_task_id: LocalTaskId, - _consistency: ReadConsistency, + parent_task_id: TaskId, + local_task_id: LocalTaskId, + consistency: ReadConsistency, ) -> Result> { - todo!("bgw: local outputs"); + // we don't currently support reading a local output outside of it's own task, so + // tracked/untracked is currently irrelevant + self.try_read_local_output_untracked(parent_task_id, local_task_id, consistency) } /// INVALIDATION: Be careful with this, it will not track dependencies, so /// using it could break cache invalidation. fn try_read_local_output_untracked( &self, - _parent_task_id: TaskId, - _local_task_id: LocalTaskId, + parent_task_id: TaskId, + local_task_id: LocalTaskId, + // we don't currently support reading a local output outside of it's own task, so + // consistency is currently irrelevant _consistency: ReadConsistency, ) -> Result> { - todo!("bgw: local outputs"); + CURRENT_GLOBAL_TASK_STATE.with(|gts| loop { + let gts_read = gts.read().unwrap(); + gts_read.assert_task_id(parent_task_id); + match gts_read.get_local_task(local_task_id) { + LocalTask::Unscheduled(..) => { + drop(gts_read); + self.schedule_local_task(parent_task_id, local_task_id); + continue; + } + LocalTask::Scheduled { done_event } => { + return Ok(Err(done_event.listen())); + } + LocalTask::Done { output } => { + return Ok(Ok(output.read_untracked()?)); + } + } + }) } fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap { @@ -1650,7 +1826,10 @@ pub fn with_turbo_tasks_for_testing( current_task, Box::new(()), ))), - CURRENT_LOCAL_TASK_STATE.scope(CurrentLocalTaskState::new(execution_id, None), f), + CURRENT_LOCAL_TASK_STATE.scope( + CurrentLocalTaskState::new(execution_id, /* function_meta */ None), + f, + ), ), ) } @@ -1992,12 +2171,17 @@ pub(crate) fn read_local_cell( } pub(crate) async fn read_local_output( - _this: &dyn TurboTasksApi, - _task_id: TaskId, - _local_output_id: LocalTaskId, - _consistency: ReadConsistency, + this: &dyn TurboTasksApi, + parent_task_id: TaskId, + local_task_id: LocalTaskId, + consistency: ReadConsistency, ) -> Result { - todo!("bgw: local outputs"); + loop { + match this.try_read_local_output(parent_task_id, local_task_id, consistency)? { + Ok(raw_vc) => return Ok(raw_vc), + Err(event_listener) => event_listener.await, + } + } } /// Panics if the [`ExecutionId`] does not match the current task's diff --git a/turbopack/crates/turbo-tasks/src/task/local_task.rs b/turbopack/crates/turbo-tasks/src/task/local_task.rs new file mode 100644 index 0000000000000..657e1f7a87303 --- /dev/null +++ b/turbopack/crates/turbo-tasks/src/task/local_task.rs @@ -0,0 +1,82 @@ +use std::sync::Arc; + +use crate::{ + backend::{Backend, CachedTaskType, TaskExecutionSpec}, + event::Event, + registry, OutputContent, TaskPersistence, TurboTasksBackendApi, +}; + +/// A potentially in-flight local task stored in `CurrentGlobalTaskState::local_tasks`. +pub enum LocalTask { + Unscheduled(Arc), + Scheduled { done_event: Event }, + Done { output: OutputContent }, +} + +pub struct Unscheduled { + pub ty: CachedTaskType, + /// if this is a `CachedTaskType::Resolve*`, we'll spawn another task with this persistence + pub persistence: TaskPersistence, +} + +impl Unscheduled { + pub fn start_execution<'a>( + &'a self, + turbo_tasks: &dyn TurboTasksBackendApi, + ) -> TaskExecutionSpec<'a> { + let Self { ty, persistence } = self; + match ty { + CachedTaskType::Native { + fn_type: native_fn_id, + this, + arg, + } => { + debug_assert_eq!(persistence, &TaskPersistence::LocalCells); + let func = registry::get_function(*native_fn_id); + let span = func.span(); + let entered = span.enter(); + let future = func.execute(*this, &**arg); + drop(entered); + TaskExecutionSpec { future, span } + } + CachedTaskType::ResolveNative { + fn_type: native_fn_id, + this, + arg, + } => { + let func = registry::get_function(*native_fn_id); + let span = func.resolve_span(); + let entered = span.enter(); + let future = Box::pin(CachedTaskType::run_resolve_native( + *native_fn_id, + *this, + &**arg, + *persistence, + turbo_tasks.pin(), + )); + drop(entered); + TaskExecutionSpec { future, span } + } + CachedTaskType::ResolveTrait { + trait_type: trait_type_id, + method_name: name, + this, + arg, + } => { + let trait_type = registry::get_trait(*trait_type_id); + let span = trait_type.resolve_span(name); + let entered = span.enter(); + let future = Box::pin(CachedTaskType::run_resolve_trait( + *trait_type_id, + name.clone(), + *this, + &**arg, + *persistence, + turbo_tasks.pin(), + )); + drop(entered); + TaskExecutionSpec { future, span } + } + } + } +} diff --git a/turbopack/crates/turbo-tasks/src/task/mod.rs b/turbopack/crates/turbo-tasks/src/task/mod.rs index 963a3a74a99d1..4ad8840c55622 100644 --- a/turbopack/crates/turbo-tasks/src/task/mod.rs +++ b/turbopack/crates/turbo-tasks/src/task/mod.rs @@ -1,4 +1,5 @@ pub(crate) mod function; +pub mod local_task; pub(crate) mod shared_reference; pub(crate) mod task_input; pub(crate) mod task_output;