Skip to content

Commit

Permalink
fix hanging waiting on in progress cells
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Nov 14, 2024
1 parent 8c1a880 commit 11cf49f
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 46 deletions.
1 change: 1 addition & 0 deletions turbopack/crates/turbo-tasks-auto-hash-map/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![feature(hash_raw_entry)]
#![feature(hash_extract_if)]

pub mod map;
pub mod set;
Expand Down
80 changes: 80 additions & 0 deletions turbopack/crates/turbo-tasks-auto-hash-map/src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,16 @@ impl<K: Eq + Hash, V, H: BuildHasher + Default, const I: usize> AutoMap<K, V, H,
}
}

pub fn extract_if<'l, F>(&'l mut self, f: F) -> ExtractIfIter<'l, K, V, I, F>
where
F: for<'a, 'b> FnMut(&'a K, &'b mut V) -> bool,
{
match self {
AutoMap::List(list) => ExtractIfIter::List { list, index: 0, f },
AutoMap::Map(map) => ExtractIfIter::Map(map.extract_if(f)),
}
}

/// see [HashMap::shrink_to_fit](https://doc.rust-lang.org/std/collections/struct.HashMap.html#method.shrink_to_fit)
pub fn shrink_to_fit(&mut self) {
match self {
Expand Down Expand Up @@ -843,6 +853,43 @@ where
}
}

pub enum ExtractIfIter<'l, K, V, const I: usize, F>
where
F: for<'a, 'b> FnMut(&'a K, &'b mut V) -> bool,
{
List {
list: &'l mut SmallVec<[(K, V); I]>,
index: usize,
f: F,
},
Map(std::collections::hash_map::ExtractIf<'l, K, V, F>),
}

impl<'l, K, V, const I: usize, F> Iterator for ExtractIfIter<'l, K, V, I, F>
where
F: for<'a, 'b> FnMut(&'a K, &'b mut V) -> bool,
{
type Item = (K, V);

fn next(&mut self) -> Option<Self::Item> {
match self {
ExtractIfIter::List { list, index, f } => {
while *index < list.len() {
let (key, value) = &mut list[*index];
if f(key, value) {
let item = list.swap_remove(*index);
return Some(item);
} else {
*index += 1;
}
}
None
}
ExtractIfIter::Map(extract_if) => extract_if.next(),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -863,4 +910,37 @@ mod tests {
}
assert_eq!(map.remove(&(MAX_LIST_SIZE * 2)), None);
}

#[test]
fn test_extract_if_map() {
let mut map = AutoMap::new();
for i in 0..MAX_LIST_SIZE * 2 {
map.insert(i, i);
}
let iter = map.extract_if(|_, v| *v % 2 == 0);
assert_eq!(iter.count(), MAX_LIST_SIZE);
assert_eq!(map.len(), MAX_LIST_SIZE);
}

#[test]
fn test_extract_if_list() {
let mut map = AutoMap::new();
for i in 0..MIN_HASH_SIZE {
map.insert(i, i);
}
let iter = map.extract_if(|_, v| *v % 2 == 0);
assert_eq!(iter.count(), MIN_HASH_SIZE / 2);
assert_eq!(map.len(), MIN_HASH_SIZE / 2);
}

#[test]
fn test_extract_if_list2() {
let mut map = AutoMap::new();
for i in 0..MIN_HASH_SIZE {
map.insert(i, i);
}
let iter = map.extract_if(|_, v| *v < 5);
assert_eq!(iter.count(), 5);
assert_eq!(map.len(), MIN_HASH_SIZE - 5);
}
}
123 changes: 77 additions & 46 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
ctx.get_task_description(task_id)
);
};
if cell.index > *max_id {
if cell.index >= *max_id {
add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
bail!(
"Cell {cell:?} no longer exists in task {} (index out of bounds)",
Expand Down Expand Up @@ -1228,7 +1228,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
let _ = stateful;

// handle cell counters: update max index and remove cells that are no longer used
let mut removed_cells = HashMap::new();
let mut old_counters: HashMap<_, _> =
get_many!(task, CellTypeMaxIndex { cell_type } max_index => (*cell_type, *max_index));
for (&cell_type, &max_index) in cell_counters.iter() {
Expand All @@ -1238,9 +1237,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
cell_type,
value: max_index,
});
if old_max_index > max_index {
removed_cells.insert(cell_type, max_index + 1..=old_max_index);
}
}
} else {
task.add_new(CachedDataItem::CellTypeMaxIndex {
Expand All @@ -1249,28 +1245,46 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
});
}
}
for (cell_type, old_max_index) in old_counters {
for (cell_type, _) in old_counters {
task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type });
removed_cells.insert(cell_type, 0..=old_max_index);
}

let mut removed_data = Vec::new();
for (&cell_type, range) in removed_cells.iter() {
for index in range.clone() {
removed_data.extend(
task.remove(&CachedDataItemKey::CellData {
cell: CellId {
type_id: cell_type,
index,
},
})
.into_iter(),
);
}
}
let mut old_edges = Vec::new();

// Remove no longer existing cells and notify in progress cells
// find all outdated data items (removed cells, outdated edges)
let old_edges = if task.is_indexed() {
let mut old_edges = Vec::new();
if task.is_indexed() {
removed_data.extend(task.extract_if(
CachedDataItemIndex::InProgressCell,
|key, value| {
match (key, value) {
(
&CachedDataItemKey::InProgressCell { cell },
CachedDataItemValue::InProgressCell { value },
) if cell_counters
.get(&cell.type_id)
.map_or(true, |start_index| cell.index >= *start_index) =>
{
value.event.notify(usize::MAX);
true
}
_ => false,
}
},
));
removed_data.extend(task.extract_if(CachedDataItemIndex::CellData, |key, _| {
match key {
&CachedDataItemKey::CellData { cell }
if cell_counters
.get(&cell.type_id)
.map_or(true, |start_index| cell.index >= *start_index) =>
{
true
}
_ => false,
}
}));
if self.should_track_children() {
old_edges.extend(task.iter(CachedDataItemIndex::Children).filter_map(
|(key, _)| match *key {
Expand Down Expand Up @@ -1306,9 +1320,9 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
|(key, _)| {
match *key {
CachedDataItemKey::CellDependent { cell, task }
if removed_cells
if cell_counters
.get(&cell.type_id)
.map_or(false, |range| range.contains(&cell.index)) =>
.map_or(true, |start_index| cell.index >= *start_index) =>
{
Some(OutdatedEdge::RemovedCellDependent(task, cell.type_id))
}
Expand All @@ -1317,36 +1331,53 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
},
));
}
old_edges
} else {
task.iter_all()
.filter_map(|(key, value)| match *key {
CachedDataItemKey::OutdatedChild { task } => Some(OutdatedEdge::Child(task)),
CachedDataItemKey::OutdatedCollectible { collectible } => {
let CachedDataItemValue::OutdatedCollectible { value } = *value else {
unreachable!();
};
Some(OutdatedEdge::Collectible(collectible, value))
removed_data.extend(task.extract_if_all(|key, value| {
match (key, value) {
(
&CachedDataItemKey::InProgressCell { cell },
CachedDataItemValue::InProgressCell { value },
) if cell_counters
.get(&cell.type_id)
.map_or(true, |start_index| cell.index >= *start_index) =>
{
value.event.notify(usize::MAX);
return true;
}
CachedDataItemKey::OutdatedCellDependency { target } => {
Some(OutdatedEdge::CellDependency(target))
(&CachedDataItemKey::CellData { cell }, _)
if cell_counters
.get(&cell.type_id)
.map_or(true, |start_index| cell.index >= *start_index) =>
{
return true;
}
(&CachedDataItemKey::OutdatedChild { task }, _) => {
old_edges.push(OutdatedEdge::Child(task));
}
(
&CachedDataItemKey::OutdatedCollectible { collectible },
&CachedDataItemValue::OutdatedCollectible { value },
) => old_edges.push(OutdatedEdge::Collectible(collectible, value)),
(&CachedDataItemKey::OutdatedCellDependency { target }, _) => {
old_edges.push(OutdatedEdge::CellDependency(target));
}
CachedDataItemKey::OutdatedOutputDependency { target } => {
Some(OutdatedEdge::OutputDependency(target))
(&CachedDataItemKey::OutdatedOutputDependency { target }, _) => {
old_edges.push(OutdatedEdge::OutputDependency(target));
}
CachedDataItemKey::OutdatedCollectiblesDependency { target } => {
Some(OutdatedEdge::CollectiblesDependency(target))
(&CachedDataItemKey::OutdatedCollectiblesDependency { target }, _) => {
old_edges.push(OutdatedEdge::CollectiblesDependency(target));
}
CachedDataItemKey::CellDependent { cell, task }
if removed_cells
(&CachedDataItemKey::CellDependent { cell, task }, _)
if cell_counters
.get(&cell.type_id)
.map_or(false, |range| range.contains(&cell.index)) =>
.map_or(true, |start_index| cell.index >= *start_index) =>
{
Some(OutdatedEdge::RemovedCellDependent(task, cell.type_id))
old_edges.push(OutdatedEdge::RemovedCellDependent(task, cell.type_id));
}
_ => None,
})
.collect::<Vec<_>>()
_ => {}
}
false
}));
};
drop(task);

Expand Down
65 changes: 65 additions & 0 deletions turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{
mem::{take, transmute},
};

use either::Either;
use serde::{Deserialize, Serialize};
use turbo_tasks::{KeyValuePair, SessionId, TaskId, TurboTasksBackendApi};

Expand Down Expand Up @@ -370,6 +371,16 @@ pub trait TaskGuard: Debug {
index: CachedDataItemIndex,
) -> impl Iterator<Item = (&CachedDataItemKey, &CachedDataItemValue)>;
fn iter_all(&self) -> impl Iterator<Item = (&CachedDataItemKey, &CachedDataItemValue)>;
fn extract_if<'l, F>(
&'l mut self,
index: CachedDataItemIndex,
f: F,
) -> impl Iterator<Item = CachedDataItem>
where
F: for<'a, 'b> FnMut(&'a CachedDataItemKey, &'b CachedDataItemValue) -> bool + 'l;
fn extract_if_all<'l, F>(&'l mut self, f: F) -> impl Iterator<Item = CachedDataItem>
where
F: for<'a, 'b> FnMut(&'a CachedDataItemKey, &'b CachedDataItemValue) -> bool + 'l;
fn invalidate_serialization(&mut self);
}

Expand Down Expand Up @@ -586,6 +597,60 @@ impl<B: BackingStorage> TaskGuard for TaskGuardImpl<'_, B> {
self.task.iter_all()
}

fn extract_if<'l, F>(
&'l mut self,
index: CachedDataItemIndex,
f: F,
) -> impl Iterator<Item = CachedDataItem>
where
F: for<'a, 'b> FnMut(&'a CachedDataItemKey, &'b CachedDataItemValue) -> bool + 'l,
{
if !self.backend.should_persist() || self.task_id.is_transient() {
return Either::Left(self.task.extract_if(Some(index), f));
}
Either::Right(self.task.extract_if(Some(index), f).inspect(|item| {
if item.is_persistent() {
let key = item.key();
let value = item.value();
self.backend
.persisted_storage_log(key.category())
.unwrap()
.lock(self.task_id)
.push(CachedDataUpdate {
key,
task: self.task_id,
value: None,
old_value: Some(value),
});
}
}))
}

fn extract_if_all<'l, F>(&'l mut self, f: F) -> impl Iterator<Item = CachedDataItem>
where
F: for<'a, 'b> FnMut(&'a CachedDataItemKey, &'b CachedDataItemValue) -> bool + 'l,
{
if !self.backend.should_persist() || self.task_id.is_transient() {
return Either::Left(self.task.extract_if_all(f));
}
Either::Right(self.task.extract_if_all(f).inspect(|item| {
if item.is_persistent() {
let key = item.key();
let value = item.value();
self.backend
.persisted_storage_log(key.category())
.unwrap()
.lock(self.task_id)
.push(CachedDataUpdate {
key,
task: self.task_id,
value: None,
old_value: Some(value),
});
}
}))
}

fn invalidate_serialization(&mut self) {
if !self.backend.should_persist() {
return;
Expand Down
Loading

0 comments on commit 11cf49f

Please sign in to comment.