Skip to content

Commit

Permalink
feat: split E2StoreStream into reader and writer (#1509)
Browse files Browse the repository at this point in the history
  • Loading branch information
morph-dev authored Oct 8, 2024
1 parent 40868b8 commit 28b7efd
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 108 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion e2store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ snap.workspace = true

[dev-dependencies]
rstest.workspace = true
tempfile.workspace = true
tokio.workspace = true
trin-utils.workspace = true
2 changes: 1 addition & 1 deletion e2store/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ TODO: Add chart of snapshot size at every million block interval.

## What is the difference between `e2store/memory.rs` and `e2store/stream.rs`

`e2store/memory.rs` provides an api to load a full e2store file such as `.era`/`.era1` and manipulate it in memory. For smaller e2store files this approach works well. The issue comes when dealing with e2store files of much greater size loading the whole file into memory at once often isn't possible. This is where `e2store/stream.rs` comes in where you can stream the data you need from a e2store file as you need it. This will be required in `.era2` a format for storing full flat state snapshots.
`e2store/memory.rs` provides an api to load a full e2store file such as `.era`/`.era1` and manipulate it in memory. For smaller e2store files this approach works well. The issue comes when dealing with e2store files of much greater size loading the whole file into memory at once often isn't possible. This is where `e2store/stream.rs` comes in where you can stream the data you need from a e2store file as you need it. This is required for `.era2` format for storing full flat state snapshots.
80 changes: 49 additions & 31 deletions e2store/src/e2store/stream.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,69 @@
use std::{
fs::File,
io::{Read, Write},
path::PathBuf,
io::{BufReader, BufWriter, Read, Write},
path::Path,
};

use super::types::{Entry, Header};

/// e2store/memory.rs was built to load full .era/.era2 files into memory and provide a simple API
/// to access the data. The issue for this is for larger files this wouldn't be feasible, as the
/// entire file would need to be loaded into memory. This is where e2store_file.rs comes in, it
/// provides a way to read and write e2store files in a streaming fashion.
pub struct E2StoreStream {
pub e2store_file: File,
/// Streaming reader for e2store files.
pub struct E2StoreStreamReader<R> {
reader: BufReader<R>,
}

impl E2StoreStream {
pub fn open(e2store_path: &PathBuf) -> anyhow::Result<Self> {
let e2store_file = File::open(e2store_path)?;
Ok(Self { e2store_file })
}

pub fn create(e2store_path: &PathBuf) -> anyhow::Result<Self> {
let e2store_file = File::create(e2store_path)?;
Ok(Self { e2store_file })
impl<R: Read> E2StoreStreamReader<R> {
pub fn new(reader: BufReader<R>) -> anyhow::Result<Self> {
Ok(Self { reader })
}

pub fn next_entry(&mut self) -> anyhow::Result<Entry> {
let mut buf = vec![0; 8];
self.e2store_file.read_exact(&mut buf)?;
self.reader.read_exact(&mut buf)?;
let header = Header::deserialize(&buf)?;
let mut value = vec![0; header.length as usize];
self.e2store_file.read_exact(&mut value)?;
self.reader.read_exact(&mut value)?;
Ok(Entry { header, value })
}
}

impl E2StoreStreamReader<File> {
pub fn open(path: &Path) -> anyhow::Result<Self> {
Self::new(BufReader::new(File::open(path)?))
}
}

/// Streaming writer for e2store files.
pub struct E2StoreStreamWriter<W: Write> {
writer: BufWriter<W>,
}

impl<W: Write> E2StoreStreamWriter<W> {
pub fn new(writer: BufWriter<W>) -> anyhow::Result<Self> {
Ok(Self { writer })
}

/// Append an entry to the e2store file.
pub fn append_entry(&mut self, entry: &Entry) -> anyhow::Result<()> {
let buf = entry.serialize()?;
self.e2store_file.write_all(&buf)?;
self.writer.write_all(&buf)?;
Ok(())
}

pub fn flush(&mut self) -> anyhow::Result<()> {
Ok(self.writer.flush()?)
}
}

impl E2StoreStreamWriter<File> {
pub fn create(path: &Path) -> anyhow::Result<Self> {
Self::new(BufWriter::new(File::create(path)?))
}
}

#[cfg(test)]
mod tests {
use rand::Rng;
use tempfile::TempDir;
use trin_utils::dir::create_temp_test_dir;

use crate::e2store::types::VersionEntry;

Expand All @@ -55,29 +73,29 @@ mod tests {
fn test_e2store_stream_write_and_read() -> anyhow::Result<()> {
// setup
let mut rng = rand::thread_rng();
let tmp_dir = TempDir::new()?;
let tmp_dir = create_temp_test_dir()?;
let random_number: u16 = rng.gen();
let tmp_path = tmp_dir
.as_ref()
.to_path_buf()
.path()
.join(format!("{}.e2store_stream_test", random_number));

// create a new e2store file and write some data to it
let mut e2store_write_stream = E2StoreStream::create(&tmp_path)?;
let mut e2store_stream_writer = E2StoreStreamWriter::create(&tmp_path)?;

let version = VersionEntry::default();
e2store_write_stream.append_entry(&version.clone().into())?;
e2store_stream_writer.append_entry(&version.clone().into())?;

let value: Vec<u8> = (0..100).map(|_| rng.gen_range(0..20)).collect();
let entry = Entry::new(0, value);
e2store_write_stream.append_entry(&entry)?;
drop(e2store_write_stream);
e2store_stream_writer.append_entry(&entry)?;
e2store_stream_writer.flush()?;
drop(e2store_stream_writer);

// read results and see if they match
let mut e2store_read_stream = E2StoreStream::open(&tmp_path)?;
let read_version_entry = VersionEntry::try_from(&e2store_read_stream.next_entry()?)?;
let mut e2store_stream_reader = E2StoreStreamReader::open(&tmp_path)?;
let read_version_entry = VersionEntry::try_from(&e2store_stream_reader.next_entry()?)?;
assert_eq!(version, read_version_entry);
let read_entry = e2store_read_stream.next_entry()?;
let read_entry = e2store_stream_reader.next_entry()?;
assert_eq!(entry, read_entry);

// cleanup
Expand Down
Loading

0 comments on commit 28b7efd

Please sign in to comment.