Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for async parquet write (#372)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrandChaman authored Sep 3, 2021
1 parent d6879ba commit f4495b7
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ venv/bin/python parquet_integration/write_parquet.py
* Reading parquet is 10-20x faster (single core) and deserialization is parallelizable
* Writing parquet is 3-10x faster (single core) and serialization is parallelizable
* parquet IO has no `unsafe`
* parquet IO supports `async` read
* parquet IO supports `async`

### Others

Expand Down
33 changes: 33 additions & 0 deletions src/io/parquet/write/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use parquet2::write::RowGroupIter;
use parquet2::{
metadata::{KeyValue, SchemaDescriptor},
write::stream::write_stream as parquet_write_stream,
write::stream::write_stream_stream as parquet_write_stream_stream,
};

use crate::datatypes::*;
Expand Down Expand Up @@ -43,3 +44,35 @@ where
)
.await?)
}

/// Async writes
pub async fn write_stream_stream<'a, W, I>(
writer: &mut W,
row_groups: I,
schema: &Schema,
parquet_schema: SchemaDescriptor,
options: WriteOptions,
key_value_metadata: Option<Vec<KeyValue>>,
) -> Result<u64>
where
W: futures::io::AsyncWrite + Unpin + Send,
I: Stream<Item = Result<RowGroupIter<'static, ArrowError>>>,
{
let key_value_metadata = key_value_metadata
.map(|mut x| {
x.push(schema_to_metadata_key(schema));
x
})
.or_else(|| Some(vec![schema_to_metadata_key(schema)]));

let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string());
Ok(parquet_write_stream_stream(
writer,
row_groups,
parquet_schema,
options,
created_by,
key_value_metadata,
)
.await?)
}

0 comments on commit f4495b7

Please sign in to comment.