Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use upgradable RwLocks #25

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 40 additions & 13 deletions src/packed_leaf.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{utils::arb_rwlock, Error, UpdateMap};
use arbitrary::Arbitrary;
use derivative::Derivative;
use parking_lot::RwLock;
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use std::ops::ControlFlow;
use tree_hash::{Hash256, TreeHash, BYTES_PER_CHUNK};

Expand All @@ -27,27 +27,54 @@ where
}

impl<T: TreeHash + Clone> PackedLeaf<T> {
pub fn tree_hash(&self) -> Hash256 {
let read_lock = self.hash.read();
let mut hash = *read_lock;
drop(read_lock);

if !hash.is_zero() {
return hash;
}

fn compute_hash(&self, mut hash: Hash256) -> Hash256 {
let hash_bytes = hash.as_bytes_mut();

let value_len = BYTES_PER_CHUNK / T::tree_hash_packing_factor();
for (i, value) in self.values.iter().enumerate() {
hash_bytes[i * value_len..(i + 1) * value_len]
.copy_from_slice(&value.tree_hash_packed_encoding());
}

*self.hash.write() = hash;
hash
}

pub fn tree_hash(&self) -> Hash256 {
let read_lock = self.hash.upgradable_read();
let hash = *read_lock;

if !hash.is_zero() {
hash
} else {
match RwLockUpgradableReadGuard::try_upgrade(read_lock) {
Ok(mut write_lock) => {
// If we successfully acquire the lock we are guaranteed to be the first and
// only thread attempting to write the hash.
let tree_hash = self.compute_hash(hash);

*write_lock = tree_hash;
tree_hash
}
Err(lock) => {
// Another thread is holding a lock. Drop the lock and attempt to
// acquire a new one. This will avoid a deadlock.
RwLockUpgradableReadGuard::unlock_fair(lock);
let mut write_lock = self.hash.write();

// Since we just acquired the write lock normally, another thread may have
// just finished computing the hash. If so, return it.
let existing_hash = *write_lock;
if !existing_hash.is_zero() {
return existing_hash;
}

let tree_hash = self.compute_hash(hash);

*write_lock = tree_hash;
tree_hash
}
}
}
}

pub fn empty() -> Self {
PackedLeaf {
hash: RwLock::new(Hash256::zero()),
Expand Down
82 changes: 66 additions & 16 deletions src/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{Arc, Error, Leaf, PackedLeaf, UpdateMap, Value};
use arbitrary::Arbitrary;
use derivative::Derivative;
use ethereum_hashing::{hash32_concat, ZERO_HASHES};
use parking_lot::RwLock;
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use serde::{Deserialize, Serialize};
use ssz_derive::{Decode, Encode};
use std::collections::BTreeMap;
Expand Down Expand Up @@ -531,10 +531,8 @@ impl<T: Value + Send + Sync> Tree<T> {
pub fn tree_hash(&self) -> Hash256 {
match self {
Self::Leaf(Leaf { hash, value }) => {
// FIXME(sproul): upgradeable RwLock?
let read_lock = hash.read();
let read_lock = hash.upgradable_read();
let existing_hash = *read_lock;
drop(read_lock);

// NOTE: We re-compute the hash whenever it is non-zero. Computed hashes may
// legitimately be zero, but this only occurs at the leaf level when the value is
Expand All @@ -546,28 +544,80 @@ impl<T: Value + Send + Sync> Tree<T> {
if !existing_hash.is_zero() {
existing_hash
} else {
let tree_hash = value.tree_hash_root();
*hash.write() = tree_hash;
tree_hash
match RwLockUpgradableReadGuard::try_upgrade(read_lock) {
Ok(mut write_lock) => {
// If we successfully acquire the lock we are guaranteed to be the first and
// only thread attempting to write the hash.
let tree_hash = value.tree_hash_root();
*write_lock = tree_hash;
tree_hash
}
Err(lock) => {
// Another thread is holding a lock. Drop the lock and attempt to
// acquire a new one. This will avoid a deadlock.
RwLockUpgradableReadGuard::unlock_fair(lock);
let mut write_lock = hash.write();

// Since we just acquired the write lock normally, another thread may have
// just finished computing the hash. If so, return it.
let existing_hash = *write_lock;
if !existing_hash.is_zero() {
return existing_hash;
}

let tree_hash = value.tree_hash_root();
*write_lock = tree_hash;
tree_hash
}
}
}
}
Self::PackedLeaf(leaf) => leaf.tree_hash(),
Self::Zero(depth) => Hash256::from_slice(&ZERO_HASHES[*depth]),
Self::Node { hash, left, right } => {
let read_lock = hash.read();
fn node_tree_hash<T: Value + Send + Sync>(
left: &Arc<Tree<T>>,
right: &Arc<Tree<T>>,
) -> Hash256 {
let (left_hash, right_hash) =
rayon::join(|| left.tree_hash(), || right.tree_hash());
Hash256::from(hash32_concat(left_hash.as_bytes(), right_hash.as_bytes()))
}

let read_lock = hash.upgradable_read();
let existing_hash = *read_lock;
drop(read_lock);

if !existing_hash.is_zero() {
existing_hash
} else {
// Parallelism goes brrrr.
let (left_hash, right_hash) =
rayon::join(|| left.tree_hash(), || right.tree_hash());
let tree_hash =
Hash256::from(hash32_concat(left_hash.as_bytes(), right_hash.as_bytes()));
*hash.write() = tree_hash;
tree_hash
match RwLockUpgradableReadGuard::try_upgrade(read_lock) {
Ok(mut write_lock) => {
// If we successfully acquire the lock we are guaranteed to be the first and
// only thread attempting to write the hash.
let tree_hash = node_tree_hash(left, right);

*write_lock = tree_hash;
tree_hash
}
Err(lock) => {
// Another thread is holding a lock. Drop the lock and attempt to
// acquire a new one. This will avoid a deadlock.
RwLockUpgradableReadGuard::unlock_fair(lock);
let mut write_lock = hash.write();

// Since we just acquired the write lock normally, another thread may have
// just finished computing the hash. If so, return it.
let existing_hash = *write_lock;
if !existing_hash.is_zero() {
return existing_hash;
}

let tree_hash = node_tree_hash(left, right);

*write_lock = tree_hash;
tree_hash
}
}
}
}
}
Expand Down