-
Notifications
You must be signed in to change notification settings - Fork 819
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Blockwise IO in IPC FileReader (#5153) #5179
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,12 +27,12 @@ use std::io::{BufReader, Read, Seek, SeekFrom}; | |
use std::sync::Arc; | ||
|
||
use arrow_array::*; | ||
use arrow_buffer::{Buffer, MutableBuffer}; | ||
use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer}; | ||
use arrow_data::ArrayData; | ||
use arrow_schema::*; | ||
|
||
use crate::compression::CompressionCodec; | ||
use crate::{FieldNode, MetadataVersion, CONTINUATION_MARKER}; | ||
use crate::{Block, FieldNode, Message, MetadataVersion, CONTINUATION_MARKER}; | ||
use DataType::*; | ||
|
||
/// Read a buffer based on offset and length | ||
|
@@ -498,10 +498,34 @@ pub fn read_dictionary( | |
Ok(()) | ||
} | ||
|
||
/// Read the data for a given block | ||
fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, ArrowError> { | ||
reader.seek(SeekFrom::Start(block.offset() as u64))?; | ||
let body_len = block.bodyLength().to_usize().unwrap(); | ||
let metadata_len = block.metaDataLength().to_usize().unwrap(); | ||
let total_len = body_len.checked_add(metadata_len).unwrap(); | ||
|
||
let mut buf = MutableBuffer::from_len_zeroed(total_len); | ||
reader.read_exact(&mut buf)?; | ||
Ok(buf.into()) | ||
} | ||
|
||
/// Parse an encapsulated message | ||
/// | ||
/// <https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format> | ||
fn parse_message(buf: &[u8]) -> Result<Message, ArrowError> { | ||
let buf = match buf[..4] == CONTINUATION_MARKER { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does highlight a somewhat peculiar quirk of the IPC file format, it doesn't actually care about the size prefixes. I guess this is just a historical artifact of the fact the IPC streams came first. |
||
true => &buf[8..], | ||
false => &buf[4..], | ||
}; | ||
crate::root_as_message(buf) | ||
.map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))) | ||
} | ||
|
||
/// Arrow File reader | ||
pub struct FileReader<R: Read + Seek> { | ||
/// Buffered file reader that supports reading and seeking | ||
reader: BufReader<R>, | ||
reader: R, | ||
|
||
/// The schema that is read from the file header | ||
schema: SchemaRef, | ||
|
@@ -535,45 +559,35 @@ pub struct FileReader<R: Read + Seek> { | |
impl<R: Read + Seek> fmt::Debug for FileReader<R> { | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> { | ||
f.debug_struct("FileReader<R>") | ||
.field("reader", &"BufReader<..>") | ||
.field("schema", &self.schema) | ||
.field("blocks", &self.blocks) | ||
.field("current_block", &self.current_block) | ||
.field("total_blocks", &self.total_blocks) | ||
.field("dictionaries_by_id", &self.dictionaries_by_id) | ||
.field("metadata_version", &self.metadata_version) | ||
.field("projection", &self.projection) | ||
.finish() | ||
.finish_non_exhaustive() | ||
} | ||
} | ||
|
||
impl<R: Read + Seek> FileReader<R> { | ||
/// Try to create a new file reader | ||
/// | ||
/// Returns errors if the file does not meet the Arrow Format header and footer | ||
/// requirements | ||
pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> { | ||
let mut reader = BufReader::new(reader); | ||
// check if header and footer contain correct magic bytes | ||
let mut magic_buffer: [u8; 6] = [0; 6]; | ||
reader.read_exact(&mut magic_buffer)?; | ||
if magic_buffer != super::ARROW_MAGIC { | ||
return Err(ArrowError::ParseError( | ||
"Arrow file does not contain correct header".to_string(), | ||
)); | ||
} | ||
reader.seek(SeekFrom::End(-6))?; | ||
reader.read_exact(&mut magic_buffer)?; | ||
if magic_buffer != super::ARROW_MAGIC { | ||
/// Returns errors if the file does not meet the Arrow Format footer requirements | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should be sufficient to just check the footer, and this provides a more predictable IO pattern |
||
pub fn try_new(mut reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> { | ||
// Space for ARROW_MAGIC (6 bytes) and length (4 bytes) | ||
let mut buffer = [0; 10]; | ||
tustvold marked this conversation as resolved.
Show resolved
Hide resolved
|
||
reader.seek(SeekFrom::End(-10))?; | ||
reader.read_exact(&mut buffer)?; | ||
|
||
if buffer[4..] != super::ARROW_MAGIC { | ||
return Err(ArrowError::ParseError( | ||
"Arrow file does not contain correct footer".to_string(), | ||
)); | ||
} | ||
|
||
// read footer length | ||
let mut footer_size: [u8; 4] = [0; 4]; | ||
reader.seek(SeekFrom::End(-10))?; | ||
reader.read_exact(&mut footer_size)?; | ||
let footer_len = i32::from_le_bytes(footer_size); | ||
let footer_len = i32::from_le_bytes(buffer[..4].try_into().unwrap()); | ||
|
||
// read footer | ||
let mut footer_data = vec![0; footer_len as usize]; | ||
|
@@ -607,35 +621,14 @@ impl<R: Read + Seek> FileReader<R> { | |
let mut dictionaries_by_id = HashMap::new(); | ||
if let Some(dictionaries) = footer.dictionaries() { | ||
for block in dictionaries { | ||
// read length from end of offset | ||
let mut message_size: [u8; 4] = [0; 4]; | ||
reader.seek(SeekFrom::Start(block.offset() as u64))?; | ||
reader.read_exact(&mut message_size)?; | ||
if message_size == CONTINUATION_MARKER { | ||
reader.read_exact(&mut message_size)?; | ||
} | ||
let footer_len = i32::from_le_bytes(message_size); | ||
let mut block_data = vec![0; footer_len as usize]; | ||
|
||
reader.read_exact(&mut block_data)?; | ||
|
||
let message = crate::root_as_message(&block_data[..]).map_err(|err| { | ||
ArrowError::ParseError(format!("Unable to get root as message: {err:?}")) | ||
})?; | ||
let buf = read_block(&mut reader, block)?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. reducing the number of copies of the code that does the It seems like previously the code did 2 IOs per block (at least) to read the header and then the data. With this PR it will only do 1 IO a block (given there is no continuation) |
||
let message = parse_message(&buf)?; | ||
|
||
match message.header_type() { | ||
crate::MessageHeader::DictionaryBatch => { | ||
let batch = message.header_as_dictionary_batch().unwrap(); | ||
|
||
// read the block that makes up the dictionary batch into a buffer | ||
let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize); | ||
reader.seek(SeekFrom::Start( | ||
block.offset() as u64 + block.metaDataLength() as u64, | ||
))?; | ||
reader.read_exact(&mut buf)?; | ||
|
||
read_dictionary( | ||
&buf.into(), | ||
&buf.slice(block.metaDataLength() as _), | ||
batch, | ||
&schema, | ||
&mut dictionaries_by_id, | ||
|
@@ -702,27 +695,15 @@ impl<R: Read + Seek> FileReader<R> { | |
} | ||
|
||
fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> { | ||
let block = self.blocks[self.current_block]; | ||
let block = &self.blocks[self.current_block]; | ||
self.current_block += 1; | ||
|
||
// read length | ||
self.reader.seek(SeekFrom::Start(block.offset() as u64))?; | ||
let mut meta_buf = [0; 4]; | ||
self.reader.read_exact(&mut meta_buf)?; | ||
if meta_buf == CONTINUATION_MARKER { | ||
// continuation marker encountered, read message next | ||
self.reader.read_exact(&mut meta_buf)?; | ||
} | ||
let meta_len = i32::from_le_bytes(meta_buf); | ||
|
||
let mut block_data = vec![0; meta_len as usize]; | ||
self.reader.read_exact(&mut block_data)?; | ||
let message = crate::root_as_message(&block_data[..]).map_err(|err| { | ||
ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")) | ||
})?; | ||
let buffer = read_block(&mut self.reader, block)?; | ||
let message = parse_message(&buffer)?; | ||
|
||
// some old test data's footer metadata is not set, so we account for that | ||
if self.metadata_version != crate::MetadataVersion::V1 | ||
if self.metadata_version != MetadataVersion::V1 | ||
&& message.version() != self.metadata_version | ||
{ | ||
return Err(ArrowError::IpcError( | ||
|
@@ -739,14 +720,8 @@ impl<R: Read + Seek> FileReader<R> { | |
ArrowError::IpcError("Unable to read IPC message as record batch".to_string()) | ||
})?; | ||
// read the block that makes up the record batch into a buffer | ||
let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize); | ||
self.reader.seek(SeekFrom::Start( | ||
block.offset() as u64 + block.metaDataLength() as u64, | ||
))?; | ||
self.reader.read_exact(&mut buf)?; | ||
|
||
read_record_batch( | ||
&buf.into(), | ||
&buffer.slice(block.metaDataLength() as _), | ||
batch, | ||
self.schema(), | ||
&self.dictionaries_by_id, | ||
|
@@ -766,14 +741,14 @@ impl<R: Read + Seek> FileReader<R> { | |
/// | ||
/// It is inadvisable to directly read from the underlying reader. | ||
pub fn get_ref(&self) -> &R { | ||
self.reader.get_ref() | ||
&self.reader | ||
} | ||
|
||
/// Gets a mutable reference to the underlying reader. | ||
/// | ||
/// It is inadvisable to directly read from the underlying reader. | ||
pub fn get_mut(&mut self) -> &mut R { | ||
self.reader.get_mut() | ||
&mut self.reader | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am making an assumption here that blocks should be large enough to be an appropriate IO size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps @pitrou might know if this is a bad idea?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that a block typically points to an entire record batch, yes, a block is certainly large enough.