diff --git a/src/db.rs b/src/db.rs index 55179393..5452dd6d 100644 --- a/src/db.rs +++ b/src/db.rs @@ -777,6 +777,8 @@ impl Database { /// write may be in progress at a time. If a write is in progress, this function will block /// until it completes. pub fn begin_write(&self) -> Result { + // Fail early if there has been an I/O error -- nothing can be committed in that case + self.mem.check_io_errors()?; let guard = TransactionGuard::new_write( self.transaction_tracker.start_write_transaction(), self.transaction_tracker.clone(), @@ -987,7 +989,7 @@ mod test { use crate::backends::FileBackend; use crate::{ CommitError, Database, DatabaseError, Durability, ReadableTable, StorageBackend, - StorageError, TableDefinition, + StorageError, TableDefinition, TransactionError, }; use std::io::{ErrorKind, Read, Seek, SeekFrom}; use std::sync::atomic::{AtomicU64, Ordering}; @@ -1132,10 +1134,10 @@ mod test { countdown.store(0, Ordering::SeqCst); let result = tx.commit().err().unwrap(); assert!(matches!(result, CommitError::Storage(StorageError::Io(_)))); - let result = db.begin_write().unwrap().commit().err().unwrap(); + let result = db.begin_write().err().unwrap(); assert!(matches!( result, - CommitError::Storage(StorageError::PreviousIo) + TransactionError::Storage(StorageError::PreviousIo) )); // Simulate a transient error countdown.store(u64::MAX, Ordering::SeqCst); diff --git a/src/tree_store/page_store/cached_file.rs b/src/tree_store/page_store/cached_file.rs index 62712301..5e1f6aa5 100644 --- a/src/tree_store/page_store/cached_file.rs +++ b/src/tree_store/page_store/cached_file.rs @@ -323,6 +323,10 @@ impl PagedCachedFile { }) } + pub(crate) fn check_io_errors(&self) -> Result { + self.file.check_failure() + } + pub(crate) fn raw_file_len(&self) -> Result { self.file.len() } diff --git a/src/tree_store/page_store/page_manager.rs b/src/tree_store/page_store/page_manager.rs index 190bf952..88c4475a 100644 --- a/src/tree_store/page_store/page_manager.rs +++ b/src/tree_store/page_store/page_manager.rs @@ -82,6 +82,7 @@ pub(crate) struct TransactionalMemory { // TODO: maybe this should be moved to WriteTransaction? allocated_since_commit: Mutex>, // True if the allocator state was corrupted when the file was opened + // TODO: maybe we can remove this flag now that CheckedBackend exists? needs_recovery: AtomicBool, storage: PagedCachedFile, state: Mutex, @@ -261,6 +262,10 @@ impl TransactionalMemory { }) } + pub(crate) fn check_io_errors(&self) -> Result { + self.storage.check_io_errors() + } + #[cfg(any(test, fuzzing))] pub(crate) fn all_allocated_pages(&self) -> Vec { self.state.lock().unwrap().allocators.all_allocated() diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index cb44dc70..8d23b946 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1,17 +1,19 @@ -use std::borrow::Borrow; -use std::fs; -use std::io::ErrorKind; -use std::marker::PhantomData; -use std::ops::RangeBounds; - use rand::prelude::SliceRandom; use rand::Rng; +use redb::backends::FileBackend; use redb::{ AccessGuard, Builder, CompactionError, Database, Durability, Key, MultimapRange, MultimapTableDefinition, MultimapValue, Range, ReadableTable, ReadableTableMetadata, - TableDefinition, TableStats, TransactionError, Value, + StorageBackend, TableDefinition, TableStats, TransactionError, Value, }; use redb::{DatabaseError, ReadableMultimapTable, SavepointError, StorageError, TableError}; +use std::borrow::Borrow; +use std::fs; +use std::io::ErrorKind; +use std::marker::PhantomData; +use std::ops::RangeBounds; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; const ELEMENTS: usize = 100; @@ -41,6 +43,71 @@ fn gen_data(count: usize, key_size: usize, value_size: usize) -> Vec<(Vec, V pairs } +#[test] +fn previous_io_error() { + #[derive(Debug)] + struct FailingBackend { + inner: FileBackend, + fail_flag: Arc, + } + + impl FailingBackend { + fn new(backend: FileBackend, fail_flag: Arc) -> Self { + Self { + inner: backend, + fail_flag, + } + } + } + + impl StorageBackend for FailingBackend { + fn len(&self) -> Result { + self.inner.len() + } + + fn read(&self, offset: u64, len: usize) -> Result, std::io::Error> { + self.inner.read(offset, len) + } + + fn set_len(&self, len: u64) -> Result<(), std::io::Error> { + self.inner.set_len(len) + } + + fn sync_data(&self, eventual: bool) -> Result<(), std::io::Error> { + if self.fail_flag.load(Ordering::SeqCst) { + Err(std::io::Error::from(ErrorKind::Other)) + } else { + self.inner.sync_data(eventual) + } + } + + fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> { + self.inner.write(offset, data) + } + } + + let tmpfile = create_tempfile(); + + let fail_flag = Arc::new(AtomicBool::new(false)); + let backend = FailingBackend::new( + FileBackend::new(tmpfile.into_file()).unwrap(), + fail_flag.clone(), + ); + let db = Database::builder().create_with_backend(backend).unwrap(); + fail_flag.store(true, Ordering::SeqCst); + let txn = db.begin_write().unwrap(); + { + let mut table = txn.open_table(U64_TABLE).unwrap(); + table.insert(&0, &0).unwrap(); + } + assert!(txn.commit().is_err()); + + assert!(matches!( + db.begin_write().err().unwrap(), + TransactionError::Storage(StorageError::PreviousIo) + )); +} + #[test] fn mixed_durable_commit() { let tmpfile = create_tempfile();