diff --git a/README.md b/README.md index 9292eb07c5b..8e0261280e6 100644 --- a/README.md +++ b/README.md @@ -74,7 +74,7 @@ venv/bin/python parquet_integration/write_parquet.py * Reading parquet is 10-20x faster (single core) and deserialization is parallelizable * Writing parquet is 3-10x faster (single core) and serialization is parallelizable * parquet IO has no `unsafe` -* parquet IO supports `async` read +* parquet IO supports `async` ### Others diff --git a/src/io/parquet/write/stream.rs b/src/io/parquet/write/stream.rs index 35972cab0b3..e8ee92e5d0b 100644 --- a/src/io/parquet/write/stream.rs +++ b/src/io/parquet/write/stream.rs @@ -4,6 +4,7 @@ use parquet2::write::RowGroupIter; use parquet2::{ metadata::{KeyValue, SchemaDescriptor}, write::stream::write_stream as parquet_write_stream, + write::stream::write_stream_stream as parquet_write_stream_stream, }; use crate::datatypes::*; @@ -43,3 +44,35 @@ where ) .await?) } + +/// Async writes +pub async fn write_stream_stream<'a, W, I>( + writer: &mut W, + row_groups: I, + schema: &Schema, + parquet_schema: SchemaDescriptor, + options: WriteOptions, + key_value_metadata: Option>, +) -> Result +where + W: futures::io::AsyncWrite + Unpin + Send, + I: Stream>>, +{ + let key_value_metadata = key_value_metadata + .map(|mut x| { + x.push(schema_to_metadata_key(schema)); + x + }) + .or_else(|| Some(vec![schema_to_metadata_key(schema)])); + + let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string()); + Ok(parquet_write_stream_stream( + writer, + row_groups, + parquet_schema, + options, + created_by, + key_value_metadata, + ) + .await?) +}