Skip to content

Commit

Permalink
Fix panic and return PreviousIo error instead
Browse files Browse the repository at this point in the history
Fixes a panic in commit() or abort() which would occur if a previous
commit() or abort() had failed due to an I/O error
  • Loading branch information
cberner committed Sep 2, 2024
1 parent 30cc976 commit 6319933
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 10 deletions.
8 changes: 5 additions & 3 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,8 @@ impl Database {
/// write may be in progress at a time. If a write is in progress, this function will block
/// until it completes.
pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
// Fail early if there has been an I/O error -- nothing can be committed in that case
self.mem.check_io_errors()?;
let guard = TransactionGuard::new_write(
self.transaction_tracker.start_write_transaction(),
self.transaction_tracker.clone(),
Expand Down Expand Up @@ -987,7 +989,7 @@ mod test {
use crate::backends::FileBackend;
use crate::{
CommitError, Database, DatabaseError, Durability, ReadableTable, StorageBackend,
StorageError, TableDefinition,
StorageError, TableDefinition, TransactionError,
};
use std::io::{ErrorKind, Read, Seek, SeekFrom};
use std::sync::atomic::{AtomicU64, Ordering};
Expand Down Expand Up @@ -1132,10 +1134,10 @@ mod test {
countdown.store(0, Ordering::SeqCst);
let result = tx.commit().err().unwrap();
assert!(matches!(result, CommitError::Storage(StorageError::Io(_))));
let result = db.begin_write().unwrap().commit().err().unwrap();
let result = db.begin_write().err().unwrap();
assert!(matches!(
result,
CommitError::Storage(StorageError::PreviousIo)
TransactionError::Storage(StorageError::PreviousIo)
));
// Simulate a transient error
countdown.store(u64::MAX, Ordering::SeqCst);
Expand Down
4 changes: 4 additions & 0 deletions src/tree_store/page_store/cached_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,10 @@ impl PagedCachedFile {
})
}

pub(crate) fn check_io_errors(&self) -> Result {
self.file.check_failure()
}

pub(crate) fn raw_file_len(&self) -> Result<u64> {
self.file.len()
}
Expand Down
5 changes: 5 additions & 0 deletions src/tree_store/page_store/page_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub(crate) struct TransactionalMemory {
// TODO: maybe this should be moved to WriteTransaction?
allocated_since_commit: Mutex<HashSet<PageNumber>>,
// True if the allocator state was corrupted when the file was opened
// TODO: maybe we can remove this flag now that CheckedBackend exists?
needs_recovery: AtomicBool,
storage: PagedCachedFile,
state: Mutex<InMemoryState>,
Expand Down Expand Up @@ -261,6 +262,10 @@ impl TransactionalMemory {
})
}

pub(crate) fn check_io_errors(&self) -> Result {
self.storage.check_io_errors()
}

#[cfg(any(test, fuzzing))]
pub(crate) fn all_allocated_pages(&self) -> Vec<PageNumber> {
self.state.lock().unwrap().allocators.all_allocated()
Expand Down
81 changes: 74 additions & 7 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use std::borrow::Borrow;
use std::fs;
use std::io::ErrorKind;
use std::marker::PhantomData;
use std::ops::RangeBounds;

use rand::prelude::SliceRandom;
use rand::Rng;
use redb::backends::FileBackend;
use redb::{
AccessGuard, Builder, CompactionError, Database, Durability, Key, MultimapRange,
MultimapTableDefinition, MultimapValue, Range, ReadableTable, ReadableTableMetadata,
TableDefinition, TableStats, TransactionError, Value,
StorageBackend, TableDefinition, TableStats, TransactionError, Value,
};
use redb::{DatabaseError, ReadableMultimapTable, SavepointError, StorageError, TableError};
use std::borrow::Borrow;
use std::fs;
use std::io::ErrorKind;
use std::marker::PhantomData;
use std::ops::RangeBounds;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

const ELEMENTS: usize = 100;

Expand Down Expand Up @@ -41,6 +43,71 @@ fn gen_data(count: usize, key_size: usize, value_size: usize) -> Vec<(Vec<u8>, V
pairs
}

#[test]
fn previous_io_error() {
#[derive(Debug)]
struct FailingBackend {
inner: FileBackend,
fail_flag: Arc<AtomicBool>,
}

impl FailingBackend {
fn new(backend: FileBackend, fail_flag: Arc<AtomicBool>) -> Self {
Self {
inner: backend,
fail_flag,
}
}
}

impl StorageBackend for FailingBackend {
fn len(&self) -> Result<u64, std::io::Error> {
self.inner.len()
}

fn read(&self, offset: u64, len: usize) -> Result<Vec<u8>, std::io::Error> {
self.inner.read(offset, len)
}

fn set_len(&self, len: u64) -> Result<(), std::io::Error> {
self.inner.set_len(len)
}

fn sync_data(&self, eventual: bool) -> Result<(), std::io::Error> {
if self.fail_flag.load(Ordering::SeqCst) {
Err(std::io::Error::from(ErrorKind::Other))
} else {
self.inner.sync_data(eventual)
}
}

fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> {
self.inner.write(offset, data)
}
}

let tmpfile = create_tempfile();

let fail_flag = Arc::new(AtomicBool::new(false));
let backend = FailingBackend::new(
FileBackend::new(tmpfile.into_file()).unwrap(),
fail_flag.clone(),
);
let db = Database::builder().create_with_backend(backend).unwrap();
fail_flag.store(true, Ordering::SeqCst);
let txn = db.begin_write().unwrap();
{
let mut table = txn.open_table(U64_TABLE).unwrap();
table.insert(&0, &0).unwrap();
}
assert!(txn.commit().is_err());

assert!(matches!(
db.begin_write().err().unwrap(),
TransactionError::Storage(StorageError::PreviousIo)
));
}

#[test]
fn mixed_durable_commit() {
let tmpfile = create_tempfile();
Expand Down

0 comments on commit 6319933

Please sign in to comment.