Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to read Arrow streams asynchronously (#832)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Feb 12, 2022
1 parent 394a699 commit 9e6924b
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 25 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ full = [
"io_ipc",
"io_flight",
"io_ipc_write_async",
"io_ipc_read_async",
"io_ipc_compression",
"io_json_integration",
"io_print",
Expand All @@ -132,6 +133,7 @@ io_csv_write = ["csv", "streaming-iterator", "lexical-core"]
io_json = ["serde", "serde_json", "streaming-iterator", "fallible-streaming-iterator", "indexmap", "lexical-core"]
io_ipc = ["arrow-format"]
io_ipc_write_async = ["io_ipc", "futures"]
io_ipc_read_async = ["io_ipc", "futures"]
io_ipc_compression = ["lz4", "zstd"]
io_flight = ["io_ipc", "arrow-format/flight-data"]
# base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format.
Expand Down
3 changes: 3 additions & 0 deletions src/io/ipc/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ mod read_basic;
mod reader;
mod schema;
mod stream;
#[cfg(feature = "io_ipc_read_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_read_async")))]
pub mod stream_async;

pub use common::{read_dictionary, read_record_batch};
pub use reader::{read_file_metadata, FileMetadata, FileReader};
Expand Down
30 changes: 29 additions & 1 deletion src/io/ipc/read/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use crate::{
error::{ArrowError, Result},
};

use super::super::{IpcField, IpcSchema};
use super::{
super::{IpcField, IpcSchema},
StreamMetadata,
};

fn try_unzip_vec<A, B, I: Iterator<Item = Result<(A, B)>>>(iter: I) -> Result<(Vec<A>, Vec<B>)> {
let mut a = vec![];
Expand Down Expand Up @@ -370,3 +373,28 @@ pub(super) fn fb_to_schema(schema: arrow_format::ipc::SchemaRef) -> Result<(Sche
},
))
}

pub(super) fn deserialize_stream_metadata(meta: &[u8]) -> Result<StreamMetadata> {
let message = arrow_format::ipc::MessageRef::read_as_root(meta).map_err(|err| {
ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err))
})?;
let version = message.version()?;
// message header is a Schema, so read it
let header = message
.header()?
.ok_or_else(|| ArrowError::oos("Unable to read the first IPC message"))?;
let schema = if let arrow_format::ipc::MessageHeaderRef::Schema(schema) = header {
schema
} else {
return Err(ArrowError::oos(
"The first IPC message of the stream must be a schema",
));
};
let (schema, ipc_schema) = fb_to_schema(schema)?;

Ok(StreamMetadata {
schema,
version,
ipc_schema,
})
}
26 changes: 2 additions & 24 deletions src/io/ipc/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::io::ipc::IpcSchema;

use super::super::CONTINUATION_MARKER;
use super::common::*;
use super::schema::fb_to_schema;
use super::schema::deserialize_stream_metadata;
use super::Dictionaries;

/// Metadata of an Arrow IPC stream, written at the start of the stream
Expand Down Expand Up @@ -45,29 +45,7 @@ pub fn read_stream_metadata<R: Read>(reader: &mut R) -> Result<StreamMetadata> {
let mut meta_buffer = vec![0; meta_len as usize];
reader.read_exact(&mut meta_buffer)?;

let message =
arrow_format::ipc::MessageRef::read_as_root(meta_buffer.as_slice()).map_err(|err| {
ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err))
})?;
let version = message.version()?;
// message header is a Schema, so read it
let header = message
.header()?
.ok_or_else(|| ArrowError::oos("Unable to read the first IPC message"))?;
let schema = if let arrow_format::ipc::MessageHeaderRef::Schema(schema) = header {
schema
} else {
return Err(ArrowError::oos(
"The first IPC message of the stream must be a schema",
));
};
let (schema, ipc_schema) = fb_to_schema(schema)?;

Ok(StreamMetadata {
schema,
version,
ipc_schema,
})
deserialize_stream_metadata(&meta_buffer)
}

/// Encodes the stream's status after each read.
Expand Down
212 changes: 212 additions & 0 deletions src/io/ipc/read/stream_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
//! APIs to read Arrow streams asynchronously
use std::sync::Arc;

use arrow_format::ipc::planus::ReadAsRoot;
use futures::future::BoxFuture;
use futures::AsyncRead;
use futures::AsyncReadExt;
use futures::Stream;

use crate::array::*;
use crate::chunk::Chunk;
use crate::error::{ArrowError, Result};

use super::super::CONTINUATION_MARKER;
use super::common::{read_dictionary, read_record_batch};
use super::schema::deserialize_stream_metadata;
use super::Dictionaries;
use super::StreamMetadata;

/// A (private) state of stream messages
struct ReadState<R> {
pub reader: R,
pub metadata: StreamMetadata,
pub dictionaries: Dictionaries,
/// The internal buffer to read data inside the messages (records and dictionaries) to
pub data_buffer: Vec<u8>,
/// The internal buffer to read messages to
pub message_buffer: Vec<u8>,
}

/// The state of an Arrow stream
enum StreamState<R> {
/// The stream does not contain new chunks (and it has not been closed)
Waiting(ReadState<R>),
/// The stream contain a new chunk
Some((ReadState<R>, Chunk<Arc<dyn Array>>)),
}

/// Reads the [`StreamMetadata`] of the Arrow stream asynchronously
pub async fn read_stream_metadata_async<R: AsyncRead + Unpin + Send>(
reader: &mut R,
) -> Result<StreamMetadata> {
// determine metadata length
let mut meta_size: [u8; 4] = [0; 4];
reader.read_exact(&mut meta_size).await?;
let meta_len = {
// If a continuation marker is encountered, skip over it and read
// the size from the next four bytes.
if meta_size == CONTINUATION_MARKER {
reader.read_exact(&mut meta_size).await?;
}
i32::from_le_bytes(meta_size)
};

let mut meta_buffer = vec![0; meta_len as usize];
reader.read_exact(&mut meta_buffer).await?;

deserialize_stream_metadata(&meta_buffer)
}

/// Reads the next item, yielding `None` if the stream has been closed,
/// or a [`StreamState`] otherwise.
async fn maybe_next<R: AsyncRead + Unpin + Send>(
mut state: ReadState<R>,
) -> Result<Option<StreamState<R>>> {
// determine metadata length
let mut meta_length: [u8; 4] = [0; 4];

match state.reader.read_exact(&mut meta_length).await {
Ok(()) => (),
Err(e) => {
return if e.kind() == std::io::ErrorKind::UnexpectedEof {
// Handle EOF without the "0xFFFFFFFF 0x00000000"
// valid according to:
// https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
Ok(Some(StreamState::Waiting(state)))
} else {
Err(ArrowError::from(e))
};
}
}

let meta_length = {
// If a continuation marker is encountered, skip over it and read
// the size from the next four bytes.
if meta_length == CONTINUATION_MARKER {
state.reader.read_exact(&mut meta_length).await?;
}
i32::from_le_bytes(meta_length) as usize
};

if meta_length == 0 {
// the stream has ended, mark the reader as finished
return Ok(None);
}

state.message_buffer.clear();
state.message_buffer.resize(meta_length, 0);
state.reader.read_exact(&mut state.message_buffer).await?;

let message =
arrow_format::ipc::MessageRef::read_as_root(&state.message_buffer).map_err(|err| {
ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err))
})?;
let header = message.header()?.ok_or_else(|| {
ArrowError::oos("IPC: unable to fetch the message header. The file or stream is corrupted.")
})?;

match header {
arrow_format::ipc::MessageHeaderRef::Schema(_) => Err(ArrowError::oos("A stream ")),
arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => {
// read the block that makes up the record batch into a buffer
state.data_buffer.clear();
state.data_buffer.resize(message.body_length()? as usize, 0);
state.reader.read_exact(&mut state.data_buffer).await?;

read_record_batch(
batch,
&state.metadata.schema.fields,
&state.metadata.ipc_schema,
None,
&state.dictionaries,
state.metadata.version,
&mut std::io::Cursor::new(&state.data_buffer),
0,
)
.map(|chunk| Some(StreamState::Some((state, chunk))))
}
arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => {
// read the block that makes up the dictionary batch into a buffer
let mut buf = vec![0; message.body_length()? as usize];
state.reader.read_exact(&mut buf).await?;

let mut dict_reader = std::io::Cursor::new(buf);

read_dictionary(
batch,
&state.metadata.schema.fields,
&state.metadata.ipc_schema,
&mut state.dictionaries,
&mut dict_reader,
0,
)?;

// read the next message until we encounter a Chunk<Arc<dyn Array>> message
Ok(Some(StreamState::Waiting(state)))
}
t => Err(ArrowError::OutOfSpec(format!(
"Reading types other than record batches not yet supported, unable to read {:?} ",
t
))),
}
}

/// A [`Stream`] over an Arrow IPC stream that asynchronously yields [`Chunk`]s.
pub struct AsyncStreamReader<R: AsyncRead + Unpin + Send + 'static> {
metadata: StreamMetadata,
future: Option<BoxFuture<'static, Result<Option<StreamState<R>>>>>,
}

impl<R: AsyncRead + Unpin + Send + 'static> AsyncStreamReader<R> {
/// Creates a new [`AsyncStreamReader`]
pub fn new(reader: R, metadata: StreamMetadata) -> Self {
let state = ReadState {
reader,
metadata: metadata.clone(),
dictionaries: Default::default(),
data_buffer: Default::default(),
message_buffer: Default::default(),
};
let future = Some(Box::pin(maybe_next(state)) as _);
Self { metadata, future }
}

/// Return the schema of the stream
pub fn metadata(&self) -> &StreamMetadata {
&self.metadata
}
}

impl<R: AsyncRead + Unpin + Send> Stream for AsyncStreamReader<R> {
type Item = Result<Chunk<Arc<dyn Array>>>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use std::pin::Pin;
use std::task::Poll;
let me = Pin::into_inner(self);

match &mut me.future {
Some(fut) => match fut.as_mut().poll(cx) {
Poll::Ready(Ok(None)) => {
me.future = None;
Poll::Ready(None)
}
Poll::Ready(Ok(Some(StreamState::Some((state, batch))))) => {
me.future = Some(Box::pin(maybe_next(state)));
Poll::Ready(Some(Ok(batch)))
}
Poll::Ready(Ok(Some(StreamState::Waiting(_)))) => Poll::Pending,
Poll::Ready(Err(err)) => {
me.future = None;
Poll::Ready(Some(Err(err)))
}
Poll::Pending => Poll::Pending,
},
None => Poll::Ready(None),
}
}
}
3 changes: 3 additions & 0 deletions tests/it/io/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ pub use common::read_gzip_json;

#[cfg(feature = "io_ipc_write_async")]
mod write_async;

#[cfg(feature = "io_ipc_read_async")]
mod read_stream_async;
45 changes: 45 additions & 0 deletions tests/it/io/ipc/read_stream_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use futures::StreamExt;
use tokio::fs::File;
use tokio_util::compat::*;

use arrow2::error::Result;
use arrow2::io::ipc::read::stream_async::*;

use crate::io::ipc::common::read_gzip_json;

async fn test_file(version: &str, file_name: &str) -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let mut file = File::open(format!(
"{}/arrow-ipc-stream/integration/{}/{}.stream",
testdata, version, file_name
))
.await?
.compat();

let metadata = read_stream_metadata_async(&mut file).await?;
let mut reader = AsyncStreamReader::new(file, metadata);

// read expected JSON output
let (schema, ipc_fields, batches) = read_gzip_json(version, file_name)?;

assert_eq!(&schema, &reader.metadata().schema);
assert_eq!(&ipc_fields, &reader.metadata().ipc_schema.fields);

let mut items = vec![];
while let Some(item) = reader.next().await {
items.push(item?)
}

batches
.iter()
.zip(items.into_iter())
.for_each(|(lhs, rhs)| {
assert_eq!(lhs, &rhs);
});
Ok(())
}

#[tokio::test]
async fn write_async() -> Result<()> {
test_file("1.0.0-littleendian", "generated_primitive").await
}

0 comments on commit 9e6924b

Please sign in to comment.