Skip to content

Commit

Permalink
Fix possible infinite readlock (#97)
Browse files Browse the repository at this point in the history
* Change all readlocks on writelocks

* Rollback to mutex

* add more traces, increase check timer

* add more trace logs

* more traces

* disable old blobs dump

* fix benchmark and lock issue

* change mutex back to rwlock

Co-authored-by: Perestoronin Pavel <[email protected]>
  • Loading branch information
idruzhitskiy and Justarone authored Apr 14, 2021
1 parent 7a36c58 commit 3fc9b2a
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 40 deletions.
11 changes: 9 additions & 2 deletions src/benchmark/bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod prelude {
generator::Generator,
statistics::{Report, Statistics},
writer::Writer,
Key128,
};
pub(crate) use clap::{App, Arg, ArgMatches};
pub(crate) use env_logger::fmt::Color;
Expand Down Expand Up @@ -86,7 +87,7 @@ async fn start_app() {
.map(|(key, data)| {
let ltx = tx.clone();
counter += 1;
writer.write(key.into(), data, ltx)
writer.write(key, data, ltx)
})
.collect();
println!(
Expand Down Expand Up @@ -205,12 +206,18 @@ fn init_logger() {
.try_init();
}
#[derive(Debug)]
struct Key128(Vec<u8>);
pub struct Key128(Vec<u8>);

impl Key for Key128 {
const LEN: u16 = 8;
}

impl AsRef<Key128> for Key128 {
fn as_ref(&self) -> &Self {
self
}
}

impl From<Vec<u8>> for Key128 {
fn from(v: Vec<u8>) -> Self {
assert_eq!(Self::LEN as usize, v.len());
Expand Down
4 changes: 2 additions & 2 deletions src/benchmark/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ impl Generator {
}
}

pub fn next(&mut self) -> Option<(Vec<u8>, Vec<u8>)> {
pub fn next(&mut self) -> Option<(Key128, Vec<u8>)> {
if self.written < self.limit * 1_000_000 {
let key = self.written.to_be_bytes().to_vec();
let data = self.value.clone();
self.written += (key.len() + data.len()) as u64;
Some((key, data))
Some((Key128(key), data))
} else {
None
}
Expand Down
12 changes: 5 additions & 7 deletions src/benchmark/writer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use super::prelude::*;
use pearl::rio;

pub struct Writer<K> {
pub struct Writer<K: Key> {
storage: Storage<K>,
}

impl<K> Writer<K> {
impl<K: Key> Writer<K> {
pub fn new(
tmp_dir: &Path,
max_blob_size: u64,
Expand All @@ -31,11 +31,9 @@ impl<K> Writer<K> {
self.storage.init().await.unwrap()
}

pub async fn write(&self, key: K, data: Vec<u8>, mut tx: Sender<Report>)
where
K: Key,
{
let mut report = Report::new(key.as_ref().len(), data.len());
pub async fn write(&self, key: impl AsRef<K>, data: Vec<u8>, mut tx: Sender<Report>) {
let kbuf: &[u8] = key.as_ref().as_ref();
let mut report = Report::new(kbuf.len(), data.len());
let now = Instant::now();
self.storage.write(key, data).await.unwrap();
debug!("write finished");
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ mod prelude {
pub(crate) use std::collections::BTreeMap;
pub(crate) const ORD: Ordering = Ordering::Relaxed;

pub(crate) use anyhow::{Result, Context as ErrorContexts};
pub(crate) use anyhow::{Context as ErrorContexts, Result};
pub(crate) use bincode::{deserialize, serialize, serialize_into, serialized_size};
pub(crate) use blob::{self, Blob, BloomConfig};
pub(crate) use crc::crc32::checksum_castagnoli as crc32;
Expand Down
23 changes: 8 additions & 15 deletions src/storage/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ const LOCK_FILE: &str = "pearl.lock";

const O_EXCL: i32 = 128;

type SaveOldBlobTask = std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>;

/// A main storage struct.
///
/// This type is clonable, cloning it will only create a new reference,
Expand Down Expand Up @@ -715,34 +713,29 @@ impl Safe {
Ok(())
}

pub(crate) async fn replace_active_blob(&mut self, blob: Box<Blob>) -> Result<SaveOldBlobTask> {
pub(crate) async fn replace_active_blob(&mut self, blob: Box<Blob>) -> Result<()> {
let old_active = self
.active_blob
.replace(blob)
.ok_or_else(Error::active_blob_not_set)?;
let blobs = self.blobs.clone();
Ok(Box::pin(async move {
let mut blobs = blobs.write().await;
blobs.push(*old_active);
// Notice: if dump of blob or indices fails, it's likely a problem with disk appeared, so
// all descriptors of the storage become invalid and unusable
// Possible solution: recreate instance of storage, when disk will be available
let last_blob = blobs.last_mut().unwrap();
if let Err(e) = last_blob.dump().await {
error!("Error dumping blob ({}): {}", last_blob.name(), e);
}
}))
self.blobs.write().await.push(*old_active);
Ok(())
}

pub(crate) async fn try_dump_old_blob_indexes(&mut self, sem: Arc<Semaphore>) {
let blobs = self.blobs.clone();
tokio::spawn(async move {
trace!("acquire blobs write to dump old blobs");
let mut write_blobs = blobs.write().await;
trace!("dump old blobs");
for blob in write_blobs.iter_mut() {
trace!("dumping old blob");
let _ = sem.acquire().await;
trace!("acquired sem for dumping old blobs");
if let Err(e) = blob.dump().await {
error!("Error dumping blob ({}): {}", blob.name(), e);
}
trace!("finished dumping old blob");
}
});
}
Expand Down
23 changes: 10 additions & 13 deletions src/storage/observer_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ impl ObserverWorker {
async fn tick(&mut self) -> Result<()> {
match timeout(self.update_interval, self.receiver.recv()).await {
Ok(Some(Msg::CloseActiveBlob)) => {
update_active_blob(self.inner.clone(), self.dump_sem.clone()).await?
update_active_blob(self.inner.clone()).await?;
self.inner
.try_dump_old_blob_indexes(self.dump_sem.clone())
.await;
}
Ok(None) => {
return Err(anyhow!(
Expand All @@ -51,13 +54,10 @@ impl ObserverWorker {
async fn try_update(&self) -> Result<()> {
trace!("try update active blob");
let inner_cloned = self.inner.clone();
if let Some(inner) = active_blob_check(inner_cloned).await? {
update_active_blob(inner, self.dump_sem.clone()).await?;
if let Some(mut inner) = active_blob_check(inner_cloned).await? {
update_active_blob(inner.clone()).await?;
inner.try_dump_old_blob_indexes(self.dump_sem.clone()).await;
}
let mut inner_mut = self.inner.clone();
inner_mut
.try_dump_old_blob_indexes(self.dump_sem.clone())
.await;
Ok(())
}
}
Expand Down Expand Up @@ -91,21 +91,18 @@ async fn active_blob_check(inner: Inner) -> Result<Option<Inner>> {
}
}

async fn update_active_blob(inner: Inner, dump_sem: Arc<Semaphore>) -> Result<()> {
async fn update_active_blob(inner: Inner) -> Result<()> {
let next_name = inner.next_blob_name()?;
// Opening a new blob may take a while
trace!("obtaining new active blob");
let new_active = Blob::open_new(next_name, inner.ioring, inner.config.filter())
.await?
.boxed();
let task = inner
inner
.safe
.write()
.await
.replace_active_blob(new_active)
.await?;
tokio::spawn(async move {
let _res = dump_sem.acquire().await.expect("semaphore is closed");
task.await;
});
Ok(())
}

0 comments on commit 3fc9b2a

Please sign in to comment.