Skip to content

Commit

Permalink
Invoke background indices drop on blob close (#137)
Browse files Browse the repository at this point in the history
* invoke background indexes drop on blob close

* update changelog

* add fsyncdata to active blob close

* make indexes dumping a separate observer call

Co-authored-by: Pavel Iakushin <[email protected]>
  • Loading branch information
idruzhitskiy and piakushin authored Nov 9, 2021
1 parent ee56ed0 commit f10d727
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Pearl changelog


#### Changed
- Dump blob indices in separate thread on active blob close (#136) [https://github.com/qoollo/pearl/pull/137]
- Remove second file descriptor from File ([#124](https://github.com/qoollo/pearl/pull/125))
- Acquire advisory write lock on files ([#124](https://github.com/qoollo/pearl/pull/125))

Expand Down
16 changes: 9 additions & 7 deletions src/storage/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,17 @@ impl<K: Key> Storage<K> {
/// Fails if there are some errors during dump
/// [`close_active_blob_in_background()`]: struct.Storage.html#method.create_active_blob_async
pub async fn try_close_active_blob(&self) -> Result<()> {
self.inner.close_active_blob().await
let result = self.inner.close_active_blob().await;
self.observer.try_dump_old_blob_indexes().await;
result
}

/// Dumps active blob
/// NOTICE! This function returns immediately, so you can't check result of operation. If you
/// want be sure about operation's result, use [`try_close_active_blob()`]
pub async fn close_active_blob_in_background(&self) {
self.observer.close_active_blob().await
self.observer.close_active_blob().await;
self.observer.try_dump_old_blob_indexes().await
}

/// Sets last blob from closed blobs as active if there is no active blobs
Expand Down Expand Up @@ -778,7 +781,8 @@ impl<K: Key> Storage<K> {
/// Or if there are some problems with syncronization.
/// [`close_active_blob_in_background()`]: struct.Storage.html#method.close_active_blob_async
pub async fn force_update_active_blob(&self, predicate: ActiveBlobPred) {
self.observer.force_update_active_blob(predicate).await
self.observer.force_update_active_blob(predicate).await;
self.observer.try_dump_old_blob_indexes().await
}

fn launch_observer(&mut self) {
Expand Down Expand Up @@ -840,11 +844,9 @@ impl Inner {
if safe.active_blob.is_none() {
Err(Error::active_blob_doesnt_exist().into())
} else {
// FIXME: write lock is still held, so everyone will wait for dump, maybe it's better
// to derive this operation to `try_dump_old_blob_indexes`
safe.active_blob.as_mut().unwrap(/*None case checked*/).dump().await?;
// always true
if let Some(ablob) = safe.active_blob.take() {
ablob.fsyncdata().await?;
safe.blobs.write().await.push(*ablob);
}
Ok(())
Expand Down Expand Up @@ -915,7 +917,7 @@ impl Inner {
self.safe.read().await.fsyncdata().await
}

pub(crate) async fn try_dump_old_blob_indexes(&mut self, sem: Arc<Semaphore>) {
pub(crate) async fn try_dump_old_blob_indexes(&self, sem: Arc<Semaphore>) {
self.safe.write().await.try_dump_old_blob_indexes(sem).await;
}
}
Expand Down
11 changes: 9 additions & 2 deletions src/storage/observer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::prelude::*;
use tokio::{
sync::{mpsc::{channel, Sender}, Semaphore},
use tokio::sync::{
mpsc::{channel, Sender},
Semaphore,
};

#[derive(Debug, Clone)]
Expand All @@ -9,6 +10,7 @@ pub(crate) enum OperationType {
CloseActiveBlob = 1,
RestoreActiveBlob = 2,
ForceUpdateActiveBlob = 3,
TryDumpBlobIndexes = 4,
}

#[derive(Debug)]
Expand Down Expand Up @@ -101,6 +103,11 @@ impl Observer {
.await
}

pub(crate) async fn try_dump_old_blob_indexes(&self) {
self.send_msg(Msg::new(OperationType::TryDumpBlobIndexes, None))
.await
}

async fn send_msg(&self, msg: Msg) {
if let Some(sender) = &self.sender {
let optype = msg.optype.clone();
Expand Down
10 changes: 6 additions & 4 deletions src/storage/observer_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ impl ObserverWorker {
match msg.optype {
OperationType::ForceUpdateActiveBlob => {
update_active_blob(self.inner.clone()).await?;
self.inner
.try_dump_old_blob_indexes(self.dump_sem.clone())
.await;
}
OperationType::CloseActiveBlob => {
self.inner.close_active_blob().await?;
Expand All @@ -79,6 +76,11 @@ impl ObserverWorker {
OperationType::RestoreActiveBlob => {
self.inner.restore_active_blob().await?;
}
OperationType::TryDumpBlobIndexes => {
self.inner
.try_dump_old_blob_indexes(self.dump_sem.clone())
.await;
}
}
Ok(())
}
Expand All @@ -94,7 +96,7 @@ impl ObserverWorker {
async fn try_update(&self) -> Result<()> {
trace!("try update active blob");
let inner_cloned = self.inner.clone();
if let Some(mut inner) = active_blob_check(inner_cloned).await? {
if let Some(inner) = active_blob_check(inner_cloned).await? {
update_active_blob(inner.clone()).await?;
inner.try_dump_old_blob_indexes(self.dump_sem.clone()).await;
}
Expand Down

0 comments on commit f10d727

Please sign in to comment.