diff --git a/src/benchmark/bin.rs b/src/benchmark/bin.rs index e1d923ef1d..cb9a62c16e 100644 --- a/src/benchmark/bin.rs +++ b/src/benchmark/bin.rs @@ -14,6 +14,7 @@ mod prelude { generator::Generator, statistics::{Report, Statistics}, writer::Writer, + Key128, }; pub(crate) use clap::{App, Arg, ArgMatches}; pub(crate) use env_logger::fmt::Color; @@ -86,7 +87,7 @@ async fn start_app() { .map(|(key, data)| { let ltx = tx.clone(); counter += 1; - writer.write(key.into(), data, ltx) + writer.write(key, data, ltx) }) .collect(); println!( @@ -205,12 +206,18 @@ fn init_logger() { .try_init(); } #[derive(Debug)] -struct Key128(Vec); +pub struct Key128(Vec); impl Key for Key128 { const LEN: u16 = 8; } +impl AsRef for Key128 { + fn as_ref(&self) -> &Self { + self + } +} + impl From> for Key128 { fn from(v: Vec) -> Self { assert_eq!(Self::LEN as usize, v.len()); diff --git a/src/benchmark/generator.rs b/src/benchmark/generator.rs index 5f4127a4a4..02f24d4bc2 100644 --- a/src/benchmark/generator.rs +++ b/src/benchmark/generator.rs @@ -19,12 +19,12 @@ impl Generator { } } - pub fn next(&mut self) -> Option<(Vec, Vec)> { + pub fn next(&mut self) -> Option<(Key128, Vec)> { if self.written < self.limit * 1_000_000 { let key = self.written.to_be_bytes().to_vec(); let data = self.value.clone(); self.written += (key.len() + data.len()) as u64; - Some((key, data)) + Some((Key128(key), data)) } else { None } diff --git a/src/benchmark/writer.rs b/src/benchmark/writer.rs index b4f312f518..c64907baa9 100644 --- a/src/benchmark/writer.rs +++ b/src/benchmark/writer.rs @@ -1,11 +1,11 @@ use super::prelude::*; use pearl::rio; -pub struct Writer { +pub struct Writer { storage: Storage, } -impl Writer { +impl Writer { pub fn new( tmp_dir: &Path, max_blob_size: u64, @@ -31,11 +31,9 @@ impl Writer { self.storage.init().await.unwrap() } - pub async fn write(&self, key: K, data: Vec, mut tx: Sender) - where - K: Key, - { - let mut report = Report::new(key.as_ref().len(), data.len()); + pub async fn write(&self, key: impl AsRef, data: Vec, mut tx: Sender) { + let kbuf: &[u8] = key.as_ref().as_ref(); + let mut report = Report::new(kbuf.len(), data.len()); let now = Instant::now(); self.storage.write(key, data).await.unwrap(); debug!("write finished"); diff --git a/src/lib.rs b/src/lib.rs index d771c8c282..6a0bd2221c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -74,7 +74,7 @@ mod prelude { pub(crate) use std::collections::BTreeMap; pub(crate) const ORD: Ordering = Ordering::Relaxed; - pub(crate) use anyhow::{Result, Context as ErrorContexts}; + pub(crate) use anyhow::{Context as ErrorContexts, Result}; pub(crate) use bincode::{deserialize, serialize, serialize_into, serialized_size}; pub(crate) use blob::{self, Blob, BloomConfig}; pub(crate) use crc::crc32::checksum_castagnoli as crc32; diff --git a/src/storage/core.rs b/src/storage/core.rs index 3b58b1ca0c..28e4a3fbd4 100644 --- a/src/storage/core.rs +++ b/src/storage/core.rs @@ -5,8 +5,6 @@ const LOCK_FILE: &str = "pearl.lock"; const O_EXCL: i32 = 128; -type SaveOldBlobTask = std::pin::Pin + Send>>; - /// A main storage struct. /// /// This type is clonable, cloning it will only create a new reference, @@ -715,34 +713,29 @@ impl Safe { Ok(()) } - pub(crate) async fn replace_active_blob(&mut self, blob: Box) -> Result { + pub(crate) async fn replace_active_blob(&mut self, blob: Box) -> Result<()> { let old_active = self .active_blob .replace(blob) .ok_or_else(Error::active_blob_not_set)?; - let blobs = self.blobs.clone(); - Ok(Box::pin(async move { - let mut blobs = blobs.write().await; - blobs.push(*old_active); - // Notice: if dump of blob or indices fails, it's likely a problem with disk appeared, so - // all descriptors of the storage become invalid and unusable - // Possible solution: recreate instance of storage, when disk will be available - let last_blob = blobs.last_mut().unwrap(); - if let Err(e) = last_blob.dump().await { - error!("Error dumping blob ({}): {}", last_blob.name(), e); - } - })) + self.blobs.write().await.push(*old_active); + Ok(()) } pub(crate) async fn try_dump_old_blob_indexes(&mut self, sem: Arc) { let blobs = self.blobs.clone(); tokio::spawn(async move { + trace!("acquire blobs write to dump old blobs"); let mut write_blobs = blobs.write().await; + trace!("dump old blobs"); for blob in write_blobs.iter_mut() { + trace!("dumping old blob"); let _ = sem.acquire().await; + trace!("acquired sem for dumping old blobs"); if let Err(e) = blob.dump().await { error!("Error dumping blob ({}): {}", blob.name(), e); } + trace!("finished dumping old blob"); } }); } diff --git a/src/storage/observer_worker.rs b/src/storage/observer_worker.rs index 982fb12a64..219cc591f0 100644 --- a/src/storage/observer_worker.rs +++ b/src/storage/observer_worker.rs @@ -34,7 +34,10 @@ impl ObserverWorker { async fn tick(&mut self) -> Result<()> { match timeout(self.update_interval, self.receiver.recv()).await { Ok(Some(Msg::CloseActiveBlob)) => { - update_active_blob(self.inner.clone(), self.dump_sem.clone()).await? + update_active_blob(self.inner.clone()).await?; + self.inner + .try_dump_old_blob_indexes(self.dump_sem.clone()) + .await; } Ok(None) => { return Err(anyhow!( @@ -51,13 +54,10 @@ impl ObserverWorker { async fn try_update(&self) -> Result<()> { trace!("try update active blob"); let inner_cloned = self.inner.clone(); - if let Some(inner) = active_blob_check(inner_cloned).await? { - update_active_blob(inner, self.dump_sem.clone()).await?; + if let Some(mut inner) = active_blob_check(inner_cloned).await? { + update_active_blob(inner.clone()).await?; + inner.try_dump_old_blob_indexes(self.dump_sem.clone()).await; } - let mut inner_mut = self.inner.clone(); - inner_mut - .try_dump_old_blob_indexes(self.dump_sem.clone()) - .await; Ok(()) } } @@ -91,21 +91,18 @@ async fn active_blob_check(inner: Inner) -> Result> { } } -async fn update_active_blob(inner: Inner, dump_sem: Arc) -> Result<()> { +async fn update_active_blob(inner: Inner) -> Result<()> { let next_name = inner.next_blob_name()?; // Opening a new blob may take a while + trace!("obtaining new active blob"); let new_active = Blob::open_new(next_name, inner.ioring, inner.config.filter()) .await? .boxed(); - let task = inner + inner .safe .write() .await .replace_active_blob(new_active) .await?; - tokio::spawn(async move { - let _res = dump_sem.acquire().await.expect("semaphore is closed"); - task.await; - }); Ok(()) }