Skip to content

Commit

Permalink
[persist_redesign] Introduce redesigned persist types
Browse files Browse the repository at this point in the history
This is a more generic version of `keychain::persist::*` structures.

Additional changes:

* The `Append` trait has a new method `is_empty`.
* Introduce `Store` structure for `bdk_file_store`.
  • Loading branch information
evanlinjin committed May 9, 2023
1 parent e3c1370 commit a427a6d
Show file tree
Hide file tree
Showing 10 changed files with 590 additions and 99 deletions.
4 changes: 4 additions & 0 deletions crates/chain/src/indexed_tx_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,10 @@ impl<A: Anchor, IA: Append> Append for IndexedAdditions<A, IA> {
self.graph_additions.append(other.graph_additions);
self.index_additions.append(other.index_additions);
}

fn is_empty(&self) -> bool {
self.graph_additions.is_empty() && self.index_additions.is_empty()
}
}

/// Represents a structure that can index transaction data.
Expand Down
4 changes: 4 additions & 0 deletions crates/chain/src/keychain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ impl<K: Ord> Append for DerivationAdditions<K> {

self.0.append(&mut other.0);
}

fn is_empty(&self) -> bool {
self.0.is_empty()
}
}

impl<K> Default for DerivationAdditions<K> {
Expand Down
2 changes: 2 additions & 0 deletions crates/chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub mod tx_graph;
pub use tx_data_traits::*;
mod chain_oracle;
pub use chain_oracle::*;
mod persist;
pub use persist::*;

#[doc(hidden)]
pub mod example_utils;
Expand Down
113 changes: 113 additions & 0 deletions crates/chain/src/persist.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use core::convert::Infallible;

use crate::Append;

/// `Persist` wraps a [`PersistBackend`] (`B`) to create a convenient staging area for changes (`C`)
/// before they are persisted.
///
/// Not all changes to the in-memory representation needs to be written to disk right away, so
/// [`Persist::stage`] can be used to *stage* changes first and then [`Persist::commit`] can be used
/// to write changes to disk.
#[derive(Debug)]
pub struct Persist<B, C> {
backend: B,
stage: C,
}

impl<B, C> Persist<B, C>
where
B: PersistBackend<C>,
C: Default + Append,
{
/// Create a new [`Persist`] from [`PersistBackend`].
pub fn new(backend: B) -> Self {
Self {
backend,
stage: Default::default(),
}
}

/// Stage a `changeset` to be commited later with [`commit`].
///
/// [`commit`]: Self::commit
pub fn stage(&mut self, changeset: C) {
self.stage.append(changeset)
}

/// Get the changes that have not been commited yet.
pub fn staged(&self) -> &C {
&self.stage
}

/// Commit the staged changes to the underlying persistance backend.
///
/// Returns a backend-defined error if this fails.
pub fn commit(&mut self) -> Result<(), B::WriteError> {
let mut temp = C::default();
core::mem::swap(&mut temp, &mut self.stage);
self.backend.write_changes(&temp)
}
}

/// A persistence backend for [`Persist`].
///
/// `C` represents the changeset; a datatype that records changes made to in-memory data structures
/// that are to be persisted, or retrieved from persistence.
pub trait PersistBackend<C> {
/// The error the backend returns when it fails to write.
type WriteError: core::fmt::Debug;

/// The error the backend returns when it fails to load changesets `C`.
type LoadError: core::fmt::Debug;

/// An iterator of changesets loaded from the persistence.
///
/// The lifetime bound is to ensure `Self` must outlive `LoadIter`.
type LoadIter<'a>: Iterator<Item = Result<C, Self::LoadError>>
where
Self: 'a;

/// Writes a changeset to the persistence backend.
///
/// It is up to the backend what it does with this. It could store every changeset in a list or
/// it inserts the actual changes into a more structured database. All it needs to guarantee is
/// that [`load_from_persistence`] restores a keychain tracker to what it should be if all
/// changesets had been applied sequentially.
///
/// [`load_from_persistence`]: Self::load_from_persistence
fn write_changes(&mut self, changeset: &C) -> Result<(), Self::WriteError>;

/// Iterates over changesets `C` from persistence.
fn load_from_persistence(&mut self) -> Self::LoadIter<'_>;

/// Return the aggregate changeset `C` from persistence.
fn load_all_from_persistence(&mut self) -> Result<C, Self::LoadError>
where
C: Default + Append,
{
self.load_from_persistence()
.try_fold(C::default(), |mut acc, r| {
acc.append(r?);
Ok(acc)
})
}
}

/// A persistence backend that does nothing.
pub struct EmptyBackend<C>(core::iter::Empty<Result<C, Infallible>>);

impl<C> PersistBackend<C> for EmptyBackend<C> {
type WriteError = Infallible;

type LoadError = Infallible;

type LoadIter<'a> = core::iter::Empty<Result<C, Infallible>> where Self: 'a;

fn write_changes(&mut self, _changeset: &C) -> Result<(), Self::WriteError> {
Ok(())
}

fn load_from_persistence(&mut self) -> Self::LoadIter<'_> {
self.0.clone()
}
}
15 changes: 15 additions & 0 deletions crates/chain/src/tx_data_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,35 @@ impl<A: Anchor> Anchor for &'static A {
pub trait Append {
/// Append another object of the same type onto `self`.
fn append(&mut self, other: Self);

/// Returns whether the structure is considered empty.
fn is_empty(&self) -> bool;
}

impl Append for () {
fn append(&mut self, _other: Self) {}

fn is_empty(&self) -> bool {
true
}
}

impl<K: Ord, V> Append for BTreeMap<K, V> {
fn append(&mut self, mut other: Self) {
BTreeMap::append(self, &mut other)
}

fn is_empty(&self) -> bool {
BTreeMap::is_empty(self)
}
}

impl<T: Ord> Append for BTreeSet<T> {
fn append(&mut self, mut other: Self) {
BTreeSet::append(self, &mut other)
}

fn is_empty(&self) -> bool {
BTreeSet::is_empty(self)
}
}
7 changes: 7 additions & 0 deletions crates/chain/src/tx_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,13 @@ impl<A: Ord> Append for Additions<A> {
.collect::<Vec<_>>(),
);
}

fn is_empty(&self) -> bool {
self.tx.is_empty()
&& self.txout.is_empty()
&& self.anchors.is_empty()
&& self.last_seen.is_empty()
}
}

impl<A> AsRef<TxGraph<A>> for TxGraph<A> {
Expand Down
106 changes: 106 additions & 0 deletions crates/file_store/src/entry_iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use bincode::Options;
use std::{
fs::File,
io::{self, Seek},
marker::PhantomData,
};

use crate::bincode_options;

/// Iterator over entries in a file store.
///
/// Reads and returns an entry each time [`next`] is called. If an error occurs while reading the
/// iterator will yield a `Result::Err(_)` instead and then `None` for the next call to `next`.
///
/// [`next`]: Self::next
pub struct EntryIter<'a, V> {
db_file: &'a mut File,
types: PhantomData<V>,
start_pos: Option<u64>,
error_exit: bool,
}

impl<'a, V> EntryIter<'a, V> {
pub fn new(start_pos: u64, db_file: &'a mut File) -> Self {
Self {
db_file,
types: PhantomData,
start_pos: Some(start_pos),
error_exit: false,
}
}
}

impl<'a, V> Iterator for EntryIter<'a, V>
where
V: serde::de::DeserializeOwned,
{
type Item = Result<V, IterError>;

fn next(&mut self) -> Option<Self::Item> {
if self.error_exit {
return None;
}

if let Some(start_pos) = self.start_pos.take() {
if let Err(err) = self.db_file.seek(io::SeekFrom::Start(start_pos)) {
return Some(Err(err.into()));
}
}

let result = (|| {
let pos = self.db_file.stream_position()?;

match bincode_options().deserialize_from(&mut self.db_file) {
Ok(changeset) => Ok(Some(changeset)),
Err(e) => {
if let bincode::ErrorKind::Io(inner) = &*e {
if inner.kind() == io::ErrorKind::UnexpectedEof {
let eof = self.db_file.seek(io::SeekFrom::End(0))?;
if pos == eof {
return Ok(None);
}
}
}

self.db_file.seek(io::SeekFrom::Start(pos))?;
Err(IterError::Bincode(*e))
}
}
})();

let result = result.transpose();

if let Some(Err(_)) = &result {
self.error_exit = true;
}

result
}
}

impl From<io::Error> for IterError {
fn from(value: io::Error) -> Self {
IterError::Io(value)
}
}

/// Error type for [`EntryIter`].
#[derive(Debug)]
pub enum IterError {
/// Failure to read from the file.
Io(io::Error),
/// Failure to decode data from the file.
Bincode(bincode::ErrorKind),
}

impl core::fmt::Display for IterError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
IterError::Io(e) => write!(f, "io error trying to read entry {}", e),
IterError::Bincode(e) => write!(f, "bincode error while reading entry {}", e),
}
}
}

impl std::error::Error for IterError {}
Loading

0 comments on commit a427a6d

Please sign in to comment.