Skip to content

Commit

Permalink
feat(iroh)!: Blob batch PR, attempt 3 (#2545)
Browse files Browse the repository at this point in the history
## Description

This is the third attempt to add a batch API for adding blobs. Previous
one was #2339

The basic idea is the following: all changes to the store happen in the
context of a _batch_. All write operations within a batch produce temp
tags. These temp tags are scoped to the batch and keep the data alive as
long as the batch exists.

At some point, the API user has to upgrade one or more temp tags to
permanent tags.

All non-batch operations would long term be implemented in terms of
batch operations.

In a second step, the following rpc calls would be replaced by their
batch equivalent.
- AddStream
- AddPath
- CreateCollection

The third one is very nice, since it means that the notion of a
collection (as in a special kind of hashseq) no longer has to even exist
in the node code.

## Breaking Changes

- iroh::client::blobs::BlobStatus has a new case NotFound
- iroh::client::blobs::BlobStatus::Partial: size is now a BaoBlobSize
instead of a u64

All other public changes are adding of new APIs.

## Notes & open questions

Note: in the previous version I had an optimisation to avoid storing
TempTags in the case where there are multiple TempTags with the same
hash. I removed this to keep things simple. We can add it back later.

## Change checklist

- [ ] Self-review.
- [ ] Documentation updates following the [style
guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text),
if relevant.
- [ ] Tests if relevant.
- [ ] All breaking changes documented.

---------

Co-authored-by: Philipp Krüger <[email protected]>
  • Loading branch information
rklaehn and matheus23 authored Aug 15, 2024
1 parent 168fa5b commit 9a55122
Show file tree
Hide file tree
Showing 16 changed files with 1,235 additions and 38 deletions.
24 changes: 24 additions & 0 deletions iroh-blobs/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,30 @@ pub enum AddProgress {
Abort(RpcError),
}

/// Progress updates for the batch add operation.
#[derive(Debug, Serialize, Deserialize)]
pub enum BatchAddPathProgress {
/// An item was found with the given size
Found {
/// The size of the entry in bytes.
size: u64,
},
/// We got progress ingesting the item.
Progress {
/// The offset of the progress, in bytes.
offset: u64,
},
/// We are done, and the hash is `hash`.
Done {
/// The hash of the entry.
hash: Hash,
},
/// We got an error and need to abort.
///
/// This will be the last message in the stream.
Abort(RpcError),
}

/// Read the request from the getter.
///
/// Will fail if there is an error while reading, if the reader
Expand Down
11 changes: 4 additions & 7 deletions iroh-blobs/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,13 +757,6 @@ impl Store {
Ok(self.0.dump().await?)
}

/// Ensure that all operations before the sync are processed and persisted.
///
/// This is done by closing any open write transaction.
pub async fn sync(&self) -> io::Result<()> {
Ok(self.0.sync().await?)
}

/// Import from a v0 or v1 flat store, for backwards compatibility.
#[deprecated(
since = "0.23.0",
Expand Down Expand Up @@ -1419,6 +1412,10 @@ impl super::Store for Store {
self.0.temp.temp_tag(value)
}

async fn sync(&self) -> io::Result<()> {
Ok(self.0.sync().await?)
}

async fn shutdown(&self) {
self.0.shutdown().await;
}
Expand Down
4 changes: 4 additions & 0 deletions iroh-blobs/src/store/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ impl super::Store for Store {
}

async fn shutdown(&self) {}

async fn sync(&self) -> io::Result<()> {
Ok(())
}
}

#[derive(Debug, Default)]
Expand Down
4 changes: 4 additions & 0 deletions iroh-blobs/src/store/readonly_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,4 +333,8 @@ impl super::Store for Store {
}

async fn shutdown(&self) {}

async fn sync(&self) -> io::Result<()> {
Ok(())
}
}
5 changes: 4 additions & 1 deletion iroh-blobs/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,9 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug {
/// Shutdown the store.
fn shutdown(&self) -> impl Future<Output = ()> + Send;

/// Sync the store.
fn sync(&self) -> impl Future<Output = io::Result<()>> + Send;

/// Validate the database
///
/// This will check that the file and outboard content is correct for all complete
Expand Down Expand Up @@ -703,7 +706,7 @@ pub enum ImportProgress {
/// does not make any sense. E.g. an in memory implementation will always have
/// to copy the file into memory. Also, a disk based implementation might choose
/// to copy small files even if the mode is `Reference`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
pub enum ImportMode {
/// This mode will copy the file into the database before hashing.
///
Expand Down
5 changes: 5 additions & 0 deletions iroh-blobs/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ impl TempTag {
self.inner.format
}

/// The hash and format of the pinned item
pub fn hash_and_format(&self) -> HashAndFormat {
self.inner
}

/// Keep the item alive until the end of the process
pub fn leak(mut self) {
// set the liveness tracker to None, so that the refcount is not decreased
Expand Down
9 changes: 7 additions & 2 deletions iroh-cli/src/commands/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,15 @@ impl BlobCommands {

let (blob_status, size) = match (status, format) {
(BlobStatus::Complete { size }, BlobFormat::Raw) => ("blob", size),
(BlobStatus::Partial { size }, BlobFormat::Raw) => ("incomplete blob", size),
(BlobStatus::Partial { size }, BlobFormat::Raw) => {
("incomplete blob", size.value())
}
(BlobStatus::Complete { size }, BlobFormat::HashSeq) => ("collection", size),
(BlobStatus::Partial { size }, BlobFormat::HashSeq) => {
("incomplete collection", size)
("incomplete collection", size.value())
}
(BlobStatus::NotFound, _) => {
return Err(anyhow!("blob is missing"));
}
};
println!(
Expand Down
3 changes: 3 additions & 0 deletions iroh/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub mod gossip;
pub mod node;
pub mod tags;

/// Iroh rpc connection - boxed so that we can have a concrete type.
pub(crate) type RpcConnection = quic_rpc::transport::boxed::Connection<RpcService>;

// Keep this type exposed, otherwise every occurrence of `RpcClient` in the API
// will show up as `RpcClient<RpcService, Connection<RpcService>>` in the docs.
/// Iroh rpc client - boxed so that we can have a concrete type.
Expand Down
67 changes: 46 additions & 21 deletions iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use std::{
task::{Context, Poll},
};

use anyhow::{anyhow, Result};
use anyhow::{anyhow, Context as _, Result};
use bytes::Bytes;
use futures_lite::{Stream, StreamExt};
use futures_util::SinkExt;
Expand All @@ -65,7 +65,7 @@ use iroh_blobs::{
export::ExportProgress as BytesExportProgress,
format::collection::{Collection, SimpleStore},
get::db::DownloadProgress as BytesDownloadProgress,
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
util::SetTagOption,
BlobFormat, Hash, Tag,
};
Expand All @@ -77,12 +77,14 @@ use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
use tokio_util::io::{ReaderStream, StreamReader};
use tracing::warn;
mod batch;
pub use batch::{AddDirOpts, AddFileOpts, AddReaderOpts, Batch};

use crate::rpc_protocol::blobs::{
AddPathRequest, AddStreamRequest, AddStreamUpdate, ConsistencyCheckRequest,
CreateCollectionRequest, CreateCollectionResponse, DeleteRequest, DownloadRequest,
ExportRequest, ListIncompleteRequest, ListRequest, ReadAtRequest, ReadAtResponse,
ValidateRequest,
AddPathRequest, AddStreamRequest, AddStreamUpdate, BatchCreateRequest, BatchCreateResponse,
BlobStatusRequest, ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse,
DeleteRequest, DownloadRequest, ExportRequest, ListIncompleteRequest, ListRequest,
ReadAtRequest, ReadAtResponse, ValidateRequest,
};
use crate::rpc_protocol::node::StatusRequest;

Expand All @@ -102,6 +104,38 @@ impl<'a> From<&'a Iroh> for &'a RpcClient {
}

impl Client {
/// Check if a blob is completely stored on the node.
///
/// Note that this will return false for blobs that are partially stored on
/// the node.
pub async fn status(&self, hash: Hash) -> Result<BlobStatus> {
let status = self.rpc.rpc(BlobStatusRequest { hash }).await??;
Ok(status.0)
}

/// Check if a blob is completely stored on the node.
///
/// This is just a convenience wrapper around `status` that returns a boolean.
pub async fn has(&self, hash: Hash) -> Result<bool> {
match self.status(hash).await {
Ok(BlobStatus::Complete { .. }) => Ok(true),
Ok(_) => Ok(false),
Err(err) => Err(err),
}
}

/// Create a new batch for adding data.
///
/// A batch is a context in which temp tags are created and data is added to the node. Temp tags
/// are automatically deleted when the batch is dropped, leading to the data being garbage collected
/// unless a permanent tag is created for it.
pub async fn batch(&self) -> Result<Batch> {
let (updates, mut stream) = self.rpc.bidi(BatchCreateRequest).await?;
let BatchCreateResponse::Id(batch) = stream.next().await.context("expected scope id")??;
let rpc = self.rpc.clone();
Ok(Batch::new(batch, rpc, updates, 1024))
}

/// Stream the contents of a a single blob.
///
/// Returns a [`Reader`], which can report the size of the blob before reading it.
Expand Down Expand Up @@ -424,17 +458,6 @@ impl Client {
Ok(ticket)
}

/// Get the status of a blob.
pub async fn status(&self, hash: Hash) -> Result<BlobStatus> {
// TODO: this could be implemented more efficiently
let reader = self.read(hash).await?;
if reader.is_complete {
Ok(BlobStatus::Complete { size: reader.size })
} else {
Ok(BlobStatus::Partial { size: reader.size })
}
}

fn tags_client(&self) -> tags::Client {
tags::Client {
rpc: self.rpc.clone(),
Expand All @@ -449,9 +472,10 @@ impl SimpleStore for Client {
}

/// Whether to wrap the added data in a collection.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
pub enum WrapOption {
/// Do not wrap the file or directory.
#[default]
NoWrap,
/// Wrap the file or directory in a collection.
Wrap {
Expand All @@ -461,12 +485,14 @@ pub enum WrapOption {
}

/// Status information about a blob.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum BlobStatus {
/// The blob is not stored at all.
NotFound,
/// The blob is only stored partially.
Partial {
/// The size of the currently stored partial blob.
size: u64,
size: BaoBlobSize,
},
/// The blob is stored completely.
Complete {
Expand Down Expand Up @@ -943,7 +969,6 @@ pub enum DownloadMode {
mod tests {
use super::*;

use anyhow::Context as _;
use iroh_blobs::hashseq::HashSeq;
use iroh_net::NodeId;
use rand::RngCore;
Expand Down
Loading

0 comments on commit 9a55122

Please sign in to comment.