diff --git a/Cargo.lock b/Cargo.lock index c14763dff..3ab3e6750 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1938,8 +1938,8 @@ dependencies = [ "rstest", "scraper", "snap", - "tempfile", "tokio", + "trin-utils", ] [[package]] diff --git a/e2store/Cargo.toml b/e2store/Cargo.toml index 1b8f1bad3..d1f69b85a 100644 --- a/e2store/Cargo.toml +++ b/e2store/Cargo.toml @@ -25,5 +25,5 @@ snap.workspace = true [dev-dependencies] rstest.workspace = true -tempfile.workspace = true tokio.workspace = true +trin-utils.workspace = true diff --git a/e2store/README.md b/e2store/README.md index 3a680a403..c260b0ba4 100644 --- a/e2store/README.md +++ b/e2store/README.md @@ -23,4 +23,4 @@ TODO: Add chart of snapshot size at every million block interval. ## What is the difference between `e2store/memory.rs` and `e2store/stream.rs` -`e2store/memory.rs` provides an api to load a full e2store file such as `.era`/`.era1` and manipulate it in memory. For smaller e2store files this approach works well. The issue comes when dealing with e2store files of much greater size loading the whole file into memory at once often isn't possible. This is where `e2store/stream.rs` comes in where you can stream the data you need from a e2store file as you need it. This will be required in `.era2` a format for storing full flat state snapshots. +`e2store/memory.rs` provides an api to load a full e2store file such as `.era`/`.era1` and manipulate it in memory. For smaller e2store files this approach works well. The issue comes when dealing with e2store files of much greater size loading the whole file into memory at once often isn't possible. This is where `e2store/stream.rs` comes in where you can stream the data you need from a e2store file as you need it. This is required for `.era2` format for storing full flat state snapshots. diff --git a/e2store/src/e2store/stream.rs b/e2store/src/e2store/stream.rs index 446667e8e..c3d6dd6dd 100644 --- a/e2store/src/e2store/stream.rs +++ b/e2store/src/e2store/stream.rs @@ -1,51 +1,69 @@ use std::{ fs::File, - io::{Read, Write}, - path::PathBuf, + io::{BufReader, BufWriter, Read, Write}, + path::Path, }; use super::types::{Entry, Header}; -/// e2store/memory.rs was built to load full .era/.era2 files into memory and provide a simple API -/// to access the data. The issue for this is for larger files this wouldn't be feasible, as the -/// entire file would need to be loaded into memory. This is where e2store_file.rs comes in, it -/// provides a way to read and write e2store files in a streaming fashion. -pub struct E2StoreStream { - pub e2store_file: File, +/// Streaming reader for e2store files. +pub struct E2StoreStreamReader { + reader: BufReader, } -impl E2StoreStream { - pub fn open(e2store_path: &PathBuf) -> anyhow::Result { - let e2store_file = File::open(e2store_path)?; - Ok(Self { e2store_file }) - } - - pub fn create(e2store_path: &PathBuf) -> anyhow::Result { - let e2store_file = File::create(e2store_path)?; - Ok(Self { e2store_file }) +impl E2StoreStreamReader { + pub fn new(reader: BufReader) -> anyhow::Result { + Ok(Self { reader }) } pub fn next_entry(&mut self) -> anyhow::Result { let mut buf = vec![0; 8]; - self.e2store_file.read_exact(&mut buf)?; + self.reader.read_exact(&mut buf)?; let header = Header::deserialize(&buf)?; let mut value = vec![0; header.length as usize]; - self.e2store_file.read_exact(&mut value)?; + self.reader.read_exact(&mut value)?; Ok(Entry { header, value }) } +} + +impl E2StoreStreamReader { + pub fn open(path: &Path) -> anyhow::Result { + Self::new(BufReader::new(File::open(path)?)) + } +} + +/// Streaming writer for e2store files. +pub struct E2StoreStreamWriter { + writer: BufWriter, +} + +impl E2StoreStreamWriter { + pub fn new(writer: BufWriter) -> anyhow::Result { + Ok(Self { writer }) + } /// Append an entry to the e2store file. pub fn append_entry(&mut self, entry: &Entry) -> anyhow::Result<()> { let buf = entry.serialize()?; - self.e2store_file.write_all(&buf)?; + self.writer.write_all(&buf)?; Ok(()) } + + pub fn flush(&mut self) -> anyhow::Result<()> { + Ok(self.writer.flush()?) + } +} + +impl E2StoreStreamWriter { + pub fn create(path: &Path) -> anyhow::Result { + Self::new(BufWriter::new(File::create(path)?)) + } } #[cfg(test)] mod tests { use rand::Rng; - use tempfile::TempDir; + use trin_utils::dir::create_temp_test_dir; use crate::e2store::types::VersionEntry; @@ -55,29 +73,29 @@ mod tests { fn test_e2store_stream_write_and_read() -> anyhow::Result<()> { // setup let mut rng = rand::thread_rng(); - let tmp_dir = TempDir::new()?; + let tmp_dir = create_temp_test_dir()?; let random_number: u16 = rng.gen(); let tmp_path = tmp_dir - .as_ref() - .to_path_buf() + .path() .join(format!("{}.e2store_stream_test", random_number)); // create a new e2store file and write some data to it - let mut e2store_write_stream = E2StoreStream::create(&tmp_path)?; + let mut e2store_stream_writer = E2StoreStreamWriter::create(&tmp_path)?; let version = VersionEntry::default(); - e2store_write_stream.append_entry(&version.clone().into())?; + e2store_stream_writer.append_entry(&version.clone().into())?; let value: Vec = (0..100).map(|_| rng.gen_range(0..20)).collect(); let entry = Entry::new(0, value); - e2store_write_stream.append_entry(&entry)?; - drop(e2store_write_stream); + e2store_stream_writer.append_entry(&entry)?; + e2store_stream_writer.flush()?; + drop(e2store_stream_writer); // read results and see if they match - let mut e2store_read_stream = E2StoreStream::open(&tmp_path)?; - let read_version_entry = VersionEntry::try_from(&e2store_read_stream.next_entry()?)?; + let mut e2store_stream_reader = E2StoreStreamReader::open(&tmp_path)?; + let read_version_entry = VersionEntry::try_from(&e2store_stream_reader.next_entry()?)?; assert_eq!(version, read_version_entry); - let read_entry = e2store_read_stream.next_entry()?; + let read_entry = e2store_stream_reader.next_entry()?; assert_eq!(entry, read_entry); // cleanup diff --git a/e2store/src/era2.rs b/e2store/src/era2.rs index 77f82afdd..64ecf03f6 100644 --- a/e2store/src/era2.rs +++ b/e2store/src/era2.rs @@ -1,5 +1,32 @@ +//! The format for storing full flat state snapshots. +//! +//! Filename: +//! +//! ```text +//! --.era2 +//! ``` +//! +//! Type definitions: +//! +//! ```text +//! era2 := Version | CompressedHeader | account* +//! account := CompressedAccount | CompressedStorage* +//! +//! Version = { type: 0x3265, data: nil } +//! CompressedHeader = { type: 0x03, data: snappyFramed(rlp(header)) } +//! CompressedAccount = { type: 0x08, data: snappyFramed(rlp(Account)) } +//! CompressedStorage = { type: 0x09, data: snappyFramed(rlp(Vec)) } +//! +//! Account = { address_hash, AccountState, raw_bytecode, storage_entry_count } +//! AccountState = { nonce, balance, storage_root, code_hash } +//! StorageItem = { storage_index_hash, value } +//! ``` +//! +//! CompressedStorage can have a max of 10 million storage items, records must be filled before +//! creating a new one, and must be sorted by storage_index_hash across all entries. + use std::{ - fs, + fs::{self, File}, io::{ErrorKind, Read, Write}, ops::Deref, path::{Path, PathBuf}, @@ -12,7 +39,7 @@ use ethportal_api::types::{execution::header::Header, state_trie::account_state: use crate::{ e2store::{ - stream::E2StoreStream, + stream::{E2StoreStreamReader, E2StoreStreamWriter}, types::{Entry, VersionEntry}, }, types::HeaderEntry, @@ -21,52 +48,22 @@ use crate::{ pub const MAX_STORAGE_ITEMS: usize = 10_000_000; -// --.era2 -// -// era2 := Version | CompressedHeader | account* -// account := CompressedAccount | CompressedStorage* -// ----- -// Version = { type: 0x3265, data: nil } -// CompressedHeader = { type: 0x03, data: snappyFramed(rlp(header)) } -// CompressedAccount = { type: 0x08, data: snappyFramed(rlp(address_hash, rlp(nonce, -// balance, storage_root, code_hash), raw_bytecode, storage_entry_count)) } -// CompressedStorage = { type: 0x09, data: snappyFramed(rlp(Vec)) } -// ----- -// CompressedStorage can have a max of 10 million storage items, records must be filled before -// creating a new one, and must be sorted by storage_index_hash across all entries. - -/// Represents an era2 `Era2` state snapshot. -/// Unlike era1, not all fields will be stored in the struct, account's will be streamed from an -/// iterator as needed. -pub struct Era2 { +/// The `Era2` streaming writer. +/// +/// Unlike [crate::era::Era] and [crate::era1::Era1], the `Era2` files are too big to be held in +/// memory. +pub struct Era2Writer { pub version: VersionEntry, pub header: HeaderEntry, - /// e2store_stream, manages the interactions between the era2 state snapshot - e2store_stream: E2StoreStream, + writer: E2StoreStreamWriter, pending_storage_entries: u32, path: PathBuf, } -impl Era2 { - pub fn open(path: PathBuf) -> anyhow::Result { - let mut e2store_stream = E2StoreStream::open(&path)?; - - let version = VersionEntry::try_from(&e2store_stream.next_entry()?)?; - let header = HeaderEntry::try_from(&e2store_stream.next_entry()?)?; - - Ok(Self { - version, - header, - e2store_stream, - pending_storage_entries: 0, - path, - }) - } - - pub fn create(path: PathBuf, header: Header) -> anyhow::Result { - fs::create_dir_all(&path)?; +impl Era2Writer { + pub fn create(path: &Path, header: Header) -> anyhow::Result { + fs::create_dir_all(path)?; ensure!(path.is_dir(), "era2 path is not a directory: {:?}", path); let path = path.join(format!( "mainnet-{:010}-{}.era2", @@ -74,18 +71,18 @@ impl Era2 { hex::encode(&header.state_root.as_slice()[..4]) )); ensure!(!path.exists(), "era2 file already exists: {:?}", path); - let mut e2store_stream = E2StoreStream::create(&path)?; + let mut writer = E2StoreStreamWriter::create(&path)?; let version = VersionEntry::default(); - e2store_stream.append_entry(&version.clone().into())?; + writer.append_entry(&version.clone().into())?; let header = HeaderEntry { header }; - e2store_stream.append_entry(&header.clone().try_into()?)?; + writer.append_entry(&header.clone().try_into()?)?; Ok(Self { version, header, - e2store_stream, + writer, pending_storage_entries: 0, path, }) @@ -105,7 +102,7 @@ impl Era2 { self.pending_storage_entries = account.storage_count; let entry: Entry = account.clone().try_into()?; - self.e2store_stream.append_entry(&entry)?; + self.writer.append_entry(&entry)?; entry.value.len() } AccountOrStorageEntry::Storage(storage) => { @@ -123,22 +120,60 @@ impl Era2 { self.pending_storage_entries -= 1; let entry: Entry = storage.clone().try_into()?; - self.e2store_stream.append_entry(&entry)?; + self.writer.append_entry(&entry)?; entry.value.len() } }; Ok(size) } + + pub fn flush(&mut self) -> anyhow::Result<()> { + self.writer.flush() + } +} + +/// The `Era2` streaming reader. +/// +/// Unlike [crate::era::Era] and [crate::era1::Era1], the `Era2` files are too big to be held in +/// memory. +pub struct Era2Reader { + pub version: VersionEntry, + pub header: HeaderEntry, + + reader: E2StoreStreamReader, + pending_storage_entries: u32, + path: PathBuf, +} + +impl Era2Reader { + pub fn open(path: &Path) -> anyhow::Result { + let mut reader = E2StoreStreamReader::open(path)?; + + let version = VersionEntry::try_from(&reader.next_entry()?)?; + let header = HeaderEntry::try_from(&reader.next_entry()?)?; + + Ok(Self { + version, + header, + reader, + pending_storage_entries: 0, + path: path.to_path_buf(), + }) + } + + pub fn path(&self) -> &Path { + self.path.as_path() + } } -impl Iterator for Era2 { +impl Iterator for Era2Reader { type Item = AccountOrStorageEntry; fn next(&mut self) -> Option { if self.pending_storage_entries > 0 { self.pending_storage_entries -= 1; - let raw_storage_entry = match self.e2store_stream.next_entry() { + let raw_storage_entry = match self.reader.next_entry() { Ok(raw_storage_entry) => raw_storage_entry, Err(err) => panic!("Failed to read next storage entry: {:?}", err), }; @@ -150,7 +185,7 @@ impl Iterator for Era2 { return Some(AccountOrStorageEntry::Storage(storage_entry)); } - let raw_account_entry = match self.e2store_stream.next_entry() { + let raw_account_entry = match self.reader.next_entry() { Ok(raw_account_entry) => raw_account_entry, Err(err) => match err { // If we read to the end of the error file we should get this @@ -280,7 +315,7 @@ impl TryFrom for Entry { #[cfg(test)] mod tests { use alloy_primitives::{Address, Bloom, B64}; - use tempfile::TempDir; + use trin_utils::dir::create_temp_test_dir; use crate::e2store::types::VersionEntry; @@ -289,8 +324,7 @@ mod tests { #[test] fn test_era2_stream_write_and_read() -> anyhow::Result<()> { // setup - let tmp_dir = TempDir::new()?; - let tmp_path = tmp_dir.as_ref().to_path_buf(); + let tmp_dir = create_temp_test_dir()?; // create fake execution block header let header = Header { @@ -317,14 +351,14 @@ mod tests { }; // create a new e2store file and write some data to it - let mut era2_write_file = Era2::create(tmp_path.clone(), header.clone())?; + let mut era2_writer = Era2Writer::create(tmp_dir.path(), header.clone())?; - let tmp_path = tmp_path.join(format!( + let era2_path = tmp_dir.path().join(format!( "mainnet-{:010}-{}.era2", header.number, hex::encode(&header.state_root.as_slice()[..4]) )); - assert_eq!(era2_write_file.path(), tmp_path.as_path()); + assert_eq!(era2_writer.path(), era2_path); let account = AccountOrStorageEntry::Account(AccountEntry { address_hash: B256::default(), @@ -333,35 +367,37 @@ mod tests { storage_count: 1, }); - assert_eq!(era2_write_file.pending_storage_entries, 0); - let size = era2_write_file.append_entry(&account)?; + assert_eq!(era2_writer.pending_storage_entries, 0); + let size = era2_writer.append_entry(&account)?; assert_eq!(size, 101); - assert_eq!(era2_write_file.pending_storage_entries, 1); + assert_eq!(era2_writer.pending_storage_entries, 1); let storage = AccountOrStorageEntry::Storage(StorageEntry(vec![StorageItem { storage_index_hash: B256::default(), value: U256::default(), }])); - let size = era2_write_file.append_entry(&storage)?; + let size = era2_writer.append_entry(&storage)?; assert_eq!(size, 29); - assert_eq!(era2_write_file.pending_storage_entries, 0); + assert_eq!(era2_writer.pending_storage_entries, 0); + era2_writer.flush()?; + drop(era2_writer); // read results and see if they match - let mut era2_read_file = Era2::open(tmp_path.clone())?; - assert_eq!(era2_read_file.path(), tmp_path.as_path()); + let mut era2_reader = Era2Reader::open(&era2_path)?; + assert_eq!(era2_reader.path(), &era2_path); let default_version_entry = VersionEntry::default(); - assert_eq!(era2_read_file.version, default_version_entry); - assert_eq!(era2_read_file.header, HeaderEntry { header }); - assert_eq!(era2_read_file.pending_storage_entries, 0); - let read_account_tuple = era2_read_file.next().unwrap(); + assert_eq!(era2_reader.version, default_version_entry); + assert_eq!(era2_reader.header, HeaderEntry { header }); + assert_eq!(era2_reader.pending_storage_entries, 0); + let read_account_tuple = era2_reader.next().unwrap(); assert_eq!(account, read_account_tuple); - assert_eq!(era2_read_file.pending_storage_entries, 1); + assert_eq!(era2_reader.pending_storage_entries, 1); - let read_storage_tuple = era2_read_file.next().unwrap(); + let read_storage_tuple = era2_reader.next().unwrap(); assert_eq!(storage, read_storage_tuple); - assert_eq!(era2_read_file.pending_storage_entries, 0); + assert_eq!(era2_reader.pending_storage_entries, 0); // cleanup tmp_dir.close()?; diff --git a/trin-execution/src/subcommands/era2/export.rs b/trin-execution/src/subcommands/era2/export.rs index 7ddffb3ba..c7d1521a3 100644 --- a/trin-execution/src/subcommands/era2/export.rs +++ b/trin-execution/src/subcommands/era2/export.rs @@ -7,7 +7,7 @@ use alloy_consensus::EMPTY_ROOT_HASH; use alloy_rlp::Decodable; use anyhow::ensure; use e2store::era2::{ - AccountEntry, AccountOrStorageEntry, Era2, StorageEntry, StorageItem, MAX_STORAGE_ITEMS, + AccountEntry, AccountOrStorageEntry, Era2Writer, StorageEntry, StorageItem, MAX_STORAGE_ITEMS, }; use eth_trie::{EthTrie, Trie}; use ethportal_api::{types::state_trie::account_state::AccountState, Header}; @@ -69,7 +69,7 @@ impl StateExporter { "Exporting state from block number: {} with state root: {}", self.header.number, self.header.state_root ); - let mut era2 = Era2::create(self.config.path_to_era2.clone(), self.header.clone())?; + let mut era2 = Era2Writer::create(&self.config.path_to_era2, self.header.clone())?; info!("Era2 initiated"); info!("Trie leaf iterator initiated"); let mut accounts_exported = 0; @@ -128,6 +128,8 @@ impl StateExporter { } } + era2.flush()?; + info!("Era2 snapshot exported"); Ok(era2.path().to_path_buf()) diff --git a/trin-execution/src/subcommands/era2/import.rs b/trin-execution/src/subcommands/era2/import.rs index 269916069..564d3653f 100644 --- a/trin-execution/src/subcommands/era2/import.rs +++ b/trin-execution/src/subcommands/era2/import.rs @@ -1,7 +1,7 @@ use std::{path::Path, sync::Arc}; use anyhow::{ensure, Error}; -use e2store::era2::{AccountEntry, AccountOrStorageEntry, Era2, StorageItem}; +use e2store::era2::{AccountEntry, AccountOrStorageEntry, Era2Reader, StorageItem}; use eth_trie::{EthTrie, Trie}; use ethportal_api::Header; use revm_primitives::{keccak256, B256, U256}; @@ -56,7 +56,7 @@ impl StateImporter { pub fn import_state(&self) -> anyhow::Result
{ info!("Importing state from .era2 file"); - let mut era2 = Era2::open(self.config.path_to_era2.clone())?; + let mut era2 = Era2Reader::open(&self.config.path_to_era2)?; info!("Era2 reader initiated"); let mut accounts_imported = 0; while let Some(account) = era2.next() {