Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix possible infinite readlock #97

Merged
merged 9 commits into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/benchmark/bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod prelude {
generator::Generator,
statistics::{Report, Statistics},
writer::Writer,
Key128,
};
pub(crate) use clap::{App, Arg, ArgMatches};
pub(crate) use env_logger::fmt::Color;
Expand Down Expand Up @@ -86,7 +87,7 @@ async fn start_app() {
.map(|(key, data)| {
let ltx = tx.clone();
counter += 1;
writer.write(key.into(), data, ltx)
writer.write(key, data, ltx)
})
.collect();
println!(
Expand Down Expand Up @@ -205,12 +206,18 @@ fn init_logger() {
.try_init();
}
#[derive(Debug)]
struct Key128(Vec<u8>);
pub struct Key128(Vec<u8>);

impl Key for Key128 {
const LEN: u16 = 8;
}

impl AsRef<Key128> for Key128 {
fn as_ref(&self) -> &Self {
self
}
}

impl From<Vec<u8>> for Key128 {
fn from(v: Vec<u8>) -> Self {
assert_eq!(Self::LEN as usize, v.len());
Expand Down
4 changes: 2 additions & 2 deletions src/benchmark/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ impl Generator {
}
}

pub fn next(&mut self) -> Option<(Vec<u8>, Vec<u8>)> {
pub fn next(&mut self) -> Option<(Key128, Vec<u8>)> {
if self.written < self.limit * 1_000_000 {
let key = self.written.to_be_bytes().to_vec();
let data = self.value.clone();
self.written += (key.len() + data.len()) as u64;
Some((key, data))
Some((Key128(key), data))
} else {
None
}
Expand Down
12 changes: 5 additions & 7 deletions src/benchmark/writer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use super::prelude::*;
use pearl::rio;

pub struct Writer<K> {
pub struct Writer<K: Key> {
storage: Storage<K>,
}

impl<K> Writer<K> {
impl<K: Key> Writer<K> {
pub fn new(
tmp_dir: &Path,
max_blob_size: u64,
Expand All @@ -31,11 +31,9 @@ impl<K> Writer<K> {
self.storage.init().await.unwrap()
}

pub async fn write(&self, key: K, data: Vec<u8>, mut tx: Sender<Report>)
where
K: Key,
{
let mut report = Report::new(key.as_ref().len(), data.len());
pub async fn write(&self, key: impl AsRef<K>, data: Vec<u8>, mut tx: Sender<Report>) {
let kbuf: &[u8] = key.as_ref().as_ref();
let mut report = Report::new(kbuf.len(), data.len());
let now = Instant::now();
self.storage.write(key, data).await.unwrap();
debug!("write finished");
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ mod prelude {
pub(crate) use std::collections::BTreeMap;
pub(crate) const ORD: Ordering = Ordering::Relaxed;

pub(crate) use anyhow::{Result, Context as ErrorContexts};
pub(crate) use anyhow::{Context as ErrorContexts, Result};
pub(crate) use bincode::{deserialize, serialize, serialize_into, serialized_size};
pub(crate) use blob::{self, Blob, BloomConfig};
pub(crate) use crc::crc32::checksum_castagnoli as crc32;
Expand Down
23 changes: 8 additions & 15 deletions src/storage/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ const LOCK_FILE: &str = "pearl.lock";

const O_EXCL: i32 = 128;

type SaveOldBlobTask = std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>;

/// A main storage struct.
///
/// This type is clonable, cloning it will only create a new reference,
Expand Down Expand Up @@ -715,34 +713,29 @@ impl Safe {
Ok(())
}

pub(crate) async fn replace_active_blob(&mut self, blob: Box<Blob>) -> Result<SaveOldBlobTask> {
pub(crate) async fn replace_active_blob(&mut self, blob: Box<Blob>) -> Result<()> {
let old_active = self
.active_blob
.replace(blob)
.ok_or_else(Error::active_blob_not_set)?;
let blobs = self.blobs.clone();
Ok(Box::pin(async move {
let mut blobs = blobs.write().await;
blobs.push(*old_active);
// Notice: if dump of blob or indices fails, it's likely a problem with disk appeared, so
// all descriptors of the storage become invalid and unusable
// Possible solution: recreate instance of storage, when disk will be available
let last_blob = blobs.last_mut().unwrap();
if let Err(e) = last_blob.dump().await {
error!("Error dumping blob ({}): {}", last_blob.name(), e);
}
}))
self.blobs.write().await.push(*old_active);
Ok(())
}

pub(crate) async fn try_dump_old_blob_indexes(&mut self, sem: Arc<Semaphore>) {
let blobs = self.blobs.clone();
tokio::spawn(async move {
trace!("acquire blobs write to dump old blobs");
let mut write_blobs = blobs.write().await;
trace!("dump old blobs");
for blob in write_blobs.iter_mut() {
trace!("dumping old blob");
let _ = sem.acquire().await;
trace!("acquired sem for dumping old blobs");
if let Err(e) = blob.dump().await {
error!("Error dumping blob ({}): {}", blob.name(), e);
}
trace!("finished dumping old blob");
}
});
}
Expand Down
23 changes: 10 additions & 13 deletions src/storage/observer_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ impl ObserverWorker {
async fn tick(&mut self) -> Result<()> {
match timeout(self.update_interval, self.receiver.recv()).await {
Ok(Some(Msg::CloseActiveBlob)) => {
update_active_blob(self.inner.clone(), self.dump_sem.clone()).await?
update_active_blob(self.inner.clone()).await?;
self.inner
.try_dump_old_blob_indexes(self.dump_sem.clone())
.await;
}
Ok(None) => {
return Err(anyhow!(
Expand All @@ -51,13 +54,10 @@ impl ObserverWorker {
async fn try_update(&self) -> Result<()> {
trace!("try update active blob");
let inner_cloned = self.inner.clone();
if let Some(inner) = active_blob_check(inner_cloned).await? {
update_active_blob(inner, self.dump_sem.clone()).await?;
if let Some(mut inner) = active_blob_check(inner_cloned).await? {
update_active_blob(inner.clone()).await?;
inner.try_dump_old_blob_indexes(self.dump_sem.clone()).await;
}
let mut inner_mut = self.inner.clone();
inner_mut
.try_dump_old_blob_indexes(self.dump_sem.clone())
.await;
Ok(())
}
}
Expand Down Expand Up @@ -91,21 +91,18 @@ async fn active_blob_check(inner: Inner) -> Result<Option<Inner>> {
}
}

async fn update_active_blob(inner: Inner, dump_sem: Arc<Semaphore>) -> Result<()> {
async fn update_active_blob(inner: Inner) -> Result<()> {
let next_name = inner.next_blob_name()?;
// Opening a new blob may take a while
trace!("obtaining new active blob");
let new_active = Blob::open_new(next_name, inner.ioring, inner.config.filter())
.await?
.boxed();
let task = inner
inner
.safe
.write()
.await
.replace_active_blob(new_active)
.await?;
tokio::spawn(async move {
let _res = dump_sem.acquire().await.expect("semaphore is closed");
task.await;
});
Ok(())
}