From 454db6789a7d40f6af295da0dae455aba2a1908f Mon Sep 17 00:00:00 2001 From: Perestoronin Pavel Date: Sun, 28 Mar 2021 23:33:34 +0300 Subject: [PATCH 1/2] Change 'safe' in 'inner' on rwlock --- src/storage/core.rs | 45 ++++++++++++++++++++--------------------- src/storage/observer.rs | 4 ++-- 2 files changed, 24 insertions(+), 25 deletions(-) diff --git a/src/storage/core.rs b/src/storage/core.rs index 6d3cdc8cab..103f431033 100644 --- a/src/storage/core.rs +++ b/src/storage/core.rs @@ -43,7 +43,7 @@ pub struct Storage { #[derive(Debug)] pub(crate) struct Inner { pub(crate) config: Config, - pub(crate) safe: Arc>, + pub(crate) safe: Arc>, next_blob_id: Arc, twins_count: Arc, pub(crate) ioring: Option, @@ -198,7 +198,7 @@ impl Storage { } let record = Record::create(&key, value, meta.unwrap_or_default()) .with_context(|| "storage write with record creation failed")?; - let mut safe = self.inner.safe.lock().await; + let mut safe = self.inner.safe.write().await; let blob = safe .active_blob .as_mut() @@ -263,7 +263,7 @@ impl Storage { pub async fn read_all(&self, key: &impl Key) -> Result> { let key = key.as_ref(); let mut all_entries = Vec::new(); - let safe = self.inner.safe.lock().await; + let safe = self.inner.safe.read().await; let active_blob = safe .active_blob .as_ref() @@ -294,9 +294,9 @@ impl Storage { async fn read_with_optional_meta(&self, key: impl Key, meta: Option<&Meta>) -> Result> { debug!("storage read with optional meta {:?}, {:?}", key, meta); - let inner = self.inner.safe.lock().await; + let safe = self.inner.safe.read().await; let key = key.as_ref(); - let active_blob_read_res = inner + let active_blob_read_res = safe .active_blob .as_ref() .ok_or_else(Error::active_blob_not_set)? @@ -310,13 +310,13 @@ impl Storage { } Err(e) => { debug!("read with optional meta active blob returned: {:#?}", e); - Self::get_any_data(&inner, key, meta).await + Self::get_any_data(&safe, key, meta).await } } } - async fn get_any_data(inner: &Safe, key: &[u8], meta: Option<&Meta>) -> Result> { - let blobs = inner.blobs.read().await; + async fn get_any_data(safe: &Safe, key: &[u8], meta: Option<&Meta>) -> Result> { + let blobs = safe.blobs.read().await; let stream: FuturesUnordered<_> = blobs.iter().map(|blob| blob.read_any(key, meta)).collect(); debug!("read with optional meta {} closed blobs", stream.len()); @@ -331,7 +331,7 @@ impl Storage { /// # Errors /// Fails because of any IO errors pub async fn close(self) -> Result<()> { - let mut safe = self.inner.safe.lock().await; + let mut safe = self.inner.safe.write().await; let active_blob = safe.active_blob.take(); if let Some(mut blob) = active_blob { blob.dump() @@ -359,7 +359,7 @@ impl Storage { /// assert_eq!(storage.blobs_count(), 1); /// ``` pub async fn blobs_count(&self) -> usize { - let safe = self.inner.safe.lock().await; + let safe = self.inner.safe.read().await; let count = safe.blobs.read().await.len(); if safe.active_blob.is_some() { count + 1 @@ -370,7 +370,7 @@ impl Storage { /// `index_memory` returns the amount of memory used by blob to store indices pub async fn index_memory(&self) -> usize { - let safe = self.inner.safe.lock().await; + let safe = self.inner.safe.read().await; if let Some(ablob) = safe.active_blob.as_ref() { ablob.index_memory() } else { @@ -424,15 +424,14 @@ impl Storage { e })?; debug!("{} not locked", path.display()); - self.inner.safe.lock().await.lock_file = Some(lock_file); + self.inner.safe.write().await.lock_file = Some(lock_file); Ok(()) } async fn init_new(&mut self) -> Result<()> { - let safe_locked = self.inner.safe.lock(); let next = self.inner.next_blob_name()?; let config = self.filter_config(); - let mut safe = safe_locked.await; + let mut safe = self.inner.safe.write().await; let blob = Blob::open_new(next, self.inner.ioring.clone(), config) .await? .boxed(); @@ -463,7 +462,7 @@ impl Storage { Error::from(ErrorKind::Uninitialized) })? .boxed(); - let mut safe_locked = self.inner.safe.lock().await; + let mut safe_locked = self.inner.safe.write().await; active_blob.load_index().await?; for blob in &mut blobs { debug!("dump all blobs except active blob"); @@ -537,7 +536,7 @@ impl Storage { } async fn contains_with(&self, key: &[u8], meta: Option<&Meta>) -> Result { - let inner = self.inner.safe.lock().await; + let inner = self.inner.safe.write().await; if let Some(active_blob) = &inner.active_blob { if active_blob.contains(key, meta).await? { return Ok(true); @@ -560,7 +559,7 @@ impl Storage { /// In other words, `check_bloom` returns either "possibly in storage" or "definitely not". pub async fn check_bloom(&self, key: impl Key) -> Option { trace!("[{:?}] check in blobs bloom filter", &key.to_vec()); - let inner = self.inner.safe.lock().await; + let inner = self.inner.safe.read().await; let in_active = inner .active_blob .as_ref() @@ -635,7 +634,7 @@ impl Inner { fn new(config: Config, ioring: Option) -> Self { Self { config, - safe: Arc::new(Mutex::new(Safe::new())), + safe: Arc::new(RwLock::new(Safe::new())), next_blob_id: Arc::new(AtomicUsize::new(0)), twins_count: Arc::new(AtomicUsize::new(0)), ioring, @@ -669,24 +668,24 @@ impl Inner { } async fn records_count(&self) -> usize { - self.safe.lock().await.records_count().await + self.safe.read().await.records_count().await } async fn records_count_detailed(&self) -> Vec<(usize, usize)> { - self.safe.lock().await.records_count_detailed().await + self.safe.read().await.records_count_detailed().await } async fn records_count_in_active_blob(&self) -> Option { - let inner = self.safe.lock().await; + let inner = self.safe.read().await; inner.active_blob.as_ref().map(|b| b.records_count()) } async fn fsyncdata(&self) -> IOResult<()> { - self.safe.lock().await.fsyncdata().await + self.safe.read().await.fsyncdata().await } pub(crate) async fn try_dump_old_blob_indexes(&mut self, sem: Arc) { - self.safe.lock().await.try_dump_old_blob_indexes(sem).await; + self.safe.write().await.try_dump_old_blob_indexes(sem).await; } } diff --git a/src/storage/observer.rs b/src/storage/observer.rs index e42aad6b0a..436e4f8b29 100644 --- a/src/storage/observer.rs +++ b/src/storage/observer.rs @@ -101,7 +101,7 @@ impl Observer { async fn active_blob_check(inner: Inner) -> Result> { let (active_size, active_count) = { trace!("await for lock"); - let safe_locked = inner.safe.lock().await; + let safe_locked = inner.safe.read().await; trace!("lock acquired"); let active_blob = safe_locked .active_blob @@ -135,7 +135,7 @@ async fn update_active_blob(inner: Inner, dump_sem: Arc) -> Result<() .boxed(); let task = inner .safe - .lock() + .write() .await .replace_active_blob(new_active) .await?; From d3343b0639afea7dfbf2c0f97d5ce1716f61a5ec Mon Sep 17 00:00:00 2001 From: Perestoronin Pavel Date: Tue, 30 Mar 2021 08:00:33 +0300 Subject: [PATCH 2/2] Fix naming and change write lock on read one --- src/storage/core.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/storage/core.rs b/src/storage/core.rs index 103f431033..76eede8fba 100644 --- a/src/storage/core.rs +++ b/src/storage/core.rs @@ -462,17 +462,17 @@ impl Storage { Error::from(ErrorKind::Uninitialized) })? .boxed(); - let mut safe_locked = self.inner.safe.write().await; + let mut safe = self.inner.safe.write().await; active_blob.load_index().await?; for blob in &mut blobs { debug!("dump all blobs except active blob"); blob.dump().await?; } - safe_locked.active_blob = Some(active_blob); - *safe_locked.blobs.write().await = blobs; + safe.active_blob = Some(active_blob); + *safe.blobs.write().await = blobs; self.inner .next_blob_id - .store(safe_locked.max_id().await.map_or(0, |i| i + 1), ORD); + .store(safe.max_id().await.map_or(0, |i| i + 1), ORD); Ok(()) } @@ -536,7 +536,7 @@ impl Storage { } async fn contains_with(&self, key: &[u8], meta: Option<&Meta>) -> Result { - let inner = self.inner.safe.write().await; + let inner = self.inner.safe.read().await; if let Some(active_blob) = &inner.active_blob { if active_blob.contains(key, meta).await? { return Ok(true);