Skip to content

Commit

Permalink
Merge branch 'main' into flume-purge-bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn committed Aug 13, 2024
2 parents 9a9ea8f + bcc87a2 commit 7654a88
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 64 deletions.
13 changes: 1 addition & 12 deletions iroh-blobs/examples/provide-bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ async fn main() -> Result<()> {
return;
}
};
iroh_blobs::provider::handle_connection(conn, db, MockEventSender, lp).await
iroh_blobs::provider::handle_connection(conn, db, Default::default(), lp).await
});
}
});
Expand All @@ -114,14 +114,3 @@ async fn main() -> Result<()> {
Err(e) => Err(anyhow::anyhow!("unable to listen for ctrl-c: {e}")),
}
}

#[derive(Clone)]
struct MockEventSender;

use futures_lite::future::FutureExt;

impl iroh_blobs::provider::EventSender for MockEventSender {
fn send(&self, _event: iroh_blobs::provider::Event) -> futures_lite::future::Boxed<()> {
async move {}.boxed()
}
}
170 changes: 136 additions & 34 deletions iroh-blobs/src/provider.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! The server side API
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;

use anyhow::{Context, Result};
Expand Down Expand Up @@ -49,23 +50,28 @@ pub enum Event {
/// The hash for which the client wants to receive data.
hash: Hash,
},
/// A request was received from a client.
CustomGetRequestReceived {
/// A sequence of hashes has been found and is being transferred.
TransferHashSeqStarted {
/// An unique connection id.
connection_id: u64,
/// An identifier uniquely identifying this transfer request.
request_id: u64,
/// The size of the custom get request.
len: usize,
/// The number of blobs in the sequence.
num_blobs: u64,
},
/// A sequence of hashes has been found and is being transferred.
TransferHashSeqStarted {
/// A chunk of a blob was transferred.
///
/// These events will be sent with try_send, so you can not assume that you
/// will receive all of them.
TransferProgress {
/// An unique connection id.
connection_id: u64,
/// An identifier uniquely identifying this transfer request.
request_id: u64,
/// The number of blobs in the sequence.
num_blobs: u64,
/// The hash for which we are transferring data.
hash: Hash,
/// Offset up to which we have transferred data.
end_offset: u64,
},
/// A blob in a sequence was transferred.
TransferBlobCompleted {
Expand Down Expand Up @@ -179,18 +185,21 @@ pub async fn read_request(mut reader: RecvStream) -> Result<Request> {
/// close the writer, and return with `Ok(SentStatus::NotFound)`.
///
/// If the transfer does _not_ end in error, the buffer will be empty and the writer is gracefully closed.
pub async fn transfer_collection<D: Map, E: EventSender>(
pub async fn transfer_collection<D: Map>(
request: GetRequest,
// Store from which to fetch blobs.
db: &D,
// Response writer, containing the quinn stream.
writer: &mut ResponseWriter<E>,
writer: &mut ResponseWriter,
// the collection to transfer
mut outboard: impl Outboard,
mut data: impl AsyncSliceReader,
stats: &mut TransferStats,
) -> Result<SentStatus> {
let hash = request.hash;
let events = writer.events.clone();
let request_id = writer.request_id();
let connection_id = writer.connection_id();

// if the request is just for the root, we don't need to deserialize the collection
let just_root = matches!(request.ranges.as_single(), Some((0, _)));
Expand All @@ -199,7 +208,7 @@ pub async fn transfer_collection<D: Map, E: EventSender>(
let (stream, num_blobs) = parse_hash_seq(&mut data).await?;
writer
.events
.send(Event::TransferHashSeqStarted {
.send(|| Event::TransferHashSeqStarted {
connection_id: writer.connection_id(),
request_id: writer.request_id(),
num_blobs,
Expand All @@ -210,6 +219,13 @@ pub async fn transfer_collection<D: Map, E: EventSender>(
None
};

let mk_progress = |end_offset| Event::TransferProgress {
connection_id,
request_id,
hash,
end_offset,
};

let mut prev = 0;
for (offset, ranges) in request.ranges.iter_non_empty() {
// create a tracking writer so we can get some stats for writing
Expand All @@ -218,11 +234,13 @@ pub async fn transfer_collection<D: Map, E: EventSender>(
debug!("writing ranges '{:?}' of sequence {}", ranges, hash);
// wrap the data reader in a tracking reader so we can get some stats for reading
let mut tracking_reader = TrackingSliceReader::new(&mut data);
let mut sending_reader =
SendingSliceReader::new(&mut tracking_reader, &events, mk_progress);
// send the root
tw.write(outboard.tree().size().to_le_bytes().as_slice())
.await?;
encode_ranges_validated(
&mut tracking_reader,
&mut sending_reader,
&mut outboard,
&ranges.to_chunk_ranges(),
&mut tw,
Expand All @@ -243,7 +261,8 @@ pub async fn transfer_collection<D: Map, E: EventSender>(
}
if let Some(hash) = c.next().await? {
tokio::task::yield_now().await;
let (status, size, blob_read_stats) = send_blob(db, hash, ranges, &mut tw).await?;
let (status, size, blob_read_stats) =
send_blob(db, hash, ranges, &mut tw, events.clone(), mk_progress).await?;
stats.send += tw.stats();
stats.read += blob_read_stats;
if SentStatus::NotFound == status {
Expand All @@ -253,7 +272,7 @@ pub async fn transfer_collection<D: Map, E: EventSender>(

writer
.events
.send(Event::TransferBlobCompleted {
.send(|| Event::TransferBlobCompleted {
connection_id: writer.connection_id(),
request_id: writer.request_id(),
hash,
Expand All @@ -273,17 +292,98 @@ pub async fn transfer_collection<D: Map, E: EventSender>(
Ok(SentStatus::Sent)
}

/// Trait for sending events.
pub trait EventSender: Clone + Sync + Send + 'static {
/// Send an event.
struct SendingSliceReader<'a, R, F> {
inner: R,
sender: &'a EventSender,
make_event: F,
}

impl<'a, R: AsyncSliceReader, F: Fn(u64) -> Event> SendingSliceReader<'a, R, F> {
fn new(inner: R, sender: &'a EventSender, make_event: F) -> Self {
Self {
inner,
sender,
make_event,
}
}
}

impl<'a, R: AsyncSliceReader, F: Fn(u64) -> Event> AsyncSliceReader
for SendingSliceReader<'a, R, F>
{
async fn read_at(&mut self, offset: u64, len: usize) -> std::io::Result<bytes::Bytes> {
let res = self.inner.read_at(offset, len).await;
if let Ok(res) = res.as_ref() {
let end_offset = offset + res.len() as u64;
self.sender.try_send(|| (self.make_event)(end_offset));
}
res
}

async fn size(&mut self) -> std::io::Result<u64> {
self.inner.size().await
}
}

/// Trait for sending blob events.
pub trait CustomEventSender: std::fmt::Debug + Sync + Send + 'static {
/// Send an event and wait for it to be sent.
fn send(&self, event: Event) -> BoxFuture<()>;

/// Try to send an event.
fn try_send(&self, event: Event);
}

/// A possibly disabled sender for events.
#[derive(Debug, Clone, Default)]
pub struct EventSender {
inner: Option<Arc<dyn CustomEventSender>>,
}

impl<T: CustomEventSender> From<T> for EventSender {
fn from(inner: T) -> Self {
Self {
inner: Some(Arc::new(inner)),
}
}
}

impl EventSender {
/// Create a new event sender.
pub fn new(inner: Option<Arc<dyn CustomEventSender>>) -> Self {
Self { inner }
}

/// Send an event.
///
/// If the inner sender is not set, the function to produce the event will
/// not be called. So any cost associated with gathering information for the
/// event will not be incurred.
pub async fn send(&self, event: impl FnOnce() -> Event) {
if let Some(inner) = &self.inner {
let event = event();
inner.as_ref().send(event).await;
}
}

/// Try to send an event.
///
/// This will just drop the event if it can not be sent immediately. So it
/// is only appropriate for events that are not critical, such as
/// self-contained progress updates.
pub fn try_send(&self, event: impl FnOnce() -> Event) {
if let Some(inner) = &self.inner {
let event = event();
inner.as_ref().try_send(event);
}
}
}

/// Handle a single connection.
pub async fn handle_connection<D: Map, E: EventSender>(
pub async fn handle_connection<D: Map>(
connection: endpoint::Connection,
db: D,
events: E,
events: EventSender,
rt: LocalPoolHandle,
) {
let remote_addr = connection.remote_address();
Expand All @@ -300,7 +400,9 @@ pub async fn handle_connection<D: Map, E: EventSender>(
events: events.clone(),
inner: writer,
};
events.send(Event::ClientConnected { connection_id }).await;
events
.send(|| Event::ClientConnected { connection_id })
.await;
let db = db.clone();
rt.spawn_detached(|| {
async move {
Expand All @@ -316,11 +418,7 @@ pub async fn handle_connection<D: Map, E: EventSender>(
.await
}

async fn handle_stream<D: Map, E: EventSender>(
db: D,
reader: RecvStream,
writer: ResponseWriter<E>,
) -> Result<()> {
async fn handle_stream<D: Map>(db: D, reader: RecvStream, writer: ResponseWriter) -> Result<()> {
// 1. Decode the request.
debug!("reading request");
let request = match read_request(reader).await {
Expand All @@ -337,16 +435,16 @@ async fn handle_stream<D: Map, E: EventSender>(
}

/// Handle a single standard get request.
pub async fn handle_get<D: Map, E: EventSender>(
pub async fn handle_get<D: Map>(
db: D,
request: GetRequest,
mut writer: ResponseWriter<E>,
mut writer: ResponseWriter,
) -> Result<()> {
let hash = request.hash;
debug!(%hash, "received request");
writer
.events
.send(Event::GetRequestReceived {
.send(|| Event::GetRequestReceived {
hash,
connection_id: writer.connection_id(),
request_id: writer.request_id(),
Expand Down Expand Up @@ -397,13 +495,13 @@ pub async fn handle_get<D: Map, E: EventSender>(

/// A helper struct that combines a quinn::SendStream with auxiliary information
#[derive(Debug)]
pub struct ResponseWriter<E> {
pub struct ResponseWriter {
inner: SendStream,
events: E,
events: EventSender,
connection_id: u64,
}

impl<E: EventSender> ResponseWriter<E> {
impl ResponseWriter {
fn tracking_writer(&mut self) -> TrackingStreamWriter<TokioStreamWriter<&mut SendStream>> {
TrackingStreamWriter::new(TokioStreamWriter(&mut self.inner))
}
Expand Down Expand Up @@ -449,7 +547,7 @@ impl<E: EventSender> ResponseWriter<E> {
info!("transfer completed for {}", hash);
Self::print_stats(&stats);
self.events
.send(Event::TransferCompleted {
.send(move || Event::TransferCompleted {
connection_id: self.connection_id(),
request_id: self.request_id(),
stats,
Expand All @@ -462,7 +560,7 @@ impl<E: EventSender> ResponseWriter<E> {
Self::print_stats(stats);
};
self.events
.send(Event::TransferAborted {
.send(move || Event::TransferAborted {
connection_id: self.connection_id(),
request_id: self.request_id(),
stats,
Expand All @@ -486,15 +584,19 @@ pub async fn send_blob<D: Map, W: AsyncStreamWriter>(
hash: Hash,
ranges: &RangeSpec,
mut writer: W,
events: EventSender,
mk_progress: impl Fn(u64) -> Event,
) -> Result<(SentStatus, u64, SliceReaderStats)> {
match db.get(&hash).await? {
Some(entry) => {
let outboard = entry.outboard().await?;
let size = outboard.tree().size();
let mut file_reader = TrackingSliceReader::new(entry.data_reader().await?);
let mut sending_reader =
SendingSliceReader::new(&mut file_reader, &events, mk_progress);
writer.write(size.to_le_bytes().as_slice()).await?;
encode_ranges_validated(
&mut file_reader,
&mut sending_reader,
outboard,
&ranges.to_chunk_ranges(),
writer,
Expand Down
Loading

0 comments on commit 7654a88

Please sign in to comment.