Skip to content

Commit

Permalink
perf(trie): parallelize storage proofs across slots for the same account
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin committed Jan 17, 2025
1 parent 5db0129 commit db9c8cb
Showing 1 changed file with 102 additions and 75 deletions.
177 changes: 102 additions & 75 deletions crates/trie/parallel/src/proof.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{root::ParallelStateRootError, stats::ParallelTrieTracker, StorageRootTargets};
use alloy_primitives::{
map::{B256HashMap, HashMap},
map::{B256HashMap, B256HashSet, HashMap},
B256,
};
use alloy_rlp::{BufMut, Encodable};
Expand Down Expand Up @@ -127,89 +127,104 @@ where
// Pre-calculate storage roots for accounts which were changed.
tracker.set_precomputed_storage_roots(storage_root_targets_len as u64);

let mut storage_proofs =
B256HashMap::with_capacity_and_hasher(storage_root_targets.len(), Default::default());
let mut storage_proofs = B256HashMap::<Vec<_>>::with_capacity_and_hasher(
storage_root_targets.len(),
Default::default(),
);

for (hashed_address, prefix_set) in
storage_root_targets.into_iter().sorted_unstable_by_key(|(address, _)| *address)
{
let view = self.view.clone();
let target_slots = targets.get(&hashed_address).cloned().unwrap_or_default();
let trie_nodes_sorted = self.nodes_sorted.clone();
let hashed_state_sorted = self.state_sorted.clone();
let collect_masks = self.collect_branch_node_masks;

let (tx, rx) = std::sync::mpsc::sync_channel(1);
for chunk in &prefix_set
.into_iter()
.cloned() // TODO(alexey): Avoid cloning here
.zip(target_slots.into_iter().sorted_unstable())
.chunks(50)
{
let (prefix_set, target_slots): (PrefixSetMut, B256HashSet) = chunk.unzip();

self.thread_pool.spawn_fifo(move || {
debug!(
target: "trie::parallel",
?hashed_address,
"Starting proof calculation"
);
let (tx, rx) = std::sync::mpsc::sync_channel(1);

let task_start = Instant::now();
let result = (|| -> Result<_, ParallelStateRootError> {
let provider_start = Instant::now();
let provider_ro = view.provider_ro()?;
trace!(
target: "trie::parallel",
?hashed_address,
provider_time = ?provider_start.elapsed(),
"Got provider"
);

let cursor_start = Instant::now();
let trie_cursor_factory = InMemoryTrieCursorFactory::new(
DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
&trie_nodes_sorted,
);
let hashed_cursor_factory = HashedPostStateCursorFactory::new(
DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
&hashed_state_sorted,
);
trace!(
let view = self.view.clone();
let trie_nodes_sorted = self.nodes_sorted.clone();
let hashed_state_sorted = self.state_sorted.clone();
self.thread_pool.spawn_fifo(move || {
debug!(
target: "trie::parallel",
?hashed_address,
cursor_time = ?cursor_start.elapsed(),
"Created cursors"
"Starting proof calculation"
);

let proof_start = Instant::now();
let proof_result = StorageProof::new_hashed(
trie_cursor_factory,
hashed_cursor_factory,
hashed_address,
)
.with_prefix_set_mut(PrefixSetMut::from(prefix_set.iter().cloned()))
.with_branch_node_masks(collect_masks)
.storage_multiproof(target_slots)
.map_err(|e| ParallelStateRootError::Other(e.to_string()));
let task_start = Instant::now();
let result = (|| -> Result<_, ParallelStateRootError> {
let provider_start = Instant::now();
let provider_ro = view.provider_ro()?;
trace!(
target: "trie::parallel",
?hashed_address,
provider_time = ?provider_start.elapsed(),
"Got provider"
);

trace!(
target: "trie::parallel",
?hashed_address,
proof_time = ?proof_start.elapsed(),
"Completed proof calculation"
);
let cursor_start = Instant::now();
let trie_cursor_factory = InMemoryTrieCursorFactory::new(
DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
&trie_nodes_sorted,
);
let hashed_cursor_factory = HashedPostStateCursorFactory::new(
DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
&hashed_state_sorted,
);
trace!(
target: "trie::parallel",
?hashed_address,
cursor_time = ?cursor_start.elapsed(),
"Created cursors"
);

proof_result
})();
let prefix_set_len = prefix_set.len();
let target_slots_len = target_slots.len();
let proof_start = Instant::now();
let proof_result = StorageProof::new_hashed(
trie_cursor_factory,
hashed_cursor_factory,
hashed_address,
)
.with_prefix_set_mut(prefix_set)
.with_branch_node_masks(collect_masks)
.storage_multiproof(target_slots)
.map_err(|e| ParallelStateRootError::Other(e.to_string()));

trace!(
target: "trie::parallel",
?hashed_address,
prefix_set = ?prefix_set_len,
target_slots = ?target_slots_len,
proof_time = ?proof_start.elapsed(),
"Completed proof calculation"
);

// We can have the receiver dropped before we send, because we still calculate
// storage proofs for deleted accounts, but do not actually walk over them in
// `account_node_iter` below.
if let Err(e) = tx.send(result) {
debug!(
target: "trie::parallel",
?hashed_address,
error = ?e,
task_time = ?task_start.elapsed(),
"Failed to send proof result"
);
}
});
storage_proofs.insert(hashed_address, rx);
proof_result
})();

// We can have the receiver dropped before we send, because we still calculate
// storage proofs for deleted accounts, but do not actually walk over them in
// `account_node_iter` below.
if let Err(e) = tx.send(result) {
debug!(
target: "trie::parallel",
?hashed_address,
error = ?e,
task_time = ?task_start.elapsed(),
"Failed to send proof result"
);
}
});
storage_proofs.entry(hashed_address).or_default().push(rx);
}
}

let provider_ro = self.view.provider_ro()?;
Expand Down Expand Up @@ -253,13 +268,25 @@ where
}
TrieElement::Leaf(hashed_address, account) => {
let storage_multiproof = match storage_proofs.remove(&hashed_address) {
Some(rx) => rx.recv().map_err(|_| {
ParallelStateRootError::StorageRoot(StorageRootError::Database(
DatabaseError::Other(format!(
"channel closed for {hashed_address}"
)),
))
})??,
Some(rxs) => rxs
.into_iter()
.map(|rx| {
rx.recv().map_err(|_| {
ParallelStateRootError::StorageRoot(StorageRootError::Database(
DatabaseError::Other(format!(
"channel closed for {hashed_address}"
)),
))
})?
})
.try_fold(StorageMultiProof::empty(), |mut result, proof| {
let proof = proof?;
result.root = proof.root;
result.subtree.extend(proof.subtree.into_inner());
result.branch_node_hash_masks.extend(proof.branch_node_hash_masks);
result.branch_node_tree_masks.extend(proof.branch_node_tree_masks);
Result::<_, ParallelStateRootError>::Ok(result)
})?,
// Since we do not store all intermediate nodes in the database, there might
// be a possibility of re-adding a non-modified leaf to the hash builder.
None => {
Expand Down

0 comments on commit db9c8cb

Please sign in to comment.