From 80f8756d3ef81627f864f4e505cdf48e4b9e261f Mon Sep 17 00:00:00 2001 From: jserfeng <1114550440@qq.com> Date: Thu, 30 Nov 2023 19:43:34 +0800 Subject: [PATCH] refactor(compilation): process dependencies should happen after all dependencies get factorized --- .../rspack_core/src/compiler/compilation.rs | 40 +++++++++++++++---- crates/rspack_core/src/compiler/queue.rs | 17 ++++++++ 2 files changed, 49 insertions(+), 8 deletions(-) diff --git a/crates/rspack_core/src/compiler/compilation.rs b/crates/rspack_core/src/compiler/compilation.rs index 6a3793e4c8c6..c5707523f786 100644 --- a/crates/rspack_core/src/compiler/compilation.rs +++ b/crates/rspack_core/src/compiler/compilation.rs @@ -23,7 +23,7 @@ use rspack_identifier::{Identifiable, IdentifierMap, IdentifierSet}; use rspack_sources::{BoxSource, CachedSource, SourceExt}; use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet, FxHasher}; use swc_core::ecma::ast::ModuleItem; -use tokio::sync::mpsc::error::TryRecvError; +use tokio::sync::{mpsc::error::TryRecvError, oneshot::Sender}; use tracing::instrument; use super::{ @@ -472,6 +472,7 @@ impl Compilation { parent_module .and_then(|m| m.as_normal_module()) .and_then(|module| module.name_for_condition()), + None, ); }); @@ -603,7 +604,11 @@ impl Compilation { .module_by_identifier(original_module_identifier) .expect("Module expected"); + let mut deps_to_be_done = vec![]; for dependencies in sorted_dependencies.into_values() { + let (tx, rx) = tokio::sync::oneshot::channel(); + deps_to_be_done.push(rx); + self.handle_module_creation( &mut factorize_queue, Some(task.original_module_identifier), @@ -617,16 +622,30 @@ impl Compilation { module .as_normal_module() .and_then(|module| module.name_for_condition()), + Some(tx), ); } - result_tx - .send(Ok(TaskResult::ProcessDependencies(Box::new( - ProcessDependenciesResult { - module_identifier: task.original_module_identifier, - }, - )))) - .expect("Failed to send process dependencies result"); + let tx = result_tx.clone(); + + tokio::spawn(async move { + let mut succeed = true; + while let Some(rx) = deps_to_be_done.pop() { + if rx.blocking_recv().is_err() { + succeed = false; + break; + } + } + + if succeed { + tx.send(Ok(TaskResult::ProcessDependencies(Box::new( + ProcessDependenciesResult { + module_identifier: task.original_module_identifier, + }, + )))) + .expect("Failed to send process dependencies result"); + } + }); } process_deps_time.end(start); @@ -644,6 +663,8 @@ impl Compilation { current_profile, exports_info_related, from_cache, + callback, + .. } = task_result; if let Some(counter) = &mut factorize_cache_counter { @@ -694,6 +715,7 @@ impl Compilation { dependencies, is_entry, current_profile, + callback, }); } Ok(TaskResult::Add(box task_result)) => match task_result { @@ -997,6 +1019,7 @@ impl Compilation { resolve_options: Option>, lazy_visit_modules: std::collections::HashSet, issuer: Option>, + callback: Option>, ) { let current_profile = self.options.profile.then(Box::::default); queue.add_task(FactorizeTask { @@ -1016,6 +1039,7 @@ impl Compilation { plugin_driver: self.plugin_driver.clone(), cache: self.cache.clone(), current_profile, + callback, }); } diff --git a/crates/rspack_core/src/compiler/queue.rs b/crates/rspack_core/src/compiler/queue.rs index 9c45c8a6f392..8a693f169a03 100644 --- a/crates/rspack_core/src/compiler/queue.rs +++ b/crates/rspack_core/src/compiler/queue.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use rspack_error::{Diagnostic, Result}; +use tokio::sync::oneshot::Sender; use crate::{ cache::Cache, BoxDependency, BuildContext, BuildResult, Compilation, CompilerContext, @@ -41,6 +42,7 @@ pub struct FactorizeTask { pub plugin_driver: SharedPluginDriver, pub cache: Arc, pub current_profile: Option>, + pub callback: Option>, } /// a struct temporarily used creating ExportsInfo @@ -61,6 +63,7 @@ pub struct FactorizeTaskResult { pub current_profile: Option>, pub exports_info_related: ExportsInfoRelated, pub from_cache: bool, + pub callback: Option>, } #[async_trait::async_trait] @@ -164,6 +167,7 @@ impl WorkerTask for FactorizeTask { other_exports_info, side_effects_info: side_effects_only_info, }, + callback: self.callback, }))) } } @@ -177,6 +181,7 @@ pub struct AddTask { pub dependencies: Vec, pub is_entry: bool, pub current_profile: Option>, + pub callback: Option>, } #[derive(Debug)] @@ -209,6 +214,12 @@ impl AddTask { module_identifier, )?; + if let Some(callback) = self.callback { + callback + .send(()) + .map_err(|_| rspack_error::internal_error!("failed to complete adding module"))?; + } + return Ok(TaskResult::Add(Box::new(AddTaskResult::ModuleReused { module: self.module, }))); @@ -235,6 +246,12 @@ impl AddTask { current_profile.mark_integration_end(); } + if let Some(callback) = self.callback { + callback + .send(()) + .map_err(|_| rspack_error::internal_error!("failed to complete adding module"))?; + } + Ok(TaskResult::Add(Box::new(AddTaskResult::ModuleAdded { module: self.module, current_profile: self.current_profile,