diff --git a/CHANGELOG.md b/CHANGELOG.md index b5c1d07427..43df54759b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ Pearl changelog #### Changed +- Dump blob indices in separate thread on active blob close (#136) [https://github.com/qoollo/pearl/pull/137] - Remove second file descriptor from File ([#124](https://github.com/qoollo/pearl/pull/125)) - Acquire advisory write lock on files ([#124](https://github.com/qoollo/pearl/pull/125)) diff --git a/src/storage/core.rs b/src/storage/core.rs index a4c4589cd0..e737b2cf3f 100644 --- a/src/storage/core.rs +++ b/src/storage/core.rs @@ -200,14 +200,17 @@ impl Storage { /// Fails if there are some errors during dump /// [`close_active_blob_in_background()`]: struct.Storage.html#method.create_active_blob_async pub async fn try_close_active_blob(&self) -> Result<()> { - self.inner.close_active_blob().await + let result = self.inner.close_active_blob().await; + self.observer.try_dump_old_blob_indexes().await; + result } /// Dumps active blob /// NOTICE! This function returns immediately, so you can't check result of operation. If you /// want be sure about operation's result, use [`try_close_active_blob()`] pub async fn close_active_blob_in_background(&self) { - self.observer.close_active_blob().await + self.observer.close_active_blob().await; + self.observer.try_dump_old_blob_indexes().await } /// Sets last blob from closed blobs as active if there is no active blobs @@ -778,7 +781,8 @@ impl Storage { /// Or if there are some problems with syncronization. /// [`close_active_blob_in_background()`]: struct.Storage.html#method.close_active_blob_async pub async fn force_update_active_blob(&self, predicate: ActiveBlobPred) { - self.observer.force_update_active_blob(predicate).await + self.observer.force_update_active_blob(predicate).await; + self.observer.try_dump_old_blob_indexes().await } fn launch_observer(&mut self) { @@ -840,11 +844,9 @@ impl Inner { if safe.active_blob.is_none() { Err(Error::active_blob_doesnt_exist().into()) } else { - // FIXME: write lock is still held, so everyone will wait for dump, maybe it's better - // to derive this operation to `try_dump_old_blob_indexes` - safe.active_blob.as_mut().unwrap(/*None case checked*/).dump().await?; // always true if let Some(ablob) = safe.active_blob.take() { + ablob.fsyncdata().await?; safe.blobs.write().await.push(*ablob); } Ok(()) @@ -915,7 +917,7 @@ impl Inner { self.safe.read().await.fsyncdata().await } - pub(crate) async fn try_dump_old_blob_indexes(&mut self, sem: Arc) { + pub(crate) async fn try_dump_old_blob_indexes(&self, sem: Arc) { self.safe.write().await.try_dump_old_blob_indexes(sem).await; } } diff --git a/src/storage/observer.rs b/src/storage/observer.rs index 94ccc77f3b..815550e147 100644 --- a/src/storage/observer.rs +++ b/src/storage/observer.rs @@ -1,6 +1,7 @@ use super::prelude::*; -use tokio::{ - sync::{mpsc::{channel, Sender}, Semaphore}, +use tokio::sync::{ + mpsc::{channel, Sender}, + Semaphore, }; #[derive(Debug, Clone)] @@ -9,6 +10,7 @@ pub(crate) enum OperationType { CloseActiveBlob = 1, RestoreActiveBlob = 2, ForceUpdateActiveBlob = 3, + TryDumpBlobIndexes = 4, } #[derive(Debug)] @@ -101,6 +103,11 @@ impl Observer { .await } + pub(crate) async fn try_dump_old_blob_indexes(&self) { + self.send_msg(Msg::new(OperationType::TryDumpBlobIndexes, None)) + .await + } + async fn send_msg(&self, msg: Msg) { if let Some(sender) = &self.sender { let optype = msg.optype.clone(); diff --git a/src/storage/observer_worker.rs b/src/storage/observer_worker.rs index 6328299261..2e8a3be29e 100644 --- a/src/storage/observer_worker.rs +++ b/src/storage/observer_worker.rs @@ -66,9 +66,6 @@ impl ObserverWorker { match msg.optype { OperationType::ForceUpdateActiveBlob => { update_active_blob(self.inner.clone()).await?; - self.inner - .try_dump_old_blob_indexes(self.dump_sem.clone()) - .await; } OperationType::CloseActiveBlob => { self.inner.close_active_blob().await?; @@ -79,6 +76,11 @@ impl ObserverWorker { OperationType::RestoreActiveBlob => { self.inner.restore_active_blob().await?; } + OperationType::TryDumpBlobIndexes => { + self.inner + .try_dump_old_blob_indexes(self.dump_sem.clone()) + .await; + } } Ok(()) } @@ -94,7 +96,7 @@ impl ObserverWorker { async fn try_update(&self) -> Result<()> { trace!("try update active blob"); let inner_cloned = self.inner.clone(); - if let Some(mut inner) = active_blob_check(inner_cloned).await? { + if let Some(inner) = active_blob_check(inner_cloned).await? { update_active_blob(inner.clone()).await?; inner.try_dump_old_blob_indexes(self.dump_sem.clone()).await; }