Skip to content

Commit

Permalink
feat: implement .era2 (state snapshots) + import/export from Trin Exe…
Browse files Browse the repository at this point in the history
…cution
  • Loading branch information
KolbyML committed Jul 22, 2024
1 parent 3b6a15e commit a39ec04
Show file tree
Hide file tree
Showing 18 changed files with 893 additions and 184 deletions.
12 changes: 11 additions & 1 deletion e2store/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ E2store is a format originally developed by Nimbus as a framework for building o
## What is era?
era is a format for storing beacon chain data more information can be found here https://github.com/status-im/nimbus-eth2/blob/stable/docs/e2store.md#era-files

## What is era1
## What is era1?

era1 is a format for storing all of Ethereum's post merge blocks. It contains block headers, block bodies, and receipts for pre-merge block history which ranges block 0-15537394

## What is era2?

era2 is a format made to store full flat state snapshots, one of our first uses of this will be using to bootstrap Portal State Network bridges. Unlike `.era`/`.era1` era2 files will only store 1 block's worth of state per file. The reason for this choice is a snapshot of the state is quite large.

TODO: Add chart of snapshot size at every million block interval.

## What is the difference between `e2store/memory.rs` and `e2store/stream.rs`

`e2store/memory.rs` provides an api to load a full e2store file such as `.era`/`.era1` and manipulate it in memory. For smaller e2store files this approach works well. The issue comes when dealing with e2store files of much greater size loading the whole file into memory at once often isn't possible. This is where `e2store/stream.rs` comes in where you can stream the data you need from a e2store file as you need it. This will be required in `.era2` a format for storing full flat state snapshots.
122 changes: 7 additions & 115 deletions e2store/src/e2s.rs → e2store/src/e2store/memory.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
use anyhow::anyhow;
use ssz_derive::{Decode, Encode};

const _SLOTS_PER_HISTORICAL_ROOT: usize = 8192;
const HEADER_SIZE: u16 = 8;
const VALUE_SIZE_LIMIT: usize = 1024 * 1024 * 50; // 50 MB
use super::types::{Entry, HEADER_SIZE};

pub struct E2StoreFile {
pub struct E2StoreMemory {
pub entries: Vec<Entry>,
}

impl TryFrom<E2StoreFile> for Vec<u8> {
impl TryFrom<E2StoreMemory> for Vec<u8> {
type Error = anyhow::Error;
fn try_from(e2store_file: E2StoreFile) -> Result<Vec<u8>, Self::Error> {
fn try_from(e2store_file: E2StoreMemory) -> Result<Vec<u8>, Self::Error> {
e2store_file.serialize()
}
}

#[allow(dead_code)]
impl E2StoreFile {
impl E2StoreMemory {
/// Serialize to a byte vector.
fn serialize(&self) -> anyhow::Result<Vec<u8>> {
let length = self.entries.iter().map(|e| e.length() as u32).sum::<u32>() as usize;
Expand Down Expand Up @@ -68,111 +65,6 @@ impl E2StoreFile {
}
}

/// Represents an e2store `Entry`
#[derive(Default, Debug, Eq, PartialEq, Clone)]
pub struct Entry {
pub header: Header,
pub value: Vec<u8>,
}

#[allow(dead_code)]
impl Entry {
pub fn new(type_: u16, value: Vec<u8>) -> Self {
Self {
header: Header {
type_,
length: value.len() as u32,
reserved: 0,
},
value,
}
}

pub fn length(&self) -> usize {
HEADER_SIZE as usize + self.header.length as usize
}

/// Serialize to a byte vector.
fn serialize(&self) -> anyhow::Result<Vec<u8>> {
let length = self.length();
let mut buf = vec![0; length];
self.write(&mut buf)?;
Ok(buf)
}

/// Write to a byte slice.
fn write(&self, buf: &mut [u8]) -> anyhow::Result<()> {
if self.length() != buf.len() {
return Err(anyhow!(
"found invalid buf length for entry: {} - expected {}",
buf.len(),
self.length()
));
}
if self.length() > VALUE_SIZE_LIMIT {
return Err(anyhow!(
"entry value size limit exceeded: {} - {}",
self.length(),
VALUE_SIZE_LIMIT
));
}
self.header.write(buf);
buf[8..].copy_from_slice(&self.value);
Ok(())
}

/// Deserialize from a byte slice.
pub fn deserialize(bytes: &[u8]) -> anyhow::Result<Self> {
let header = Header::deserialize(&bytes[0..8])?;
if header.length as usize + HEADER_SIZE as usize != bytes.len() {
return Err(anyhow!(
"found invalid buf length for entry: {} - expected {}",
bytes.len(),
header.length as usize + HEADER_SIZE as usize
));
}
Ok(Self {
header: Header::deserialize(&bytes[0..8])?,
value: bytes[8..].to_vec(),
})
}
}

/// Represents the header of an e2store `Entry`
#[derive(Clone, Debug, Decode, Encode, Default, Eq, PartialEq)]
pub struct Header {
pub type_: u16,
pub length: u32,
pub reserved: u16,
}

impl Header {
/// Write to a byte slice.
fn write(&self, buf: &mut [u8]) {
buf[0..2].copy_from_slice(&self.type_.to_le_bytes());
buf[2..6].copy_from_slice(&self.length.to_le_bytes());
buf[6..8].copy_from_slice(&self.reserved.to_le_bytes());
}

/// Deserialize from a byte slice.
fn deserialize(bytes: &[u8]) -> anyhow::Result<Self> {
if bytes.len() != HEADER_SIZE as usize {
return Err(anyhow!("invalid header size: {}", bytes.len()));
}
let type_ = u16::from_le_bytes([bytes[0], bytes[1]]);
let length = u32::from_le_bytes([bytes[2], bytes[3], bytes[4], bytes[5]]);
let reserved = u16::from_le_bytes([bytes[6], bytes[7]]);
if reserved != 0 {
return Err(anyhow!("invalid reserved value: {} - expected 0", reserved));
}
Ok(Self {
type_,
length,
reserved,
})
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down Expand Up @@ -207,7 +99,7 @@ mod test {
#[test]
fn test_entry_multiple() {
let expected = "0x2a00020000000000beef0900040000000000abcdabcd";
let file = E2StoreFile::deserialize(&hex_decode(expected).unwrap()).unwrap();
let file = E2StoreMemory::deserialize(&hex_decode(expected).unwrap()).unwrap();
assert_eq!(file.entries.len(), 2);
assert_eq!(file.entries[0].header.type_, 0x2a); // 42
assert_eq!(file.entries[0].header.length, 2);
Expand All @@ -226,6 +118,6 @@ mod test {
#[case("0xbeef010000000000")] // length exceeds buffer
fn test_entry_invalid_decoding(#[case] input: &str) {
let buf = hex_decode(input).unwrap();
assert!(E2StoreFile::deserialize(&buf).is_err());
assert!(E2StoreMemory::deserialize(&buf).is_err());
}
}
3 changes: 3 additions & 0 deletions e2store/src/e2store/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod memory;
pub mod stream;
pub mod types;
37 changes: 37 additions & 0 deletions e2store/src/e2store/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::{
fs::File,
io::{Read, Write},
path::PathBuf,
};

use super::types::{Entry, Header};

/// e2s.rs was built to load full .era/.era2 files into memory and provide a simple API to access the data.
/// The issue for this is for larger files this wouldn't be feasible, as the entire file would need to be loaded into memory.
/// This is where e2store_file.rs comes in, it provides a way to read and write e2store files in a streaming fashion.
pub struct E2StoreStream {
pub e2store_file: File,
}

impl E2StoreStream {
pub fn new(e2store_path: &PathBuf) -> anyhow::Result<Self> {
let e2store_file = File::open(e2store_path)?;
Ok(Self { e2store_file })
}

pub fn next_entry(&mut self) -> anyhow::Result<Entry> {
let mut buf = vec![0; 8];
self.e2store_file.read_exact(&mut buf)?;
let header = Header::deserialize(&buf)?;
let mut value = vec![0; header.length as usize];
self.e2store_file.read_exact(&mut value)?;
Ok(Entry { header, value })
}

/// Append an entry to the e2store file.
pub fn append_entry(&mut self, entry: &Entry) -> anyhow::Result<()> {
let buf = entry.serialize()?;
self.e2store_file.write_all(&buf)?;
Ok(())
}
}
149 changes: 149 additions & 0 deletions e2store/src/e2store/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use anyhow::{anyhow, ensure};
use ssz_derive::{Decode, Encode};

pub const HEADER_SIZE: u16 = 8;
const VALUE_SIZE_LIMIT: usize = 1024 * 1024 * 50; // 50 MB

/// Represents an e2store `Entry`
#[derive(Default, Debug, Eq, PartialEq, Clone)]
pub struct Entry {
pub header: Header,
pub value: Vec<u8>,
}

#[allow(dead_code)]
impl Entry {
pub fn new(type_: u16, value: Vec<u8>) -> Self {
Self {
header: Header {
type_,
length: value.len() as u32,
reserved: 0,
},
value,
}
}

pub fn length(&self) -> usize {
HEADER_SIZE as usize + self.header.length as usize
}

/// Serialize to a byte vector.
pub fn serialize(&self) -> anyhow::Result<Vec<u8>> {
let length = self.length();
let mut buf = vec![0; length];
self.write(&mut buf)?;
Ok(buf)
}

/// Write to a byte slice.
pub fn write(&self, buf: &mut [u8]) -> anyhow::Result<()> {
if self.length() != buf.len() {
return Err(anyhow!(
"found invalid buf length for entry: {} - expected {}",
buf.len(),
self.length()
));
}
if self.length() > VALUE_SIZE_LIMIT {
return Err(anyhow!(
"entry value size limit exceeded: {} - {}",
self.length(),
VALUE_SIZE_LIMIT
));
}
self.header.write(buf);
buf[8..].copy_from_slice(&self.value);
Ok(())
}

/// Deserialize from a byte slice.
pub fn deserialize(bytes: &[u8]) -> anyhow::Result<Self> {
let header = Header::deserialize(&bytes[0..8])?;
if header.length as usize + HEADER_SIZE as usize != bytes.len() {
return Err(anyhow!(
"found invalid buf length for entry: {} - expected {}",
bytes.len(),
header.length as usize + HEADER_SIZE as usize
));
}
Ok(Self {
header: Header::deserialize(&bytes[0..8])?,
value: bytes[8..].to_vec(),
})
}
}

/// Represents the header of an e2store `Entry`
#[derive(Clone, Debug, Decode, Encode, Default, Eq, PartialEq)]
pub struct Header {
pub type_: u16,
pub length: u32,
pub reserved: u16,
}

impl Header {
/// Write to a byte slice.
fn write(&self, buf: &mut [u8]) {
buf[0..2].copy_from_slice(&self.type_.to_le_bytes());
buf[2..6].copy_from_slice(&self.length.to_le_bytes());
buf[6..8].copy_from_slice(&self.reserved.to_le_bytes());
}

/// Deserialize from a byte slice.
pub fn deserialize(bytes: &[u8]) -> anyhow::Result<Self> {
if bytes.len() != HEADER_SIZE as usize {
return Err(anyhow!("invalid header size: {}", bytes.len()));
}
let type_ = u16::from_le_bytes([bytes[0], bytes[1]]);
let length = u32::from_le_bytes([bytes[2], bytes[3], bytes[4], bytes[5]]);
let reserved = u16::from_le_bytes([bytes[6], bytes[7]]);
if reserved != 0 {
return Err(anyhow!("invalid reserved value: {} - expected 0", reserved));
}
Ok(Self {
type_,
length,
reserved,
})
}
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct VersionEntry {
version: Entry,
}

impl TryFrom<&Entry> for VersionEntry {
type Error = anyhow::Error;

fn try_from(entry: &Entry) -> anyhow::Result<Self> {
ensure!(
entry.header.type_ == 0x3265,
"invalid version entry: incorrect header type"
);
ensure!(
entry.header.length == 0,
"invalid version entry: incorrect header length"
);
ensure!(
entry.header.reserved == 0,
"invalid version entry: incorrect header reserved bytes"
);
ensure!(
entry.value.is_empty(),
"invalid version entry: non-empty value"
);
Ok(Self {
version: entry.clone(),
})
}
}

impl TryInto<Entry> for VersionEntry {
type Error = anyhow::Error;

fn try_into(self) -> anyhow::Result<Entry> {
Ok(self.version)
}
}
Loading

0 comments on commit a39ec04

Please sign in to comment.