Skip to content

Commit

Permalink
feat(compilation): add queue handlers representing task_queue
Browse files Browse the repository at this point in the history
  • Loading branch information
JSerFeng committed Jan 4, 2024
1 parent 25d8841 commit 63431be
Show file tree
Hide file tree
Showing 7 changed files with 265 additions and 7 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 64 additions & 6 deletions crates/rspack_core/src/compiler/compilation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use super::{
use crate::{
build_chunk_graph::build_chunk_graph,
cache::{use_code_splitting_cache, Cache, CodeSplittingCache},
get_chunk_from_ukey, get_mut_chunk_from_ukey, is_source_equal,
create_queue_handle, get_chunk_from_ukey, get_mut_chunk_from_ukey, is_source_equal,
tree_shaking::{optimizer, visitor::SymbolRef, BailoutFlag, OptimizeDependencyResult},
AddQueue, AddTask, AddTaskResult, AdditionalChunkRuntimeRequirementsArgs,
AdditionalModuleRequirementsArgs, AsyncDependenciesBlock, BoxDependency, BoxModule, BuildQueue,
Expand All @@ -40,11 +40,12 @@ use crate::{
CompilationLogging, CompilerOptions, ContentHashArgs, ContextDependency, DependencyId,
DependencyParents, DependencyType, Entry, EntryData, EntryOptions, Entrypoint, ErrorSpan,
FactorizeQueue, FactorizeTask, FactorizeTaskResult, Filename, Logger, Module,
ModuleCreationCallback, ModuleFactory, ModuleGraph, ModuleGraphModule, ModuleIdentifier,
ModuleProfile, NormalModuleSource, PathData, ProcessAssetsArgs, ProcessDependenciesQueue,
ProcessDependenciesResult, ProcessDependenciesTask, RenderManifestArgs, Resolve, ResolverFactory,
RuntimeGlobals, RuntimeModule, RuntimeRequirementsInTreeArgs, RuntimeSpec, SharedPluginDriver,
SourceType, Stats, TaskResult, WorkerTask,
ModuleCreationCallback, ModuleFactory, ModuleFactoryResult, ModuleGraph, ModuleGraphModule,
ModuleIdentifier, ModuleProfile, NormalModuleSource, PathData, ProcessAssetsArgs,
ProcessDependenciesQueue, ProcessDependenciesResult, ProcessDependenciesTask, QueueHandler,
RenderManifestArgs, Resolve, ResolverFactory, RuntimeGlobals, RuntimeModule,
RuntimeRequirementsInTreeArgs, RuntimeSpec, SharedPluginDriver, SourceType, Stats, TaskResult,
WorkerTask,
};
use crate::{tree_shaking::visitor::OptimizeAnalyzeResult, Context};

Expand Down Expand Up @@ -109,6 +110,8 @@ pub struct Compilation {
pub build_dependencies: IndexSet<PathBuf, BuildHasherDefault<FxHasher>>,
pub side_effects_free_modules: IdentifierSet,
pub module_item_map: IdentifierMap<Vec<ModuleItem>>,

pub queue_handle: Option<QueueHandler>,
}

impl Compilation {
Expand Down Expand Up @@ -170,6 +173,8 @@ impl Compilation {
side_effects_free_modules: IdentifierSet::default(),
module_item_map: IdentifierMap::default(),
include_module_ids: IdentifierSet::default(),

queue_handle: None,
}
}

Expand Down Expand Up @@ -529,14 +534,19 @@ impl Compilation {
let mut active_task_count = 0usize;
let is_expected_shutdown = Arc::new(AtomicBool::new(false));
let (result_tx, mut result_rx) = tokio::sync::mpsc::unbounded_channel::<Result<TaskResult>>();

let mut factorize_queue = FactorizeQueue::new();
let mut add_queue = AddQueue::new();
let mut build_queue = BuildQueue::new();
let mut process_dependencies_queue = ProcessDependenciesQueue::new();

let mut make_failed_dependencies: HashSet<BuildDependency> = HashSet::default();
let mut make_failed_module = HashSet::default();
let mut errored = None;

let (handler, mut processor) = create_queue_handle();
self.queue_handle = Some(handler);

deps_builder
.revoke_modules(&mut self.module_graph)
.into_iter()
Expand Down Expand Up @@ -587,6 +597,14 @@ impl Compilation {

tokio::task::block_in_place(|| loop {
let start = factorize_time.start();
processor.try_process(
self,
&mut factorize_queue,
&mut add_queue,
&mut build_queue,
&mut process_dependencies_queue,
);

while let Some(task) = factorize_queue.get_task() {
tokio::spawn({
let result_tx = result_tx.clone();
Expand Down Expand Up @@ -729,6 +747,45 @@ impl Compilation {

match result_rx.try_recv() {
Ok(item) => {
if let Ok(item) = &item {
match item {
TaskResult::Factorize(result) => {
if let Some(ModuleFactoryResult {
module: Some(module),
..
}) = &result.factory_result
{
processor.flush_callback(
crate::TaskCategory::Factorize,
&result.dependency.to_string(),
module.identifier(),
self,
)
}
}
TaskResult::Add(result) => {
let module = match result.as_ref() {
AddTaskResult::ModuleReused { module } => module.identifier(),
AddTaskResult::ModuleAdded { module, .. } => module.identifier(),
};

processor.flush_callback(crate::TaskCategory::Add, &module, module, self)
}
TaskResult::Build(result) => {
let id = result.module.identifier();
processor.flush_callback(crate::TaskCategory::Build, &id, id, self);
}
TaskResult::ProcessDependencies(result) => {
processor.flush_callback(
crate::TaskCategory::ProcessDependencies,
&result.module_identifier,
result.module_identifier,
self,
);
}
}
}

match item {
Ok(TaskResult::Factorize(box task_result)) => {
let FactorizeTaskResult {
Expand All @@ -743,6 +800,7 @@ impl Compilation {
missing_dependencies,
diagnostics,
callback,
..
} = task_result;
if !diagnostics.is_empty() {
make_failed_dependencies.insert((dependencies[0], original_module_identifier));
Expand Down
186 changes: 185 additions & 1 deletion crates/rspack_core/src/compiler/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::sync::Arc;

use rspack_error::{Diagnostic, IntoTWithDiagnosticArray, Result};
use rspack_sources::BoxSource;
use rustc_hash::FxHashSet as HashSet;
use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};

use crate::{
cache::Cache, BoxDependency, BuildContext, BuildResult, Compilation, CompilerContext,
Expand Down Expand Up @@ -46,6 +47,30 @@ pub struct FactorizeTask {
pub callback: Option<ModuleCreationCallback>,
}

impl std::fmt::Debug for FactorizeTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FactorizeTask")
.field(
"original_module_identifier",
&self.original_module_identifier,
)
.field("original_module_context", &self.original_module_context)
.field("issuer", &self.issuer)
.field("dependency", &self.dependency)
.field("dependencies", &self.dependencies)
.field("is_entry", &self.is_entry)
.field("resolve_options", &self.resolve_options)
.field("resolver_factory", &self.resolver_factory)
.field("loader_resolver_factory", &self.loader_resolver_factory)
.field("options", &self.options)
.field("lazy_visit_modules", &self.lazy_visit_modules)
.field("plugin_driver", &self.plugin_driver)
.field("cache", &self.cache)
.field("current_profile", &self.current_profile)
.finish()
}
}

/// a struct temporarily used creating ExportsInfo
#[derive(Debug)]
pub struct ExportsInfoRelated {
Expand All @@ -55,6 +80,7 @@ pub struct ExportsInfoRelated {
}

pub struct FactorizeTaskResult {
pub dependency: DependencyId,
pub original_module_identifier: Option<ModuleIdentifier>,
/// Result will be available if [crate::ModuleFactory::create] returns `Ok`.
pub factory_result: Option<ModuleFactoryResult>,
Expand Down Expand Up @@ -125,6 +151,7 @@ impl WorkerTask for FactorizeTask {
current_profile.mark_factory_start();
}
let dependency = self.dependency;
let dep_id = *dependency.id();

let context = if let Some(context) = dependency.get_context() {
context
Expand All @@ -143,6 +170,7 @@ impl WorkerTask for FactorizeTask {
);
let exports_info = ExportsInfo::new(other_exports_info.id, side_effects_only_info.id);
let factorize_task_result = FactorizeTaskResult {
dependency: dep_id,
original_module_identifier: self.original_module_identifier,
factory_result: None,
dependencies: self.dependencies,
Expand Down Expand Up @@ -231,6 +259,22 @@ pub struct AddTask {
pub callback: Option<ModuleCreationCallback>,
}

impl std::fmt::Debug for AddTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AddTask")
.field(
"original_module_identifier",
&self.original_module_identifier,
)
.field("module", &self.module)
.field("module_graph_module", &self.module_graph_module)
.field("dependencies", &self.dependencies)
.field("is_entry", &self.is_entry)
.field("current_profile", &self.current_profile)
.finish()
}
}

#[derive(Debug)]
pub enum AddTaskResult {
ModuleReused {
Expand Down Expand Up @@ -336,6 +380,7 @@ fn set_resolved_module(

pub type AddQueue = WorkerQueue<AddTask>;

#[derive(Debug)]
pub struct BuildTask {
pub module: Box<dyn Module>,
pub resolver_factory: Arc<ResolverFactory>,
Expand Down Expand Up @@ -432,6 +477,7 @@ impl WorkerTask for BuildTask {

pub type BuildQueue = WorkerQueue<BuildTask>;

#[derive(Debug)]
pub struct ProcessDependenciesTask {
pub original_module_identifier: ModuleIdentifier,
pub dependencies: Vec<DependencyId>,
Expand Down Expand Up @@ -498,3 +544,141 @@ impl CleanTask {
pub type CleanQueue = WorkerQueue<CleanTask>;

pub type ModuleCreationCallback = Box<dyn FnOnce(&BoxModule) + Send>;

pub type QueueHandleCallback = Box<dyn FnOnce(ModuleIdentifier, &mut Compilation) + Send + Sync>;

#[derive(Debug)]
pub enum QueueTask {
Factorize(Box<FactorizeTask>),
Add(Box<AddTask>),
Build(Box<BuildTask>),
ProcessDependencies(Box<ProcessDependenciesTask>),

Subscription(Box<Subscription>),
}

#[derive(Debug, Copy, Clone)]
pub enum TaskCategory {
Factorize = 0,
Add = 1,
Build = 2,
ProcessDependencies = 3,
}

pub struct Subscription {
category: TaskCategory,
key: String,
callback: QueueHandleCallback,
}

impl std::fmt::Debug for Subscription {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Subscription")
.field("category", &self.category)
.finish()
}
}

#[derive(Clone)]
pub struct QueueHandler {
sender: UnboundedSender<QueueTask>,
}

impl std::fmt::Debug for QueueHandler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QueueHandler")
.field("sender", &self.sender)
.finish()
}
}

impl QueueHandler {
pub fn wait_for(&self, key: String, category: TaskCategory, callback: QueueHandleCallback) {
self
.sender
.send(QueueTask::Subscription(Box::new(Subscription {
category,
key,
callback,
})))
.expect("failed to wait task");
}
}

pub struct QueueHandlerProcessor {
receiver: UnboundedReceiver<QueueTask>,
callbacks: [HashMap<String, Vec<QueueHandleCallback>>; 4],
finished: [HashMap<String, ModuleIdentifier>; 4],
}

impl QueueHandlerProcessor {
pub fn try_process(
&mut self,
compilation: &mut Compilation,
factorize_queue: &mut FactorizeQueue,
add_queue: &mut AddQueue,
build_queue: &mut BuildQueue,
process_dependencies_queue: &mut ProcessDependenciesQueue,
) {
while let Ok(task) = self.receiver.try_recv() {
match task {
QueueTask::Factorize(task) => {
factorize_queue.add_task(*task);
}
QueueTask::Add(task) => {
add_queue.add_task(*task);
}
QueueTask::Build(task) => {
build_queue.add_task(*task);
}
QueueTask::ProcessDependencies(task) => {
process_dependencies_queue.add_task(*task);
}
QueueTask::Subscription(subscription) => {
let Subscription {
category,
key,
callback,
} = *subscription;
if let Some(module) = self.finished[category as usize].get(&key) {
// already finished
callback(*module, compilation);
} else {
self.callbacks[category as usize]
.entry(key)
.or_default()
.push(callback);
}
}
}
}
}

pub fn flush_callback(
&mut self,
category: TaskCategory,
key: &str,
module: ModuleIdentifier,
compilation: &mut Compilation,
) {
self.finished[category as usize].insert(key.into(), module);
if let Some(callbacks) = self.callbacks[category as usize].get_mut(key) {
while let Some(cb) = callbacks.pop() {
cb(module, compilation);
}
}
}
}

pub fn create_queue_handle() -> (QueueHandler, QueueHandlerProcessor) {
let (tx, rx) = unbounded_channel();

(
QueueHandler { sender: tx },
QueueHandlerProcessor {
receiver: rx,
callbacks: Default::default(),
finished: Default::default(),
},
)
}
Loading

0 comments on commit 63431be

Please sign in to comment.