Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

IPC sink types and IPC file stream #878

Merged
merged 36 commits into from
Mar 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
1933f85
Added lifetime parameter to async IPC reader.
Feb 17, 2022
6c4df8d
Merge branch 'main' of github.com:mindx/arrow2 into main
Feb 17, 2022
6b834c5
Merge branch 'jorgecarleitao:main' into main
Feb 17, 2022
b55447e
Merge branch 'jorgecarleitao:main' into main
Mar 2, 2022
119f18e
Added lifetime parameter to async IPC reader.
Feb 17, 2022
a3c98ad
Added sink implementation for writing IPC streams.
Feb 17, 2022
10461fb
Added IPC file stream and sink implementations.
Feb 25, 2022
59ed7b1
Fixed error in writing compressed arrow (#855)
jorgecarleitao Feb 18, 2022
efe6a91
Simplified API for FFI (#854)
jorgecarleitao Feb 19, 2022
62b34cc
Bumped crc (#856)
jorgecarleitao Feb 20, 2022
7c90d8c
add support for datatypes serde (#858)
houqp Feb 22, 2022
fc35e7e
Added support to the Arrow C stream interface (read and write) (#857)
jorgecarleitao Feb 22, 2022
511e5ff
Docs and lint (#860)
jorgecarleitao Feb 23, 2022
c0320b1
Removed warnings when crate is compiled without flags (#847)
jorgecarleitao Feb 23, 2022
750aa57
Fixed reading parquet with timezone (#862)
jorgecarleitao Feb 24, 2022
ff6d951
Moved files internally for better organization (#863)
jorgecarleitao Feb 25, 2022
e76775e
Simplified API for writing to JSON (#864)
jorgecarleitao Feb 25, 2022
e174a23
change csv-writer (#866)
ritchie46 Feb 26, 2022
d1c43b9
Fixed json writing of dates and datetimes (#867)
jorgecarleitao Feb 26, 2022
df75eb2
Refactored JSON IO (better support for JSON and NDJSON) (#870)
jorgecarleitao Feb 27, 2022
4fbcfa2
Improved performance of filter performance via Simd selection [3x] (#…
sundy-li Feb 28, 2022
0dcea40
Added `try_new` and `new` to all arrays (#873)
jorgecarleitao Mar 1, 2022
e7cc3aa
Moved IPC stream sink into stream_async file.
Mar 2, 2022
bad7341
Added test case to IPC stream sink.
Mar 2, 2022
6f2d60d
Added test for async IPC file read/write.
Mar 2, 2022
c9c90e5
Added documentation for async IPC file reader.
Mar 2, 2022
47a8a52
Updated IPC tests.
Mar 2, 2022
9bbb74f
Merge branch 'main' into ipc_async_types
Mar 2, 2022
2be0923
Moved IPC sink test to correct directory.
Mar 2, 2022
32dc0c4
Moved IPC sink test to correct location.
Mar 3, 2022
dbf799b
Updated IPC sink interface to support fields.
Mar 3, 2022
ae2e079
Removed IPC StreamWriter.
Mar 3, 2022
819e4df
Fixed error in IPC file sink example.
Mar 3, 2022
114cb29
Merge branch 'jorgecarleitao:main' into main
Mar 3, 2022
6ac0865
Updated IPC Record type to support reference types.
Mar 3, 2022
3afd996
Merge branch 'main' into ipc_async_types
Mar 3, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ io_csv_write = ["csv", "csv-core", "streaming-iterator", "lexical-core"]
io_json = ["serde", "serde_json", "streaming-iterator", "fallible-streaming-iterator", "indexmap", "lexical-core"]
io_ipc = ["arrow-format"]
io_ipc_write_async = ["io_ipc", "futures"]
io_ipc_read_async = ["io_ipc", "futures"]
io_ipc_read_async = ["io_ipc", "futures", "async-stream"]
io_ipc_compression = ["lz4", "zstd"]
io_flight = ["io_ipc", "arrow-format/flight-data"]
# base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format.
Expand Down
275 changes: 275 additions & 0 deletions src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
//! Async reader for Arrow IPC files
use std::io::SeekFrom;
use std::sync::Arc;

use arrow_format::ipc::{
planus::{ReadAsRoot, Vector},
BlockRef, FooterRef, MessageHeaderRef, MessageRef,
};
use futures::{
stream::BoxStream, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, Stream, StreamExt,
};

use crate::array::*;
use crate::chunk::Chunk;
use crate::datatypes::{Field, Schema};
use crate::error::{ArrowError, Result};
use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER};

use super::common::{read_dictionary, read_record_batch};
use super::reader::get_serialized_batch;
use super::schema::fb_to_schema;
use super::Dictionaries;
use super::FileMetadata;

/// Async reader for Arrow IPC files
pub struct FileStream<'a> {
stream: BoxStream<'a, Result<Chunk<Arc<dyn Array>>>>,
metadata: FileMetadata,
schema: Schema,
}

impl<'a> FileStream<'a> {
/// Create a new IPC file reader.
///
/// # Examples
/// See [`FileSink`](crate::io::ipc::write::file_async::FileSink).
pub fn new<R>(reader: R, metadata: FileMetadata, projection: Option<Vec<usize>>) -> Self
where
R: AsyncRead + AsyncSeek + Unpin + Send + 'a,
{
let schema = if let Some(projection) = projection.as_ref() {
projection.windows(2).for_each(|x| {
assert!(
x[0] < x[1],
"IPC projection must be ordered and non-overlapping",
)
});
let fields = projection
.iter()
.map(|&x| metadata.schema.fields[x].clone())
.collect::<Vec<_>>();
Schema {
fields,
metadata: metadata.schema.metadata.clone(),
}
} else {
metadata.schema.clone()
};

let stream = Self::stream(reader, metadata.clone(), projection);
Self {
stream,
metadata,
schema,
}
}

/// Get the metadata from the IPC file.
pub fn metadata(&self) -> &FileMetadata {
&self.metadata
}

/// Get the projected schema from the IPC file.
pub fn schema(&self) -> &Schema {
&self.schema
}

fn stream<R>(
mut reader: R,
metadata: FileMetadata,
projection: Option<Vec<usize>>,
) -> BoxStream<'a, Result<Chunk<Arc<dyn Array>>>>
where
R: AsyncRead + AsyncSeek + Unpin + Send + 'a,
{
async_stream::try_stream! {
let mut meta_buffer = vec![];
let mut block_buffer = vec![];
for block in 0..metadata.blocks.len() {
let chunk = read_batch(
&mut reader,
&metadata,
projection.as_deref(),
block,
&mut meta_buffer,
&mut block_buffer,
).await?;
yield chunk;
}
}
.boxed()
}
}

impl<'a> Stream for FileStream<'a> {
type Item = Result<Chunk<Arc<dyn Array>>>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.get_mut().stream.poll_next_unpin(cx)
}
}

/// Read the metadata from an IPC file.
pub async fn read_file_metadata_async<R>(mut reader: R) -> Result<FileMetadata>
where
R: AsyncRead + AsyncSeek + Unpin,
{
// Check header
let mut magic = [0; 6];
reader.read_exact(&mut magic).await?;
if magic != ARROW_MAGIC {
return Err(ArrowError::OutOfSpec(
"file does not contain correct Arrow header".to_string(),
));
}
// Check footer
reader.seek(SeekFrom::End(-6)).await?;
reader.read_exact(&mut magic).await?;
if magic != ARROW_MAGIC {
return Err(ArrowError::OutOfSpec(
"file does not contain correct Arrow footer".to_string(),
));
}
// Get footer size
let mut footer_size = [0; 4];
reader.seek(SeekFrom::End(-10)).await?;
reader.read_exact(&mut footer_size).await?;
let footer_size = i32::from_le_bytes(footer_size);
// Read footer
let mut footer = vec![0; footer_size as usize];
reader.seek(SeekFrom::End(-10 - footer_size as i64)).await?;
reader.read_exact(&mut footer).await?;
let footer = FooterRef::read_as_root(&footer[..])
.map_err(|err| ArrowError::OutOfSpec(format!("unable to get root as footer: {:?}", err)))?;

let blocks = footer.record_batches()?.ok_or_else(|| {
ArrowError::OutOfSpec("unable to get record batches from footer".to_string())
})?;
let schema = footer
.schema()?
.ok_or_else(|| ArrowError::OutOfSpec("unable to get schema from footer".to_string()))?;
let (schema, ipc_schema) = fb_to_schema(schema)?;
let dictionary_blocks = footer.dictionaries()?;
let dictionaries = if let Some(blocks) = dictionary_blocks {
read_dictionaries(reader, &schema.fields[..], &ipc_schema, blocks).await?
} else {
Default::default()
};

Ok(FileMetadata {
schema,
ipc_schema,
blocks: blocks
.iter()
.map(|block| Ok(block.try_into()?))
.collect::<Result<Vec<_>>>()?,
dictionaries,
})
}

async fn read_dictionaries<R>(
mut reader: R,
fields: &[Field],
ipc_schema: &IpcSchema,
blocks: Vector<'_, BlockRef<'_>>,
) -> Result<Dictionaries>
where
R: AsyncRead + AsyncSeek + Unpin,
{
let mut dictionaries = Default::default();
let mut data = vec![];
let mut buffer = vec![];

for block in blocks {
let offset = block.offset() as u64;
read_dictionary_message(&mut reader, offset, &mut data).await?;

let message = MessageRef::read_as_root(&data).map_err(|err| {
ArrowError::OutOfSpec(format!("unable to get root as message: {:?}", err))
})?;
let header = message
.header()?
.ok_or_else(|| ArrowError::oos("message must have a header"))?;
match header {
MessageHeaderRef::DictionaryBatch(batch) => {
buffer.clear();
buffer.resize(block.body_length() as usize, 0);
reader.read_exact(&mut buffer).await?;
let mut cursor = std::io::Cursor::new(&mut buffer);
read_dictionary(batch, fields, ipc_schema, &mut dictionaries, &mut cursor, 0)?;
}
other => {
return Err(ArrowError::OutOfSpec(format!(
"expected DictionaryBatch in dictionary blocks, found {:?}",
other,
)))
}
}
}
Ok(dictionaries)
}

async fn read_dictionary_message<R>(mut reader: R, offset: u64, data: &mut Vec<u8>) -> Result<()>
where
R: AsyncRead + AsyncSeek + Unpin,
{
let mut message_size = [0; 4];
reader.seek(SeekFrom::Start(offset)).await?;
reader.read_exact(&mut message_size).await?;
if message_size == CONTINUATION_MARKER {
reader.read_exact(&mut message_size).await?;
}
let footer_size = i32::from_le_bytes(message_size);
data.clear();
data.resize(footer_size as usize, 0);
reader.read_exact(data).await?;

Ok(())
}

async fn read_batch<R>(
mut reader: R,
metadata: &FileMetadata,
projection: Option<&[usize]>,
block: usize,
meta_buffer: &mut Vec<u8>,
block_buffer: &mut Vec<u8>,
) -> Result<Chunk<Arc<dyn Array>>>
where
R: AsyncRead + AsyncSeek + Unpin,
{
let block = metadata.blocks[block];
reader.seek(SeekFrom::Start(block.offset as u64)).await?;
let mut meta_buf = [0; 4];
reader.read_exact(&mut meta_buf).await?;
if meta_buf == CONTINUATION_MARKER {
reader.read_exact(&mut meta_buf).await?;
}
let meta_len = i32::from_le_bytes(meta_buf) as usize;
meta_buffer.clear();
meta_buffer.resize(meta_len, 0);
reader.read_exact(meta_buffer).await?;

let message = MessageRef::read_as_root(&meta_buffer[..])
.map_err(|err| ArrowError::oos(format!("unable to parse message: {:?}", err)))?;
let batch = get_serialized_batch(&message)?;
block_buffer.clear();
block_buffer.resize(message.body_length()? as usize, 0);
reader.read_exact(block_buffer).await?;
let mut cursor = std::io::Cursor::new(block_buffer);
let chunk = read_record_batch(
batch,
&metadata.schema.fields,
&metadata.ipc_schema,
projection,
&metadata.dictionaries,
message.version()?,
&mut cursor,
0,
)?;
Ok(chunk)
}
4 changes: 4 additions & 0 deletions src/io/ipc/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ mod stream;
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_read_async")))]
pub mod stream_async;

#[cfg(feature = "io_ipc_read_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_read_async")))]
pub mod file_async;

pub use common::{read_dictionary, read_record_batch};
pub use reader::{read_file_metadata, FileMetadata, FileReader};
pub use schema::deserialize_schema;
Expand Down
6 changes: 3 additions & 3 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ pub struct FileMetadata {
/// The blocks in the file
///
/// A block indicates the regions in the file to read to get data
blocks: Vec<arrow_format::ipc::Block>,
pub(super) blocks: Vec<arrow_format::ipc::Block>,

/// Dictionaries associated to each dict_id
dictionaries: Dictionaries,
pub(super) dictionaries: Dictionaries,
}

/// Arrow File reader
Expand Down Expand Up @@ -166,7 +166,7 @@ pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata
})
}

fn get_serialized_batch<'a>(
pub(super) fn get_serialized_batch<'a>(
message: &'a arrow_format::ipc::MessageRef,
) -> Result<arrow_format::ipc::RecordBatchRef<'a>> {
let header = message.header()?.ok_or_else(|| {
Expand Down
53 changes: 53 additions & 0 deletions src/io/ipc/write/common.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::borrow::{Borrow, Cow};
use std::sync::Arc;

use arrow_format::ipc::planus::Builder;
Expand Down Expand Up @@ -379,3 +380,55 @@ pub struct EncodedData {
pub(crate) fn pad_to_8(len: usize) -> usize {
(((len + 7) & !7) - len) as usize
}

/// An array [`Chunk`] with optional accompanying IPC fields.
#[derive(Debug, Clone, PartialEq)]
pub struct Record<'a> {
columns: Cow<'a, Chunk<Arc<dyn Array>>>,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious: what is the purpose of Cow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to allow passing the arguments either by value or by reference. Obviously, better to avoid unnecessary clones if possible, but also wanted to avoid requiring a reference since then if you want to pass owned values you have to use the somewhat non-ergonomic sink.feed((&chunk).into()) or sink.feed((&chunk, &fields[..]).into()). In contrast, with the chosen implementation you can use sink.feed(chunk.into()) or sink.feed((chunk, fields).into()) in all cases.

I originally used a generic implementation that parameterized Record to accept a type implementing Borrow<_>, but it turns out that having a single type that implements sink for multiple other types (e.g. FileSink implementing both Sink<Record<Chunk<_>>> and Sink<Record&<Chunk<_>>>) doesn't work because the compiler can't infer the correct generic type when you call sink.flush() or sink.close().

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the explanation. Cool tip!

fields: Option<Cow<'a, [IpcField]>>,
}

impl<'a> Record<'a> {
/// Get the IPC fields for this record.
pub fn fields(&self) -> Option<&[IpcField]> {
self.fields.as_deref()
}

/// Get the Arrow columns in this record.
pub fn columns(&self) -> &Chunk<Arc<dyn Array>> {
self.columns.borrow()
}
}

impl From<Chunk<Arc<dyn Array>>> for Record<'static> {
fn from(columns: Chunk<Arc<dyn Array>>) -> Self {
Self {
columns: Cow::Owned(columns),
fields: None,
}
}
}

impl<'a, F> From<(Chunk<Arc<dyn Array>>, Option<F>)> for Record<'a>
where
F: Into<Cow<'a, [IpcField]>>,
{
fn from((columns, fields): (Chunk<Arc<dyn Array>>, Option<F>)) -> Self {
Self {
columns: Cow::Owned(columns),
fields: fields.map(|f| f.into()),
}
}
}

impl<'a, F> From<(&'a Chunk<Arc<dyn Array>>, Option<F>)> for Record<'a>
where
F: Into<Cow<'a, [IpcField]>>,
{
fn from((columns, fields): (&'a Chunk<Arc<dyn Array>>, Option<F>)) -> Self {
Self {
columns: Cow::Borrowed(columns),
fields: fields.map(|f| f.into()),
}
}
}
Loading