Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CarWriter sink #3461

Merged
merged 54 commits into from
Sep 18, 2023
Merged
Changes from 2 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
39d0e8d
Add basic scaffolding
elmattic Sep 5, 2023
4448dd4
Add a CarWriter type
elmattic Sep 6, 2023
044a01d
Use send instead of try_send
elmattic Sep 6, 2023
b641258
Use bounded channel instead
elmattic Sep 6, 2023
a933e3c
Rework CarWriter internals and start implementing trait methods
elmattic Sep 6, 2023
0857c76
Try replacing write_stream_async with Sink
elmattic Sep 6, 2023
ef4167f
Add writing of blocks in Sink
elmattic Sep 6, 2023
19e2d07
Add write of header
elmattic Sep 6, 2023
25301d0
Remove useless imports
elmattic Sep 6, 2023
a5a0054
Add missing varint
elmattic Sep 6, 2023
e8cc29a
Add missing varint for header
elmattic Sep 6, 2023
95f8331
Add code to get an uncompressed car for reference checking
elmattic Sep 6, 2023
2eafe93
Fix varint write part
elmattic Sep 6, 2023
1e3d91c
UviBytes::encode already handle varint
elmattic Sep 7, 2023
e5697de
Merge branch 'main' into elmattic/car-writer-sink
elmattic Sep 7, 2023
f6322e0
Add flush in Sink close method
elmattic Sep 7, 2023
3cd233f
Add debug logs
elmattic Sep 7, 2023
0658847
poll_flush is already done internally in poll_shutdown
elmattic Sep 7, 2023
a100673
Remove unit value
elmattic Sep 7, 2023
d31d8c9
Move write/flush inside poll_flush and add debug comments
elmattic Sep 7, 2023
ce71fe3
fix: poll_ready should flush
elmattic Sep 7, 2023
14f50b0
Cleanup
elmattic Sep 7, 2023
de53271
Peer session rework
elmattic Sep 8, 2023
f91bbd4
Add missing header bytes
elmattic Sep 8, 2023
935b242
Make sure buffer is written before poll_shutdown
elmattic Sep 8, 2023
cf1d360
Add comment
elmattic Sep 8, 2023
609b0c6
Remove BufWriter in CarWriter
elmattic Sep 8, 2023
d23dd84
Add back compression
elmattic Sep 8, 2023
9fd67cb
Remove else block
elmattic Sep 8, 2023
d5815c0
Handle error cases
elmattic Sep 8, 2023
14b86fd
Remove newline
elmattic Sep 8, 2023
32ced0c
Merge branch 'main' into elmattic/car-writer-sink
elmattic Sep 8, 2023
ccc5b0e
Apply clippy suggestion
elmattic Sep 8, 2023
3d6aa1f
Replace Vec field with BytesMut
elmattic Sep 8, 2023
6da6edb
Fix clippy error
elmattic Sep 8, 2023
86f7a7e
Merge branch 'main' into elmattic/car-writer-sink
elmattic Sep 8, 2023
87f2dc1
Update src/utils/db/car_stream.rs
elmattic Sep 8, 2023
4ea0ad5
Fix build
elmattic Sep 8, 2023
6844445
Update src/tool/subcommands/state_migration_cmd.rs
elmattic Sep 8, 2023
8e55e0d
Fix build
elmattic Sep 8, 2023
b94fcee
Remove assert
elmattic Sep 8, 2023
9f149dd
Use Block write function
elmattic Sep 8, 2023
45e97fc
Use while loop optim
elmattic Sep 11, 2023
9ad0d40
Fix clippy error
elmattic Sep 11, 2023
8a100a1
Remove call to expect
elmattic Sep 11, 2023
e551127
Merge branch 'main' into elmattic/car-writer-sink
elmattic Sep 11, 2023
92b8256
Update src/tool/subcommands/state_migration_cmd.rs
elmattic Sep 12, 2023
7e02e35
Add roundtrip test for CarWriter
elmattic Sep 12, 2023
0818db9
Merge branch 'main' into elmattic/car-writer-sink
elmattic Sep 12, 2023
4d3b960
Merge branch 'main' into elmattic/car-writer-sink
elmattic Sep 14, 2023
fb9c65c
Update CarWriter sink to use FramedWrite
elmattic Sep 14, 2023
eac5fdc
Add some debugging code
elmattic Sep 14, 2023
9f1b039
Revert to previous Sink implementation
elmattic Sep 14, 2023
8f56951
Merge branch 'main' into elmattic/car-writer-sink
elmattic Sep 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion src/utils/db/car_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use cid::{
multihash::{Code, MultihashDigest},
Cid,
};
use futures::{Stream, StreamExt};
use flume::r#async::RecvStream;
use futures::{sink::Sink, Stream, StreamExt};
use fvm_ipld_car::CarHeader as FvmCarHeader;
elmattic marked this conversation as resolved.
Show resolved Hide resolved
use integer_encoding::VarInt;
use pin_project_lite::pin_project;
use serde::{Deserialize, Serialize};
Expand All @@ -15,6 +17,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncSeekExt};
use tokio_util::codec::FramedRead;
use tokio_util::compat::TokioAsyncReadCompatExt;
use tokio_util::either::Either;

use crate::utils::encoding::{from_slice_with_fallback, uvibytes::UviBytes};
Expand Down Expand Up @@ -137,6 +140,65 @@ impl<ReaderT: AsyncBufRead> Stream for CarStream<ReaderT> {
}
}

pin_project! {
pub struct CarWriter {
#[pin]
sender: flume::Sender<(Cid, Vec<u8>)>,
pub inner: std::pin::Pin<Box<dyn std::future::Future<Output = std::io::Result<()>>>>,
}

}

impl CarWriter {
pub fn new_carv1(roots: Vec<Cid>, dst: tokio::fs::File) -> Self {
let (sender, receiver): (
flume::Sender<(Cid, Vec<u8>)>,
flume::Receiver<(Cid, Vec<u8>)>,
) = flume::bounded(5);
let car_writer = FvmCarHeader::from(roots);

Self {
sender,
inner: Box::pin(async move {
let mut file = dst.compat();
let mut stream: RecvStream<'_, (Cid, Vec<u8>)> = receiver.stream();
car_writer
.write_stream_async(&mut file, &mut stream)
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
}),
}
}
}

impl Sink<Block> for CarWriter {
type Error = io::Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if !self.sender.is_full() {
return Poll::Ready(Ok(()));
} else {
if self.sender.is_disconnected() {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "channel dropped")));
} else {
return Poll::Pending;
}
}
}
fn start_send(self: Pin<&mut Self>, item: Block) -> Result<(), Self::Error> {
self.sender
.try_send((item.cid, item.data))
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
//self.project().writer.poll_flush(self, cx)
todo!()
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
todo!()
}
}

async fn read_header<ReaderT: AsyncRead + Unpin>(reader: &mut ReaderT) -> Option<CarHeader> {
let mut framed_reader = FramedRead::new(reader, UviBytes::default());
let header = from_slice_with_fallback::<CarHeader>(&framed_reader.next().await?.ok()?).ok()?;
Expand Down
Loading