Skip to content

Commit

Permalink
Change safe field of Inner struct on RwLock (instead of Mutex) (
Browse files Browse the repository at this point in the history
#91)

* Change 'safe' in 'inner' on rwlock

* Fix naming and change write lock on read one
  • Loading branch information
Justarone authored Mar 30, 2021
1 parent 9e28c63 commit 7f77fc0
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 28 deletions.
51 changes: 25 additions & 26 deletions src/storage/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct Storage<K> {
#[derive(Debug)]
pub(crate) struct Inner {
pub(crate) config: Config,
pub(crate) safe: Arc<Mutex<Safe>>,
pub(crate) safe: Arc<RwLock<Safe>>,
next_blob_id: Arc<AtomicUsize>,
twins_count: Arc<AtomicUsize>,
pub(crate) ioring: Option<Rio>,
Expand Down Expand Up @@ -198,7 +198,7 @@ impl<K> Storage<K> {
}
let record = Record::create(&key, value, meta.unwrap_or_default())
.with_context(|| "storage write with record creation failed")?;
let mut safe = self.inner.safe.lock().await;
let mut safe = self.inner.safe.write().await;
let blob = safe
.active_blob
.as_mut()
Expand Down Expand Up @@ -263,7 +263,7 @@ impl<K> Storage<K> {
pub async fn read_all(&self, key: &impl Key) -> Result<Vec<Entry>> {
let key = key.as_ref();
let mut all_entries = Vec::new();
let safe = self.inner.safe.lock().await;
let safe = self.inner.safe.read().await;
let active_blob = safe
.active_blob
.as_ref()
Expand Down Expand Up @@ -294,9 +294,9 @@ impl<K> Storage<K> {

async fn read_with_optional_meta(&self, key: impl Key, meta: Option<&Meta>) -> Result<Vec<u8>> {
debug!("storage read with optional meta {:?}, {:?}", key, meta);
let inner = self.inner.safe.lock().await;
let safe = self.inner.safe.read().await;
let key = key.as_ref();
let active_blob_read_res = inner
let active_blob_read_res = safe
.active_blob
.as_ref()
.ok_or_else(Error::active_blob_not_set)?
Expand All @@ -310,13 +310,13 @@ impl<K> Storage<K> {
}
Err(e) => {
debug!("read with optional meta active blob returned: {:#?}", e);
Self::get_any_data(&inner, key, meta).await
Self::get_any_data(&safe, key, meta).await
}
}
}

async fn get_any_data(inner: &Safe, key: &[u8], meta: Option<&Meta>) -> Result<Vec<u8>> {
let blobs = inner.blobs.read().await;
async fn get_any_data(safe: &Safe, key: &[u8], meta: Option<&Meta>) -> Result<Vec<u8>> {
let blobs = safe.blobs.read().await;
let stream: FuturesUnordered<_> =
blobs.iter().map(|blob| blob.read_any(key, meta)).collect();
debug!("read with optional meta {} closed blobs", stream.len());
Expand All @@ -331,7 +331,7 @@ impl<K> Storage<K> {
/// # Errors
/// Fails because of any IO errors
pub async fn close(self) -> Result<()> {
let mut safe = self.inner.safe.lock().await;
let mut safe = self.inner.safe.write().await;
let active_blob = safe.active_blob.take();
if let Some(mut blob) = active_blob {
blob.dump()
Expand Down Expand Up @@ -359,7 +359,7 @@ impl<K> Storage<K> {
/// assert_eq!(storage.blobs_count(), 1);
/// ```
pub async fn blobs_count(&self) -> usize {
let safe = self.inner.safe.lock().await;
let safe = self.inner.safe.read().await;
let count = safe.blobs.read().await.len();
if safe.active_blob.is_some() {
count + 1
Expand All @@ -370,7 +370,7 @@ impl<K> Storage<K> {

/// `index_memory` returns the amount of memory used by blob to store indices
pub async fn index_memory(&self) -> usize {
let safe = self.inner.safe.lock().await;
let safe = self.inner.safe.read().await;
if let Some(ablob) = safe.active_blob.as_ref() {
ablob.index_memory()
} else {
Expand Down Expand Up @@ -424,15 +424,14 @@ impl<K> Storage<K> {
e
})?;
debug!("{} not locked", path.display());
self.inner.safe.lock().await.lock_file = Some(lock_file);
self.inner.safe.write().await.lock_file = Some(lock_file);
Ok(())
}

async fn init_new(&mut self) -> Result<()> {
let safe_locked = self.inner.safe.lock();
let next = self.inner.next_blob_name()?;
let config = self.filter_config();
let mut safe = safe_locked.await;
let mut safe = self.inner.safe.write().await;
let blob = Blob::open_new(next, self.inner.ioring.clone(), config)
.await?
.boxed();
Expand Down Expand Up @@ -463,17 +462,17 @@ impl<K> Storage<K> {
Error::from(ErrorKind::Uninitialized)
})?
.boxed();
let mut safe_locked = self.inner.safe.lock().await;
let mut safe = self.inner.safe.write().await;
active_blob.load_index().await?;
for blob in &mut blobs {
debug!("dump all blobs except active blob");
blob.dump().await?;
}
safe_locked.active_blob = Some(active_blob);
*safe_locked.blobs.write().await = blobs;
safe.active_blob = Some(active_blob);
*safe.blobs.write().await = blobs;
self.inner
.next_blob_id
.store(safe_locked.max_id().await.map_or(0, |i| i + 1), ORD);
.store(safe.max_id().await.map_or(0, |i| i + 1), ORD);
Ok(())
}

Expand Down Expand Up @@ -537,7 +536,7 @@ impl<K> Storage<K> {
}

async fn contains_with(&self, key: &[u8], meta: Option<&Meta>) -> Result<bool> {
let inner = self.inner.safe.lock().await;
let inner = self.inner.safe.read().await;
if let Some(active_blob) = &inner.active_blob {
if active_blob.contains(key, meta).await? {
return Ok(true);
Expand All @@ -560,7 +559,7 @@ impl<K> Storage<K> {
/// In other words, `check_bloom` returns either "possibly in storage" or "definitely not".
pub async fn check_bloom(&self, key: impl Key) -> Option<bool> {
trace!("[{:?}] check in blobs bloom filter", &key.to_vec());
let inner = self.inner.safe.lock().await;
let inner = self.inner.safe.read().await;
let in_active = inner
.active_blob
.as_ref()
Expand Down Expand Up @@ -635,7 +634,7 @@ impl Inner {
fn new(config: Config, ioring: Option<Rio>) -> Self {
Self {
config,
safe: Arc::new(Mutex::new(Safe::new())),
safe: Arc::new(RwLock::new(Safe::new())),
next_blob_id: Arc::new(AtomicUsize::new(0)),
twins_count: Arc::new(AtomicUsize::new(0)),
ioring,
Expand Down Expand Up @@ -669,24 +668,24 @@ impl Inner {
}

async fn records_count(&self) -> usize {
self.safe.lock().await.records_count().await
self.safe.read().await.records_count().await
}

async fn records_count_detailed(&self) -> Vec<(usize, usize)> {
self.safe.lock().await.records_count_detailed().await
self.safe.read().await.records_count_detailed().await
}

async fn records_count_in_active_blob(&self) -> Option<usize> {
let inner = self.safe.lock().await;
let inner = self.safe.read().await;
inner.active_blob.as_ref().map(|b| b.records_count())
}

async fn fsyncdata(&self) -> IOResult<()> {
self.safe.lock().await.fsyncdata().await
self.safe.read().await.fsyncdata().await
}

pub(crate) async fn try_dump_old_blob_indexes(&mut self, sem: Arc<Semaphore>) {
self.safe.lock().await.try_dump_old_blob_indexes(sem).await;
self.safe.write().await.try_dump_old_blob_indexes(sem).await;
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/storage/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl Observer {
async fn active_blob_check(inner: Inner) -> Result<Option<Inner>> {
let (active_size, active_count) = {
trace!("await for lock");
let safe_locked = inner.safe.lock().await;
let safe_locked = inner.safe.read().await;
trace!("lock acquired");
let active_blob = safe_locked
.active_blob
Expand Down Expand Up @@ -135,7 +135,7 @@ async fn update_active_blob(inner: Inner, dump_sem: Arc<Semaphore>) -> Result<()
.boxed();
let task = inner
.safe
.lock()
.write()
.await
.replace_active_blob(new_active)
.await?;
Expand Down

0 comments on commit 7f77fc0

Please sign in to comment.