-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Read only enough bytes to infer Arrow IPC file schema via stream #7962
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,7 @@ | |
//! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format) | ||
|
||
use std::any::Any; | ||
use std::io::{Read, Seek}; | ||
use std::borrow::Cow; | ||
use std::sync::Arc; | ||
|
||
use crate::datasource::file_format::FileFormat; | ||
|
@@ -29,13 +29,18 @@ use crate::error::Result; | |
use crate::execution::context::SessionState; | ||
use crate::physical_plan::ExecutionPlan; | ||
|
||
use arrow::ipc::convert::fb_to_schema; | ||
use arrow::ipc::reader::FileReader; | ||
use arrow_schema::{Schema, SchemaRef}; | ||
use arrow::ipc::root_as_message; | ||
use arrow_schema::{ArrowError, Schema, SchemaRef}; | ||
|
||
use bytes::Bytes; | ||
use datafusion_common::{FileType, Statistics}; | ||
use datafusion_physical_expr::PhysicalExpr; | ||
|
||
use async_trait::async_trait; | ||
use futures::stream::BoxStream; | ||
use futures::StreamExt; | ||
use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; | ||
|
||
/// Arrow `FileFormat` implementation. | ||
|
@@ -59,13 +64,11 @@ impl FileFormat for ArrowFormat { | |
let r = store.as_ref().get(&object.location).await?; | ||
let schema = match r.payload { | ||
GetResultPayload::File(mut file, _) => { | ||
read_arrow_schema_from_reader(&mut file)? | ||
let reader = FileReader::try_new(&mut file, None)?; | ||
reader.schema() | ||
} | ||
GetResultPayload::Stream(_) => { | ||
// TODO: Fetching entire file to get schema is potentially wasteful | ||
let data = r.bytes().await?; | ||
let mut cursor = std::io::Cursor::new(&data); | ||
read_arrow_schema_from_reader(&mut cursor)? | ||
GetResultPayload::Stream(stream) => { | ||
infer_schema_from_file_stream(stream).await? | ||
} | ||
}; | ||
schemas.push(schema.as_ref().clone()); | ||
|
@@ -99,7 +102,140 @@ impl FileFormat for ArrowFormat { | |
} | ||
} | ||
|
||
fn read_arrow_schema_from_reader<R: Read + Seek>(reader: R) -> Result<SchemaRef> { | ||
let reader = FileReader::try_new(reader, None)?; | ||
Ok(reader.schema()) | ||
const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; | ||
const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; | ||
|
||
async fn infer_schema_from_file_stream( | ||
Jefffrey marked this conversation as resolved.
Show resolved
Hide resolved
|
||
mut stream: BoxStream<'static, object_store::Result<Bytes>>, | ||
) -> Result<SchemaRef> { | ||
// Expected format: | ||
// <magic number "ARROW1"> - 6 bytes | ||
// <empty padding bytes [to 8 byte boundary]> - 2 bytes | ||
// <continutation: 0xFFFFFFFF> - 4 bytes, not present below v0.15.0 | ||
// <metadata_size: int32> - 4 bytes | ||
// <metadata_flatbuffer: bytes> | ||
// <rest of file bytes> | ||
|
||
// So in first read we need at least all known sized sections, | ||
// which is 6 + 2 + 4 + 4 = 16 bytes. | ||
let bytes = collect_at_least_n_bytes(&mut stream, 16, None).await?; | ||
if bytes.len() < 16 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this error check happen within There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, fixed to have |
||
return Err(ArrowError::ParseError( | ||
"Arrow IPC file stream shorter than expected".to_string(), | ||
))?; | ||
} | ||
|
||
// Files should start with these magic bytes | ||
if bytes[0..6] != ARROW_MAGIC { | ||
return Err(ArrowError::ParseError( | ||
"Arrow file does not contian correct header".to_string(), | ||
))?; | ||
} | ||
|
||
// Since continuation marker bytes added in later versions | ||
let (meta_len, rest_of_bytes_start_index) = if bytes[8..12] == CONTINUATION_MARKER { | ||
(&bytes[12..16], 16) | ||
} else { | ||
(&bytes[8..12], 12) | ||
}; | ||
|
||
let meta_len = [meta_len[0], meta_len[1], meta_len[2], meta_len[3]]; | ||
let meta_len = i32::from_le_bytes(meta_len); | ||
|
||
// Read bytes for Schema message | ||
let block_data = if bytes[rest_of_bytes_start_index..].len() < meta_len as usize { | ||
// Need to read more bytes to decode Message | ||
let mut block_data = Vec::with_capacity(meta_len as usize); | ||
// In case we had some spare bytes in our initial read chunk | ||
block_data.extend_from_slice(&bytes[rest_of_bytes_start_index..]); | ||
let size_to_read = meta_len as usize - block_data.len(); | ||
let block_data = | ||
collect_at_least_n_bytes(&mut stream, size_to_read, Some(block_data)).await?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Related to my previous comment, there is currently no check here that we actually did read at least n bytes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
Cow::Owned(block_data) | ||
} else { | ||
// Already have the bytes we need | ||
let end_index = meta_len as usize + rest_of_bytes_start_index; | ||
let block_data = &bytes[rest_of_bytes_start_index..end_index]; | ||
Cow::Borrowed(block_data) | ||
}; | ||
|
||
// Decode Schema message | ||
let message = root_as_message(&block_data).map_err(|err| { | ||
ArrowError::ParseError(format!("Unable to read IPC message as metadata: {err:?}")) | ||
})?; | ||
let ipc_schema = message.header_as_schema().ok_or_else(|| { | ||
ArrowError::IpcError("Unable to read IPC message as schema".to_string()) | ||
})?; | ||
let schema = fb_to_schema(ipc_schema); | ||
|
||
Ok(Arc::new(schema)) | ||
} | ||
|
||
async fn collect_at_least_n_bytes( | ||
stream: &mut BoxStream<'static, object_store::Result<Bytes>>, | ||
n: usize, | ||
extend_from: Option<Vec<u8>>, | ||
) -> Result<Vec<u8>> { | ||
let mut buf = extend_from.unwrap_or_else(|| Vec::with_capacity(n)); | ||
// If extending existing buffer then ensure we read n additional bytes | ||
let n = n + buf.len(); | ||
while let Some(bytes) = stream.next().await.transpose()? { | ||
buf.extend_from_slice(&bytes); | ||
if buf.len() >= n { | ||
break; | ||
} | ||
} | ||
Ok(buf) | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use chrono::DateTime; | ||
use object_store::{chunked::ChunkedStore, memory::InMemory, path::Path}; | ||
|
||
use crate::execution::context::SessionContext; | ||
|
||
use super::*; | ||
|
||
#[tokio::test] | ||
async fn test_infer_schema_stream() -> Result<()> { | ||
let mut bytes = std::fs::read("tests/data/example.arrow")?; | ||
bytes.truncate(bytes.len() - 20); // mangle end to show we don't need to read whole file | ||
let location = Path::parse("example.arrow")?; | ||
let in_memory_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new()); | ||
in_memory_store.put(&location, bytes.into()).await?; | ||
|
||
let session_ctx = SessionContext::new(); | ||
let state = session_ctx.state(); | ||
let object_meta = ObjectMeta { | ||
location, | ||
last_modified: DateTime::default(), | ||
size: usize::MAX, | ||
e_tag: None, | ||
}; | ||
|
||
let arrow_format = ArrowFormat {}; | ||
let expected = vec!["f0: Int64", "f1: Utf8", "f2: Boolean"]; | ||
|
||
// Test chunk sizes where too small so we keep having to read more bytes | ||
// And when large enough that first read contains all we need | ||
for chunk_size in [7, 3000] { | ||
let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), chunk_size)); | ||
let inferred_schema = arrow_format | ||
.infer_schema( | ||
&state, | ||
&(store.clone() as Arc<dyn ObjectStore>), | ||
&[object_meta.clone()], | ||
) | ||
.await?; | ||
let actual_fields = inferred_schema | ||
.fields() | ||
.iter() | ||
.map(|f| format!("{}: {:?}", f.name(), f.data_type())) | ||
.collect::<Vec<_>>(); | ||
assert_eq!(expected, actual_fields); | ||
} | ||
|
||
Ok(()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about moving this logic upstream into the arrow-rs reader.
https://github.com/apache/arrow-rs/blob/78735002d99eb0212166924948f95554c4ac2866/arrow-ipc/src/reader.rs#L560
If you agree, I can file an upstream ticket to do so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I would rather the approach described on apache/arrow-rs#5021, reading the footer is more generally useful, providing information beyond just the schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this logic was essentially ripped from
StreamReader
andFileReader
ofarrow-ipc
, but adjusted to be made compatible with async stream of bytes. We could move this logic toarrow-ipc
, but we need to keep in mind that though we are getting a stream of bytes, this is a stream of bytes in the IPC file format and not the IPC streaming format. So anAsyncStreamReader
might not exactly fit our use, whereas anAsyncFileReader
could but might be limited if we don't read its footer when attempting to decode the rest of the data.It seems this ticket could be appropriate for that. Just to note, that we can't exactly read the footer in a stream without reverting to the old method of reading the entire stream just to decode the schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea would be to do something similar to what we do to read the parquet footer, I provided a few more details on the linked ticket. The trick is to perform ranged reads
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah apologies, I took a look at the parquet code and I see what you mean now. That indeed would be a better approach, in line with existing behaviour for
FileReader
👍