Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change safe field of Inner struct on RwLock (instead of Mutex) #91

Merged
merged 2 commits into from
Mar 30, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 22 additions & 23 deletions src/storage/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct Storage<K> {
#[derive(Debug)]
pub(crate) struct Inner {
pub(crate) config: Config,
pub(crate) safe: Arc<Mutex<Safe>>,
pub(crate) safe: Arc<RwLock<Safe>>,
next_blob_id: Arc<AtomicUsize>,
twins_count: Arc<AtomicUsize>,
pub(crate) ioring: Option<Rio>,
Expand Down Expand Up @@ -198,7 +198,7 @@ impl<K> Storage<K> {
}
let record = Record::create(&key, value, meta.unwrap_or_default())
.with_context(|| "storage write with record creation failed")?;
let mut safe = self.inner.safe.lock().await;
let mut safe = self.inner.safe.write().await;
let blob = safe
.active_blob
.as_mut()
Expand Down Expand Up @@ -263,7 +263,7 @@ impl<K> Storage<K> {
pub async fn read_all(&self, key: &impl Key) -> Result<Vec<Entry>> {
let key = key.as_ref();
let mut all_entries = Vec::new();
let safe = self.inner.safe.lock().await;
let safe = self.inner.safe.read().await;
let active_blob = safe
.active_blob
.as_ref()
Expand Down Expand Up @@ -294,9 +294,9 @@ impl<K> Storage<K> {

async fn read_with_optional_meta(&self, key: impl Key, meta: Option<&Meta>) -> Result<Vec<u8>> {
debug!("storage read with optional meta {:?}, {:?}", key, meta);
let inner = self.inner.safe.lock().await;
let safe = self.inner.safe.read().await;
let key = key.as_ref();
let active_blob_read_res = inner
let active_blob_read_res = safe
.active_blob
.as_ref()
.ok_or_else(Error::active_blob_not_set)?
Expand All @@ -310,13 +310,13 @@ impl<K> Storage<K> {
}
Err(e) => {
debug!("read with optional meta active blob returned: {:#?}", e);
Self::get_any_data(&inner, key, meta).await
Self::get_any_data(&safe, key, meta).await
}
}
}

async fn get_any_data(inner: &Safe, key: &[u8], meta: Option<&Meta>) -> Result<Vec<u8>> {
let blobs = inner.blobs.read().await;
async fn get_any_data(safe: &Safe, key: &[u8], meta: Option<&Meta>) -> Result<Vec<u8>> {
let blobs = safe.blobs.read().await;
let stream: FuturesUnordered<_> =
blobs.iter().map(|blob| blob.read_any(key, meta)).collect();
debug!("read with optional meta {} closed blobs", stream.len());
Expand All @@ -331,7 +331,7 @@ impl<K> Storage<K> {
/// # Errors
/// Fails because of any IO errors
pub async fn close(self) -> Result<()> {
let mut safe = self.inner.safe.lock().await;
let mut safe = self.inner.safe.write().await;
let active_blob = safe.active_blob.take();
if let Some(mut blob) = active_blob {
blob.dump()
Expand Down Expand Up @@ -359,7 +359,7 @@ impl<K> Storage<K> {
/// assert_eq!(storage.blobs_count(), 1);
/// ```
pub async fn blobs_count(&self) -> usize {
let safe = self.inner.safe.lock().await;
let safe = self.inner.safe.read().await;
let count = safe.blobs.read().await.len();
if safe.active_blob.is_some() {
count + 1
Expand All @@ -370,7 +370,7 @@ impl<K> Storage<K> {

/// `index_memory` returns the amount of memory used by blob to store indices
pub async fn index_memory(&self) -> usize {
let safe = self.inner.safe.lock().await;
let safe = self.inner.safe.read().await;
if let Some(ablob) = safe.active_blob.as_ref() {
ablob.index_memory()
} else {
Expand Down Expand Up @@ -424,15 +424,14 @@ impl<K> Storage<K> {
e
})?;
debug!("{} not locked", path.display());
self.inner.safe.lock().await.lock_file = Some(lock_file);
self.inner.safe.write().await.lock_file = Some(lock_file);
Ok(())
}

async fn init_new(&mut self) -> Result<()> {
let safe_locked = self.inner.safe.lock();
let next = self.inner.next_blob_name()?;
let config = self.filter_config();
let mut safe = safe_locked.await;
let mut safe = self.inner.safe.write().await;
let blob = Blob::open_new(next, self.inner.ioring.clone(), config)
.await?
.boxed();
Expand Down Expand Up @@ -463,7 +462,7 @@ impl<K> Storage<K> {
Error::from(ErrorKind::Uninitialized)
})?
.boxed();
let mut safe_locked = self.inner.safe.lock().await;
let mut safe_locked = self.inner.safe.write().await;
Justarone marked this conversation as resolved.
Show resolved Hide resolved
active_blob.load_index().await?;
for blob in &mut blobs {
debug!("dump all blobs except active blob");
Expand Down Expand Up @@ -537,7 +536,7 @@ impl<K> Storage<K> {
}

async fn contains_with(&self, key: &[u8], meta: Option<&Meta>) -> Result<bool> {
let inner = self.inner.safe.lock().await;
let inner = self.inner.safe.write().await;
Justarone marked this conversation as resolved.
Show resolved Hide resolved
if let Some(active_blob) = &inner.active_blob {
if active_blob.contains(key, meta).await? {
return Ok(true);
Expand All @@ -560,7 +559,7 @@ impl<K> Storage<K> {
/// In other words, `check_bloom` returns either "possibly in storage" or "definitely not".
pub async fn check_bloom(&self, key: impl Key) -> Option<bool> {
trace!("[{:?}] check in blobs bloom filter", &key.to_vec());
let inner = self.inner.safe.lock().await;
let inner = self.inner.safe.read().await;
let in_active = inner
.active_blob
.as_ref()
Expand Down Expand Up @@ -635,7 +634,7 @@ impl Inner {
fn new(config: Config, ioring: Option<Rio>) -> Self {
Self {
config,
safe: Arc::new(Mutex::new(Safe::new())),
safe: Arc::new(RwLock::new(Safe::new())),
next_blob_id: Arc::new(AtomicUsize::new(0)),
twins_count: Arc::new(AtomicUsize::new(0)),
ioring,
Expand Down Expand Up @@ -669,24 +668,24 @@ impl Inner {
}

async fn records_count(&self) -> usize {
self.safe.lock().await.records_count().await
self.safe.read().await.records_count().await
}

async fn records_count_detailed(&self) -> Vec<(usize, usize)> {
self.safe.lock().await.records_count_detailed().await
self.safe.read().await.records_count_detailed().await
}

async fn records_count_in_active_blob(&self) -> Option<usize> {
let inner = self.safe.lock().await;
let inner = self.safe.read().await;
inner.active_blob.as_ref().map(|b| b.records_count())
}

async fn fsyncdata(&self) -> IOResult<()> {
self.safe.lock().await.fsyncdata().await
self.safe.read().await.fsyncdata().await
}

pub(crate) async fn try_dump_old_blob_indexes(&mut self, sem: Arc<Semaphore>) {
self.safe.lock().await.try_dump_old_blob_indexes(sem).await;
self.safe.write().await.try_dump_old_blob_indexes(sem).await;
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/storage/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl Observer {
async fn active_blob_check(inner: Inner) -> Result<Option<Inner>> {
let (active_size, active_count) = {
trace!("await for lock");
let safe_locked = inner.safe.lock().await;
let safe_locked = inner.safe.read().await;
trace!("lock acquired");
let active_blob = safe_locked
.active_blob
Expand Down Expand Up @@ -135,7 +135,7 @@ async fn update_active_blob(inner: Inner, dump_sem: Arc<Semaphore>) -> Result<()
.boxed();
let task = inner
.safe
.lock()
.write()
.await
.replace_active_blob(new_active)
.await?;
Expand Down