Skip to content

Commit

Permalink
Add transaction locking to fix race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsproul committed Oct 22, 2020
1 parent d21611b commit 8ebfb00
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 11 deletions.
2 changes: 2 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1630,7 +1630,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Box::new(signed_block.clone()),
));
ops.push(StoreOp::PutState(block.state_root, &state));
let txn_lock = self.store.hot_db.begin_rw_transaction();
self.store.do_atomically(ops)?;
drop(txn_lock);

// The fork choice write-lock is dropped *after* the on-disk database has been updated.
// This prevents inconsistency between the two at the expense of concurrency.
Expand Down
4 changes: 3 additions & 1 deletion beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use std::borrow::Cow;
use std::convert::TryFrom;
use std::fs;
use std::io::Write;
use store::{Error as DBError, HotColdDB, HotStateSummary, StoreOp};
use store::{Error as DBError, HotColdDB, HotStateSummary, KeyValueStore, StoreOp};
use tree_hash::TreeHash;
use types::{
BeaconBlock, BeaconState, BeaconStateError, ChainSpec, CloneConfig, EthSpec, Hash256,
Expand Down Expand Up @@ -704,6 +704,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {

// Store the state immediately, marking it as temporary, and staging the deletion
// of its temporary status as part of the larger atomic operation.
let txn_lock = chain.store.hot_db.begin_rw_transaction();
let state_already_exists =
chain.store.load_hot_state_summary(&state_root)?.is_some();

Expand All @@ -727,6 +728,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
]
};
chain.store.do_atomically(state_batch)?;
drop(txn_lock);

confirmation_db_batch.push(StoreOp::DeleteStateTemporaryFlag(state_root));

Expand Down
9 changes: 9 additions & 0 deletions beacon_node/store/src/leveldb_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ use leveldb::database::Database;
use leveldb::error::Error as LevelDBError;
use leveldb::iterator::{Iterable, KeyIterator};
use leveldb::options::{Options, ReadOptions, WriteOptions};
use parking_lot::{Mutex, MutexGuard};
use std::marker::PhantomData;
use std::path::Path;

/// A wrapped leveldb database.
pub struct LevelDB<E: EthSpec> {
db: Database<BytesKey>,
/// A mutex to synchronise sensitive read-write transactions.
transaction_mutex: Mutex<()>,
_phantom: PhantomData<E>,
}

Expand All @@ -24,9 +27,11 @@ impl<E: EthSpec> LevelDB<E> {
options.create_if_missing = true;

let db = Database::open(path, options)?;
let transaction_mutex = Mutex::new(());

Ok(Self {
db,
transaction_mutex,
_phantom: PhantomData,
})
}
Expand Down Expand Up @@ -143,6 +148,10 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
self.db.write(self.write_options(), &leveldb_batch)?;
Ok(())
}

fn begin_rw_transaction(&self) -> MutexGuard<()> {
self.transaction_mutex.lock()
}
}

impl<E: EthSpec> ItemStore<E> for LevelDB<E> {}
Expand Down
7 changes: 7 additions & 0 deletions beacon_node/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub use self::partial_beacon_state::PartialBeaconState;
pub use errors::Error;
pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer;
pub use metrics::scrape_for_metrics;
use parking_lot::MutexGuard;
pub use types::*;

pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
Expand All @@ -60,6 +61,12 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {

/// Execute either all of the operations in `batch` or none at all, returning an error.
fn do_atomically(&self, batch: Vec<KeyValueStoreOp>) -> Result<(), Error>;

/// Return a mutex guard that can be used to synchronize sensitive transactions.
///
/// This doesn't prevent other threads writing to the DB unless they also use
/// this method. In future we may implement a safer mandatory locking scheme.
fn begin_rw_transaction(&self) -> MutexGuard<()>;
}

pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec<u8> {
Expand Down
17 changes: 7 additions & 10 deletions beacon_node/store/src/memory_store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{Error, ItemStore, KeyValueStore, KeyValueStoreOp};
use parking_lot::RwLock;
use parking_lot::{Mutex, MutexGuard, RwLock};
use std::collections::HashMap;
use std::marker::PhantomData;
use types::*;
Expand All @@ -9,23 +9,16 @@ type DBHashMap = HashMap<Vec<u8>, Vec<u8>>;
/// A thread-safe `HashMap` wrapper.
pub struct MemoryStore<E: EthSpec> {
db: RwLock<DBHashMap>,
transaction_mutex: Mutex<()>,
_phantom: PhantomData<E>,
}

impl<E: EthSpec> Clone for MemoryStore<E> {
fn clone(&self) -> Self {
Self {
db: RwLock::new(self.db.read().clone()),
_phantom: PhantomData,
}
}
}

impl<E: EthSpec> MemoryStore<E> {
/// Create a new, empty database.
pub fn open() -> Self {
Self {
db: RwLock::new(HashMap::new()),
transaction_mutex: Mutex::new(()),
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -87,6 +80,10 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
}
Ok(())
}

fn begin_rw_transaction(&self) -> MutexGuard<()> {
self.transaction_mutex.lock()
}
}

impl<E: EthSpec> ItemStore<E> for MemoryStore<E> {}

0 comments on commit 8ebfb00

Please sign in to comment.