diff --git a/src/packed_leaf.rs b/src/packed_leaf.rs index b0f3770..78a58f9 100644 --- a/src/packed_leaf.rs +++ b/src/packed_leaf.rs @@ -1,7 +1,7 @@ use crate::{utils::arb_rwlock, Error, UpdateMap}; use arbitrary::Arbitrary; use derivative::Derivative; -use parking_lot::RwLock; +use parking_lot::{RwLock, RwLockUpgradableReadGuard}; use std::ops::ControlFlow; use tree_hash::{Hash256, TreeHash, BYTES_PER_CHUNK}; @@ -27,27 +27,54 @@ where } impl PackedLeaf { - pub fn tree_hash(&self) -> Hash256 { - let read_lock = self.hash.read(); - let mut hash = *read_lock; - drop(read_lock); - - if !hash.is_zero() { - return hash; - } - + fn compute_hash(&self, mut hash: Hash256) -> Hash256 { let hash_bytes = hash.as_bytes_mut(); - let value_len = BYTES_PER_CHUNK / T::tree_hash_packing_factor(); for (i, value) in self.values.iter().enumerate() { hash_bytes[i * value_len..(i + 1) * value_len] .copy_from_slice(&value.tree_hash_packed_encoding()); } - - *self.hash.write() = hash; hash } + pub fn tree_hash(&self) -> Hash256 { + let read_lock = self.hash.upgradable_read(); + let hash = *read_lock; + + if !hash.is_zero() { + hash + } else { + match RwLockUpgradableReadGuard::try_upgrade(read_lock) { + Ok(mut write_lock) => { + // If we successfully acquire the lock we are guaranteed to be the first and + // only thread attempting to write the hash. + let tree_hash = self.compute_hash(hash); + + *write_lock = tree_hash; + tree_hash + } + Err(lock) => { + // Another thread is holding a lock. Drop the lock and attempt to + // acquire a new one. This will avoid a deadlock. + RwLockUpgradableReadGuard::unlock_fair(lock); + let mut write_lock = self.hash.write(); + + // Since we just acquired the write lock normally, another thread may have + // just finished computing the hash. If so, return it. + let existing_hash = *write_lock; + if !existing_hash.is_zero() { + return existing_hash; + } + + let tree_hash = self.compute_hash(hash); + + *write_lock = tree_hash; + tree_hash + } + } + } + } + pub fn empty() -> Self { PackedLeaf { hash: RwLock::new(Hash256::zero()), diff --git a/src/tree.rs b/src/tree.rs index d816e6e..9bcae3e 100644 --- a/src/tree.rs +++ b/src/tree.rs @@ -3,7 +3,7 @@ use crate::{Arc, Error, Leaf, PackedLeaf, UpdateMap, Value}; use arbitrary::Arbitrary; use derivative::Derivative; use ethereum_hashing::{hash32_concat, ZERO_HASHES}; -use parking_lot::RwLock; +use parking_lot::{RwLock, RwLockUpgradableReadGuard}; use serde::{Deserialize, Serialize}; use ssz_derive::{Decode, Encode}; use std::collections::BTreeMap; @@ -531,10 +531,8 @@ impl Tree { pub fn tree_hash(&self) -> Hash256 { match self { Self::Leaf(Leaf { hash, value }) => { - // FIXME(sproul): upgradeable RwLock? - let read_lock = hash.read(); + let read_lock = hash.upgradable_read(); let existing_hash = *read_lock; - drop(read_lock); // NOTE: We re-compute the hash whenever it is non-zero. Computed hashes may // legitimately be zero, but this only occurs at the leaf level when the value is @@ -546,28 +544,80 @@ impl Tree { if !existing_hash.is_zero() { existing_hash } else { - let tree_hash = value.tree_hash_root(); - *hash.write() = tree_hash; - tree_hash + match RwLockUpgradableReadGuard::try_upgrade(read_lock) { + Ok(mut write_lock) => { + // If we successfully acquire the lock we are guaranteed to be the first and + // only thread attempting to write the hash. + let tree_hash = value.tree_hash_root(); + *write_lock = tree_hash; + tree_hash + } + Err(lock) => { + // Another thread is holding a lock. Drop the lock and attempt to + // acquire a new one. This will avoid a deadlock. + RwLockUpgradableReadGuard::unlock_fair(lock); + let mut write_lock = hash.write(); + + // Since we just acquired the write lock normally, another thread may have + // just finished computing the hash. If so, return it. + let existing_hash = *write_lock; + if !existing_hash.is_zero() { + return existing_hash; + } + + let tree_hash = value.tree_hash_root(); + *write_lock = tree_hash; + tree_hash + } + } } } Self::PackedLeaf(leaf) => leaf.tree_hash(), Self::Zero(depth) => Hash256::from_slice(&ZERO_HASHES[*depth]), Self::Node { hash, left, right } => { - let read_lock = hash.read(); + fn node_tree_hash( + left: &Arc>, + right: &Arc>, + ) -> Hash256 { + let (left_hash, right_hash) = + rayon::join(|| left.tree_hash(), || right.tree_hash()); + Hash256::from(hash32_concat(left_hash.as_bytes(), right_hash.as_bytes())) + } + + let read_lock = hash.upgradable_read(); let existing_hash = *read_lock; - drop(read_lock); if !existing_hash.is_zero() { existing_hash } else { - // Parallelism goes brrrr. - let (left_hash, right_hash) = - rayon::join(|| left.tree_hash(), || right.tree_hash()); - let tree_hash = - Hash256::from(hash32_concat(left_hash.as_bytes(), right_hash.as_bytes())); - *hash.write() = tree_hash; - tree_hash + match RwLockUpgradableReadGuard::try_upgrade(read_lock) { + Ok(mut write_lock) => { + // If we successfully acquire the lock we are guaranteed to be the first and + // only thread attempting to write the hash. + let tree_hash = node_tree_hash(left, right); + + *write_lock = tree_hash; + tree_hash + } + Err(lock) => { + // Another thread is holding a lock. Drop the lock and attempt to + // acquire a new one. This will avoid a deadlock. + RwLockUpgradableReadGuard::unlock_fair(lock); + let mut write_lock = hash.write(); + + // Since we just acquired the write lock normally, another thread may have + // just finished computing the hash. If so, return it. + let existing_hash = *write_lock; + if !existing_hash.is_zero() { + return existing_hash; + } + + let tree_hash = node_tree_hash(left, right); + + *write_lock = tree_hash; + tree_hash + } + } } } }