diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 34e845442d7..a4568a9616c 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1630,7 +1630,9 @@ impl BeaconChain { Box::new(signed_block.clone()), )); ops.push(StoreOp::PutState(block.state_root, &state)); + let txn_lock = self.store.hot_db.begin_rw_transaction(); self.store.do_atomically(ops)?; + drop(txn_lock); // The fork choice write-lock is dropped *after* the on-disk database has been updated. // This prevents inconsistency between the two at the expense of concurrency. diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 44dc5eb9f34..aaa0425dd16 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -63,7 +63,7 @@ use std::borrow::Cow; use std::convert::TryFrom; use std::fs; use std::io::Write; -use store::{Error as DBError, HotColdDB, HotStateSummary, StoreOp}; +use store::{Error as DBError, HotColdDB, HotStateSummary, KeyValueStore, StoreOp}; use tree_hash::TreeHash; use types::{ BeaconBlock, BeaconState, BeaconStateError, ChainSpec, CloneConfig, EthSpec, Hash256, @@ -704,6 +704,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { // Store the state immediately, marking it as temporary, and staging the deletion // of its temporary status as part of the larger atomic operation. + let txn_lock = chain.store.hot_db.begin_rw_transaction(); let state_already_exists = chain.store.load_hot_state_summary(&state_root)?.is_some(); @@ -727,6 +728,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { ] }; chain.store.do_atomically(state_batch)?; + drop(txn_lock); confirmation_db_batch.push(StoreOp::DeleteStateTemporaryFlag(state_root)); diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index b99d66adb14..fc0dea487d7 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -7,12 +7,15 @@ use leveldb::database::Database; use leveldb::error::Error as LevelDBError; use leveldb::iterator::{Iterable, KeyIterator}; use leveldb::options::{Options, ReadOptions, WriteOptions}; +use parking_lot::{Mutex, MutexGuard}; use std::marker::PhantomData; use std::path::Path; /// A wrapped leveldb database. pub struct LevelDB { db: Database, + /// A mutex to synchronise sensitive read-write transactions. + transaction_mutex: Mutex<()>, _phantom: PhantomData, } @@ -24,9 +27,11 @@ impl LevelDB { options.create_if_missing = true; let db = Database::open(path, options)?; + let transaction_mutex = Mutex::new(()); Ok(Self { db, + transaction_mutex, _phantom: PhantomData, }) } @@ -143,6 +148,10 @@ impl KeyValueStore for LevelDB { self.db.write(self.write_options(), &leveldb_batch)?; Ok(()) } + + fn begin_rw_transaction(&self) -> MutexGuard<()> { + self.transaction_mutex.lock() + } } impl ItemStore for LevelDB {} diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 58be69818e3..38411d9a180 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -35,6 +35,7 @@ pub use self::partial_beacon_state::PartialBeaconState; pub use errors::Error; pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer; pub use metrics::scrape_for_metrics; +use parking_lot::MutexGuard; pub use types::*; pub trait KeyValueStore: Sync + Send + Sized + 'static { @@ -60,6 +61,12 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { /// Execute either all of the operations in `batch` or none at all, returning an error. fn do_atomically(&self, batch: Vec) -> Result<(), Error>; + + /// Return a mutex guard that can be used to synchronize sensitive transactions. + /// + /// This doesn't prevent other threads writing to the DB unless they also use + /// this method. In future we may implement a safer mandatory locking scheme. + fn begin_rw_transaction(&self) -> MutexGuard<()>; } pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 19308c86d20..2df503965d0 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -1,5 +1,5 @@ use super::{Error, ItemStore, KeyValueStore, KeyValueStoreOp}; -use parking_lot::RwLock; +use parking_lot::{Mutex, MutexGuard, RwLock}; use std::collections::HashMap; use std::marker::PhantomData; use types::*; @@ -9,23 +9,16 @@ type DBHashMap = HashMap, Vec>; /// A thread-safe `HashMap` wrapper. pub struct MemoryStore { db: RwLock, + transaction_mutex: Mutex<()>, _phantom: PhantomData, } -impl Clone for MemoryStore { - fn clone(&self) -> Self { - Self { - db: RwLock::new(self.db.read().clone()), - _phantom: PhantomData, - } - } -} - impl MemoryStore { /// Create a new, empty database. pub fn open() -> Self { Self { db: RwLock::new(HashMap::new()), + transaction_mutex: Mutex::new(()), _phantom: PhantomData, } } @@ -87,6 +80,10 @@ impl KeyValueStore for MemoryStore { } Ok(()) } + + fn begin_rw_transaction(&self) -> MutexGuard<()> { + self.transaction_mutex.lock() + } } impl ItemStore for MemoryStore {}