Skip to content

Commit

Permalink
feat: CarV1: Sink
Browse files Browse the repository at this point in the history
  • Loading branch information
aatifsyed committed Sep 14, 2023
1 parent f92b14e commit 4f8ce1e
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/db/car/plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ fn get_roots_from_v1_header(reader: impl Read) -> io::Result<Vec<Cid>> {
}
}

fn cid_error_to_io_error(cid_error: cid::Error) -> io::Error {
pub fn cid_error_to_io_error(cid_error: cid::Error) -> io::Error {
match cid_error {
cid::Error::Io(io_error) => io_error,
other => io::Error::new(InvalidData, other),
Expand Down
104 changes: 104 additions & 0 deletions src/utils/car_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use bytes::{BufMut as _, Bytes, BytesMut};
use cid::Cid;
use futures::{Sink, SinkExt as _};
use pin_project_lite::pin_project;
use std::{
io,
pin::Pin,
task::{Context, Poll},
};
use tokio::io::AsyncWrite;
use tokio_util::codec::FramedWrite;
use unsigned_varint::codec::UviBytes as VarintCodec;

use crate::db::car::plain::cid_error_to_io_error;

// TODO(aatifsyed): `car_stream` should not be a submodule of `db`
use super::db::car_stream::{Block, CarHeader};

pin_project! {
pub struct CarV1<W> {
#[pin]
inner: FramedWrite<W, VarintCodec>,
}}

impl<W> CarV1<W>
where
// TODO(aatifsyed): remove Unpin bound
W: AsyncWrite + Unpin,
{
pub async fn new(writer: W, roots: Vec<Cid>) -> anyhow::Result<Self> {
let mut inner = FramedWrite::new(writer, VarintCodec::default());
inner
.send(Bytes::from(fvm_ipld_encoding::to_vec(&CarHeader {
roots,
version: 1,
})?))
.await?;
Ok(Self { inner })
}
}

impl<W> Sink<Block> for CarV1<W>
where
W: AsyncWrite,
{
type Error = io::Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_ready(cx)
}

fn start_send(self: Pin<&mut Self>, item: Block) -> io::Result<()> {
// TODO(aatifsyed): Should `Block::write` exist at all?
let mut encoded = BytesMut::new();
let Block { cid, data } = item;
cid.write_bytes((&mut encoded).writer())
.map_err(cid_error_to_io_error)?;
encoded.extend(data);

self.project().inner.start_send(encoded.freeze())
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_flush(cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_close(cx)
}
}

#[cfg(test)]
mod tests {
use super::*;
use async_compression::tokio::write::ZstdEncoder;
use futures::{executor::block_on, stream, StreamExt as _};
use quickcheck::quickcheck;

quickcheck! {
fn zstd_always_valid(blocks: Vec<Block>) -> () {
block_on(async {
let mut car_zst = vec![];
stream::iter(blocks)
.map(io::Result::Ok)
.forward(
CarV1::new(ZstdEncoder::new(&mut car_zst), vec![])
.await
.unwrap()
)
.await
.unwrap();

// check it's a valid zstd archive
zstd::decode_all(car_zst.as_slice()).unwrap();
})
}
}

#[test]
#[should_panic = "incomplete frame"] // https://github.com/ChainSafe/forest/issues/3485
fn zstd_decode_all_detects_broken_archives() {
zstd::decode_all(include_bytes!("../../assets/actor_bundles.car.zst").as_slice()).unwrap();
}
}
1 change: 1 addition & 0 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod net;
pub mod proofs_api;
pub mod stream;
pub mod version;
mod car_sink;

use futures::{
future::{pending, FusedFuture},
Expand Down

0 comments on commit 4f8ce1e

Please sign in to comment.