diff --git a/crates/chain/src/indexed_tx_graph.rs b/crates/chain/src/indexed_tx_graph.rs index c550d86f07..7ab0ffa8a4 100644 --- a/crates/chain/src/indexed_tx_graph.rs +++ b/crates/chain/src/indexed_tx_graph.rs @@ -301,6 +301,10 @@ impl Append for IndexedAdditions { self.graph_additions.append(other.graph_additions); self.index_additions.append(other.index_additions); } + + fn is_empty(&self) -> bool { + self.graph_additions.is_empty() && self.index_additions.is_empty() + } } /// Represents a structure that can index transaction data. diff --git a/crates/chain/src/keychain.rs b/crates/chain/src/keychain.rs index 81503049bd..f4d398ab0c 100644 --- a/crates/chain/src/keychain.rs +++ b/crates/chain/src/keychain.rs @@ -84,6 +84,10 @@ impl Append for DerivationAdditions { self.0.append(&mut other.0); } + + fn is_empty(&self) -> bool { + self.0.is_empty() + } } impl Default for DerivationAdditions { diff --git a/crates/chain/src/lib.rs b/crates/chain/src/lib.rs index cf3cda3b02..cbadf1709a 100644 --- a/crates/chain/src/lib.rs +++ b/crates/chain/src/lib.rs @@ -33,6 +33,8 @@ pub mod tx_graph; pub use tx_data_traits::*; mod chain_oracle; pub use chain_oracle::*; +mod persist; +pub use persist::*; #[doc(hidden)] pub mod example_utils; diff --git a/crates/chain/src/persist.rs b/crates/chain/src/persist.rs new file mode 100644 index 0000000000..f5ea500907 --- /dev/null +++ b/crates/chain/src/persist.rs @@ -0,0 +1,113 @@ +use core::convert::Infallible; + +use crate::Append; + +/// `Persist` wraps a [`PersistBackend`] (`B`) to create a convenient staging area for changes (`C`) +/// before they are persisted. +/// +/// Not all changes to the in-memory representation needs to be written to disk right away, so +/// [`Persist::stage`] can be used to *stage* changes first and then [`Persist::commit`] can be used +/// to write changes to disk. +#[derive(Debug)] +pub struct Persist { + backend: B, + stage: C, +} + +impl Persist +where + B: PersistBackend, + C: Default + Append, +{ + /// Create a new [`Persist`] from [`PersistBackend`]. + pub fn new(backend: B) -> Self { + Self { + backend, + stage: Default::default(), + } + } + + /// Stage a `changeset` to be commited later with [`commit`]. + /// + /// [`commit`]: Self::commit + pub fn stage(&mut self, changeset: C) { + self.stage.append(changeset) + } + + /// Get the changes that have not been commited yet. + pub fn staged(&self) -> &C { + &self.stage + } + + /// Commit the staged changes to the underlying persistance backend. + /// + /// Returns a backend-defined error if this fails. + pub fn commit(&mut self) -> Result<(), B::WriteError> { + let mut temp = C::default(); + core::mem::swap(&mut temp, &mut self.stage); + self.backend.write_changes(&temp) + } +} + +/// A persistence backend for [`Persist`]. +/// +/// `C` represents the changeset; a datatype that records changes made to in-memory data structures +/// that are to be persisted, or retrieved from persistence. +pub trait PersistBackend { + /// The error the backend returns when it fails to write. + type WriteError: core::fmt::Debug; + + /// The error the backend returns when it fails to load changesets `C`. + type LoadError: core::fmt::Debug; + + /// An iterator of changesets loaded from the persistence. + /// + /// The lifetime bound is to ensure `Self` must outlive `LoadIter`. + type LoadIter<'a>: Iterator> + where + Self: 'a; + + /// Writes a changeset to the persistence backend. + /// + /// It is up to the backend what it does with this. It could store every changeset in a list or + /// it inserts the actual changes into a more structured database. All it needs to guarantee is + /// that [`load_from_persistence`] restores a keychain tracker to what it should be if all + /// changesets had been applied sequentially. + /// + /// [`load_from_persistence`]: Self::load_from_persistence + fn write_changes(&mut self, changeset: &C) -> Result<(), Self::WriteError>; + + /// Iterates over changesets `C` from persistence. + fn load_from_persistence(&mut self) -> Self::LoadIter<'_>; + + /// Return the aggregate changeset `C` from persistence. + fn load_all_from_persistence(&mut self) -> Result + where + C: Default + Append, + { + self.load_from_persistence() + .try_fold(C::default(), |mut acc, r| { + acc.append(r?); + Ok(acc) + }) + } +} + +/// A persistence backend that does nothing. +pub struct EmptyBackend(core::iter::Empty>); + +impl PersistBackend for EmptyBackend { + type WriteError = Infallible; + + type LoadError = Infallible; + + type LoadIter<'a> = core::iter::Empty> where Self: 'a; + + fn write_changes(&mut self, _changeset: &C) -> Result<(), Self::WriteError> { + Ok(()) + } + + fn load_from_persistence(&mut self) -> Self::LoadIter<'_> { + self.0.clone() + } +} diff --git a/crates/chain/src/tx_data_traits.rs b/crates/chain/src/tx_data_traits.rs index 8ec695add3..bd7f138f07 100644 --- a/crates/chain/src/tx_data_traits.rs +++ b/crates/chain/src/tx_data_traits.rs @@ -64,20 +64,35 @@ impl Anchor for &'static A { pub trait Append { /// Append another object of the same type onto `self`. fn append(&mut self, other: Self); + + /// Returns whether the structure is considered empty. + fn is_empty(&self) -> bool; } impl Append for () { fn append(&mut self, _other: Self) {} + + fn is_empty(&self) -> bool { + true + } } impl Append for BTreeMap { fn append(&mut self, mut other: Self) { BTreeMap::append(self, &mut other) } + + fn is_empty(&self) -> bool { + BTreeMap::is_empty(self) + } } impl Append for BTreeSet { fn append(&mut self, mut other: Self) { BTreeSet::append(self, &mut other) } + + fn is_empty(&self) -> bool { + BTreeSet::is_empty(self) + } } diff --git a/crates/chain/src/tx_graph.rs b/crates/chain/src/tx_graph.rs index e75255e4af..ef3f3847ce 100644 --- a/crates/chain/src/tx_graph.rs +++ b/crates/chain/src/tx_graph.rs @@ -940,6 +940,13 @@ impl Append for Additions { .collect::>(), ); } + + fn is_empty(&self) -> bool { + self.tx.is_empty() + && self.txout.is_empty() + && self.anchors.is_empty() + && self.last_seen.is_empty() + } } impl AsRef> for TxGraph { diff --git a/crates/file_store/src/entry_iter.rs b/crates/file_store/src/entry_iter.rs new file mode 100644 index 0000000000..e392608729 --- /dev/null +++ b/crates/file_store/src/entry_iter.rs @@ -0,0 +1,106 @@ +use bincode::Options; +use std::{ + fs::File, + io::{self, Seek}, + marker::PhantomData, +}; + +use crate::bincode_options; + +/// Iterator over entries in a file store. +/// +/// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the +/// iterator will yield a `Result::Err(_)` instead and then `None` for the next call to `next`. +/// +/// [`next`]: Self::next +pub struct EntryIter<'a, V> { + db_file: &'a mut File, + types: PhantomData, + start_pos: Option, + error_exit: bool, +} + +impl<'a, V> EntryIter<'a, V> { + pub fn new(start_pos: u64, db_file: &'a mut File) -> Self { + Self { + db_file, + types: PhantomData, + start_pos: Some(start_pos), + error_exit: false, + } + } +} + +impl<'a, V> Iterator for EntryIter<'a, V> +where + V: serde::de::DeserializeOwned, +{ + type Item = Result; + + fn next(&mut self) -> Option { + if self.error_exit { + return None; + } + + if let Some(start_pos) = self.start_pos.take() { + if let Err(err) = self.db_file.seek(io::SeekFrom::Start(start_pos)) { + return Some(Err(err.into())); + } + } + + let result = (|| { + let pos = self.db_file.stream_position()?; + + match bincode_options().deserialize_from(&mut self.db_file) { + Ok(changeset) => Ok(Some(changeset)), + Err(e) => { + if let bincode::ErrorKind::Io(inner) = &*e { + if inner.kind() == io::ErrorKind::UnexpectedEof { + let eof = self.db_file.seek(io::SeekFrom::End(0))?; + if pos == eof { + return Ok(None); + } + } + } + + self.db_file.seek(io::SeekFrom::Start(pos))?; + Err(IterError::Bincode(*e)) + } + } + })(); + + let result = result.transpose(); + + if let Some(Err(_)) = &result { + self.error_exit = true; + } + + result + } +} + +impl From for IterError { + fn from(value: io::Error) -> Self { + IterError::Io(value) + } +} + +/// Error type for [`EntryIter`]. +#[derive(Debug)] +pub enum IterError { + /// Failure to read from the file. + Io(io::Error), + /// Failure to decode data from the file. + Bincode(bincode::ErrorKind), +} + +impl core::fmt::Display for IterError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + IterError::Io(e) => write!(f, "io error trying to read entry {}", e), + IterError::Bincode(e) => write!(f, "bincode error while reading entry {}", e), + } + } +} + +impl std::error::Error for IterError {} diff --git a/crates/file_store/src/file_store.rs b/crates/file_store/src/keychain_store.rs similarity index 79% rename from crates/file_store/src/file_store.rs rename to crates/file_store/src/keychain_store.rs index 824e3ccc56..9d33dbaea8 100644 --- a/crates/file_store/src/file_store.rs +++ b/crates/file_store/src/keychain_store.rs @@ -6,14 +6,15 @@ use bdk_chain::{ keychain::{KeychainChangeSet, KeychainTracker}, sparse_chain, }; -use bincode::{DefaultOptions, Options}; -use core::marker::PhantomData; +use bincode::Options; use std::{ fs::{File, OpenOptions}, io::{self, Read, Seek, Write}, path::Path, }; +use crate::{bincode_options, EntryIter, IterError}; + /// BDK File Store magic bytes length. const MAGIC_BYTES_LEN: usize = 12; @@ -28,10 +29,6 @@ pub struct KeychainStore { changeset_type_params: core::marker::PhantomData<(K, P)>, } -fn bincode() -> impl bincode::Options { - DefaultOptions::new().with_varint_encoding() -} - impl KeychainStore where K: Ord + Clone + core::fmt::Debug, @@ -86,10 +83,7 @@ where /// always iterate over all entries until `None` is returned if you want your next write to go /// at the end; otherwise, you will write over existing entries. pub fn iter_changesets(&mut self) -> Result>, io::Error> { - self.db_file - .seek(io::SeekFrom::Start(MAGIC_BYTES_LEN as _))?; - - Ok(EntryIter::new(&mut self.db_file)) + Ok(EntryIter::new(MAGIC_BYTES_LEN as u64, &mut self.db_file)) } /// Loads all the changesets that have been stored as one giant changeset. @@ -144,7 +138,7 @@ where return Ok(()); } - bincode() + bincode_options() .serialize_into(&mut self.db_file, changeset) .map_err(|e| match *e { bincode::ErrorKind::Io(inner) => inner, @@ -197,92 +191,6 @@ impl From for FileError { impl std::error::Error for FileError {} -/// Error type for [`EntryIter`]. -#[derive(Debug)] -pub enum IterError { - /// Failure to read from the file. - Io(io::Error), - /// Failure to decode data from the file. - Bincode(bincode::ErrorKind), -} - -impl core::fmt::Display for IterError { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - match self { - IterError::Io(e) => write!(f, "io error trying to read entry {}", e), - IterError::Bincode(e) => write!(f, "bincode error while reading entry {}", e), - } - } -} - -impl std::error::Error for IterError {} - -/// Iterator over entries in a file store. -/// -/// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the -/// iterator will yield a `Result::Err(_)` instead and then `None` for the next call to `next`. -/// -/// [`next`]: Self::next -pub struct EntryIter<'a, V> { - db_file: &'a mut File, - types: PhantomData, - error_exit: bool, -} - -impl<'a, V> EntryIter<'a, V> { - pub fn new(db_file: &'a mut File) -> Self { - Self { - db_file, - types: PhantomData, - error_exit: false, - } - } -} - -impl<'a, V> Iterator for EntryIter<'a, V> -where - V: serde::de::DeserializeOwned, -{ - type Item = Result; - - fn next(&mut self) -> Option { - let result = (|| { - let pos = self.db_file.stream_position()?; - - match bincode().deserialize_from(&mut self.db_file) { - Ok(changeset) => Ok(Some(changeset)), - Err(e) => { - if let bincode::ErrorKind::Io(inner) = &*e { - if inner.kind() == io::ErrorKind::UnexpectedEof { - let eof = self.db_file.seek(io::SeekFrom::End(0))?; - if pos == eof { - return Ok(None); - } - } - } - - self.db_file.seek(io::SeekFrom::Start(pos))?; - Err(IterError::Bincode(*e)) - } - } - })(); - - let result = result.transpose(); - - if let Some(Err(_)) = &result { - self.error_exit = true; - } - - result - } -} - -impl From for IterError { - fn from(value: io::Error) -> Self { - IterError::Io(value) - } -} - #[cfg(test)] mod test { use super::*; @@ -290,6 +198,7 @@ mod test { keychain::{DerivationAdditions, KeychainChangeSet}, TxHeight, }; + use bincode::DefaultOptions; use std::{ io::{Read, Write}, vec::Vec, diff --git a/crates/file_store/src/lib.rs b/crates/file_store/src/lib.rs index e334741947..b10c8c29ef 100644 --- a/crates/file_store/src/lib.rs +++ b/crates/file_store/src/lib.rs @@ -1,10 +1,51 @@ #![doc = include_str!("../README.md")] -mod file_store; +mod entry_iter; +mod keychain_store; +mod store; +use std::io; + use bdk_chain::{ keychain::{KeychainChangeSet, KeychainTracker, PersistBackend}, sparse_chain::ChainPosition, }; -pub use file_store::*; +use bincode::{DefaultOptions, Options}; +pub use entry_iter::*; +pub use keychain_store::*; +pub use store::*; + +pub(crate) fn bincode_options() -> impl bincode::Options { + DefaultOptions::new().with_varint_encoding() +} + +/// Error that occurs due to problems encountered with the file. +#[derive(Debug)] +pub enum FileError<'a> { + /// IO error, this may mean that the file is too short. + Io(io::Error), + /// Magic bytes do not match what is expected. + InvalidMagicBytes { got: Vec, expected: &'a [u8] }, +} + +impl<'a> core::fmt::Display for FileError<'a> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::Io(e) => write!(f, "io error trying to read file: {}", e), + Self::InvalidMagicBytes { got, expected } => write!( + f, + "file has invalid magic bytes: expected={:?} got={:?}", + expected, got, + ), + } + } +} + +impl<'a> From for FileError<'a> { + fn from(value: io::Error) -> Self { + Self::Io(value) + } +} + +impl<'a> std::error::Error for FileError<'a> {} impl PersistBackend for KeychainStore where diff --git a/crates/file_store/src/store.rs b/crates/file_store/src/store.rs new file mode 100644 index 0000000000..0485c5b49a --- /dev/null +++ b/crates/file_store/src/store.rs @@ -0,0 +1,290 @@ +use std::{ + fmt::Debug, + fs::{File, OpenOptions}, + io::{self, Read, Seek, Write}, + marker::PhantomData, + path::Path, +}; + +use bdk_chain::{Append, PersistBackend}; +use bincode::Options; + +use crate::{bincode_options, EntryIter, FileError, IterError}; + +/// Persists an append-only list of changesets (`C`) to a single file. +/// +/// The changesets are the results of altering a tracker implementation (`T`). +#[derive(Debug)] +pub struct Store<'a, C> { + magic: &'a [u8], + db_file: File, + marker: PhantomData, +} + +impl<'a, C> PersistBackend for Store<'a, C> +where + C: Default + Append + serde::Serialize + serde::de::DeserializeOwned, +{ + type WriteError = std::io::Error; + + type LoadError = IterError; + + type LoadIter<'b> = EntryIter<'b, C> where Self: 'b; + + fn write_changes(&mut self, changeset: &C) -> Result<(), Self::WriteError> { + self.append_changeset(changeset) + } + + fn load_from_persistence(&mut self) -> Self::LoadIter<'_> { + EntryIter::new(self.magic.len() as u64, &mut self.db_file) + } +} + +impl<'a, C> Store<'a, C> +where + C: Default + Append + serde::Serialize + serde::de::DeserializeOwned, +{ + /// Creates a new store from a [`File`]. + /// + /// The file must have been opened with read and write permissions. + /// + /// [`File`]: std::fs::File + pub fn new(magic: &'a [u8], mut db_file: File) -> Result { + db_file.rewind()?; + + let mut magic_buf = Vec::from_iter((0..).take(magic.len())); + db_file.read_exact(magic_buf.as_mut())?; + + if magic_buf != magic { + return Err(FileError::InvalidMagicBytes { + got: magic_buf, + expected: magic, + }); + } + + Ok(Self { + magic, + db_file, + marker: Default::default(), + }) + } + + /// Creates or loads a store from `db_path`. + /// + /// If no file exists there, it will be created. + pub fn new_from_path

(magic: &'a [u8], db_path: P) -> Result + where + P: AsRef, + { + let already_exists = db_path.as_ref().exists(); + + let mut db_file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(db_path)?; + + if !already_exists { + db_file.write_all(magic)?; + } + + Self::new(magic, db_file) + } + + /// Iterates over the stored changeset from first to last, changing the seek position at each + /// iteration. + /// + /// The iterator may fail to read an entry and therefore return an error. However, the first time + /// it returns an error will be the last. After doing so, the iterator will always yield `None`. + /// + /// **WARNING**: This method changes the write position in the underlying file. You should + /// always iterate over all entries until `None` is returned if you want your next write to go + /// at the end; otherwise, you will write over existing entries. + pub fn iter_changesets(&mut self) -> EntryIter<'_, C> { + EntryIter::new(self.magic.len() as u64, &mut self.db_file) + } + + /// Loads all the changesets that have been stored as one giant changeset. + /// + /// This function returns a tuple of the aggregate changeset and a result that indicates + /// whether an error occurred while reading or deserializing one of the entries. If so the + /// changeset will consist of all of those it was able to read. + /// + /// You should usually check the error. In many applications, it may make sense to do a full + /// wallet scan with a stop-gap after getting an error, since it is likely that one of the + /// changesets it was unable to read changed the derivation indices of the tracker. + /// + /// **WARNING**: This method changes the write position of the underlying file. The next + /// changeset will be written over the erroring entry (or the end of the file if none existed). + pub fn aggregate_changesets(&mut self) -> (C, Result<(), IterError>) { + let mut changeset = C::default(); + let result = (|| { + for next_changeset in self.iter_changesets() { + changeset.append(next_changeset?); + } + Ok(()) + })(); + + (changeset, result) + } + + /// Append a new changeset to the file and truncate the file to the end of the appended + /// changeset. + /// + /// The truncation is to avoid the possibility of having a valid but inconsistent changeset + /// directly after the appended changeset. + pub fn append_changeset(&mut self, changeset: &C) -> Result<(), io::Error> { + // no need to write anything if changeset is empty + if changeset.is_empty() { + return Ok(()); + } + + bincode_options() + .serialize_into(&mut self.db_file, changeset) + .map_err(|e| match *e { + bincode::ErrorKind::Io(inner) => inner, + unexpected_err => panic!("unexpected bincode error: {}", unexpected_err), + })?; + + // truncate file after this changeset addition + // if this is not done, data after this changeset may represent valid changesets, however + // applying those changesets on top of this one may result in an inconsistent state + let pos = self.db_file.stream_position()?; + self.db_file.set_len(pos)?; + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + use bincode::DefaultOptions; + use std::{ + io::{Read, Write}, + vec::Vec, + }; + use tempfile::NamedTempFile; + + const TEST_MAGIC_BYTES_LEN: usize = 12; + const TEST_MAGIC_BYTES: [u8; TEST_MAGIC_BYTES_LEN] = + [98, 100, 107, 102, 115, 49, 49, 49, 49, 49, 49, 49]; + + #[derive( + Debug, + Clone, + Copy, + PartialOrd, + Ord, + PartialEq, + Eq, + Hash, + serde::Serialize, + serde::Deserialize, + )] + enum TestKeychain { + External, + Internal, + } + + impl core::fmt::Display for TestKeychain { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::External => write!(f, "external"), + Self::Internal => write!(f, "internal"), + } + } + } + + #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] + struct TestChangeSet { + pub changes: Vec, + } + + impl Append for TestChangeSet { + fn append(&mut self, mut other: Self) { + self.changes.append(&mut other.changes) + } + + fn is_empty(&self) -> bool { + self.changes.is_empty() + } + } + + #[derive(Debug)] + struct TestTracker; + + #[test] + fn new_fails_if_file_is_too_short() { + let mut file = NamedTempFile::new().unwrap(); + file.write_all(&TEST_MAGIC_BYTES[..TEST_MAGIC_BYTES_LEN - 1]) + .expect("should write"); + + match Store::::new(&TEST_MAGIC_BYTES, file.reopen().unwrap()) { + Err(FileError::Io(e)) => assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof), + unexpected => panic!("unexpected result: {:?}", unexpected), + }; + } + + #[test] + fn new_fails_if_magic_bytes_are_invalid() { + let invalid_magic_bytes = "ldkfs0000000"; + + let mut file = NamedTempFile::new().unwrap(); + file.write_all(invalid_magic_bytes.as_bytes()) + .expect("should write"); + + match Store::::new(&TEST_MAGIC_BYTES, file.reopen().unwrap()) { + Err(FileError::InvalidMagicBytes { got, .. }) => { + assert_eq!(got, invalid_magic_bytes.as_bytes()) + } + unexpected => panic!("unexpected result: {:?}", unexpected), + }; + } + + #[test] + fn append_changeset_truncates_invalid_bytes() { + // initial data to write to file (magic bytes + invalid data) + let mut data = [255_u8; 2000]; + data[..TEST_MAGIC_BYTES_LEN].copy_from_slice(&TEST_MAGIC_BYTES); + + let changeset = TestChangeSet { + changes: vec!["one".into(), "two".into(), "three!".into()], + }; + + let mut file = NamedTempFile::new().unwrap(); + file.write_all(&data).expect("should write"); + + let mut store = Store::::new(&TEST_MAGIC_BYTES, file.reopen().unwrap()) + .expect("should open"); + match store.iter_changesets().next() { + Some(Err(IterError::Bincode(_))) => {} + unexpected_res => panic!("unexpected result: {:?}", unexpected_res), + } + + store.append_changeset(&changeset).expect("should append"); + + drop(store); + + let got_bytes = { + let mut buf = Vec::new(); + file.reopen() + .unwrap() + .read_to_end(&mut buf) + .expect("should read"); + buf + }; + + let expected_bytes = { + let mut buf = TEST_MAGIC_BYTES.to_vec(); + DefaultOptions::new() + .with_varint_encoding() + .serialize_into(&mut buf, &changeset) + .expect("should encode"); + buf + }; + + assert_eq!(got_bytes, expected_bytes); + } +}