Skip to content

Commit

Permalink
Merge pull request #1 from spaceandtimelabs/dralves-checkpointing
Browse files Browse the repository at this point in the history
Add the ability to checkpoint-on-commit and to wait for a given entry to be checkpointed
  • Loading branch information
dralves authored Aug 1, 2023
2 parents 8ad9346 + 2729070 commit 00a1339
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 16 deletions.
43 changes: 31 additions & 12 deletions src/entry.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::io::{self, Read, Write};
use std::io::{self, Error, Read, Write};

use crc32c::crc32c_append;
use file_manager::FileManager;
Expand Down Expand Up @@ -59,6 +59,32 @@ where
self.id
}

fn commit_internal<F: FnOnce(&mut LogFileWriter<M::File>) -> io::Result<()>>(
&mut self,
callback: F,
) -> io::Result<u64> {
let file = self.file.as_ref().expect("Already committed");
let mut writer = file.lock();
writer.write_all(&[END_OF_ENTRY])?;
let new_length = writer.position();
writer.set_last_entry_id(Some(self.id));
callback(&mut writer)?;
drop(writer);
Ok(new_length)
}

pub fn commit_and_checkpoint(mut self) -> io::Result<EntryId> {
let new_length = self.commit_internal(|_file| Ok(()))?;
let sender = &self.log.data.checkpoint_sender;
let id = self.id;
let file = self.file.take().expect("Already committed");
sender
.send(crate::CheckpointCommand::Checkpoint(file.clone()))
.map_err(|se| Error::new(io::ErrorKind::Other, se.to_string()))?;
self.log.reclaim(file, WriteResult::Entry { new_length })?;
Ok(id)
}

/// Commits this entry to the log. Once this call returns, all data is
/// atomically updated and synchronized to disk.
///
Expand All @@ -73,19 +99,12 @@ where
mut self,
callback: F,
) -> io::Result<EntryId> {
let file = self.file.take().expect("already committed");

let mut writer = file.lock();

writer.write_all(&[END_OF_ENTRY])?;
let new_length = writer.position();
callback(&mut writer)?;
writer.set_last_entry_id(Some(self.id));
drop(writer);

let new_length = self.commit_internal(callback)?;
let id = self.id;
let file = self.file.take().expect("file already dropped");
self.log.reclaim(file, WriteResult::Entry { new_length })?;

Ok(self.id)
Ok(id)
}

/// Abandons this entry, preventing the entry from being recovered in the
Expand Down
55 changes: 51 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
use std::{
collections::{HashMap, VecDeque},
ffi::OsStr,
io::{self, BufReader, ErrorKind, Read, Seek, SeekFrom},
io::{self, BufReader, Error, ErrorKind, Read, Seek, SeekFrom},
path::Path,
sync::{Arc, Weak},
thread::JoinHandle,
time::Instant,
time::{Duration, Instant},
};

use file_manager::{fs::StdFileManager, FileManager, OpenOptions, PathId};
Expand Down Expand Up @@ -78,6 +78,7 @@ where
config: Configuration<M>,
checkpoint_sender: flume::Sender<CheckpointCommand<M::File>>,
checkpoint_thread: Mutex<Option<JoinHandle<io::Result<()>>>>,
checkpoint_sync: Condvar,
readers: Mutex<HashMap<u64, usize>>,
readers_sync: Condvar,
}
Expand Down Expand Up @@ -184,6 +185,7 @@ where
active_sync: Condvar::new(),
dirfsync_sync: Condvar::new(),
config,
checkpoint_sync: Condvar::new(),
checkpoint_sender,
checkpoint_thread: Mutex::new(None),
readers: Mutex::default(),
Expand Down Expand Up @@ -236,6 +238,35 @@ where
EntryWriter::new(self, entry_id, file)
}

/// Waits, until timeout, for `entry_id` to be checkpointed.
pub fn wait_checkpointed_for(&self, entry_id: &EntryId, timeout: Duration) -> io::Result<()> {
let time_deadline = Instant::now()
.checked_add(timeout)
.expect("Couldn't add timeout");
let mut files = self.data.files.lock();
loop {
let last_checkpointed_entry_id = files.last_checkpointed_entry_id;
if last_checkpointed_entry_id.is_some()
&& last_checkpointed_entry_id
.expect("Last checkpointed entry id not set")
.0
>= entry_id.0
{
return Ok(());
}
let result = self
.data
.checkpoint_sync
.wait_until(&mut files, time_deadline);
if result.timed_out() {
return Err(Error::new(
ErrorKind::TimedOut,
"Entry not checkpointed during timeout.",
));
}
}
}

fn reclaim(&self, file: LogFile<M::File>, result: WriteResult) -> io::Result<()> {
if let WriteResult::Entry { new_length } = result {
let last_directory_sync = if self.data.config.checkpoint_after_bytes <= new_length {
Expand Down Expand Up @@ -311,14 +342,17 @@ where
let synchronize_target = writer.position();
writer = file_to_checkpoint.synchronize_locked(writer, synchronize_target)?;
}
if let Some(entry_id) = writer.last_entry_id() {
let last_checkpointed_entry_id = if let Some(entry_id) = writer.last_entry_id() {
let mut reader =
SegmentReader::new(writer.path(), file_id, &wal.data.config.file_manager)?;
drop(writer);
let mut manager = wal.data.manager.lock();
manager.checkpoint_to(entry_id, &mut reader, &wal)?;
writer = file_to_checkpoint.lock();
}
Some(entry_id)
} else {
None
};

// Rename the file to denote that it's been checkpointed.
let new_name = format!(
Expand Down Expand Up @@ -353,6 +387,17 @@ where

let files = wal.data.files.lock();
let mut files = wal.sync_directory(files, sync_target)?;
if let Some(entry_id) = files.last_checkpointed_entry_id {
if let Some(last_checkpointed_entry_id) = last_checkpointed_entry_id {
if last_checkpointed_entry_id.0 > entry_id.0 {
files.last_checkpointed_entry_id = Some(last_checkpointed_entry_id);
wal.data.checkpoint_sync.notify_all();
}
}
} else {
files.last_checkpointed_entry_id = last_checkpointed_entry_id;
wal.data.checkpoint_sync.notify_all();
}
files.inactive.push_back(file_to_checkpoint);
}

Expand Down Expand Up @@ -484,6 +529,7 @@ where
active: Option<LogFile<F>>,
inactive: VecDeque<LogFile<F>>,
last_entry_id: EntryId,
last_checkpointed_entry_id: Option<EntryId>,
directory_synced_at: Option<Instant>,
directory_is_syncing: bool,
all: HashMap<u64, LogFile<F>>,
Expand Down Expand Up @@ -529,6 +575,7 @@ where
active: None,
inactive: VecDeque::new(),
last_entry_id: EntryId::default(),
last_checkpointed_entry_id: None,
directory_synced_at: None,
directory_is_syncing: false,
all: HashMap::new(),
Expand Down
57 changes: 57 additions & 0 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,63 @@ fn basic<M: FileManager, P: AsRef<Path>>(manager: M, path: P) {
assert!(reader.crc_is_valid().expect("error validating crc"));
}

/// Tests that it's possible to order checkpoints on demand and that it's possible to wait
/// for a given entry to be checkpointed.
fn commit_with_checkpoint<M: FileManager, P: AsRef<Path>>(manager: M, path: P) {
let checkpointer = LoggingCheckpointer::default();
let config = Configuration::default_with_manager(path, manager);
let wal = config.open(checkpointer.clone()).unwrap();

// Now let's write a record, it should not be checkpointed since we haven't
// reached the limits.
let m1 = b"first message";

let mut writer = wal.begin_entry().unwrap();
writer.write_chunk(m1).unwrap();
let entry1 = writer.commit().unwrap();

// Since there was no checkpointing done, the number of checkpointed messages in the
// checkpointer should be zero.
let invocations = checkpointer.invocations.lock();
assert!(invocations.is_empty());
drop(invocations);

// Now let's write a second message. It should be checkpointed because we explicitly
// indicate that it needs to.
let m2 = b"second message";

let mut writer = wal.begin_entry().unwrap();
writer.write_chunk(m2).unwrap();
let entry2 = writer.commit_and_checkpoint().unwrap();

wal.wait_checkpointed_for(&entry1, Duration::from_secs(2))
.unwrap();

let invocations = checkpointer.invocations.lock();
println!("Invocations: {invocations:?}");
assert_eq!(invocations.len(), 1);
match &invocations[0] {
CheckpointCall::Checkpoint { data } => {
assert_eq!(data.len(), 2);
assert_eq!(data[&entry1][0], m1.to_vec());
assert_eq!(data[&entry2][0], m2.to_vec());
}
other => unreachable!("unexpected invocation: {other:?}"),
}
}

#[test]
fn checkpoint_std() {
let dir = tempdir().unwrap();
commit_with_checkpoint(StdFileManager::default(), &dir);
}

#[test]
fn checkpoint_memory() {
let dir = tempdir().unwrap();
commit_with_checkpoint(MemoryFileManager::default(), &dir);
}

#[test]
fn basic_std() {
let dir = tempdir().unwrap();
Expand Down

0 comments on commit 00a1339

Please sign in to comment.