diff --git a/src/tool/subcommands/state_migration_cmd.rs b/src/tool/subcommands/state_migration_cmd.rs index 58a985f390b8..15df3e6c09f5 100644 --- a/src/tool/subcommands/state_migration_cmd.rs +++ b/src/tool/subcommands/state_migration_cmd.rs @@ -3,6 +3,7 @@ use crate::networks::{ActorBundleInfo, ACTOR_BUNDLES}; use crate::utils::db::car_stream::CarStream; +use crate::utils::db::car_stream::CarWriter; use crate::utils::db::car_util::merge_car_streams; use crate::utils::net::global_http_client; use anyhow::{Context as _, Result}; @@ -11,14 +12,13 @@ use cid::Cid; use clap::Subcommand; use futures::io::{BufReader, BufWriter}; use futures::{AsyncRead, AsyncWriteExt, StreamExt, TryStreamExt}; -use fvm_ipld_car::{CarHeader, CarReader}; +use fvm_ipld_car::CarReader; use itertools::Itertools; use once_cell::sync::Lazy; use reqwest::Url; use std::env; use std::path::{Path, PathBuf}; use std::{fs, io}; -use tokio_util::compat::TokioAsyncWriteCompatExt; const DEFAULT_BUNDLE_FILE_NAME: &str = "actor_bundles.car.zst"; @@ -68,24 +68,14 @@ async fn generate_actor_bundle() -> Result<()> { .cloned() .collect::>(); - let car_writer = CarHeader::from(all_roots); - - let mut zstd_encoder = ZstdEncoder::with_quality( + let zstd_encoder = ZstdEncoder::with_quality( tokio::fs::File::create(Path::new(DEFAULT_BUNDLE_FILE_NAME)).await?, async_compression::Level::Precise(zstd::zstd_safe::max_c_level()), - ) - .compat_write(); + ); - car_writer - .write_stream_async( - &mut zstd_encoder, - &mut std::pin::pin!(merge_car_streams(car_streams).map(|b| { - let b = b.expect("There should be no invalid blocks"); - (b.cid, b.data) - })), - ) + merge_car_streams(car_streams) + .forward(CarWriter::new_carv1(all_roots, zstd_encoder)?) .await?; - Ok(()) } diff --git a/src/utils/db/car_stream.rs b/src/utils/db/car_stream.rs index 21a5a046c05a..ef98003c3117 100644 --- a/src/utils/db/car_stream.rs +++ b/src/utils/db/car_stream.rs @@ -1,19 +1,22 @@ // Copyright 2019-2023 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT use async_compression::tokio::bufread::ZstdDecoder; -use bytes::{Buf, Bytes}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use cid::{ multihash::{Code, MultihashDigest}, Cid, }; -use futures::{Stream, StreamExt}; +use futures::ready; +use futures::{sink::Sink, Stream, StreamExt}; +use fvm_ipld_encoding::to_vec; use integer_encoding::VarInt; use pin_project_lite::pin_project; use serde::{Deserialize, Serialize}; use std::io::{self, Cursor, SeekFrom}; use std::pin::Pin; use std::task::{Context, Poll}; -use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncSeekExt}; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite}; +use tokio_util::codec::Encoder; use tokio_util::codec::FramedRead; use tokio_util::either::Either; use unsigned_varint::codec::UviBytes; @@ -26,7 +29,7 @@ pub struct CarHeader { pub version: u64, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct CarBlock { pub cid: Cid, pub data: Vec, @@ -139,6 +142,54 @@ impl Stream for CarStream { } } +pin_project! { + pub struct CarWriter { + #[pin] + inner: W, + buffer: BytesMut, + } +} + +impl CarWriter { + pub fn new_carv1(roots: Vec, writer: W) -> anyhow::Result { + let car_header = CarHeader { roots, version: 1 }; + + let mut header_uvi_frame = BytesMut::new(); + UviBytes::default().encode(Bytes::from(to_vec(&car_header)?), &mut header_uvi_frame)?; + + Ok(Self { + inner: writer, + buffer: header_uvi_frame, + }) + } +} + +impl Sink for CarWriter { + type Error = io::Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); + + while !this.buffer.is_empty() { + this = self.as_mut().project(); + let bytes_written = ready!(this.inner.poll_write(cx, this.buffer))?; + this.buffer.advance(bytes_written); + } + Poll::Ready(Ok(())) + } + fn start_send(self: Pin<&mut Self>, item: CarBlock) -> Result<(), Self::Error> { + item.write(&mut self.project().buffer.writer()) + } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(self.as_mut().poll_ready(cx))?; + self.project().inner.poll_flush(cx) + } + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(self.as_mut().poll_ready(cx))?; + self.project().inner.poll_shutdown(cx) + } +} + async fn read_header(reader: &mut ReaderT) -> Option { let mut framed_reader = FramedRead::new(reader, UviBytes::::default()); let header = from_slice_with_fallback::(&framed_reader.next().await?.ok()?).ok()?; diff --git a/src/utils/db/car_util.rs b/src/utils/db/car_util.rs index c115b1e50937..e55903cff58a 100644 --- a/src/utils/db/car_util.rs +++ b/src/utils/db/car_util.rs @@ -26,11 +26,14 @@ pub fn dedup_block_stream( #[cfg(test)] mod tests { use super::*; + use crate::utils::db::car_stream::CarWriter; use ahash::HashSet; + use async_compression::tokio::write::ZstdEncoder; use cid::multihash; use cid::multihash::MultihashDigest; use cid::Cid; use futures::executor::{block_on, block_on_stream}; + use futures::{StreamExt, TryStreamExt}; use fvm_ipld_encoding::DAG_CBOR; use itertools::Itertools; use pretty_assertions::assert_eq; @@ -48,6 +51,10 @@ mod tests { impl Blocks { async fn into_forest_car_zst_bytes(self) -> Vec { + self.into_forest_car_zst_bytes_with_roots().await.1 + } + + async fn into_forest_car_zst_bytes_with_roots(self) -> (Vec, Vec) { let roots = vec![self.0[0].cid]; let frames = crate::db::car::forest::Encoder::compress_stream( 8000_usize.next_power_of_two(), @@ -55,10 +62,10 @@ mod tests { self.into_stream().map_err(anyhow::Error::from), ); let mut writer = vec![]; - crate::db::car::forest::Encoder::write(&mut writer, roots, frames) + crate::db::car::forest::Encoder::write(&mut writer, roots.clone(), frames) .await .unwrap(); - writer + (roots, writer) } fn into_stream(self) -> impl Stream> { @@ -104,6 +111,27 @@ mod tests { }) } + #[quickcheck] + fn car_writer_roundtrip(blocks1: Blocks) -> anyhow::Result<()> { + block_on(async move { + let (all_roots, car) = blocks1.clone().into_forest_car_zst_bytes_with_roots().await; + let reader = CarStream::new(std::io::Cursor::new(&car)).await?; + + let mut buff: Vec = vec![]; + let zstd_encoder = ZstdEncoder::new(&mut buff); + reader + .forward(CarWriter::new_carv1(all_roots, zstd_encoder)?) + .await?; + + let stream = CarStream::new(std::io::Cursor::new(buff)).await?; + let blocks2 = Blocks(stream.try_collect().await?); + + assert_eq!(blocks1.0, blocks2.0); + + Ok::<_, anyhow::Error>(()) + }) + } + #[quickcheck] fn dedup_block_stream_tests_a_a(a: Blocks) { // ∀A. A∪A = A