Skip to content

Commit

Permalink
store pmmr cleanup (#2075)
Browse files Browse the repository at this point in the history
* introduce DataFile<T> to hide the LEN handling
use slices rather than vec<u8> where possible

* cleanup

* use DataFile<Hash> and not HashFile

* fix core tests
  • Loading branch information
antiochp authored Dec 5, 2018
1 parent 45bfe14 commit 8d8f533
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 90 deletions.
2 changes: 1 addition & 1 deletion core/src/core/pmmr/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub trait Backend<T: PMMRable> {
/// Returns the data file path.. this is a bit of a hack now that doesn't
/// sit well with the design, but TxKernels have to be summed and the
/// fastest way to to be able to allow direct access to the file
fn get_data_file_path(&self) -> String;
fn get_data_file_path(&self) -> &str;

/// Also a bit of a hack...
/// Saves a snapshot of the rewound utxo file with the block hash as
Expand Down
2 changes: 1 addition & 1 deletion core/src/core/pmmr/pmmr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ where
}

/// Return the path of the data file (needed to sum kernels efficiently)
pub fn data_file_path(&self) -> String {
pub fn data_file_path(&self) -> &str {
self.backend.get_data_file_path()
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/ser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ pub fn serialize<W: Writeable>(sink: &mut Write, thing: &W) -> Result<(), Error>
/// Utility function to serialize a writeable directly in memory using a
/// Vec<u8>.
pub fn ser_vec<W: Writeable>(thing: &W) -> Result<Vec<u8>, Error> {
let mut vec = Vec::new();
let mut vec = vec![];
serialize(&mut vec, thing)?;
Ok(vec)
}
Expand Down
4 changes: 2 additions & 2 deletions core/tests/vec_backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ impl<T: PMMRable> Backend<T> for VecBackend<T> {
Ok(())
}

fn get_data_file_path(&self) -> String {
"".to_string()
fn get_data_file_path(&self) -> &str {
""
}

fn dump_stats(&self) {}
Expand Down
61 changes: 22 additions & 39 deletions store/src/pmmr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@

//! Implementation of the persistent Backend for the prunable MMR tree.

use std::{fs, io, marker};
use std::{fs, io};

use croaring::Bitmap;

use core::core::hash::{Hash, Hashed};
use core::core::pmmr::{self, family, Backend};
use core::core::BlockHeader;
use core::ser::{self, FixedLength, PMMRable};
use core::ser::PMMRable;
use leaf_set::LeafSet;
use prune_list::PruneList;
use types::{prune_noop, AppendOnlyFile, HashFile};
use types::{prune_noop, DataFile};

const PMMR_HASH_FILE: &str = "pmmr_hash.bin";
const PMMR_DATA_FILE: &str = "pmmr_data.bin";
Expand Down Expand Up @@ -52,11 +52,10 @@ pub const PMMR_FILES: [&str; 4] = [
pub struct PMMRBackend<T: PMMRable> {
data_dir: String,
prunable: bool,
hash_file: HashFile,
data_file: AppendOnlyFile,
hash_file: DataFile<Hash>,
data_file: DataFile<T::E>,
leaf_set: LeafSet,
prune_list: PruneList,
_marker: marker::PhantomData<T>,
}

impl<T: PMMRable> Backend<T> for PMMRBackend<T> {
Expand All @@ -69,8 +68,11 @@ impl<T: PMMRable> Backend<T> for PMMRBackend<T> {
let position = self.hash_file.size_unsync() + shift + 1;
self.leaf_set.add(position);
}

self.data_file
.append(&mut ser::ser_vec(&data.as_elmt()).unwrap());
.append(&data.as_elmt())
.map_err(|e| format!("Failed to append data to file. {}", e))?;

for h in &hashes {
self.hash_file
.append(h)
Expand All @@ -91,22 +93,9 @@ impl<T: PMMRable> Backend<T> for PMMRBackend<T> {
if self.is_compacted(position) {
return None;
}
let flatfile_pos = pmmr::n_leaves(position);
let shift = self.prune_list.get_leaf_shift(position);
let pos = pmmr::n_leaves(position) - 1;

// Must be on disk, doing a read at the correct position
let file_offset = ((pos - shift) as usize) * T::E::LEN;
let data = self.data_file.read(file_offset, T::E::LEN);
match ser::deserialize(&mut &data[..]) {
Ok(h) => Some(h),
Err(e) => {
error!(
"Corrupted storage, could not read an entry from data store: {:?}",
e
);
None
}
}
self.data_file.read(flatfile_pos - shift)
}

/// Get the hash at pos.
Expand Down Expand Up @@ -143,10 +132,9 @@ impl<T: PMMRable> Backend<T> for PMMRBackend<T> {
self.hash_file.rewind(position - shift);

// Rewind the data file accounting for pruned/compacted pos
let leaf_shift = self.prune_list.get_leaf_shift(position);
let flatfile_pos = pmmr::n_leaves(position);
let file_pos = (flatfile_pos - leaf_shift) * T::E::LEN as u64;
self.data_file.rewind(file_pos);
let leaf_shift = self.prune_list.get_leaf_shift(position);
self.data_file.rewind(flatfile_pos - leaf_shift);

Ok(())
}
Expand All @@ -159,7 +147,7 @@ impl<T: PMMRable> Backend<T> for PMMRBackend<T> {
}

/// Return data file path
fn get_data_file_path(&self) -> String {
fn get_data_file_path(&self) -> &str {
self.data_file.path()
}

Expand Down Expand Up @@ -190,8 +178,8 @@ impl<T: PMMRable> PMMRBackend<T> {
prunable: bool,
header: Option<&BlockHeader>,
) -> io::Result<PMMRBackend<T>> {
let hash_file = HashFile::open(&format!("{}/{}", data_dir, PMMR_HASH_FILE))?;
let data_file = AppendOnlyFile::open(&format!("{}/{}", data_dir, PMMR_DATA_FILE))?;
let hash_file = DataFile::open(&format!("{}/{}", data_dir, PMMR_HASH_FILE))?;
let data_file = DataFile::open(&format!("{}/{}", data_dir, PMMR_DATA_FILE))?;

let leaf_set_path = format!("{}/{}", data_dir, PMMR_LEAF_FILE);

Expand All @@ -212,7 +200,6 @@ impl<T: PMMRable> PMMRBackend<T> {
data_file,
leaf_set,
prune_list,
_marker: marker::PhantomData,
})
}

Expand All @@ -239,7 +226,7 @@ impl<T: PMMRable> PMMRBackend<T> {
/// Number of elements in the underlying stored data. Extremely dependent on
/// pruning and compaction.
pub fn data_size(&self) -> u64 {
self.data_file.size() / T::E::LEN as u64
self.data_file.size()
}

/// Size of the underlying hashed data. Extremely dependent on pruning
Expand Down Expand Up @@ -320,15 +307,11 @@ impl<T: PMMRable> PMMRBackend<T> {
let off_to_rm = map_vec!(leaf_pos_to_rm, |&pos| {
let flat_pos = pmmr::n_leaves(pos);
let shift = self.prune_list.get_leaf_shift(pos);
(flat_pos - 1 - shift) * T::E::LEN as u64
(flat_pos - 1 - shift)
});

self.data_file.save_prune(
tmp_prune_file_data.clone(),
&off_to_rm,
T::E::LEN as u64,
prune_cb,
)?;
self.data_file
.save_prune(tmp_prune_file_data.clone(), &off_to_rm, prune_cb)?;
}

// 3. Update the prune list and write to disk.
Expand All @@ -344,14 +327,14 @@ impl<T: PMMRable> PMMRBackend<T> {
tmp_prune_file_hash.clone(),
format!("{}/{}", self.data_dir, PMMR_HASH_FILE),
)?;
self.hash_file = HashFile::open(&format!("{}/{}", self.data_dir, PMMR_HASH_FILE))?;
self.hash_file = DataFile::open(&format!("{}/{}", self.data_dir, PMMR_HASH_FILE))?;

// 5. Rename the compact copy of the data file and reopen it.
fs::rename(
tmp_prune_file_data.clone(),
format!("{}/{}", self.data_dir, PMMR_DATA_FILE),
)?;
self.data_file = AppendOnlyFile::open(&format!("{}/{}", self.data_dir, PMMR_DATA_FILE))?;
self.data_file = DataFile::open(&format!("{}/{}", self.data_dir, PMMR_DATA_FILE))?;

// 6. Write the leaf_set to disk.
// Optimize the bitmap storage in the process.
Expand Down
102 changes: 56 additions & 46 deletions store/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,47 +16,54 @@ use memmap;

use std::fs::{self, File, OpenOptions};
use std::io::{self, BufWriter, ErrorKind, Read, Write};
use std::marker;

use core::core::hash::Hash;
use core::ser::{self, FixedLength};
use core::ser::{self, FixedLength, Readable, Writeable};

/// A no-op function for doing nothing with some pruned data.
pub fn prune_noop(_pruned_data: &[u8]) {}

/// Hash file (MMR) wrapper around an append only file.
pub struct HashFile {
/// Data file (MMR) wrapper around an append only file.
pub struct DataFile<T> {
file: AppendOnlyFile,
_marker: marker::PhantomData<T>,
}

impl HashFile {
/// Open (or create) a hash file at the provided path on disk.
pub fn open(path: &str) -> io::Result<HashFile> {
impl<T> DataFile<T>
where
T: FixedLength + Readable + Writeable,
{
/// Open (or create) a file at the provided path on disk.
pub fn open(path: &str) -> io::Result<DataFile<T>> {
let file = AppendOnlyFile::open(path)?;
Ok(HashFile { file })
Ok(DataFile {
file,
_marker: marker::PhantomData,
})
}

/// Append a hash to this hash file.
/// Append an element to the file.
/// Will not be written to disk until flush() is subsequently called.
/// Alternatively discard() may be called to discard any pending changes.
pub fn append(&mut self, hash: &Hash) -> io::Result<()> {
let mut bytes = ser::ser_vec(hash).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
pub fn append(&mut self, data: &T) -> io::Result<()> {
let mut bytes = ser::ser_vec(data).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
self.file.append(&mut bytes);
Ok(())
}

/// Read a hash from the hash file by position.
pub fn read(&self, position: u64) -> Option<Hash> {
/// Read an element from the file by position.
pub fn read(&self, position: u64) -> Option<T> {
// The MMR starts at 1, our binary backend starts at 0.
let pos = position - 1;

// Must be on disk, doing a read at the correct position
let file_offset = (pos as usize) * Hash::LEN;
let data = self.file.read(file_offset, Hash::LEN);
let file_offset = (pos as usize) * T::LEN;
let data = self.file.read(file_offset, T::LEN);
match ser::deserialize(&mut &data[..]) {
Ok(h) => Some(h),
Ok(x) => Some(x),
Err(e) => {
error!(
"Corrupted storage, could not read an entry from hash file: {:?}",
"Corrupted storage, could not read an entry from data file: {:?}",
e
);
None
Expand All @@ -66,40 +73,45 @@ impl HashFile {

/// Rewind the backend file to the specified position.
pub fn rewind(&mut self, position: u64) {
self.file.rewind(position * Hash::LEN as u64)
self.file.rewind(position * T::LEN as u64)
}

/// Flush unsynced changes to the hash file to disk.
/// Flush unsynced changes to the file to disk.
pub fn flush(&mut self) -> io::Result<()> {
self.file.flush()
}

/// Discard any unsynced changes to the hash file.
/// Discard any unsynced changes to the file.
pub fn discard(&mut self) {
self.file.discard()
}

/// Size of the hash file in number of hashes (not bytes).
/// Size of the file in number of elements (not bytes).
pub fn size(&self) -> u64 {
self.file.size() / Hash::LEN as u64
self.file.size() / T::LEN as u64
}

/// Size of the unsync'd hash file, in hashes (not bytes).
/// Size of the unsync'd file, in elements (not bytes).
pub fn size_unsync(&self) -> u64 {
self.file.size_unsync() / Hash::LEN as u64
self.file.size_unsync() / T::LEN as u64
}

/// Rewrite the hash file out to disk, pruning removed hashes.
pub fn save_prune<T>(&self, target: String, prune_offs: &[u64], prune_cb: T) -> io::Result<()>
/// Path of the underlying file
pub fn path(&self) -> &str {
self.file.path()
}

/// Write the file out to disk, pruning removed elements.
pub fn save_prune<F>(&self, target: String, prune_offs: &[u64], prune_cb: F) -> io::Result<()>
where
T: Fn(&[u8]),
F: Fn(&[u8]),
{
let prune_offs = prune_offs
.iter()
.map(|x| x * Hash::LEN as u64)
.map(|x| x * T::LEN as u64)
.collect::<Vec<_>>();
self.file
.save_prune(target, prune_offs.as_slice(), Hash::LEN as u64, prune_cb)
.save_prune(target, prune_offs.as_slice(), T::LEN as u64, prune_cb)
}
}

Expand Down Expand Up @@ -147,8 +159,8 @@ impl AppendOnlyFile {

/// Append data to the file. Until the append-only file is synced, data is
/// only written to memory.
pub fn append(&mut self, buf: &mut Vec<u8>) {
self.buffer.append(buf);
pub fn append(&mut self, bytes: &mut [u8]) {
self.buffer.extend_from_slice(bytes);
}

/// Rewinds the data file back to a lower position. The new position needs
Expand Down Expand Up @@ -219,31 +231,29 @@ impl AppendOnlyFile {

/// Read length bytes of data at offset from the file.
/// Leverages the memory map.
pub fn read(&self, offset: usize, length: usize) -> Vec<u8> {
pub fn read(&self, offset: usize, length: usize) -> &[u8] {
if offset >= self.buffer_start {
let buffer_offset = offset - self.buffer_start;
return self.read_from_buffer(buffer_offset, length);
}
if self.mmap.is_none() {
return vec![];
}
let mmap = self.mmap.as_ref().unwrap();

if mmap.len() < (offset + length) {
return vec![];
if let Some(mmap) = &self.mmap {
if mmap.len() < (offset + length) {
return &mmap[..0];
}
&mmap[offset..(offset + length)]
} else {
return &self.buffer[..0];
}

(&mmap[offset..(offset + length)]).to_vec()
}

// Read length bytes from the buffer, from offset.
// Return empty vec if we do not have enough bytes in the buffer to read a full
// vec.
fn read_from_buffer(&self, offset: usize, length: usize) -> Vec<u8> {
fn read_from_buffer(&self, offset: usize, length: usize) -> &[u8] {
if self.buffer.len() < (offset + length) {
vec![]
&self.buffer[..0]
} else {
self.buffer[offset..(offset + length)].to_vec()
&self.buffer[offset..(offset + length)]
}
}

Expand Down Expand Up @@ -313,7 +323,7 @@ impl AppendOnlyFile {
}

/// Path of the underlying file
pub fn path(&self) -> String {
self.path.clone()
pub fn path(&self) -> &str {
&self.path
}
}

0 comments on commit 8d8f533

Please sign in to comment.