Skip to content

Commit

Permalink
Add outboard creation progress to the mem store
Browse files Browse the repository at this point in the history
just move compute_outboard from fs to util and use it from both fs and mem.
  • Loading branch information
rklaehn committed Aug 14, 2024
1 parent 74a527b commit 26e060c
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 69 deletions.
41 changes: 4 additions & 37 deletions iroh-blobs/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,14 @@
//! errors when communicating with the actor.
use std::{
collections::{BTreeMap, BTreeSet},
io::{self, BufReader, Read},
io,
path::{Path, PathBuf},
sync::{Arc, RwLock},
time::{Duration, SystemTime},
};

use bao_tree::io::{
fsm::Outboard,
outboard::PreOrderOutboard,
sync::{ReadAt, Size},
};
use bytes::Bytes;
Expand Down Expand Up @@ -102,17 +101,18 @@ use crate::{
bao_file::{BaoFileStorage, CompleteStorage},
fs::{
tables::BaoFilePart,
util::{overwrite_and_sync, read_and_remove, ProgressReader},
util::{overwrite_and_sync, read_and_remove},
},
},
util::{
compute_outboard,
progress::{
BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSendError,
ProgressSender,
},
raw_outboard_size, MemOrFile, TagCounter, TagDrop,
},
Tag, TempTag, IROH_BLOCK_SIZE,
Tag, TempTag,
};
use tables::{ReadOnlyTables, ReadableTables, Tables};

Expand Down Expand Up @@ -2316,39 +2316,6 @@ fn export_file_copy(
Ok(())
}

/// Synchronously compute the outboard of a file, and return hash and outboard.
///
/// It is assumed that the file is not modified while this is running.
///
/// If it is modified while or after this is running, the outboard will be
/// invalid, so any attempt to compute a slice from it will fail.
///
/// If the size of the file is changed while this is running, an error will be
/// returned.
///
/// The computed outboard is without length prefix.
fn compute_outboard(
read: impl Read,
size: u64,
progress: impl Fn(u64) -> io::Result<()> + Send + Sync + 'static,
) -> io::Result<(Hash, Option<Vec<u8>>)> {
use bao_tree::io::sync::CreateOutboard;

// wrap the reader in a progress reader, so we can report progress.
let reader = ProgressReader::new(read, progress);
// wrap the reader in a buffered reader, so we read in large chunks
// this reduces the number of io ops and also the number of progress reports
let buf_size = usize::try_from(size).unwrap_or(usize::MAX).min(1024 * 1024);
let reader = BufReader::with_capacity(buf_size, reader);

let ob = PreOrderOutboard::<Vec<u8>>::create_sized(reader, size, IROH_BLOCK_SIZE)?;
let root = ob.root.into();
let data = ob.data;
tracing::trace!(%root, "done");
let data = if !data.is_empty() { Some(data) } else { None };
Ok((root, data))
}

fn dump(tables: &impl ReadableTables) -> ActorResult<()> {
for e in tables.blobs().iter()? {
let (k, v) = e?;
Expand Down
1 change: 1 addition & 0 deletions iroh-blobs/src/store/fs/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::store::bao_file::test_support::{
};
use crate::store::{Map as _, MapEntryMut, MapMut, ReadableStore, Store as _};
use crate::util::raw_outboard;
use crate::IROH_BLOCK_SIZE;

macro_rules! assert_matches {
($expression:expr, $pattern:pat) => {
Expand Down
26 changes: 0 additions & 26 deletions iroh-blobs/src/store/fs/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,6 @@ use std::{
time::{Duration, Instant},
};

/// A reader that calls a callback with the number of bytes read after each read.
pub(crate) struct ProgressReader<R, F: Fn(u64) -> io::Result<()>> {
inner: R,
offset: u64,
cb: F,
}

impl<R: io::Read, F: Fn(u64) -> io::Result<()>> ProgressReader<R, F> {
pub fn new(inner: R, cb: F) -> Self {
Self {
inner,
offset: 0,
cb,
}
}
}

impl<R: io::Read, F: Fn(u64) -> io::Result<()>> io::Read for ProgressReader<R, F> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let read = self.inner.read(buf)?;
self.offset += read as u64;
(self.cb)(self.offset)?;
Ok(read)
}
}

/// overwrite a file with the given data.
///
/// This is almost like `std::fs::write`, but it does not truncate the file.
Expand Down
7 changes: 6 additions & 1 deletion iroh-blobs/src/store/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ impl Store {
progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
) -> io::Result<TempTag> {
progress.blocking_send(ImportProgress::OutboardProgress { id, offset: 0 })?;
let (storage, hash) = MutableMemStorage::complete(bytes);
let p2 = progress.clone();
let cb = move |offset| {
p2.try_send(ImportProgress::OutboardProgress { id, offset })
.ok();
};
let (storage, hash) = MutableMemStorage::complete(bytes, cb);
progress.blocking_send(ImportProgress::OutboardDone { id, hash })?;
use super::Store;
let tag = self.temp_tag(HashAndFormat { hash, format });
Expand Down
14 changes: 11 additions & 3 deletions iroh-blobs/src/store/mutable_mem_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use bao_tree::{
use bytes::Bytes;

use crate::{
util::{copy_limited_slice, raw_outboard, SparseMemFile},
util::{compute_outboard, copy_limited_slice, SparseMemFile},
IROH_BLOCK_SIZE,
};

Expand Down Expand Up @@ -63,8 +63,16 @@ impl SizeInfo {

impl MutableMemStorage {
/// Create a new mutable mem storage from the given data
pub fn complete(bytes: Bytes) -> (Self, iroh_base::hash::Hash) {
let (outboard, hash) = raw_outboard(bytes.as_ref());
pub fn complete(
bytes: Bytes,
cb: impl Fn(u64) -> () + Send + Sync + 'static,
) -> (Self, iroh_base::hash::Hash) {
let (hash, outboard) = compute_outboard(&bytes[..], bytes.len() as u64, move |offset| {
cb(offset);
Ok(())
})
.unwrap();
let outboard = outboard.unwrap_or_default();
let res = Self {
data: bytes.to_vec().into(),
outboard: outboard.into(),
Expand Down
65 changes: 63 additions & 2 deletions iroh-blobs/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! Utility functions and types.
use bao_tree::{io::outboard::PreOrderMemOutboard, BaoTree, ChunkRanges};
use bao_tree::{io::outboard::PreOrderOutboard, BaoTree, ChunkRanges};
use bytes::Bytes;
use derive_more::{Debug, Display, From, Into};
use range_collections::range_set::RangeSetRange;
use serde::{Deserialize, Serialize};
use std::{
borrow::Borrow,
fmt,
io::{BufReader, Read},
sync::{Arc, Weak},
time::SystemTime,
};
Expand Down Expand Up @@ -290,8 +291,68 @@ pub(crate) fn raw_outboard_size(size: u64) -> u64 {
BaoTree::new(size, IROH_BLOCK_SIZE).outboard_size()
}

/// Synchronously compute the outboard of a file, and return hash and outboard.
///
/// It is assumed that the file is not modified while this is running.
///
/// If it is modified while or after this is running, the outboard will be
/// invalid, so any attempt to compute a slice from it will fail.
///
/// If the size of the file is changed while this is running, an error will be
/// returned.
///
/// The computed outboard is without length prefix.
pub(crate) fn compute_outboard(
read: impl Read,
size: u64,
progress: impl Fn(u64) -> std::io::Result<()> + Send + Sync + 'static,
) -> std::io::Result<(Hash, Option<Vec<u8>>)> {
use bao_tree::io::sync::CreateOutboard;

// wrap the reader in a progress reader, so we can report progress.
let reader = ProgressReader::new(read, progress);
// wrap the reader in a buffered reader, so we read in large chunks
// this reduces the number of io ops and also the number of progress reports
let buf_size = usize::try_from(size).unwrap_or(usize::MAX).min(1024 * 1024);
let reader = BufReader::with_capacity(buf_size, reader);

let ob = PreOrderOutboard::<Vec<u8>>::create_sized(reader, size, IROH_BLOCK_SIZE)?;
let root = ob.root.into();
let data = ob.data;
tracing::trace!(%root, "done");
let data = if !data.is_empty() { Some(data) } else { None };
Ok((root, data))
}

/// Compute raw outboard, without the size header.
#[cfg(test)]
pub(crate) fn raw_outboard(data: &[u8]) -> (Vec<u8>, Hash) {
let res = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE);
let res = bao_tree::io::outboard::PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE);
(res.data, res.root.into())
}

/// A reader that calls a callback with the number of bytes read after each read.
pub(crate) struct ProgressReader<R, F: Fn(u64) -> std::io::Result<()>> {
inner: R,
offset: u64,
cb: F,
}

impl<R: std::io::Read, F: Fn(u64) -> std::io::Result<()>> ProgressReader<R, F> {
pub fn new(inner: R, cb: F) -> Self {
Self {
inner,
offset: 0,
cb,
}
}
}

impl<R: std::io::Read, F: Fn(u64) -> std::io::Result<()>> std::io::Read for ProgressReader<R, F> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let read = self.inner.read(buf)?;
self.offset += read as u64;
(self.cb)(self.offset)?;
Ok(read)
}
}

0 comments on commit 26e060c

Please sign in to comment.