From 3dd151d2a42758dee7453c39340debb6a0cde07c Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Wed, 17 Apr 2019 21:58:37 +0200 Subject: [PATCH 01/16] ARROW-5180: [Rust] Initial Arrow FIle reader impl --- rust/arrow/src/ipc/convert.rs | 70 ++++++- rust/arrow/src/ipc/file/mod.rs | 18 ++ rust/arrow/src/ipc/file/reader.rs | 290 ++++++++++++++++++++++++++++ rust/arrow/src/ipc/mod.rs | 1 + rust/arrow/test/data/arrow_file.dat | Bin 0 -> 2842 bytes 5 files changed, 378 insertions(+), 1 deletion(-) create mode 100644 rust/arrow/src/ipc/file/mod.rs create mode 100644 rust/arrow/src/ipc/file/reader.rs create mode 100644 rust/arrow/test/data/arrow_file.dat diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index abcabd70ef011..da452aca3ccbc 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -17,7 +17,8 @@ //! Utilities for converting between IPC types and native Arrow types -use crate::datatypes::DataType::*; +use crate::datatypes::DataType; +use crate::datatypes::Field; use crate::datatypes::Schema; use crate::ipc; @@ -25,6 +26,7 @@ use flatbuffers::FlatBufferBuilder; /// Serialize a schema in IPC format fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder { + use DataType::*; let mut fbb = FlatBufferBuilder::new(); let mut fields = vec![]; @@ -61,6 +63,72 @@ fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder { fbb } +/// Deserialize a Schema table from IPC format to Schema data type +pub fn fb_to_schema(fb: ipc::Schema) -> Schema { + let mut fields: Vec = vec![]; + let c_fields = fb.fields().unwrap(); + let len = c_fields.len(); + for i in 0..len { + let c_field: ipc::Field = c_fields.get(i); + let field = Field::new( + c_field.name().unwrap(), + get_data_type(c_field), + c_field.nullable(), + ); + fields.push(field); + } + Schema::new(fields) +} + +fn get_fbs_type(dtype: DataType) -> ipc::Type { + use ipc::Type::*; + use DataType::*; + + match dtype { + Boolean => Bool, + Int8 | Int16 | Int32 | Int64 => Int, + UInt8 | UInt16 | UInt32 | UInt64 => Int, + Float16 => unimplemented!("Float16 type not supported in Rust Arrow"), + Float32 | Float64 => FloatingPoint, + DataType::Timestamp(_) => ipc::Type::Timestamp, + Date32(_) | Date64(_) => Date, + Time32(_) | Time64(_) => Time, + DataType::Interval(_) => unimplemented!("Interval type not supported"), + DataType::Utf8 => ipc::Type::Utf8, + DataType::List(_) => ipc::Type::List, + Struct(_) => Struct_, + _ => unimplemented!("Type not supported in Rust Arrow"), + } +} + +fn get_data_type(field: ipc::Field) -> DataType { + match field.type_type() { + ipc::Type::Bool => DataType::Boolean, + ipc::Type::Int => { + let int = field.type__as_int().unwrap(); + match (int.bitWidth(), int.is_signed()) { + (8, true) => DataType::Int8, + (8, false) => DataType::UInt8, + (16, true) => DataType::Int16, + (16, false) => DataType::UInt16, + (32, true) => DataType::Int32, + (32, false) => DataType::UInt32, + (64, true) => DataType::Int64, + (64, false) => DataType::UInt64, + _ => panic!("Unexpected bitwidth and signed"), + } + } + ipc::Type::Utf8 => DataType::Utf8, + ipc::Type::FloatingPoint => DataType::Float64, + t @ _ => unimplemented!("Type {:?} not supported", t), + // ipc::Type::BINARY => DataType::Utf8, + // ipc::Type::CATEGORY => unimplemented!("Reading CATEGORY type columns not implemented"), + // ipc::Type::TIMESTAMP | fbs::Type::DATE | fbs::Type::TIME => { + // unimplemented!("Reading date and time fields not implemented") + // } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/arrow/src/ipc/file/mod.rs b/rust/arrow/src/ipc/file/mod.rs new file mode 100644 index 0000000000000..49535903ab599 --- /dev/null +++ b/rust/arrow/src/ipc/file/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod reader; diff --git a/rust/arrow/src/ipc/file/reader.rs b/rust/arrow/src/ipc/file/reader.rs new file mode 100644 index 0000000000000..c961d4f7733c6 --- /dev/null +++ b/rust/arrow/src/ipc/file/reader.rs @@ -0,0 +1,290 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Arrow File Reader + +use std::io::{BufReader, Read, Seek, SeekFrom}; +use std::sync::Arc; + +use crate::array::ArrayRef; +use crate::array_data::ArrayData; +use crate::buffer::Buffer; +use crate::datatypes::{DataType, Schema}; +use crate::error::{ArrowError, Result}; +use crate::ipc; +use crate::record_batch::RecordBatch; + +static ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; + +fn as_u32_le(array: &[u8; 4]) -> u32 { + ((array[0] as u32) << 0) + + ((array[1] as u32) << 8) + + ((array[2] as u32) << 16) + + ((array[3] as u32) << 24) +} + +fn create_record_batch(schema: Schema, node: &ipc::FieldNode) -> Result { + unimplemented!() +} + +fn read_buffer(c_buf: &ipc::Buffer, a_data: &Vec) -> Buffer { + let start_offset = c_buf.offset() as usize; + let end_offset = start_offset + c_buf.length() as usize; + let buf_data = &a_data[start_offset..end_offset]; + Buffer::from(&buf_data) +} + +/// Reads the correct number of buffers based on data type and null_count, and creates an array ref +fn create_array( + c_node: &ipc::FieldNode, + data_type: &DataType, + a_data: &Vec, + c_bufs: &[ipc::Buffer], + mut offset: usize, +) -> (ArrayRef, usize) { + use DataType::*; + let null_count = c_node.null_count() as usize; + let array_data = match data_type { + Utf8 => { + if null_count > 0 { + // read 3 buffers + let array_data = ArrayData::new( + data_type.clone(), + c_node.length() as usize, + Some(null_count), + Some(read_buffer(&c_bufs[offset], a_data)), + 0, + vec![ + read_buffer(&c_bufs[offset + 1], a_data), + read_buffer(&c_bufs[offset + 2], a_data), + ], + vec![], + ); + offset = offset + 3; + array_data + } else { + // read 2 buffers + let array_data = ArrayData::new( + data_type.clone(), + c_node.length() as usize, + Some(null_count), + None, + 0, + vec![ + read_buffer(&c_bufs[offset], a_data), + read_buffer(&c_bufs[offset + 1], a_data), + ], + vec![], + ); + offset = offset + 2; + array_data + } + } + Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 | Float32 + | Boolean | Float64 => { + if null_count > 0 { + // read 3 buffers + let array_data = ArrayData::new( + data_type.clone(), + c_node.length() as usize, + Some(null_count), + Some(read_buffer(&c_bufs[offset], a_data)), + 0, + vec![read_buffer(&c_bufs[offset + 1], a_data)], + vec![], + ); + offset = offset + 2; + array_data + } else { + // read 2 buffers + let array_data = ArrayData::new( + data_type.clone(), + c_node.length() as usize, + Some(null_count), + None, + 0, + vec![read_buffer(&c_bufs[offset], a_data)], + vec![], + ); + offset = offset + 1; + array_data + } + } + t @ _ => panic!("Data type {:?} not supported", t), + }; + + (crate::array::make_array(Arc::new(array_data)), offset) +} + +pub struct Reader { + reader: BufReader, + offset: usize, + schema: Schema, + blocks: Vec, + current_block: usize, + total_blocks: usize, +} + +impl Reader { + /// create a new reader + pub fn try_new(reader: R) -> Result { + let mut reader = BufReader::new(reader); + // check if header and footer contain correct magic bytes + let mut magic_buffer: [u8; 6] = [0; 6]; + reader.read_exact(&mut magic_buffer)?; + if magic_buffer != ARROW_MAGIC { + return Err(ArrowError::IoError( + "Arrow file does not contain correct header".to_string(), + )); + } + reader.seek(SeekFrom::End(-6))?; + reader.read_exact(&mut magic_buffer)?; + if magic_buffer != ARROW_MAGIC { + return Err(ArrowError::IoError( + "Arrow file does not contain correct footer".to_string(), + )); + } + reader.seek(SeekFrom::Start(8))?; + // determine metadata length + let mut meta_size: [u8; 4] = [0; 4]; + reader.read_exact(&mut meta_size)?; + let meta_len = as_u32_le(&meta_size); + + let mut meta_buffer = vec![0; meta_len as usize]; + reader.seek(SeekFrom::Start(12))?; + reader.read_exact(&mut meta_buffer)?; + + let vecs = &meta_buffer.to_vec(); + let c_message = ipc::get_root_as_message(vecs); + // message header is a Schema, so read it + let c_schema: ipc::Schema = c_message.header_as_schema().unwrap(); + let schema = ipc::convert::fb_to_schema(c_schema); + + // what does the footer contain? + let mut footer_size: [u8; 4] = [0; 4]; + reader.seek(SeekFrom::End(-10))?; + reader.read_exact(&mut footer_size)?; + let footer_len = as_u32_le(&footer_size); + + // read footer + let mut footer_data = vec![0; footer_len as usize]; + reader.seek(SeekFrom::End(-10 - footer_len as i64))?; + reader.read_exact(&mut footer_data)?; + let c_footer = ipc::get_root_as_footer(&footer_data[..]); + + let c_blocks = c_footer.recordBatches().unwrap(); + + dbg!(c_footer.recordBatches()); + let total_blocks = c_blocks.len(); + + Ok(Self { + reader, + offset: 8 + 4 + meta_len as usize, + schema, + blocks: c_blocks.to_vec(), + current_block: 0, + total_blocks, + }) + } + + /// Read file into record batches + pub fn read(&mut self) -> Result> { + // get current block + if self.current_block < self.total_blocks { + let block = self.blocks[self.current_block]; + self.current_block = self.current_block + 1; + + // read length from end of offset + let meta_len = block.metaDataLength() - 4; + + let mut block_data = vec![0; meta_len as usize]; + self.reader + .seek(SeekFrom::Start(block.offset() as u64 + 4))?; + self.reader.read_exact(&mut block_data)?; + + let c_block = ipc::get_root_as_message(&block_data[..]); + + match c_block.header_type() { + ipc::MessageHeader::Schema => { + panic!("Not expecting a schema when messages are read") + } + ipc::MessageHeader::DictionaryBatch => { + unimplemented!("reading dictionary batches not yet supported") + } + ipc::MessageHeader::RecordBatch => { + let c_batch = c_block.header_as_record_batch().unwrap(); + // read array data + let mut a_data = vec![0; block.bodyLength() as usize]; + self.reader.seek(SeekFrom::Start( + block.offset() as u64 + block.metaDataLength() as u64, + ))?; + self.reader.read_exact(&mut a_data)?; + + // construct buffers from their blocks + let c_buffers = c_batch.buffers().unwrap(); + + // get fields and determine number of buffers to use for each + let c_nodes = c_batch.nodes().unwrap(); + let mut buffer_num = 0; + let mut field_num = 0; + let mut arrays = vec![]; + for c_node in c_nodes { + let field = self.schema.field(field_num); + let (array, buffer) = create_array( + c_node, + field.data_type(), + &a_data, + c_buffers, + buffer_num, + ); + field_num = field_num + 1; + buffer_num = buffer; + + arrays.push(array); + } + + RecordBatch::try_new(Arc::new(self.schema.clone()), arrays) + .map(|batch| Some(batch)) + } + ipc::MessageHeader::SparseTensor => panic!(), + ipc::MessageHeader::Tensor => panic!("Can't be Tensor"), + ipc::MessageHeader::NONE => panic!("Can't be NONE"), + } + } else { + Ok(None) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::fs::File; + + #[test] + fn test_read_file() { + let file = File::open("./test/data/arrow_file.dat").unwrap(); + + let mut reader = Reader::try_new(file).unwrap(); + let batch: RecordBatch = reader.read().unwrap().unwrap(); + + assert_eq!(5, batch.num_rows()); + assert_eq!(4, batch.num_columns()); + let arr_1 = batch.column(0); + } +} diff --git a/rust/arrow/src/ipc/mod.rs b/rust/arrow/src/ipc/mod.rs index c0220d7687ecf..41f8150c76025 100644 --- a/rust/arrow/src/ipc/mod.rs +++ b/rust/arrow/src/ipc/mod.rs @@ -16,6 +16,7 @@ // under the License. pub mod convert; +pub mod file; pub mod gen; diff --git a/rust/arrow/test/data/arrow_file.dat b/rust/arrow/test/data/arrow_file.dat new file mode 100644 index 0000000000000000000000000000000000000000..7e36888ad30bda6dbd43f83594459ebef54ff6fe GIT binary patch literal 2842 zcmeHJy-EW?5T46jk~5reg{UCH6&4mLqCS9^Cs5L8X;P$36ar$E!ot!Au&}hqL-+`m zzD@kTotY5M2wHmsvpYZE{>`#G(;o~ju1-XfNF*0hM>1(iQ&LEvD26W3QnZP+ltf;@ zH{dRq`r~rB#DLZgWR9VYw1B;VetahKrkJ>ZPMl)|Yo_29Y?wm={#4{s@y&f$xj)Az zkSGTn_Mc-mj&FyRb304aI09yUofQ5QD4qrs6==d7JqVyK;y^qplU>ZnNMvDFiDThD z=5z+?K+mubjnx7+?y;%P&~B~Cwx+Kg=AyeWjQfvG=USvPw0=J#m!>=Z+H}WHEB^b6 zPL;9qed5T0{;G4(o({#0X&htIeJ=OYOds!;F(Ge*$CJrucsCk8+|Fj)=K)x0evZC0 z8U1UJ57iLHQ>^ccuSLmY{*z9W-$#94vU-1|hR^L Date: Wed, 17 Apr 2019 23:07:51 +0200 Subject: [PATCH 02/16] some clean-up, add random IO --- rust/arrow/src/ipc/convert.rs | 58 ++++++++++++--- rust/arrow/src/ipc/file/reader.rs | 118 +++++++++++++++++++++++------- 2 files changed, 140 insertions(+), 36 deletions(-) diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index da452aca3ccbc..dda7ea4cdf240 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -17,14 +17,14 @@ //! Utilities for converting between IPC types and native Arrow types -use crate::datatypes::DataType; -use crate::datatypes::Field; -use crate::datatypes::Schema; +use crate::datatypes::{DataType, DateUnit, Field, Schema, TimeUnit}; use crate::ipc; use flatbuffers::FlatBufferBuilder; /// Serialize a schema in IPC format +/// +/// TODO(Neville) add bit-widths and other field metadata to flatbuffer Type fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder { use DataType::*; let mut fbb = FlatBufferBuilder::new(); @@ -97,10 +97,10 @@ fn get_fbs_type(dtype: DataType) -> ipc::Type { DataType::Utf8 => ipc::Type::Utf8, DataType::List(_) => ipc::Type::List, Struct(_) => Struct_, - _ => unimplemented!("Type not supported in Rust Arrow"), } } +/// Get the Arrow data type from the flatbuffer Field table fn get_data_type(field: ipc::Field) -> DataType { match field.type_type() { ipc::Type::Bool => DataType::Boolean, @@ -118,14 +118,50 @@ fn get_data_type(field: ipc::Field) -> DataType { _ => panic!("Unexpected bitwidth and signed"), } } - ipc::Type::Utf8 => DataType::Utf8, - ipc::Type::FloatingPoint => DataType::Float64, + ipc::Type::Utf8 | ipc::Type::Binary => DataType::Utf8, + ipc::Type::FloatingPoint => { + let float = field.type__as_floating_point().unwrap(); + match float.precision() { + ipc::Precision::HALF => DataType::Float16, + ipc::Precision::SINGLE => DataType::Float32, + ipc::Precision::DOUBLE => DataType::Float64, + } + } + ipc::Type::Time => { + let time = field.type__as_time().unwrap(); + match (time.bitWidth(), time.unit()) { + (32, ipc::TimeUnit::SECOND) => DataType::Time32(TimeUnit::Second), + (32, ipc::TimeUnit::MILLISECOND) => { + DataType::Time32(TimeUnit::Millisecond) + } + (64, ipc::TimeUnit::MICROSECOND) => { + DataType::Time64(TimeUnit::Microsecond) + } + (64, ipc::TimeUnit::NANOSECOND) => DataType::Time64(TimeUnit::Nanosecond), + z @ _ => panic!( + "Time type with bit witdh of {} and unit of {:?} not supported", + z.0, z.1 + ), + } + } + ipc::Type::Timestamp => { + let timestamp = field.type__as_timestamp().unwrap(); + match timestamp.unit() { + ipc::TimeUnit::SECOND => DataType::Timestamp(TimeUnit::Second), + ipc::TimeUnit::MILLISECOND => DataType::Timestamp(TimeUnit::Millisecond), + ipc::TimeUnit::MICROSECOND => DataType::Timestamp(TimeUnit::Microsecond), + ipc::TimeUnit::NANOSECOND => DataType::Timestamp(TimeUnit::Nanosecond), + } + } + ipc::Type::Date => { + let date = field.type__as_date().unwrap(); + match date.unit() { + ipc::DateUnit::DAY => DataType::Date32(DateUnit::Day), + ipc::DateUnit::MILLISECOND => DataType::Date64(DateUnit::Millisecond), + } + } + // TODO add interval support t @ _ => unimplemented!("Type {:?} not supported", t), - // ipc::Type::BINARY => DataType::Utf8, - // ipc::Type::CATEGORY => unimplemented!("Reading CATEGORY type columns not implemented"), - // ipc::Type::TIMESTAMP | fbs::Type::DATE | fbs::Type::TIME => { - // unimplemented!("Reading date and time fields not implemented") - // } } } diff --git a/rust/arrow/src/ipc/file/reader.rs b/rust/arrow/src/ipc/file/reader.rs index c961d4f7733c6..e53abffc2f86e 100644 --- a/rust/arrow/src/ipc/file/reader.rs +++ b/rust/arrow/src/ipc/file/reader.rs @@ -30,17 +30,7 @@ use crate::record_batch::RecordBatch; static ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; -fn as_u32_le(array: &[u8; 4]) -> u32 { - ((array[0] as u32) << 0) - + ((array[1] as u32) << 8) - + ((array[2] as u32) << 16) - + ((array[3] as u32) << 24) -} - -fn create_record_batch(schema: Schema, node: &ipc::FieldNode) -> Result { - unimplemented!() -} - +/// Read a buffer based on offset and length fn read_buffer(c_buf: &ipc::Buffer, a_data: &Vec) -> Buffer { let start_offset = c_buf.offset() as usize; let end_offset = start_offset + c_buf.length() as usize; @@ -48,7 +38,8 @@ fn read_buffer(c_buf: &ipc::Buffer, a_data: &Vec) -> Buffer { Buffer::from(&buf_data) } -/// Reads the correct number of buffers based on data type and null_count, and creates an array ref +/// Reads the correct number of buffers based on data type and null_count, and creates an +/// array ref fn create_array( c_node: &ipc::FieldNode, data_type: &DataType, @@ -95,7 +86,8 @@ fn create_array( } } Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 | Float32 - | Boolean | Float64 => { + | Boolean | Float64 | Time32(_) | Time64(_) | Timestamp(_) | Date32(_) + | Date64(_) => { if null_count > 0 { // read 3 buffers let array_data = ArrayData::new( @@ -124,23 +116,34 @@ fn create_array( array_data } } + // TODO implement list and struct if I can find/generate test data t @ _ => panic!("Data type {:?} not supported", t), }; (crate::array::make_array(Arc::new(array_data)), offset) } +/// Arrow File reader pub struct Reader { + /// Buffered reader that supports reading and seeking reader: BufReader, - offset: usize, - schema: Schema, + /// The schema that is read from the file header + schema: Arc, + /// The blocks in the file + /// + /// A block indicates the regions in the file to read to get data blocks: Vec, + /// A counter to keep track of the current block that should be read current_block: usize, + /// The total number of blocks, which may contain record batches and other types total_blocks: usize, } impl Reader { - /// create a new reader + /// Try to create a new reader + /// + /// Returns errors if the file does not meet the Arrow Format header and footer + /// requirements pub fn try_new(reader: R) -> Result { let mut reader = BufReader::new(reader); // check if header and footer contain correct magic bytes @@ -162,7 +165,7 @@ impl Reader { // determine metadata length let mut meta_size: [u8; 4] = [0; 4]; reader.read_exact(&mut meta_size)?; - let meta_len = as_u32_le(&meta_size); + let meta_len = u32::from_le_bytes(meta_size); let mut meta_buffer = vec![0; meta_len as usize]; reader.seek(SeekFrom::Start(12))?; @@ -178,7 +181,7 @@ impl Reader { let mut footer_size: [u8; 4] = [0; 4]; reader.seek(SeekFrom::End(-10))?; reader.read_exact(&mut footer_size)?; - let footer_len = as_u32_le(&footer_size); + let footer_len = u32::from_le_bytes(footer_size); // read footer let mut footer_data = vec![0; footer_len as usize]; @@ -188,21 +191,29 @@ impl Reader { let c_blocks = c_footer.recordBatches().unwrap(); - dbg!(c_footer.recordBatches()); let total_blocks = c_blocks.len(); Ok(Self { reader, - offset: 8 + 4 + meta_len as usize, - schema, + schema: Arc::new(schema), blocks: c_blocks.to_vec(), current_block: 0, total_blocks, }) } - /// Read file into record batches - pub fn read(&mut self) -> Result> { + /// Return the number of batches in the file + pub fn num_batches(&self) -> usize { + self.total_blocks + } + + /// Return the schema of the file + pub fn schema(&self) -> Arc { + self.schema.clone() + } + + /// Read the next record batch + pub fn next(&mut self) -> Result> { // get current block if self.current_block < self.total_blocks { let block = self.blocks[self.current_block]; @@ -257,7 +268,7 @@ impl Reader { arrays.push(array); } - RecordBatch::try_new(Arc::new(self.schema.clone()), arrays) + RecordBatch::try_new(self.schema.clone(), arrays) .map(|batch| Some(batch)) } ipc::MessageHeader::SparseTensor => panic!(), @@ -268,12 +279,29 @@ impl Reader { Ok(None) } } + + /// Read a specific record batch + /// + /// Sets the current block to the batch number, and reads the record batch at that + /// block + pub fn read_batch(&mut self, batch_num: usize) -> Result> { + if batch_num >= self.total_blocks { + Err(ArrowError::IoError(format!( + "Cannot read batch at index {} from {} total batches", + batch_num, self.total_blocks + ))) + } else { + self.current_block = batch_num; + self.next() + } + } } #[cfg(test)] mod tests { use super::*; + use crate::array::*; use std::fs::File; #[test] @@ -281,10 +309,50 @@ mod tests { let file = File::open("./test/data/arrow_file.dat").unwrap(); let mut reader = Reader::try_new(file).unwrap(); - let batch: RecordBatch = reader.read().unwrap().unwrap(); + assert_eq!(5, reader.num_batches()); + for _ in 0..reader.num_batches() { + let batch = reader.next().unwrap().unwrap(); + validate_batch(batch); + } + // try read a batch after all batches are exhausted + let batch = reader.next().unwrap(); + assert!(batch.is_none()); + + // seek a specific batch + let batch = reader.read_batch(4).unwrap().unwrap(); + validate_batch(batch); + // try read a batch after seeking to the last batch + let batch = reader.next().unwrap(); + assert!(batch.is_none()); + } + fn validate_batch(batch: RecordBatch) { assert_eq!(5, batch.num_rows()); assert_eq!(4, batch.num_columns()); let arr_1 = batch.column(0); + let int32_array = Int32Array::from(arr_1.data()); + assert_eq!( + "PrimitiveArray\n[\n 1,\n 2,\n 3,\n null,\n 5,\n]", + format!("{:?}", int32_array) + ); + let arr_2 = batch.column(1); + let binary_array = BinaryArray::from(arr_2.data()); + assert_eq!("foo", std::str::from_utf8(binary_array.value(0)).unwrap()); + assert_eq!("bar", std::str::from_utf8(binary_array.value(1)).unwrap()); + assert_eq!("baz", std::str::from_utf8(binary_array.value(2)).unwrap()); + assert!(binary_array.is_null(3)); + assert_eq!("quux", std::str::from_utf8(binary_array.value(4)).unwrap()); + let arr_3 = batch.column(2); + let f32_array = Float32Array::from(arr_3.data()); + assert_eq!( + "PrimitiveArray\n[\n 1.0,\n 2.0,\n null,\n 4.0,\n 5.0,\n]", + format!("{:?}", f32_array) + ); + let arr_4 = batch.column(3); + let bool_array = BooleanArray::from(arr_4.data()); + assert_eq!( + "PrimitiveArray\n[\n true,\n null,\n false,\n true,\n false,\n]", + format!("{:?}", bool_array) + ); } } From 0f30ef6092f0dbb502ee3b7dcb4154441836969d Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Fri, 26 Apr 2019 17:14:30 +0200 Subject: [PATCH 03/16] read lists and structs, reafactor reader --- rust/arrow/src/array/array.rs | 29 ++ rust/arrow/src/ipc/convert.rs | 50 ++- rust/arrow/src/ipc/file/reader.rs | 428 +++++++++++++++++------ rust/arrow/test/data/rust_types.file.dat | Bin 0 -> 13594 bytes 4 files changed, 378 insertions(+), 129 deletions(-) create mode 100644 rust/arrow/test/data/rust_types.file.dat diff --git a/rust/arrow/src/array/array.rs b/rust/arrow/src/array/array.rs index 8dcc81e084acf..b3e2169aad7fa 100644 --- a/rust/arrow/src/array/array.rs +++ b/rust/arrow/src/array/array.rs @@ -1544,6 +1544,35 @@ impl fmt::Debug for StructArray { } } +impl From<(Vec<(Field, ArrayRef)>, Buffer, usize)> for StructArray { + fn from(triple: (Vec<(Field, ArrayRef)>, Buffer, usize)) -> Self { + let (field_types, field_values): (Vec<_>, Vec<_>) = triple.0.into_iter().unzip(); + + // Check the length of the child arrays + let length = field_values[0].len(); + for i in 1..field_values.len() { + assert_eq!( + length, + field_values[i].len(), + "all child arrays of a StructArray must have the same length" + ); + assert_eq!( + field_types[i].data_type(), + field_values[i].data().data_type(), + "the field data types must match the array data in a StructArray" + ) + } + + let data = ArrayData::builder(DataType::Struct(field_types)) + .add_buffer(triple.1) + .child_data(field_values.into_iter().map(|a| a.data()).collect()) + .len(length) + .null_count(triple.2) + .build(); + Self::from(data) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index dda7ea4cdf240..0e2e2a8b00477 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -23,9 +23,8 @@ use crate::ipc; use flatbuffers::FlatBufferBuilder; /// Serialize a schema in IPC format -/// -/// TODO(Neville) add bit-widths and other field metadata to flatbuffer Type fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder { + // TODO add bit-widths and other field metadata to flatbuffer Type use DataType::*; let mut fbb = FlatBufferBuilder::new(); @@ -63,6 +62,14 @@ fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder { fbb } +fn fb_to_field(field: ipc::Field) -> Field { + Field::new( + field.name().unwrap(), + get_data_type(field), + field.nullable(), + ) +} + /// Deserialize a Schema table from IPC format to Schema data type pub fn fb_to_schema(fb: ipc::Schema) -> Schema { let mut fields: Vec = vec![]; @@ -70,12 +77,7 @@ pub fn fb_to_schema(fb: ipc::Schema) -> Schema { let len = c_fields.len(); for i in 0..len { let c_field: ipc::Field = c_fields.get(i); - let field = Field::new( - c_field.name().unwrap(), - get_data_type(c_field), - c_field.nullable(), - ); - fields.push(field); + fields.push(fb_to_field(c_field)); } Schema::new(fields) } @@ -102,6 +104,7 @@ fn get_fbs_type(dtype: DataType) -> ipc::Type { /// Get the Arrow data type from the flatbuffer Field table fn get_data_type(field: ipc::Field) -> DataType { + // TODO add recursion protection for deeply-nested fields (struct and list) match field.type_type() { ipc::Type::Bool => DataType::Boolean, ipc::Type::Int => { @@ -118,7 +121,9 @@ fn get_data_type(field: ipc::Field) -> DataType { _ => panic!("Unexpected bitwidth and signed"), } } - ipc::Type::Utf8 | ipc::Type::Binary => DataType::Utf8, + ipc::Type::Utf8 | ipc::Type::Binary | ipc::Type::FixedSizeBinary => { + DataType::Utf8 + } ipc::Type::FloatingPoint => { let float = field.type__as_floating_point().unwrap(); match float.precision() { @@ -127,6 +132,13 @@ fn get_data_type(field: ipc::Field) -> DataType { ipc::Precision::DOUBLE => DataType::Float64, } } + ipc::Type::Date => { + let date = field.type__as_date().unwrap(); + match date.unit() { + ipc::DateUnit::DAY => DataType::Date32(DateUnit::Day), + ipc::DateUnit::MILLISECOND => DataType::Date64(DateUnit::Millisecond), + } + } ipc::Type::Time => { let time = field.type__as_time().unwrap(); match (time.bitWidth(), time.unit()) { @@ -153,12 +165,22 @@ fn get_data_type(field: ipc::Field) -> DataType { ipc::TimeUnit::NANOSECOND => DataType::Timestamp(TimeUnit::Nanosecond), } } - ipc::Type::Date => { - let date = field.type__as_date().unwrap(); - match date.unit() { - ipc::DateUnit::DAY => DataType::Date32(DateUnit::Day), - ipc::DateUnit::MILLISECOND => DataType::Date64(DateUnit::Millisecond), + ipc::Type::List => { + let children = field.children().unwrap(); + if children.len() != 1 { + panic!("expect a list to have one child") + } + let child_field = children.get(0); + // returning int16 for now, to test, not sure how to get data type + DataType::List(Box::new(get_data_type(child_field))) + } + ipc::Type::Struct_ => { + let children = field.children().unwrap(); + let mut fields = vec![]; + for i in 0..children.len() { + fields.push(fb_to_field(children.get(i))); } + DataType::Struct(fields) } // TODO add interval support t @ _ => unimplemented!("Type {:?} not supported", t), diff --git a/rust/arrow/src/ipc/file/reader.rs b/rust/arrow/src/ipc/file/reader.rs index e53abffc2f86e..dbbd46f6362e4 100644 --- a/rust/arrow/src/ipc/file/reader.rs +++ b/rust/arrow/src/ipc/file/reader.rs @@ -38,89 +38,189 @@ fn read_buffer(c_buf: &ipc::Buffer, a_data: &Vec) -> Buffer { Buffer::from(&buf_data) } -/// Reads the correct number of buffers based on data type and null_count, and creates an -/// array ref +/// Coordinates reading arrays based on data types fn create_array( - c_node: &ipc::FieldNode, + nodes: &[ipc::FieldNode], data_type: &DataType, - a_data: &Vec, - c_bufs: &[ipc::Buffer], - mut offset: usize, -) -> (ArrayRef, usize) { + data: &Vec, + buffers: &[ipc::Buffer], + mut node_index: usize, + mut buffer_index: usize, +) -> (ArrayRef, usize, usize) { use DataType::*; - let null_count = c_node.null_count() as usize; - let array_data = match data_type { + let array = match data_type { Utf8 => { - if null_count > 0 { - // read 3 buffers - let array_data = ArrayData::new( - data_type.clone(), - c_node.length() as usize, - Some(null_count), - Some(read_buffer(&c_bufs[offset], a_data)), - 0, - vec![ - read_buffer(&c_bufs[offset + 1], a_data), - read_buffer(&c_bufs[offset + 2], a_data), - ], - vec![], - ); - offset = offset + 3; - array_data - } else { - // read 2 buffers - let array_data = ArrayData::new( - data_type.clone(), - c_node.length() as usize, - Some(null_count), - None, - 0, - vec![ - read_buffer(&c_bufs[offset], a_data), - read_buffer(&c_bufs[offset + 1], a_data), - ], - vec![], + let array = create_primitive_array( + &nodes[node_index], + data_type, + buffers[buffer_index..buffer_index + 3] + .iter() + .map(|buf| read_buffer(buf, data)) + .collect(), + ); + node_index = node_index + 1; + buffer_index = buffer_index + 3; + array + } + List(ref list_data_type) => { + let list_node = &nodes[node_index]; + let list_buffers: Vec = buffers[buffer_index..buffer_index + 2] + .iter() + .map(|buf| read_buffer(buf, data)) + .collect(); + node_index = node_index + 1; + buffer_index = buffer_index + 2; + let triple = create_array( + nodes, + list_data_type, + data, + buffers, + node_index, + buffer_index, + ); + node_index = triple.1; + buffer_index = triple.2; + + create_list_array(list_node, data_type, &list_buffers[..], triple.0) + } + Struct(struct_fields) => { + let struct_node = &nodes[node_index]; + let null_buffer: Buffer = read_buffer(&buffers[buffer_index], data); + node_index = node_index + 1; + buffer_index = buffer_index + 1; + + // read the arrays for each field + let mut struct_arrays = vec![]; + // TODO investigate whether just knowing the number of buffers could + // still work + for struct_field in struct_fields { + let triple = create_array( + nodes, + struct_field.data_type(), + data, + buffers, + node_index, + buffer_index, ); - offset = offset + 2; - array_data + node_index = triple.1; + buffer_index = triple.2; + struct_arrays.push((struct_field.clone(), triple.0)); } + // create struct array from fields, arrays and null data + let struct_array = crate::array::StructArray::from(( + struct_arrays, + null_buffer, + struct_node.null_count() as usize, + )); + Arc::new(struct_array) + } + _ => { + let array = create_primitive_array( + &nodes[node_index], + data_type, + buffers[buffer_index..buffer_index + 2] + .iter() + .map(|buf| read_buffer(buf, data)) + .collect(), + ); + node_index = node_index + 1; + buffer_index = buffer_index + 2; + array + } + }; + (array, node_index, buffer_index) +} + +/// Reads the correct number of buffers based on data type and null_count, and creates a +/// primitive array ref +fn create_primitive_array( + field_node: &ipc::FieldNode, + data_type: &DataType, + buffers: Vec, +) -> ArrayRef { + use DataType::*; + let length = field_node.length() as usize; + let null_count = field_node.null_count() as usize; + let array_data = match data_type { + Utf8 => { + // read 3 buffers + ArrayData::new( + data_type.clone(), + length, + Some(null_count), + Some(buffers[0].clone()), + 0, + buffers[1..3].to_vec(), + vec![], + ) } Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 | Float32 | Boolean | Float64 | Time32(_) | Time64(_) | Timestamp(_) | Date32(_) - | Date64(_) => { - if null_count > 0 { - // read 3 buffers - let array_data = ArrayData::new( - data_type.clone(), - c_node.length() as usize, - Some(null_count), - Some(read_buffer(&c_bufs[offset], a_data)), - 0, - vec![read_buffer(&c_bufs[offset + 1], a_data)], - vec![], - ); - offset = offset + 2; - array_data - } else { - // read 2 buffers - let array_data = ArrayData::new( - data_type.clone(), - c_node.length() as usize, - Some(null_count), - None, - 0, - vec![read_buffer(&c_bufs[offset], a_data)], - vec![], - ); - offset = offset + 1; - array_data - } - } - // TODO implement list and struct if I can find/generate test data - t @ _ => panic!("Data type {:?} not supported", t), + | Date64(_) => ArrayData::new( + data_type.clone(), + length, + Some(null_count), + Some(buffers[0].clone()), + 0, + buffers[1..].to_vec(), + vec![], + ), + t @ _ => panic!("Data type {:?} either unsupported or not primitive", t), }; - (crate::array::make_array(Arc::new(array_data)), offset) + crate::array::make_array(Arc::new(array_data)) +} + +fn create_list_array( + field_node: &ipc::FieldNode, + data_type: &DataType, + buffers: &[Buffer], + child_array: ArrayRef, +) -> ArrayRef { + if let &DataType::List(_) = data_type { + let array_data = ArrayData::new( + data_type.clone(), + field_node.length() as usize, + Some(field_node.null_count() as usize), + Some(buffers[0].clone()), + 0, + buffers[1..2].to_vec(), + vec![child_array.data()], + ); + crate::array::make_array(Arc::new(array_data)) + } else { + panic!("Cannot create list array from {:?}", data_type) + } +} + +fn read_record_batch( + buf: &Vec, + batch: ipc::RecordBatch, + schema: Arc, +) -> Result> { + let buffers = batch.buffers().unwrap(); + let field_nodes = batch.nodes().unwrap(); + // keep track of buffer and node index, the functions that create arrays mutate these + let mut buffer_index = 0; + let mut node_index = 0; + let mut arrays = vec![]; + + // keep track of index as lists require more than one node + for field in schema.fields() { + let triple = create_array( + field_nodes, + field.data_type(), + &buf, + buffers, + node_index, + buffer_index, + ); + node_index = triple.1; + buffer_index = triple.2; + arrays.push(triple.0); + } + + RecordBatch::try_new(schema.clone(), arrays).map(|batch| Some(batch)) } /// Arrow File reader @@ -227,9 +327,9 @@ impl Reader { .seek(SeekFrom::Start(block.offset() as u64 + 4))?; self.reader.read_exact(&mut block_data)?; - let c_block = ipc::get_root_as_message(&block_data[..]); + let message = ipc::get_root_as_message(&block_data[..]); - match c_block.header_type() { + match message.header_type() { ipc::MessageHeader::Schema => { panic!("Not expecting a schema when messages are read") } @@ -237,43 +337,23 @@ impl Reader { unimplemented!("reading dictionary batches not yet supported") } ipc::MessageHeader::RecordBatch => { - let c_batch = c_block.header_as_record_batch().unwrap(); - // read array data - let mut a_data = vec![0; block.bodyLength() as usize]; + let batch = message.header_as_record_batch().unwrap(); + // read the block that makes up the record batch into a buffer + let mut buf = vec![0; block.bodyLength() as usize]; self.reader.seek(SeekFrom::Start( block.offset() as u64 + block.metaDataLength() as u64, ))?; - self.reader.read_exact(&mut a_data)?; - - // construct buffers from their blocks - let c_buffers = c_batch.buffers().unwrap(); - - // get fields and determine number of buffers to use for each - let c_nodes = c_batch.nodes().unwrap(); - let mut buffer_num = 0; - let mut field_num = 0; - let mut arrays = vec![]; - for c_node in c_nodes { - let field = self.schema.field(field_num); - let (array, buffer) = create_array( - c_node, - field.data_type(), - &a_data, - c_buffers, - buffer_num, - ); - field_num = field_num + 1; - buffer_num = buffer; - - arrays.push(array); - } - - RecordBatch::try_new(self.schema.clone(), arrays) - .map(|batch| Some(batch)) + self.reader.read_exact(&mut buf)?; + + read_record_batch(&buf, batch, self.schema()) + } + ipc::MessageHeader::SparseTensor => { + unimplemented!("reading sparse tensors not yet supported") } - ipc::MessageHeader::SparseTensor => panic!(), - ipc::MessageHeader::Tensor => panic!("Can't be Tensor"), - ipc::MessageHeader::NONE => panic!("Can't be NONE"), + ipc::MessageHeader::Tensor => { + unimplemented!("reading tensors not yet supported") + } + ipc::MessageHeader::NONE => panic!("unknown message header"), } } else { Ok(None) @@ -302,17 +382,133 @@ mod tests { use super::*; use crate::array::*; + use crate::builder::{BinaryBuilder, Int32Builder, ListBuilder}; + use crate::datatypes::*; use std::fs::File; + // #[test] + // fn test_read_primitive_file() { + // let file = File::open("./test/data/primitive_types.file.dat").unwrap(); + + // let mut reader = Reader::try_new(file).unwrap(); + // assert_eq!(3, reader.num_batches()); + // for _ in 0..reader.num_batches() { + // let batch = reader.next().unwrap().unwrap(); + // validate_batch(batch); + // } + // // try read a batch after all batches are exhausted + // let batch = reader.next().unwrap(); + // assert!(batch.is_none()); + + // // seek a specific batch + // let batch = reader.read_batch(4).unwrap().unwrap(); + // validate_batch(batch); + // // try read a batch after seeking to the last batch + // let batch = reader.next().unwrap(); + // assert!(batch.is_none()); + // } + #[test] - fn test_read_file() { - let file = File::open("./test/data/arrow_file.dat").unwrap(); + fn test_read_struct_file() { + use DataType::*; + let file = File::open("./test/data/struct_types.file.dat").unwrap(); + let schema = Schema::new(vec![Field::new( + "structs", + Struct(vec![ + Field::new("bools", Boolean, true), + Field::new("int8s", Int8, true), + Field::new("varbinary", Utf8, true), + Field::new("numericlist", List(Box::new(Int32)), true), + ]), + false, + )]); + + // batch contents + let list_values_builder = Int32Builder::new(10); + let mut list_builder = ListBuilder::new(list_values_builder); + // [[1,2,3,4], null, [5,6], [7], [8,9,10]] + list_builder.values().append_value(1).unwrap(); + list_builder.values().append_value(2).unwrap(); + list_builder.values().append_value(3).unwrap(); + list_builder.values().append_value(4).unwrap(); + list_builder.append(true).unwrap(); + list_builder.append(false).unwrap(); + list_builder.values().append_value(5).unwrap(); + list_builder.values().append_value(6).unwrap(); + list_builder.append(true).unwrap(); + list_builder.values().append_value(7).unwrap(); + list_builder.append(true).unwrap(); + list_builder.values().append_value(8).unwrap(); + list_builder.values().append_value(9).unwrap(); + list_builder.values().append_value(10).unwrap(); + list_builder.append(true).unwrap(); + let list_array = list_builder.finish(); + + let mut binary_builder = BinaryBuilder::new(100); + binary_builder.append_string("foo").unwrap(); + binary_builder.append_string("bar").unwrap(); + binary_builder.append_string("baz").unwrap(); + binary_builder.append_string("qux").unwrap(); + binary_builder.append_string("quux").unwrap(); + let binary_array = binary_builder.finish(); + let struct_array = StructArray::from(( + vec![ + ( + Field::new("bools", Boolean, true), + Arc::new(BooleanArray::from(vec![ + Some(true), + None, + None, + Some(false), + Some(true), + ])) as Arc, + ), + ( + Field::new("int8s", Int8, true), + Arc::new(Int8Array::from(vec![ + Some(-1), + None, + None, + Some(-4), + Some(-5), + ])), + ), + (Field::new("varbinary", Utf8, true), Arc::new(binary_array)), + ( + Field::new("numericlist", List(Box::new(Int32)), true), + Arc::new(list_array), + ), + ], + Buffer::from([]), + 0, + )); let mut reader = Reader::try_new(file).unwrap(); - assert_eq!(5, reader.num_batches()); + assert_eq!(3, reader.num_batches()); for _ in 0..reader.num_batches() { - let batch = reader.next().unwrap().unwrap(); - validate_batch(batch); + let batch: RecordBatch = reader.next().unwrap().unwrap(); + assert_eq!(&Arc::new(schema.clone()), batch.schema()); + assert_eq!(1, batch.num_columns()); + assert_eq!(5, batch.num_rows()); + let struct_col: &StructArray = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let struct_col_1: &BooleanArray = struct_col + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!("PrimitiveArray\n[\n true,\n null,\n null,\n false,\n true,\n]", format!("{:?}", struct_col_1)); + assert_eq!( + struct_col_1.data(), + BooleanArray::from( + vec![Some(true), None, None, Some(false), Some(true),] + ) + .data() + ); + assert_eq!(struct_col.data(), struct_array.data()); } // try read a batch after all batches are exhausted let batch = reader.next().unwrap(); @@ -320,6 +516,7 @@ mod tests { // seek a specific batch let batch = reader.read_batch(4).unwrap().unwrap(); + dbg!(batch.schema()); validate_batch(batch); // try read a batch after seeking to the last batch let batch = reader.next().unwrap(); @@ -327,12 +524,13 @@ mod tests { } fn validate_batch(batch: RecordBatch) { + // primitive batches were created for assert_eq!(5, batch.num_rows()); - assert_eq!(4, batch.num_columns()); + assert_eq!(21, batch.num_columns()); let arr_1 = batch.column(0); - let int32_array = Int32Array::from(arr_1.data()); + let int32_array = arr_1.as_any().downcast_ref::().unwrap(); assert_eq!( - "PrimitiveArray\n[\n 1,\n 2,\n 3,\n null,\n 5,\n]", + "PrimitiveArray\n[\n true,\n null,\n null,\n false,\n true,\n]", format!("{:?}", int32_array) ); let arr_2 = batch.column(1); diff --git a/rust/arrow/test/data/rust_types.file.dat b/rust/arrow/test/data/rust_types.file.dat new file mode 100644 index 0000000000000000000000000000000000000000..2557500776b2a2bcb716aa9b0c868bcfdbedaa7b GIT binary patch literal 13594 zcmeHOL1>)E6@IIgm9?W_wK%Cuow6lv<3mH^#2A%Qx&a?_NL^ta^KS&DX5 z?2y79deET<9ZV6_2T?*%z=u7gAbJuwhqRbfgd9vyJ)~f$3xby7#A=l9n|YtKe(SYd z3T~nQh}qe1zW3hDdo%y+%>Vy->CBlQy>{H}+GA!z#&p02ZNTcZLWncrU;IrR9 z&5i)Q3>`zhF<`cgd>;AR$XokRhkO-z6!{49?S8X$@2K3o{e*Q|G3q8ckcz_WKW^zEi)A z`u(W;X?=ct>SBHVGQQu=vR!Q*&Hj|wXM~7t4WlVLlIlwX(e@zfDK|blJ54!;ztNmu zIM;00Xx6!}zo)>>W&Yq{=ydk?Cx`=W(!UE+mnP18uMggflh5yY8>hg0BOq8j~YIAb>Rro8_D<5{@AydQ7_pS zUw{)xi!K}EdOGzjUt{bS7lBi`t`hCO0~@>+&)1vif5WC-F53KchfT`T=5nqpN7)Zk zeP8nJ!t`vtnP!~Xa2}6x743D#g)_f`%=yoij(#S+w0iM+J(TJmOIQDpul3d!+kFe| z)}Tv&7O)_(*=xDYF4^Bew%Q@frf&9-{RpyLGi3Mp$%geOA4`3-(d|or@`ZlTC-#Ml zaKi>R348~*BY?l^7O&i7`k8|xc&Rh(+a$0|B<@Z z_&gp1kl<>p@hA;&JhZk_u|NIg-v_PxtbZGUWXzZb?J4w$zu1Gxnw85&JDa*_@JoIe z41c|OZSuF6ZE)RJ`k}s{^gef$)L|0OHkgE~3GL!zj-@vo_?lzcIUD#f$Fj2+_@#iC z172~g_S=D9bFB6^9IFsk`*(tV+EAcQ`iHr}=t#oSuQ^|FnGwg*9}E1XW9ctBRujU~ zUk$vl`1QaG%l@sv3yUAZo~$zoiyscWu=ugS3yWV2ys-Fo;DzPi2hMA}Zw7f`$!`W; zSbPQh(azKhS6pbw<)wct=nId!{3Yk5-wOK6L0`D)`rJD+slAP0?`GgPgFWGDvVt|i z*9*macxU~{0>bKV&3WOGfJXy98|(?I|8vf3{8~YO(Xr%Lf<0lym%aWbver2b_kRoL zDE98pUVQ9-l>cAF>X%BzU&~E8XNp7_dF~lH)5G%8k$j9_#8JLi|0wrW7ygo$!r0Jm z-R}-Hy19wxv+4ljx>&R4ryZI`n&vtc+d272g5irvkhLeyskBL zIq10$s7`V$HLp_afpx9xHzkb)>a|96wnv-H^o#9Emu<>d>hWF!uhS=y;T$|Wj|Ar~ zApIB#^$%m0eWg9#8{yuYbM))T&mc2yDR3!rsR6c-+enSDmEC#WYu%uX`u?DC>g?~M zh z#0!gWhxUc#Uvb{Zk*jzBi}OZt-r!WwZ@N#5M~>C5O7%y}jrJ0~cN^5TWX>wF(>A4^9}By2k}&gZ_y3n z?I=DUiq8W0WK+Ahq(4I74KFT?^ffDimvyti+8K@8BqT}^Pjw1UBS(Tm(u@4eF1{o+9Ch| literal 0 HcmV?d00001 From 6656e6de8db364ff2c62733d44b7ca1d544c3847 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Fri, 26 Apr 2019 17:31:37 +0200 Subject: [PATCH 04/16] add struct test data --- rust/arrow/test/data/struct_types.file.dat | Bin 0 -> 2578 bytes 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 rust/arrow/test/data/struct_types.file.dat diff --git a/rust/arrow/test/data/struct_types.file.dat b/rust/arrow/test/data/struct_types.file.dat new file mode 100644 index 0000000000000000000000000000000000000000..13f891cd2bdedde28300a3bd27a7f5163d4fe567 GIT binary patch literal 2578 zcmeHJKTiTt4DWGpAf5>(#Au=kgQJUo239vW4U4QmG?E|}4$h!qaCC5V^dmSrFgp4X z9Q+_8n&9vE-m4H2HpZ*zdu?C)+P>Cur?uMo<)O&P6DbLyD4_(BlL9D!=Ytn;1*}=O zLTosZ_vv&Ba^nPoT_?+8p8(&0BOomWdIv_=3+rI%i+a3g8(6_vNySFR~80o7`T-?N+DVAIPiqmn}m+Z~qMW8f15kxY_PD;(O4a zRyVT`n)>Ri0Ly~pZ4L;Rx#WQGS%pi)$e06Y(1XV81U zl}B^`{y3SBo$;`DY_&zMqo|2CYCQCkVJ}IBdrp@d>rJE$YfZlzgT~G~3EFR(3vCFr zBj_8_H$ijY|83eg_D#~hvAxFENc%=a`yrh-BCDD=CG=K^i4$NVh4|I6zcapYcGrQN wW1MMEZqPlteGD1l!wtMe`NR#rMY*-&{Nn^nzd!g2e5zG{e|(`%r{-Jm4^q*icmMzZ literal 0 HcmV?d00001 From 45a98875757dcf7f191502d2deb254e3925fb8c5 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Mon, 6 May 2019 16:26:19 +0200 Subject: [PATCH 05/16] roundtrip fb to schema conversion and tests --- rust/arrow/src/ipc/convert.rs | 309 +++++++++++++++++++++++++----- rust/arrow/src/ipc/file/reader.rs | 166 ++++++++-------- 2 files changed, 341 insertions(+), 134 deletions(-) diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index 0e2e2a8b00477..55d21b2531889 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -20,32 +20,28 @@ use crate::datatypes::{DataType, DateUnit, Field, Schema, TimeUnit}; use crate::ipc; -use flatbuffers::FlatBufferBuilder; +use flatbuffers::{ + FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset, +}; /// Serialize a schema in IPC format fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder { - // TODO add bit-widths and other field metadata to flatbuffer Type - use DataType::*; let mut fbb = FlatBufferBuilder::new(); let mut fields = vec![]; for field in schema.fields() { let fb_field_name = fbb.create_string(field.name().as_str()); + let (ipc_type_type, ipc_type, ipc_children) = + get_fb_field_type(field.data_type(), &mut fbb); let mut field_builder = ipc::FieldBuilder::new(&mut fbb); field_builder.add_name(fb_field_name); - let ipc_type = match field.data_type() { - Boolean => ipc::Type::Bool, - UInt8 | UInt16 | UInt32 | UInt64 => ipc::Type::Int, - Int8 | Int16 | Int32 | Int64 => ipc::Type::Int, - Float32 | Float64 => ipc::Type::FloatingPoint, - Utf8 => ipc::Type::Utf8, - Date32(_) | Date64(_) => ipc::Type::Date, - Time32(_) | Time64(_) => ipc::Type::Time, - Timestamp(_) => ipc::Type::Timestamp, - _ => ipc::Type::NONE, - }; - field_builder.add_type_type(ipc_type); + field_builder.add_type_type(ipc_type_type); field_builder.add_nullable(field.is_nullable()); + match ipc_children { + None => {} + Some(children) => field_builder.add_children(children), + }; + field_builder.add_type_(ipc_type); fields.push(field_builder.finish()); } @@ -62,12 +58,15 @@ fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder { fbb } -fn fb_to_field(field: ipc::Field) -> Field { - Field::new( - field.name().unwrap(), - get_data_type(field), - field.nullable(), - ) +/// Convert an IPC Field to Arrow Field +impl<'a> From> for Field { + fn from(field: ipc::Field) -> Field { + Field::new( + field.name().unwrap(), + get_data_type(field), + field.nullable(), + ) + } } /// Deserialize a Schema table from IPC format to Schema data type @@ -77,34 +76,13 @@ pub fn fb_to_schema(fb: ipc::Schema) -> Schema { let len = c_fields.len(); for i in 0..len { let c_field: ipc::Field = c_fields.get(i); - fields.push(fb_to_field(c_field)); + fields.push(c_field.into()); } Schema::new(fields) } -fn get_fbs_type(dtype: DataType) -> ipc::Type { - use ipc::Type::*; - use DataType::*; - - match dtype { - Boolean => Bool, - Int8 | Int16 | Int32 | Int64 => Int, - UInt8 | UInt16 | UInt32 | UInt64 => Int, - Float16 => unimplemented!("Float16 type not supported in Rust Arrow"), - Float32 | Float64 => FloatingPoint, - DataType::Timestamp(_) => ipc::Type::Timestamp, - Date32(_) | Date64(_) => Date, - Time32(_) | Time64(_) => Time, - DataType::Interval(_) => unimplemented!("Interval type not supported"), - DataType::Utf8 => ipc::Type::Utf8, - DataType::List(_) => ipc::Type::List, - Struct(_) => Struct_, - } -} - /// Get the Arrow data type from the flatbuffer Field table fn get_data_type(field: ipc::Field) -> DataType { - // TODO add recursion protection for deeply-nested fields (struct and list) match field.type_type() { ipc::Type::Bool => DataType::Boolean, ipc::Type::Int => { @@ -175,11 +153,13 @@ fn get_data_type(field: ipc::Field) -> DataType { DataType::List(Box::new(get_data_type(child_field))) } ipc::Type::Struct_ => { - let children = field.children().unwrap(); let mut fields = vec![]; - for i in 0..children.len() { - fields.push(fb_to_field(children.get(i))); - } + if let Some(children) = field.children() { + for i in 0..children.len() { + fields.push(children.get(i).into()); + } + }; + DataType::Struct(fields) } // TODO add interval support @@ -187,16 +167,243 @@ fn get_data_type(field: ipc::Field) -> DataType { } } +/// Get the IPC type of a data type +fn get_fb_field_type<'a: 'b, 'b>( + data_type: &DataType, + mut fbb: &mut FlatBufferBuilder<'a>, +) -> ( + ipc::Type, + WIPOffset, + Option>>>>, +) { + use DataType::*; + match data_type { + Boolean => ( + ipc::Type::Bool, + ipc::BoolBuilder::new(&mut fbb).finish().as_union_value(), + None, + ), + UInt8 | UInt16 | UInt32 | UInt64 => { + let mut builder = ipc::IntBuilder::new(&mut fbb); + builder.add_is_signed(false); + match data_type { + UInt8 => builder.add_bitWidth(8), + UInt16 => builder.add_bitWidth(16), + UInt32 => builder.add_bitWidth(32), + UInt64 => builder.add_bitWidth(64), + _ => {} + }; + (ipc::Type::Int, builder.finish().as_union_value(), None) + } + Int8 | Int16 | Int32 | Int64 => { + let mut builder = ipc::IntBuilder::new(&mut fbb); + builder.add_is_signed(true); + match data_type { + Int8 => builder.add_bitWidth(8), + Int16 => builder.add_bitWidth(16), + Int32 => builder.add_bitWidth(32), + Int64 => builder.add_bitWidth(64), + _ => {} + }; + (ipc::Type::Int, builder.finish().as_union_value(), None) + } + Float16 | Float32 | Float64 => { + let mut builder = ipc::FloatingPointBuilder::new(&mut fbb); + match data_type { + Float16 => builder.add_precision(ipc::Precision::HALF), + Float32 => builder.add_precision(ipc::Precision::SINGLE), + Float64 => builder.add_precision(ipc::Precision::DOUBLE), + _ => {} + }; + ( + ipc::Type::FloatingPoint, + builder.finish().as_union_value(), + None, + ) + } + Utf8 => ( + ipc::Type::Utf8, + ipc::Utf8Builder::new(&mut fbb).finish().as_union_value(), + None, + ), + Date32(_) => { + let mut builder = ipc::DateBuilder::new(&mut fbb); + builder.add_unit(ipc::DateUnit::DAY); + (ipc::Type::Date, builder.finish().as_union_value(), None) + } + Date64(_) => { + let mut builder = ipc::DateBuilder::new(&mut fbb); + builder.add_unit(ipc::DateUnit::MILLISECOND); + (ipc::Type::Date, builder.finish().as_union_value(), None) + } + Time32(unit) | Time64(unit) => { + let mut builder = ipc::TimeBuilder::new(&mut fbb); + match unit { + TimeUnit::Second => { + builder.add_bitWidth(32); + builder.add_unit(ipc::TimeUnit::SECOND); + } + TimeUnit::Millisecond => { + builder.add_bitWidth(32); + builder.add_unit(ipc::TimeUnit::MILLISECOND); + } + TimeUnit::Microsecond => { + builder.add_bitWidth(64); + builder.add_unit(ipc::TimeUnit::MICROSECOND); + } + TimeUnit::Nanosecond => { + builder.add_bitWidth(64); + builder.add_unit(ipc::TimeUnit::NANOSECOND); + } + } + (ipc::Type::Time, builder.finish().as_union_value(), None) + } + Timestamp(unit) => { + let mut builder = ipc::TimestampBuilder::new(&mut fbb); + let time_unit = match unit { + TimeUnit::Second => ipc::TimeUnit::SECOND, + TimeUnit::Millisecond => ipc::TimeUnit::MILLISECOND, + TimeUnit::Microsecond => ipc::TimeUnit::MICROSECOND, + TimeUnit::Nanosecond => ipc::TimeUnit::NANOSECOND, + }; + builder.add_unit(time_unit); + ( + ipc::Type::Timestamp, + builder.finish().as_union_value(), + None, + ) + } + List(ref list_type) => { + let inner_types = get_fb_field_type(list_type, &mut fbb); + let child = ipc::Field::create( + &mut fbb, + &ipc::FieldArgs { + name: None, + nullable: false, + type_type: inner_types.0, + type_: Some(inner_types.1), + dictionary: None, + children: inner_types.2, + custom_metadata: None, + }, + ); + let children = fbb.create_vector(&[child]); + ( + ipc::Type::List, + ipc::ListBuilder::new(&mut fbb).finish().as_union_value(), + Some(children), + ) + } + Struct(fields) => { + // struct's fields are children + let mut children = vec![]; + for field in fields { + let inner_types = get_fb_field_type(field.data_type(), &mut fbb); + let field_name = fbb.create_string(field.name()); + children.push(ipc::Field::create( + &mut fbb, + &ipc::FieldArgs { + name: Some(field_name), + nullable: field.is_nullable(), + type_type: inner_types.0, + type_: Some(inner_types.1), + dictionary: None, + children: inner_types.2, + custom_metadata: None, + }, + )); + } + let children = fbb.create_vector(&children[..]); + ( + ipc::Type::Struct_, + ipc::Struct_Builder::new(&mut fbb).finish().as_union_value(), + Some(children), + ) + } + t @ _ => panic!("Unsupported Arrow Data Type {:?}", t), + } +} + #[cfg(test)] mod tests { use super::*; use crate::datatypes::{DataType, Field, Schema}; #[test] - fn convert_schema() { - let schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]); + fn convert_schema_round_trip() { + let schema = Schema::new(vec![ + Field::new("uint8", DataType::UInt8, false), + Field::new("uint16", DataType::UInt16, true), + Field::new("uint32", DataType::UInt32, false), + Field::new("uint64", DataType::UInt64, true), + Field::new("int8", DataType::Int8, true), + Field::new("int16", DataType::Int16, false), + Field::new("int32", DataType::Int32, true), + Field::new("int64", DataType::Int64, false), + Field::new("float16", DataType::Float16, true), + Field::new("float32", DataType::Float32, false), + Field::new("float64", DataType::Float64, true), + Field::new("bool", DataType::Boolean, false), + Field::new("date32", DataType::Date32(DateUnit::Day), false), + Field::new("date64", DataType::Date64(DateUnit::Millisecond), true), + Field::new("time32[s]", DataType::Time32(TimeUnit::Second), true), + Field::new("time32[ms]", DataType::Time32(TimeUnit::Millisecond), false), + Field::new("time64[us]", DataType::Time64(TimeUnit::Microsecond), false), + Field::new("time64[ns]", DataType::Time64(TimeUnit::Nanosecond), true), + Field::new("timestamp[s]", DataType::Timestamp(TimeUnit::Second), false), + Field::new( + "timestamp[ms]", + DataType::Timestamp(TimeUnit::Millisecond), + true, + ), + Field::new( + "timestamp[us]", + DataType::Timestamp(TimeUnit::Microsecond), + false, + ), + Field::new( + "timestamp[ns]", + DataType::Timestamp(TimeUnit::Nanosecond), + true, + ), + Field::new("utf8", DataType::Utf8, false), + Field::new("list[u8]", DataType::List(Box::new(DataType::UInt8)), true), + Field::new( + "list[struct]", + DataType::List(Box::new(DataType::Struct(vec![ + Field::new("float32", DataType::UInt8, false), + Field::new("int32", DataType::Int32, true), + Field::new("bool", DataType::Boolean, true), + ]))), + false, + ), + Field::new( + "struct]>]>", + DataType::Struct(vec![ + Field::new("int64", DataType::Int64, true), + Field::new( + "list[struct]>]", + DataType::List(Box::new(DataType::Struct(vec![ + Field::new("date32", DataType::Date32(DateUnit::Day), true), + Field::new( + "list[struct<>]", + DataType::List(Box::new(DataType::Struct(vec![]))), + false, + ), + ]))), + false, + ), + ]), + false, + ), + Field::new("struct<>", DataType::Struct(vec![]), true), + ]); + + let fb = schema_to_fb(&schema); - let ipc = schema_to_fb(&schema); - assert_eq!(60, ipc.finished_data().len()); + // read back fields + let ipc = ipc::get_root_as_schema(fb.finished_data()); + let schema2 = fb_to_schema(ipc); + assert_eq!(schema, schema2); } } diff --git a/rust/arrow/src/ipc/file/reader.rs b/rust/arrow/src/ipc/file/reader.rs index dbbd46f6362e4..f3920145bf48f 100644 --- a/rust/arrow/src/ipc/file/reader.rs +++ b/rust/arrow/src/ipc/file/reader.rs @@ -31,9 +31,9 @@ use crate::record_batch::RecordBatch; static ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; /// Read a buffer based on offset and length -fn read_buffer(c_buf: &ipc::Buffer, a_data: &Vec) -> Buffer { - let start_offset = c_buf.offset() as usize; - let end_offset = start_offset + c_buf.length() as usize; +fn read_buffer(buf: &ipc::Buffer, a_data: &Vec) -> Buffer { + let start_offset = buf.offset() as usize; + let end_offset = start_offset + buf.length() as usize; let buf_data = &a_data[start_offset..end_offset]; Buffer::from(&buf_data) } @@ -272,10 +272,10 @@ impl Reader { reader.read_exact(&mut meta_buffer)?; let vecs = &meta_buffer.to_vec(); - let c_message = ipc::get_root_as_message(vecs); + let message = ipc::get_root_as_message(vecs); // message header is a Schema, so read it - let c_schema: ipc::Schema = c_message.header_as_schema().unwrap(); - let schema = ipc::convert::fb_to_schema(c_schema); + let ipc_schema: ipc::Schema = message.header_as_schema().unwrap(); + let schema = ipc::convert::fb_to_schema(ipc_schema); // what does the footer contain? let mut footer_size: [u8; 4] = [0; 4]; @@ -287,16 +287,16 @@ impl Reader { let mut footer_data = vec![0; footer_len as usize]; reader.seek(SeekFrom::End(-10 - footer_len as i64))?; reader.read_exact(&mut footer_data)?; - let c_footer = ipc::get_root_as_footer(&footer_data[..]); + let footer = ipc::get_root_as_footer(&footer_data[..]); - let c_blocks = c_footer.recordBatches().unwrap(); + let blocks = footer.recordBatches().unwrap(); - let total_blocks = c_blocks.len(); + let total_blocks = blocks.len(); Ok(Self { reader, schema: Arc::new(schema), - blocks: c_blocks.to_vec(), + blocks: blocks.to_vec(), current_block: 0, total_blocks, }) @@ -442,7 +442,7 @@ mod tests { list_builder.values().append_value(9).unwrap(); list_builder.values().append_value(10).unwrap(); list_builder.append(true).unwrap(); - let list_array = list_builder.finish(); + let _list_array = list_builder.finish(); let mut binary_builder = BinaryBuilder::new(100); binary_builder.append_string("foo").unwrap(); @@ -451,37 +451,37 @@ mod tests { binary_builder.append_string("qux").unwrap(); binary_builder.append_string("quux").unwrap(); let binary_array = binary_builder.finish(); - let struct_array = StructArray::from(( - vec![ - ( - Field::new("bools", Boolean, true), - Arc::new(BooleanArray::from(vec![ - Some(true), - None, - None, - Some(false), - Some(true), - ])) as Arc, - ), - ( - Field::new("int8s", Int8, true), - Arc::new(Int8Array::from(vec![ - Some(-1), - None, - None, - Some(-4), - Some(-5), - ])), - ), - (Field::new("varbinary", Utf8, true), Arc::new(binary_array)), - ( - Field::new("numericlist", List(Box::new(Int32)), true), - Arc::new(list_array), - ), - ], - Buffer::from([]), - 0, - )); + // let _struct_array = StructArray::from(( + // vec![ + // ( + // Field::new("bools", Boolean, true), + // Arc::new(BooleanArray::from(vec![ + // Some(true), + // None, + // None, + // Some(false), + // Some(true), + // ])) as Arc, + // ), + // ( + // Field::new("int8s", Int8, true), + // Arc::new(Int8Array::from(vec![ + // Some(-1), + // None, + // None, + // Some(-4), + // Some(-5), + // ])), + // ), + // (Field::new("varbinary", Utf8, true), Arc::new(binary_array)), + // ( + // Field::new("numericlist", List(Box::new(Int32)), true), + // Arc::new(list_array), + // ), + // ], + // Buffer::from([]), + // 0, + // )); let mut reader = Reader::try_new(file).unwrap(); assert_eq!(3, reader.num_batches()); @@ -501,56 +501,56 @@ mod tests { .downcast_ref::() .unwrap(); assert_eq!("PrimitiveArray\n[\n true,\n null,\n null,\n false,\n true,\n]", format!("{:?}", struct_col_1)); - assert_eq!( - struct_col_1.data(), - BooleanArray::from( - vec![Some(true), None, None, Some(false), Some(true),] - ) - .data() - ); - assert_eq!(struct_col.data(), struct_array.data()); + // TODO failing tests + // assert_eq!( + // struct_col_1.data(), + // BooleanArray::from( + // vec![Some(true), None, None, Some(false), Some(true),] + // ) + // .data() + // ); + // assert_eq!(struct_col.data(), struct_array.data()); } // try read a batch after all batches are exhausted let batch = reader.next().unwrap(); assert!(batch.is_none()); // seek a specific batch - let batch = reader.read_batch(4).unwrap().unwrap(); - dbg!(batch.schema()); - validate_batch(batch); + let batch = reader.read_batch(2).unwrap().unwrap(); + // validate_batch(batch); // try read a batch after seeking to the last batch let batch = reader.next().unwrap(); assert!(batch.is_none()); } - fn validate_batch(batch: RecordBatch) { - // primitive batches were created for - assert_eq!(5, batch.num_rows()); - assert_eq!(21, batch.num_columns()); - let arr_1 = batch.column(0); - let int32_array = arr_1.as_any().downcast_ref::().unwrap(); - assert_eq!( - "PrimitiveArray\n[\n true,\n null,\n null,\n false,\n true,\n]", - format!("{:?}", int32_array) - ); - let arr_2 = batch.column(1); - let binary_array = BinaryArray::from(arr_2.data()); - assert_eq!("foo", std::str::from_utf8(binary_array.value(0)).unwrap()); - assert_eq!("bar", std::str::from_utf8(binary_array.value(1)).unwrap()); - assert_eq!("baz", std::str::from_utf8(binary_array.value(2)).unwrap()); - assert!(binary_array.is_null(3)); - assert_eq!("quux", std::str::from_utf8(binary_array.value(4)).unwrap()); - let arr_3 = batch.column(2); - let f32_array = Float32Array::from(arr_3.data()); - assert_eq!( - "PrimitiveArray\n[\n 1.0,\n 2.0,\n null,\n 4.0,\n 5.0,\n]", - format!("{:?}", f32_array) - ); - let arr_4 = batch.column(3); - let bool_array = BooleanArray::from(arr_4.data()); - assert_eq!( - "PrimitiveArray\n[\n true,\n null,\n false,\n true,\n false,\n]", - format!("{:?}", bool_array) - ); - } + // fn validate_batch(batch: RecordBatch) { + // // primitive batches were created for + // assert_eq!(5, batch.num_rows()); + // assert_eq!(1, batch.num_columns()); + // let arr_1 = batch.column(0); + // let int32_array = arr_1.as_any().downcast_ref::().unwrap(); + // assert_eq!( + // "PrimitiveArray\n[\n true,\n null,\n null,\n false,\n true,\n]", + // format!("{:?}", int32_array) + // ); + // let arr_2 = batch.column(1); + // let binary_array = BinaryArray::from(arr_2.data()); + // assert_eq!("foo", std::str::from_utf8(binary_array.value(0)).unwrap()); + // assert_eq!("bar", std::str::from_utf8(binary_array.value(1)).unwrap()); + // assert_eq!("baz", std::str::from_utf8(binary_array.value(2)).unwrap()); + // assert!(binary_array.is_null(3)); + // assert_eq!("quux", std::str::from_utf8(binary_array.value(4)).unwrap()); + // let arr_3 = batch.column(2); + // let f32_array = Float32Array::from(arr_3.data()); + // assert_eq!( + // "PrimitiveArray\n[\n 1.0,\n 2.0,\n null,\n 4.0,\n 5.0,\n]", + // format!("{:?}", f32_array) + // ); + // let arr_4 = batch.column(3); + // let bool_array = BooleanArray::from(arr_4.data()); + // assert_eq!( + // "PrimitiveArray\n[\n true,\n null,\n false,\n true,\n false,\n]", + // format!("{:?}", bool_array) + // ); + // } } From d039fb723bc42487fbcce9ff3c520aeca527bed1 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Mon, 8 Jul 2019 08:24:21 +0200 Subject: [PATCH 06/16] More progress on reader: - ignore null buffers if array has no nulls - read nested numerics correctly if they are interpreted as 64-bit - use `ArrayEqual` for comparisons --- rust/arrow/src/array/array.rs | 2 +- rust/arrow/src/ipc/file/reader.rs | 269 ++++++++++++++------- rust/arrow/test/data/struct_types.file.dat | Bin 2578 -> 3410 bytes 3 files changed, 176 insertions(+), 95 deletions(-) diff --git a/rust/arrow/src/array/array.rs b/rust/arrow/src/array/array.rs index b3e2169aad7fa..1756b35fb574a 100644 --- a/rust/arrow/src/array/array.rs +++ b/rust/arrow/src/array/array.rs @@ -1564,7 +1564,7 @@ impl From<(Vec<(Field, ArrayRef)>, Buffer, usize)> for StructArray { } let data = ArrayData::builder(DataType::Struct(field_types)) - .add_buffer(triple.1) + .null_bit_buffer(triple.1) .child_data(field_values.into_iter().map(|a| a.data()).collect()) .len(length) .null_count(triple.2) diff --git a/rust/arrow/src/ipc/file/reader.rs b/rust/arrow/src/ipc/file/reader.rs index f3920145bf48f..ebefd80619b07 100644 --- a/rust/arrow/src/ipc/file/reader.rs +++ b/rust/arrow/src/ipc/file/reader.rs @@ -20,13 +20,14 @@ use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; -use crate::array::ArrayRef; -use crate::array_data::ArrayData; +use crate::array::*; use crate::buffer::Buffer; +use crate::compute::cast; use crate::datatypes::{DataType, Schema}; use crate::error::{ArrowError, Result}; use crate::ipc; use crate::record_batch::RecordBatch; +use DataType::*; static ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; @@ -38,7 +39,15 @@ fn read_buffer(buf: &ipc::Buffer, a_data: &Vec) -> Buffer { Buffer::from(&buf_data) } -/// Coordinates reading arrays based on data types +/// Coordinates reading arrays based on data types. +/// +/// Notes: +/// * In the IPC format, null buffers are always set, but may be empty. We discard them if an array has 0 nulls +/// * Numeric values inside list arrays are often stored as 64-bit values regardless of their data type size. +/// We thus: +/// - check if the bit width of non-64-bit numbers is 64, and +/// - read the buffer as 64-bit (signed integer or float), and +/// - cast the 64-bit array to the appropriate data type fn create_array( nodes: &[ipc::FieldNode], data_type: &DataType, @@ -106,12 +115,17 @@ fn create_array( buffer_index = triple.2; struct_arrays.push((struct_field.clone(), triple.0)); } - // create struct array from fields, arrays and null data - let struct_array = crate::array::StructArray::from(( - struct_arrays, - null_buffer, - struct_node.null_count() as usize, - )); + let null_count = struct_node.null_count() as usize; + let struct_array = if null_count > 0 { + // create struct array from fields, arrays and null data + StructArray::from(( + struct_arrays, + null_buffer, + struct_node.null_count() as usize, + )) + } else { + StructArray::from(struct_arrays) + }; Arc::new(struct_array) } _ => { @@ -138,37 +152,94 @@ fn create_primitive_array( data_type: &DataType, buffers: Vec, ) -> ArrayRef { - use DataType::*; let length = field_node.length() as usize; let null_count = field_node.null_count() as usize; let array_data = match data_type { Utf8 => { // read 3 buffers - ArrayData::new( - data_type.clone(), - length, - Some(null_count), - Some(buffers[0].clone()), - 0, - buffers[1..3].to_vec(), - vec![], - ) + let mut builder = ArrayData::builder(data_type.clone()) + .len(length) + .buffers(buffers[1..3].to_vec()) + .offset(0); + if null_count > 0 { + builder = builder + .null_count(null_count) + .null_bit_buffer(buffers[0].clone()) + } + builder.build() + } + Int8 | Int16 | Int32 | UInt8 | UInt16 | UInt32 | Time32(_) | Date32(_) => { + if buffers[1].len() / 8 == length { + // interpret as a signed i64, and cast appropriately + let mut builder = ArrayData::builder(DataType::Int64) + .len(length) + .buffers(buffers[1..].to_vec()) + .offset(0); + if null_count > 0 { + builder = builder + .null_count(null_count) + .null_bit_buffer(buffers[0].clone()) + } + let values = Arc::new(Int64Array::from(builder.build())) as ArrayRef; + let casted = cast(&values, data_type).unwrap(); + casted.data() + } else { + let mut builder = ArrayData::builder(data_type.clone()) + .len(length) + .buffers(buffers[1..].to_vec()) + .offset(0); + if null_count > 0 { + builder = builder + .null_count(null_count) + .null_bit_buffer(buffers[0].clone()) + } + builder.build() + } + } + Float32 => { + if buffers[1].len() / 8 == length { + // interpret as a f64, and cast appropriately + let mut builder = ArrayData::builder(DataType::Float64) + .len(length) + .buffers(buffers[1..].to_vec()) + .offset(0); + if null_count > 0 { + builder = builder + .null_count(null_count) + .null_bit_buffer(buffers[0].clone()) + } + let values = Arc::new(Float64Array::from(builder.build())) as ArrayRef; + let casted = cast(&values, data_type).unwrap(); + casted.data() + } else { + let mut builder = ArrayData::builder(data_type.clone()) + .len(length) + .buffers(buffers[1..].to_vec()) + .offset(0); + if null_count > 0 { + builder = builder + .null_count(null_count) + .null_bit_buffer(buffers[0].clone()) + } + builder.build() + } + } + Boolean | Int64 | UInt64 | Float64 | Time64(_) | Timestamp(_) | Date64(_) => { + let mut builder = ArrayData::builder(data_type.clone()) + .len(length) + .buffers(buffers[1..].to_vec()) + .offset(0); + if null_count > 0 { + builder = builder + .null_count(null_count) + .null_bit_buffer(buffers[0].clone()) + } + builder.build() } - Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 | Float32 - | Boolean | Float64 | Time32(_) | Time64(_) | Timestamp(_) | Date32(_) - | Date64(_) => ArrayData::new( - data_type.clone(), - length, - Some(null_count), - Some(buffers[0].clone()), - 0, - buffers[1..].to_vec(), - vec![], - ), t @ _ => panic!("Data type {:?} either unsupported or not primitive", t), }; - crate::array::make_array(Arc::new(array_data)) + make_array(array_data) } fn create_list_array( @@ -178,16 +249,18 @@ fn create_list_array( child_array: ArrayRef, ) -> ArrayRef { if let &DataType::List(_) = data_type { - let array_data = ArrayData::new( - data_type.clone(), - field_node.length() as usize, - Some(field_node.null_count() as usize), - Some(buffers[0].clone()), - 0, - buffers[1..2].to_vec(), - vec![child_array.data()], - ); - crate::array::make_array(Arc::new(array_data)) + let null_count = field_node.null_count() as usize; + let mut builder = ArrayData::builder(data_type.clone()) + .len(field_node.length() as usize) + .buffers(buffers[1..2].to_vec()) + .offset(0) + .child_data(vec![child_array.data()]); + if null_count > 0 { + builder = builder + .null_count(null_count) + .null_bit_buffer(buffers[0].clone()) + } + make_array(builder.build()) } else { panic!("Cannot create list array from {:?}", data_type) } @@ -381,8 +454,6 @@ impl Reader { mod tests { use super::*; - use crate::array::*; - use crate::builder::{BinaryBuilder, Int32Builder, ListBuilder}; use crate::datatypes::*; use std::fs::File; @@ -418,13 +489,14 @@ mod tests { Field::new("bools", Boolean, true), Field::new("int8s", Int8, true), Field::new("varbinary", Utf8, true), - Field::new("numericlist", List(Box::new(Int32)), true), + Field::new("numericlist", List(Box::new(Int8)), true), + Field::new("floatlist", List(Box::new(Float32)), true), ]), - false, + true, )]); // batch contents - let list_values_builder = Int32Builder::new(10); + let list_values_builder = Int8Builder::new(10); let mut list_builder = ListBuilder::new(list_values_builder); // [[1,2,3,4], null, [5,6], [7], [8,9,10]] list_builder.values().append_value(1).unwrap(); @@ -442,7 +514,28 @@ mod tests { list_builder.values().append_value(9).unwrap(); list_builder.values().append_value(10).unwrap(); list_builder.append(true).unwrap(); - let _list_array = list_builder.finish(); + let list_array: ListArray = list_builder.finish(); + + // floats + let list_values_builder = Float32Builder::new(10); + let mut list_builder = ListBuilder::new(list_values_builder); + // [[1.1,2.2,3.3,4.4], null, [5.5,6.6], [7.7], [8.8,9.9,10.0]] + list_builder.values().append_value(1.1).unwrap(); + list_builder.values().append_value(2.2).unwrap(); + list_builder.values().append_value(3.3).unwrap(); + list_builder.values().append_value(4.4).unwrap(); + list_builder.append(true).unwrap(); + list_builder.append(false).unwrap(); + list_builder.values().append_value(5.5).unwrap(); + list_builder.values().append_value(6.6).unwrap(); + list_builder.append(true).unwrap(); + list_builder.values().append_value(7.7).unwrap(); + list_builder.append(true).unwrap(); + list_builder.values().append_value(8.8).unwrap(); + list_builder.values().append_value(9.9).unwrap(); + list_builder.values().append_value(10.0).unwrap(); + list_builder.append(true).unwrap(); + let float_list_array: ListArray = list_builder.finish(); let mut binary_builder = BinaryBuilder::new(100); binary_builder.append_string("foo").unwrap(); @@ -451,37 +544,38 @@ mod tests { binary_builder.append_string("qux").unwrap(); binary_builder.append_string("quux").unwrap(); let binary_array = binary_builder.finish(); - // let _struct_array = StructArray::from(( - // vec![ - // ( - // Field::new("bools", Boolean, true), - // Arc::new(BooleanArray::from(vec![ - // Some(true), - // None, - // None, - // Some(false), - // Some(true), - // ])) as Arc, - // ), - // ( - // Field::new("int8s", Int8, true), - // Arc::new(Int8Array::from(vec![ - // Some(-1), - // None, - // None, - // Some(-4), - // Some(-5), - // ])), - // ), - // (Field::new("varbinary", Utf8, true), Arc::new(binary_array)), - // ( - // Field::new("numericlist", List(Box::new(Int32)), true), - // Arc::new(list_array), - // ), - // ], - // Buffer::from([]), - // 0, - // )); + // create struct array + let struct_array = StructArray::from(vec![ + ( + Field::new("bools", Boolean, true), + Arc::new(BooleanArray::from(vec![ + Some(true), + None, + None, + Some(false), + Some(true), + ])) as Arc, + ), + ( + Field::new("int8s", Int8, true), + Arc::new(Int8Array::from(vec![ + Some(-1), + None, + None, + Some(-4), + Some(-5), + ])), + ), + (Field::new("varbinary", Utf8, true), Arc::new(binary_array)), + ( + Field::new("numericlist", List(Box::new(Int8)), true), + Arc::new(list_array), + ), + ( + Field::new("floatlist", List(Box::new(Float32)), true), + Arc::new(float_list_array), + ), + ]); let mut reader = Reader::try_new(file).unwrap(); assert_eq!(3, reader.num_batches()); @@ -495,28 +589,15 @@ mod tests { .as_any() .downcast_ref::() .unwrap(); - let struct_col_1: &BooleanArray = struct_col - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!("PrimitiveArray\n[\n true,\n null,\n null,\n false,\n true,\n]", format!("{:?}", struct_col_1)); - // TODO failing tests - // assert_eq!( - // struct_col_1.data(), - // BooleanArray::from( - // vec![Some(true), None, None, Some(false), Some(true),] - // ) - // .data() - // ); - // assert_eq!(struct_col.data(), struct_array.data()); + // test equality of struct columns + assert!(struct_col.equals(&struct_array)); } // try read a batch after all batches are exhausted let batch = reader.next().unwrap(); assert!(batch.is_none()); // seek a specific batch - let batch = reader.read_batch(2).unwrap().unwrap(); + reader.read_batch(2).unwrap().unwrap(); // validate_batch(batch); // try read a batch after seeking to the last batch let batch = reader.next().unwrap(); diff --git a/rust/arrow/test/data/struct_types.file.dat b/rust/arrow/test/data/struct_types.file.dat index 13f891cd2bdedde28300a3bd27a7f5163d4fe567..6bfd4a46843aa99405bd99398206246931575f43 100644 GIT binary patch literal 3410 zcmeHKJ#Q015S{Zm7h?{|k)kM|5E7UtNI1oA(4;2rc6EK>}TR{I3cp2>s<2Puuo&M%`j8{d2nv;Kp zY%--k37hyVqfzcDf=+}VDRBJmlgA^X8))5r+RpmDQHJqD=Oc)BF)qef1#UvtfM@2j z=qhVy-BMWtkF{FN4?dTGx{TW!WG>=Oj0gGa{b6sEZEPJQ}wp$FKKTWhwAE5c_Di4XYNhzSI%QMO*`18?RN+H_#n^6cfm8_eqy5y z_Y(869IQ6_BvFrKwfghw?Z~$^Vr-Jt_^)`MKatL#`aVdyj_QfWONnzPE6Tr3&p~+(%5zYjgYrITCA8DztnGQ8Zk=% delta 384 zcmca4HA#fiF(}AC+>n8xW+JD#&X)iG|1&T!@R~3(FvtKIEKvLdNUs6n8X)EXVg{hP z$vQj%j0%%$c}_FdO#aBI%(R1HvK*_J=pCT601&f+)IkByWJjjMU;(ztfo#c>1DIGQ zUtr!re~ptlxHX(QfW8BP1}MD)!euzX3}K*)|A2Z0AwIc?TaC=X;<^E}6%yzG=U+&( From c6563d5f75aff029b801988673d89e43a3c91be2 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 21 Sep 2019 06:28:55 +0200 Subject: [PATCH 07/16] save progress, going to work on JSON integration --- rust/arrow/src/ipc/file/reader.rs | 149 ++++++++++++--------- rust/arrow/test/data/arrow_file.dat | Bin 2842 -> 0 bytes rust/arrow/test/data/rust_types.file.dat | Bin 13594 -> 0 bytes rust/arrow/test/data/struct_types.file.dat | Bin 3410 -> 0 bytes 4 files changed, 86 insertions(+), 63 deletions(-) delete mode 100644 rust/arrow/test/data/arrow_file.dat delete mode 100644 rust/arrow/test/data/rust_types.file.dat delete mode 100644 rust/arrow/test/data/struct_types.file.dat diff --git a/rust/arrow/src/ipc/file/reader.rs b/rust/arrow/src/ipc/file/reader.rs index ebefd80619b07..6d84b44f36c0e 100644 --- a/rust/arrow/src/ipc/file/reader.rs +++ b/rust/arrow/src/ipc/file/reader.rs @@ -406,9 +406,6 @@ impl Reader { ipc::MessageHeader::Schema => { panic!("Not expecting a schema when messages are read") } - ipc::MessageHeader::DictionaryBatch => { - unimplemented!("reading dictionary batches not yet supported") - } ipc::MessageHeader::RecordBatch => { let batch = message.header_as_record_batch().unwrap(); // read the block that makes up the record batch into a buffer @@ -420,13 +417,9 @@ impl Reader { read_record_batch(&buf, batch, self.schema()) } - ipc::MessageHeader::SparseTensor => { - unimplemented!("reading sparse tensors not yet supported") - } - ipc::MessageHeader::Tensor => { - unimplemented!("reading tensors not yet supported") - } - ipc::MessageHeader::NONE => panic!("unknown message header"), + _ => unimplemented!( + "reading types other than record batches not yet supported" + ), } } else { Ok(None) @@ -455,34 +448,64 @@ mod tests { use super::*; use crate::datatypes::*; + use std::env; use std::fs::File; - // #[test] - // fn test_read_primitive_file() { - // let file = File::open("./test/data/primitive_types.file.dat").unwrap(); - - // let mut reader = Reader::try_new(file).unwrap(); - // assert_eq!(3, reader.num_batches()); - // for _ in 0..reader.num_batches() { - // let batch = reader.next().unwrap().unwrap(); - // validate_batch(batch); - // } - // // try read a batch after all batches are exhausted - // let batch = reader.next().unwrap(); - // assert!(batch.is_none()); - - // // seek a specific batch - // let batch = reader.read_batch(4).unwrap().unwrap(); - // validate_batch(batch); - // // try read a batch after seeking to the last batch - // let batch = reader.next().unwrap(); - // assert!(batch.is_none()); - // } + use DataType::*; + + #[test] + fn test_read_primitive_file() { + let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let file = File::open(format!( + "{}/data/arrow-ipc/rust/primitive_types.file.arrow", + testdata + )) + .unwrap(); + + let mut reader = Reader::try_new(file).unwrap(); + assert_eq!(3, reader.num_batches()); + for _ in 0..reader.num_batches() { + let batch = reader.next().unwrap().unwrap(); + validate_primitive_batch(batch); + } + // try read a batch after all batches are exhausted + let batch = reader.next().unwrap(); + assert!(batch.is_none()); + + // seek a specific batch + let batch = reader.read_batch(4).unwrap().unwrap(); + validate_primitive_batch(batch); + // try read a batch after seeking to the last batch + let batch = reader.next().unwrap(); + assert!(batch.is_none()); + } + + #[test] + fn test_read_list_file() { + let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let file = File::open(format!( + "{}/data/arrow-ipc/rust/list_types.file.arrow", + testdata + )) + .unwrap(); + + let mut reader = Reader::try_new(file).unwrap(); + assert_eq!(3, reader.num_batches()); + for _ in 0..reader.num_batches() { + let batch = reader.next().unwrap().unwrap(); + validate_primitive_batch(batch); + } + } #[test] fn test_read_struct_file() { - use DataType::*; - let file = File::open("./test/data/struct_types.file.dat").unwrap(); + let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let file = File::open(format!( + "{}/data/arrow-ipc/rust/struct_types.file.arrow", + testdata + )) + .unwrap(); + let schema = Schema::new(vec![Field::new( "structs", Struct(vec![ @@ -604,34 +627,34 @@ mod tests { assert!(batch.is_none()); } - // fn validate_batch(batch: RecordBatch) { - // // primitive batches were created for - // assert_eq!(5, batch.num_rows()); - // assert_eq!(1, batch.num_columns()); - // let arr_1 = batch.column(0); - // let int32_array = arr_1.as_any().downcast_ref::().unwrap(); - // assert_eq!( - // "PrimitiveArray\n[\n true,\n null,\n null,\n false,\n true,\n]", - // format!("{:?}", int32_array) - // ); - // let arr_2 = batch.column(1); - // let binary_array = BinaryArray::from(arr_2.data()); - // assert_eq!("foo", std::str::from_utf8(binary_array.value(0)).unwrap()); - // assert_eq!("bar", std::str::from_utf8(binary_array.value(1)).unwrap()); - // assert_eq!("baz", std::str::from_utf8(binary_array.value(2)).unwrap()); - // assert!(binary_array.is_null(3)); - // assert_eq!("quux", std::str::from_utf8(binary_array.value(4)).unwrap()); - // let arr_3 = batch.column(2); - // let f32_array = Float32Array::from(arr_3.data()); - // assert_eq!( - // "PrimitiveArray\n[\n 1.0,\n 2.0,\n null,\n 4.0,\n 5.0,\n]", - // format!("{:?}", f32_array) - // ); - // let arr_4 = batch.column(3); - // let bool_array = BooleanArray::from(arr_4.data()); - // assert_eq!( - // "PrimitiveArray\n[\n true,\n null,\n false,\n true,\n false,\n]", - // format!("{:?}", bool_array) - // ); - // } + /// Validate the `RecordBatch` of primitive arrays + fn validate_primitive_batch(batch: RecordBatch) { + assert_eq!(5, batch.num_rows()); + assert_eq!(1, batch.num_columns()); + let arr_1 = batch.column(0); + let int32_array = arr_1.as_any().downcast_ref::().unwrap(); + assert_eq!( + "PrimitiveArray\n[\n true,\n null,\n null,\n false,\n true,\n]", + format!("{:?}", int32_array) + ); + let arr_2 = batch.column(1); + let binary_array = BinaryArray::from(arr_2.data()); + assert_eq!("foo", std::str::from_utf8(binary_array.value(0)).unwrap()); + assert_eq!("bar", std::str::from_utf8(binary_array.value(1)).unwrap()); + assert_eq!("baz", std::str::from_utf8(binary_array.value(2)).unwrap()); + assert!(binary_array.is_null(3)); + assert_eq!("quux", std::str::from_utf8(binary_array.value(4)).unwrap()); + let arr_3 = batch.column(2); + let f32_array = Float32Array::from(arr_3.data()); + assert_eq!( + "PrimitiveArray\n[\n 1.0,\n 2.0,\n null,\n 4.0,\n 5.0,\n]", + format!("{:?}", f32_array) + ); + let arr_4 = batch.column(3); + let bool_array = BooleanArray::from(arr_4.data()); + assert_eq!( + "PrimitiveArray\n[\n true,\n null,\n false,\n true,\n false,\n]", + format!("{:?}", bool_array) + ); + } } diff --git a/rust/arrow/test/data/arrow_file.dat b/rust/arrow/test/data/arrow_file.dat deleted file mode 100644 index 7e36888ad30bda6dbd43f83594459ebef54ff6fe..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2842 zcmeHJy-EW?5T46jk~5reg{UCH6&4mLqCS9^Cs5L8X;P$36ar$E!ot!Au&}hqL-+`m zzD@kTotY5M2wHmsvpYZE{>`#G(;o~ju1-XfNF*0hM>1(iQ&LEvD26W3QnZP+ltf;@ zH{dRq`r~rB#DLZgWR9VYw1B;VetahKrkJ>ZPMl)|Yo_29Y?wm={#4{s@y&f$xj)Az zkSGTn_Mc-mj&FyRb304aI09yUofQ5QD4qrs6==d7JqVyK;y^qplU>ZnNMvDFiDThD z=5z+?K+mubjnx7+?y;%P&~B~Cwx+Kg=AyeWjQfvG=USvPw0=J#m!>=Z+H}WHEB^b6 zPL;9qed5T0{;G4(o({#0X&htIeJ=OYOds!;F(Ge*$CJrucsCk8+|Fj)=K)x0evZC0 z8U1UJ57iLHQ>^ccuSLmY{*z9W-$#94vU-1|hR^L)E6@IIgm9?W_wK%Cuow6lv<3mH^#2A%Qx&a?_NL^ta^KS&DX5 z?2y79deET<9ZV6_2T?*%z=u7gAbJuwhqRbfgd9vyJ)~f$3xby7#A=l9n|YtKe(SYd z3T~nQh}qe1zW3hDdo%y+%>Vy->CBlQy>{H}+GA!z#&p02ZNTcZLWncrU;IrR9 z&5i)Q3>`zhF<`cgd>;AR$XokRhkO-z6!{49?S8X$@2K3o{e*Q|G3q8ckcz_WKW^zEi)A z`u(W;X?=ct>SBHVGQQu=vR!Q*&Hj|wXM~7t4WlVLlIlwX(e@zfDK|blJ54!;ztNmu zIM;00Xx6!}zo)>>W&Yq{=ydk?Cx`=W(!UE+mnP18uMggflh5yY8>hg0BOq8j~YIAb>Rro8_D<5{@AydQ7_pS zUw{)xi!K}EdOGzjUt{bS7lBi`t`hCO0~@>+&)1vif5WC-F53KchfT`T=5nqpN7)Zk zeP8nJ!t`vtnP!~Xa2}6x743D#g)_f`%=yoij(#S+w0iM+J(TJmOIQDpul3d!+kFe| z)}Tv&7O)_(*=xDYF4^Bew%Q@frf&9-{RpyLGi3Mp$%geOA4`3-(d|or@`ZlTC-#Ml zaKi>R348~*BY?l^7O&i7`k8|xc&Rh(+a$0|B<@Z z_&gp1kl<>p@hA;&JhZk_u|NIg-v_PxtbZGUWXzZb?J4w$zu1Gxnw85&JDa*_@JoIe z41c|OZSuF6ZE)RJ`k}s{^gef$)L|0OHkgE~3GL!zj-@vo_?lzcIUD#f$Fj2+_@#iC z172~g_S=D9bFB6^9IFsk`*(tV+EAcQ`iHr}=t#oSuQ^|FnGwg*9}E1XW9ctBRujU~ zUk$vl`1QaG%l@sv3yUAZo~$zoiyscWu=ugS3yWV2ys-Fo;DzPi2hMA}Zw7f`$!`W; zSbPQh(azKhS6pbw<)wct=nId!{3Yk5-wOK6L0`D)`rJD+slAP0?`GgPgFWGDvVt|i z*9*macxU~{0>bKV&3WOGfJXy98|(?I|8vf3{8~YO(Xr%Lf<0lym%aWbver2b_kRoL zDE98pUVQ9-l>cAF>X%BzU&~E8XNp7_dF~lH)5G%8k$j9_#8JLi|0wrW7ygo$!r0Jm z-R}-Hy19wxv+4ljx>&R4ryZI`n&vtc+d272g5irvkhLeyskBL zIq10$s7`V$HLp_afpx9xHzkb)>a|96wnv-H^o#9Emu<>d>hWF!uhS=y;T$|Wj|Ar~ zApIB#^$%m0eWg9#8{yuYbM))T&mc2yDR3!rsR6c-+enSDmEC#WYu%uX`u?DC>g?~M zh z#0!gWhxUc#Uvb{Zk*jzBi}OZt-r!WwZ@N#5M~>C5O7%y}jrJ0~cN^5TWX>wF(>A4^9}By2k}&gZ_y3n z?I=DUiq8W0WK+Ahq(4I74KFT?^ffDimvyti+8K@8BqT}^Pjw1UBS(Tm(u@4eF1{o+9Ch| diff --git a/rust/arrow/test/data/struct_types.file.dat b/rust/arrow/test/data/struct_types.file.dat deleted file mode 100644 index 6bfd4a46843aa99405bd99398206246931575f43..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 3410 zcmeHKJ#Q015S{Zm7h?{|k)kM|5E7UtNI1oA(4;2rc6EK>}TR{I3cp2>s<2Puuo&M%`j8{d2nv;Kp zY%--k37hyVqfzcDf=+}VDRBJmlgA^X8))5r+RpmDQHJqD=Oc)BF)qef1#UvtfM@2j z=qhVy-BMWtkF{FN4?dTGx{TW!WG>=Oj0gGa{b6sEZEPJQ}wp$FKKTWhwAE5c_Di4XYNhzSI%QMO*`18?RN+H_#n^6cfm8_eqy5y z_Y(869IQ6_BvFrKwfghw?Z~$^Vr-Jt_^)`MKatL#`aVdyj_QfWONnzPE6Tr3&p~+(%5zYjgYrITCA8DztnGQ8Zk=% From 51c384219dbb682ff233787291807af477d28a0a Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Thu, 17 Oct 2019 23:49:20 +0200 Subject: [PATCH 08/16] use generated arrow files for testing --- rust/arrow/Cargo.toml | 1 + rust/arrow/src/ipc/convert.rs | 2 +- rust/arrow/src/ipc/file/reader.rs | 264 +++++------------------- rust/arrow/src/util/integration_util.rs | 20 +- 4 files changed, 75 insertions(+), 212 deletions(-) diff --git a/rust/arrow/Cargo.toml b/rust/arrow/Cargo.toml index e94772d82e7a0..81aedfeb9c9d8 100644 --- a/rust/arrow/Cargo.toml +++ b/rust/arrow/Cargo.toml @@ -57,6 +57,7 @@ default = ["simd"] [dev-dependencies] criterion = "0.2" lazy_static = "1" +flate2 = "1" [[bench]] name = "array_from_vec" diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index 55d21b2531889..5776077154fda 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -129,7 +129,7 @@ fn get_data_type(field: ipc::Field) -> DataType { } (64, ipc::TimeUnit::NANOSECOND) => DataType::Time64(TimeUnit::Nanosecond), z @ _ => panic!( - "Time type with bit witdh of {} and unit of {:?} not supported", + "Time type with bit width of {} and unit of {:?} not supported", z.0, z.1 ), } diff --git a/rust/arrow/src/ipc/file/reader.rs b/rust/arrow/src/ipc/file/reader.rs index 6d84b44f36c0e..bf1ea1cd62ef0 100644 --- a/rust/arrow/src/ipc/file/reader.rs +++ b/rust/arrow/src/ipc/file/reader.rs @@ -23,10 +23,10 @@ use std::sync::Arc; use crate::array::*; use crate::buffer::Buffer; use crate::compute::cast; -use crate::datatypes::{DataType, Schema}; +use crate::datatypes::{DataType, Schema, SchemaRef}; use crate::error::{ArrowError, Result}; use crate::ipc; -use crate::record_batch::RecordBatch; +use crate::record_batch::{RecordBatch, RecordBatchReader}; use DataType::*; static ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; @@ -381,7 +381,7 @@ impl Reader { } /// Return the schema of the file - pub fn schema(&self) -> Arc { + pub fn schema(&self) -> SchemaRef { self.schema.clone() } @@ -428,233 +428,79 @@ impl Reader { /// Read a specific record batch /// - /// Sets the current block to the batch number, and reads the record batch at that - /// block - pub fn read_batch(&mut self, batch_num: usize) -> Result> { - if batch_num >= self.total_blocks { + /// Sets the current block to the index, allowing random reads + pub fn set_index(&mut self, index: usize) -> Result<()> { + if index >= self.total_blocks { Err(ArrowError::IoError(format!( - "Cannot read batch at index {} from {} total batches", - batch_num, self.total_blocks + "Cannot set batch to index {} from {} total batches", + index, self.total_blocks ))) } else { - self.current_block = batch_num; - self.next() + self.current_block = index; + Ok(()) } } } +impl RecordBatchReader for Reader { + fn schema(&mut self) -> SchemaRef { + self.schema.clone() + } + + fn next_batch(&mut self) -> Result> { + self.next() + } +} + #[cfg(test)] mod tests { use super::*; - use crate::datatypes::*; + use flate2::read::GzDecoder; + + use crate::util::integration_util::*; use std::env; use std::fs::File; - use DataType::*; - #[test] - fn test_read_primitive_file() { + fn read_generated_files() { let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); - let file = File::open(format!( - "{}/data/arrow-ipc/rust/primitive_types.file.arrow", - testdata - )) - .unwrap(); - - let mut reader = Reader::try_new(file).unwrap(); - assert_eq!(3, reader.num_batches()); - for _ in 0..reader.num_batches() { - let batch = reader.next().unwrap().unwrap(); - validate_primitive_batch(batch); - } - // try read a batch after all batches are exhausted - let batch = reader.next().unwrap(); - assert!(batch.is_none()); - - // seek a specific batch - let batch = reader.read_batch(4).unwrap().unwrap(); - validate_primitive_batch(batch); - // try read a batch after seeking to the last batch - let batch = reader.next().unwrap(); - assert!(batch.is_none()); + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_datetime", + "generated_nested", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/data/arrow-ipc/integration/0.14.1/{}.arrow_file", + testdata, path + )) + .unwrap(); + + let mut reader = Reader::try_new(file).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(path); + assert!(arrow_json.equals_reader(&mut reader)); + }); } - #[test] - fn test_read_list_file() { + /// Read gzipped JSON file + fn read_gzip_json(path: &str) -> ArrowJson { let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); let file = File::open(format!( - "{}/data/arrow-ipc/rust/list_types.file.arrow", - testdata + "{}/data/arrow-ipc/integration/0.14.1/{}.json.gz", + testdata, path )) .unwrap(); - - let mut reader = Reader::try_new(file).unwrap(); - assert_eq!(3, reader.num_batches()); - for _ in 0..reader.num_batches() { - let batch = reader.next().unwrap().unwrap(); - validate_primitive_batch(batch); - } - } - - #[test] - fn test_read_struct_file() { - let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); - let file = File::open(format!( - "{}/data/arrow-ipc/rust/struct_types.file.arrow", - testdata - )) - .unwrap(); - - let schema = Schema::new(vec![Field::new( - "structs", - Struct(vec![ - Field::new("bools", Boolean, true), - Field::new("int8s", Int8, true), - Field::new("varbinary", Utf8, true), - Field::new("numericlist", List(Box::new(Int8)), true), - Field::new("floatlist", List(Box::new(Float32)), true), - ]), - true, - )]); - - // batch contents - let list_values_builder = Int8Builder::new(10); - let mut list_builder = ListBuilder::new(list_values_builder); - // [[1,2,3,4], null, [5,6], [7], [8,9,10]] - list_builder.values().append_value(1).unwrap(); - list_builder.values().append_value(2).unwrap(); - list_builder.values().append_value(3).unwrap(); - list_builder.values().append_value(4).unwrap(); - list_builder.append(true).unwrap(); - list_builder.append(false).unwrap(); - list_builder.values().append_value(5).unwrap(); - list_builder.values().append_value(6).unwrap(); - list_builder.append(true).unwrap(); - list_builder.values().append_value(7).unwrap(); - list_builder.append(true).unwrap(); - list_builder.values().append_value(8).unwrap(); - list_builder.values().append_value(9).unwrap(); - list_builder.values().append_value(10).unwrap(); - list_builder.append(true).unwrap(); - let list_array: ListArray = list_builder.finish(); - - // floats - let list_values_builder = Float32Builder::new(10); - let mut list_builder = ListBuilder::new(list_values_builder); - // [[1.1,2.2,3.3,4.4], null, [5.5,6.6], [7.7], [8.8,9.9,10.0]] - list_builder.values().append_value(1.1).unwrap(); - list_builder.values().append_value(2.2).unwrap(); - list_builder.values().append_value(3.3).unwrap(); - list_builder.values().append_value(4.4).unwrap(); - list_builder.append(true).unwrap(); - list_builder.append(false).unwrap(); - list_builder.values().append_value(5.5).unwrap(); - list_builder.values().append_value(6.6).unwrap(); - list_builder.append(true).unwrap(); - list_builder.values().append_value(7.7).unwrap(); - list_builder.append(true).unwrap(); - list_builder.values().append_value(8.8).unwrap(); - list_builder.values().append_value(9.9).unwrap(); - list_builder.values().append_value(10.0).unwrap(); - list_builder.append(true).unwrap(); - let float_list_array: ListArray = list_builder.finish(); - - let mut binary_builder = BinaryBuilder::new(100); - binary_builder.append_string("foo").unwrap(); - binary_builder.append_string("bar").unwrap(); - binary_builder.append_string("baz").unwrap(); - binary_builder.append_string("qux").unwrap(); - binary_builder.append_string("quux").unwrap(); - let binary_array = binary_builder.finish(); - // create struct array - let struct_array = StructArray::from(vec![ - ( - Field::new("bools", Boolean, true), - Arc::new(BooleanArray::from(vec![ - Some(true), - None, - None, - Some(false), - Some(true), - ])) as Arc, - ), - ( - Field::new("int8s", Int8, true), - Arc::new(Int8Array::from(vec![ - Some(-1), - None, - None, - Some(-4), - Some(-5), - ])), - ), - (Field::new("varbinary", Utf8, true), Arc::new(binary_array)), - ( - Field::new("numericlist", List(Box::new(Int8)), true), - Arc::new(list_array), - ), - ( - Field::new("floatlist", List(Box::new(Float32)), true), - Arc::new(float_list_array), - ), - ]); - - let mut reader = Reader::try_new(file).unwrap(); - assert_eq!(3, reader.num_batches()); - for _ in 0..reader.num_batches() { - let batch: RecordBatch = reader.next().unwrap().unwrap(); - assert_eq!(&Arc::new(schema.clone()), batch.schema()); - assert_eq!(1, batch.num_columns()); - assert_eq!(5, batch.num_rows()); - let struct_col: &StructArray = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - // test equality of struct columns - assert!(struct_col.equals(&struct_array)); - } - // try read a batch after all batches are exhausted - let batch = reader.next().unwrap(); - assert!(batch.is_none()); - - // seek a specific batch - reader.read_batch(2).unwrap().unwrap(); - // validate_batch(batch); - // try read a batch after seeking to the last batch - let batch = reader.next().unwrap(); - assert!(batch.is_none()); - } - - /// Validate the `RecordBatch` of primitive arrays - fn validate_primitive_batch(batch: RecordBatch) { - assert_eq!(5, batch.num_rows()); - assert_eq!(1, batch.num_columns()); - let arr_1 = batch.column(0); - let int32_array = arr_1.as_any().downcast_ref::().unwrap(); - assert_eq!( - "PrimitiveArray\n[\n true,\n null,\n null,\n false,\n true,\n]", - format!("{:?}", int32_array) - ); - let arr_2 = batch.column(1); - let binary_array = BinaryArray::from(arr_2.data()); - assert_eq!("foo", std::str::from_utf8(binary_array.value(0)).unwrap()); - assert_eq!("bar", std::str::from_utf8(binary_array.value(1)).unwrap()); - assert_eq!("baz", std::str::from_utf8(binary_array.value(2)).unwrap()); - assert!(binary_array.is_null(3)); - assert_eq!("quux", std::str::from_utf8(binary_array.value(4)).unwrap()); - let arr_3 = batch.column(2); - let f32_array = Float32Array::from(arr_3.data()); - assert_eq!( - "PrimitiveArray\n[\n 1.0,\n 2.0,\n null,\n 4.0,\n 5.0,\n]", - format!("{:?}", f32_array) - ); - let arr_4 = batch.column(3); - let bool_array = BooleanArray::from(arr_4.data()); - assert_eq!( - "PrimitiveArray\n[\n true,\n null,\n false,\n true,\n false,\n]", - format!("{:?}", bool_array) - ); + let mut gz = GzDecoder::new(&file); + let mut s = String::new(); + gz.read_to_string(&mut s).unwrap(); + // convert to Arrow JSON + let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap(); + arrow_json } } diff --git a/rust/arrow/src/util/integration_util.rs b/rust/arrow/src/util/integration_util.rs index 5a8a9f2f97d11..44ad5a51f2eca 100644 --- a/rust/arrow/src/util/integration_util.rs +++ b/rust/arrow/src/util/integration_util.rs @@ -24,11 +24,11 @@ use serde_json::Value; use crate::array::*; use crate::datatypes::*; -use crate::record_batch::RecordBatch; +use crate::record_batch::{RecordBatch, RecordBatchReader}; /// A struct that represents an Arrow file with a schema and record batches #[derive(Deserialize)] -struct ArrowJson { +pub(crate) struct ArrowJson { schema: ArrowJsonSchema, batches: Vec, } @@ -62,6 +62,22 @@ struct ArrowJsonColumn { children: Option>, } +impl ArrowJson { + // Compare the Arrow JSON with a record batch reader + pub fn equals_reader(&self, reader: &mut RecordBatchReader) -> bool { + if !self.schema.equals_schema(&reader.schema()) { + return false; + } + self.batches.iter().all(|col| { + let batch = reader.next_batch(); + match batch { + Ok(Some(batch)) => col.equals_batch(&batch), + _ => false, + } + }) + } +} + impl ArrowJsonSchema { /// Compare the Arrow JSON schema with the Arrow `Schema` fn equals_schema(&self, schema: &Schema) -> bool { From 49ff1276c2ed83489acd5fe1e6a1041d3dd94c53 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Mon, 21 Oct 2019 15:50:47 +0200 Subject: [PATCH 09/16] fix rustfmt issue was using an older version of rustfmt --- rust/datafusion/src/execution/context.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index d7198126bdf4a..9b2294091f09d 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -326,9 +326,7 @@ impl ExecutionContext { match expr { &Expr::Literal(ref scalar_value) => { let limit: usize = match scalar_value { - ScalarValue::Int8(limit) if *limit >= 0 => { - Ok(*limit as usize) - } + ScalarValue::Int8(limit) if *limit >= 0 => Ok(*limit as usize), ScalarValue::Int16(limit) if *limit >= 0 => { Ok(*limit as usize) } From 65228a4bd5302628f07c9f9db926191e5fc9dd9f Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 19 Oct 2019 07:52:05 +0200 Subject: [PATCH 10/16] read fixed size list array --- rust/arrow/src/ipc/convert.rs | 12 +++++++ rust/arrow/src/ipc/file/reader.rs | 42 ++++++++++++++++++++++--- rust/arrow/src/util/integration_util.rs | 38 ++++++++++++++++++++++ 3 files changed, 88 insertions(+), 4 deletions(-) diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index 5776077154fda..b8d64f0e6fd8b 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -152,6 +152,18 @@ fn get_data_type(field: ipc::Field) -> DataType { // returning int16 for now, to test, not sure how to get data type DataType::List(Box::new(get_data_type(child_field))) } + ipc::Type::FixedSizeList => { + let children = field.children().unwrap(); + if children.len() != 1 { + panic!("expect a list to have one child") + } + let child_field = children.get(0); + let fsl = field.type__as_fixed_size_list().unwrap(); + DataType::FixedSizeList(( + Box::new(get_data_type(child_field)), + fsl.listSize(), + )) + } ipc::Type::Struct_ => { let mut fields = vec![]; if let Some(children) = field.children() { diff --git a/rust/arrow/src/ipc/file/reader.rs b/rust/arrow/src/ipc/file/reader.rs index bf1ea1cd62ef0..e34a39f5baaf1 100644 --- a/rust/arrow/src/ipc/file/reader.rs +++ b/rust/arrow/src/ipc/file/reader.rs @@ -92,6 +92,27 @@ fn create_array( create_list_array(list_node, data_type, &list_buffers[..], triple.0) } + FixedSizeList((ref list_data_type, _)) => { + let list_node = &nodes[node_index]; + let list_buffers: Vec = buffers[buffer_index..buffer_index + 1] + .iter() + .map(|buf| read_buffer(buf, data)) + .collect(); + node_index = node_index + 1; + buffer_index = buffer_index + 1; + let triple = create_array( + nodes, + list_data_type, + data, + buffers, + node_index, + buffer_index, + ); + node_index = triple.1; + buffer_index = triple.2; + + create_list_array(list_node, data_type, &list_buffers[..], triple.0) + } Struct(struct_fields) => { let struct_node = &nodes[node_index]; let null_buffer: Buffer = read_buffer(&buffers[buffer_index], data); @@ -261,6 +282,19 @@ fn create_list_array( .null_bit_buffer(buffers[0].clone()) } make_array(builder.build()) + } else if let &DataType::FixedSizeList(_) = data_type { + let null_count = field_node.null_count() as usize; + let mut builder = ArrayData::builder(data_type.clone()) + .len(field_node.length() as usize) + .buffers(buffers[1..1].to_vec()) + .offset(0) + .child_data(vec![child_array.data()]); + if null_count > 0 { + builder = builder + .null_count(null_count) + .null_bit_buffer(buffers[0].clone()) + } + make_array(builder.build()) } else { panic!("Cannot create list array from {:?}", data_type) } @@ -467,11 +501,11 @@ mod tests { let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); // the test is repetitive, thus we can read all supported files at once let paths = vec![ - "generated_datetime", + // "generated_datetime", "generated_nested", - "generated_primitive_no_batches", - "generated_primitive_zerolength", - "generated_primitive", + // "generated_primitive_no_batches", + // "generated_primitive_zerolength", + // "generated_primitive", ]; paths.iter().for_each(|path| { let file = File::open(format!( diff --git a/rust/arrow/src/util/integration_util.rs b/rust/arrow/src/util/integration_util.rs index 44ad5a51f2eca..58923a2a2412b 100644 --- a/rust/arrow/src/util/integration_util.rs +++ b/rust/arrow/src/util/integration_util.rs @@ -180,6 +180,11 @@ impl ArrowJsonBatch { let arr = arr.as_any().downcast_ref::().unwrap(); arr.equals_json(&json_array.iter().collect::>()[..]) } + DataType::FixedSizeList(_) => { + let arr = + arr.as_any().downcast_ref::().unwrap(); + arr.equals_json(&json_array.iter().collect::>()[..]) + } DataType::Struct(_) => { let arr = arr.as_any().downcast_ref::().unwrap(); arr.equals_json(&json_array.iter().collect::>()[..]) @@ -194,6 +199,9 @@ impl ArrowJsonBatch { fn json_from_col(col: &ArrowJsonColumn, data_type: &DataType) -> Vec { match data_type { DataType::List(dt) => json_from_list_col(col, &**dt), + DataType::FixedSizeList((dt, list_size)) => { + json_from_fixed_size_list_col(col, &**dt, *list_size as usize) + } DataType::Struct(fields) => json_from_struct_col(col, fields), _ => merge_json_array(&col.validity, &col.data.clone().unwrap()), } @@ -273,6 +281,36 @@ fn json_from_list_col(col: &ArrowJsonColumn, data_type: &DataType) -> Vec values } +/// Convert an Arrow JSON column/array of a `DataType::List` into a vector of `Value` +fn json_from_fixed_size_list_col( + col: &ArrowJsonColumn, + data_type: &DataType, + list_size: usize, +) -> Vec { + let mut values = Vec::with_capacity(col.count); + + // get the inner array + let child = &col.children.clone().expect("list type must have children")[0]; + let inner = match data_type { + DataType::List(ref dt) => json_from_col(child, &**dt), + DataType::FixedSizeList((ref dt, _)) => json_from_col(child, &**dt), + DataType::Struct(fields) => json_from_struct_col(col, fields), + _ => merge_json_array(&child.validity, &child.data.clone().unwrap()), + }; + + for i in 0..col.count { + match col.validity[i] { + 0 => values.push(Value::Null), + 1 => values.push(Value::Array( + inner[(list_size * i)..(list_size * (i + 1))].to_vec(), + )), + _ => panic!("Validity data should be 0 or 1"), + } + } + + values +} + #[cfg(test)] mod tests { use super::*; From 11301bce6c0595039631609d11bba3f4b178b0c3 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Mon, 21 Oct 2019 16:06:06 +0200 Subject: [PATCH 11/16] fix test file locations --- rust/arrow/src/ipc/file/reader.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/arrow/src/ipc/file/reader.rs b/rust/arrow/src/ipc/file/reader.rs index e34a39f5baaf1..426ff7ef91c6b 100644 --- a/rust/arrow/src/ipc/file/reader.rs +++ b/rust/arrow/src/ipc/file/reader.rs @@ -509,7 +509,7 @@ mod tests { ]; paths.iter().for_each(|path| { let file = File::open(format!( - "{}/data/arrow-ipc/integration/0.14.1/{}.arrow_file", + "{}/arrow-ipc/integration/0.14.1/{}.arrow_file", testdata, path )) .unwrap(); @@ -526,7 +526,7 @@ mod tests { fn read_gzip_json(path: &str) -> ArrowJson { let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); let file = File::open(format!( - "{}/data/arrow-ipc/integration/0.14.1/{}.json.gz", + "{}/arrow-ipc/integration/0.14.1/{}.json.gz", testdata, path )) .unwrap(); From bda4a7c9b7039cd4d5844b1b83b10561097b77a1 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Mon, 4 Nov 2019 18:38:44 +0200 Subject: [PATCH 12/16] read strings and binaries --- rust/arrow/src/ipc/convert.rs | 7 ++++-- rust/arrow/src/ipc/file/reader.rs | 36 ++++++++++++++++++++++++++----- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index b8d64f0e6fd8b..97fcde01bbc6c 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -99,8 +99,11 @@ fn get_data_type(field: ipc::Field) -> DataType { _ => panic!("Unexpected bitwidth and signed"), } } - ipc::Type::Utf8 | ipc::Type::Binary | ipc::Type::FixedSizeBinary => { - DataType::Utf8 + ipc::Type::Binary => DataType::Binary, + ipc::Type::Utf8 => DataType::Utf8, + ipc::Type::FixedSizeBinary => { + let fsb = field.type__as_fixed_size_binary().unwrap(); + DataType::FixedSizeBinary(fsb.byteWidth()) } ipc::Type::FloatingPoint => { let float = field.type__as_floating_point().unwrap(); diff --git a/rust/arrow/src/ipc/file/reader.rs b/rust/arrow/src/ipc/file/reader.rs index 426ff7ef91c6b..8458711017496 100644 --- a/rust/arrow/src/ipc/file/reader.rs +++ b/rust/arrow/src/ipc/file/reader.rs @@ -58,7 +58,7 @@ fn create_array( ) -> (ArrayRef, usize, usize) { use DataType::*; let array = match data_type { - Utf8 => { + Utf8 | Binary => { let array = create_primitive_array( &nodes[node_index], data_type, @@ -71,6 +71,19 @@ fn create_array( buffer_index = buffer_index + 3; array } + FixedSizeBinary(_) => { + let array = create_primitive_array( + &nodes[node_index], + data_type, + buffers[buffer_index..buffer_index + 2] + .iter() + .map(|buf| read_buffer(buf, data)) + .collect(), + ); + node_index = node_index + 1; + buffer_index = buffer_index + 2; + array + } List(ref list_data_type) => { let list_node = &nodes[node_index]; let list_buffers: Vec = buffers[buffer_index..buffer_index + 2] @@ -176,7 +189,7 @@ fn create_primitive_array( let length = field_node.length() as usize; let null_count = field_node.null_count() as usize; let array_data = match data_type { - Utf8 => { + Utf8 | Binary => { // read 3 buffers let mut builder = ArrayData::builder(data_type.clone()) .len(length) @@ -189,6 +202,19 @@ fn create_primitive_array( } builder.build() } + FixedSizeBinary(_) => { + // read 3 buffers + let mut builder = ArrayData::builder(data_type.clone()) + .len(length) + .buffers(buffers[1..2].to_vec()) + .offset(0); + if null_count > 0 { + builder = builder + .null_count(null_count) + .null_bit_buffer(buffers[0].clone()) + } + builder.build() + } Int8 | Int16 | Int32 | UInt8 | UInt16 | UInt32 | Time32(_) | Date32(_) => { if buffers[1].len() / 8 == length { // interpret as a signed i64, and cast appropriately @@ -503,9 +529,9 @@ mod tests { let paths = vec![ // "generated_datetime", "generated_nested", - // "generated_primitive_no_batches", - // "generated_primitive_zerolength", - // "generated_primitive", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", ]; paths.iter().for_each(|path| { let file = File::open(format!( From af5ebef7f4e8e7736fd086d31079573d4f9f9d3a Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 16 Nov 2019 08:25:39 +0200 Subject: [PATCH 13/16] debug messages to identify reader errors --- rust/arrow/src/ipc/file/reader.rs | 8 +++++--- rust/arrow/src/util/integration_util.rs | 4 ++++ rust/datafusion/src/execution/context.rs | 4 +++- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/rust/arrow/src/ipc/file/reader.rs b/rust/arrow/src/ipc/file/reader.rs index 8458711017496..eec1dfe49390a 100644 --- a/rust/arrow/src/ipc/file/reader.rs +++ b/rust/arrow/src/ipc/file/reader.rs @@ -34,6 +34,7 @@ static ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; /// Read a buffer based on offset and length fn read_buffer(buf: &ipc::Buffer, a_data: &Vec) -> Buffer { let start_offset = buf.offset() as usize; + dbg!(&buf); let end_offset = start_offset + buf.length() as usize; let buf_data = &a_data[start_offset..end_offset]; Buffer::from(&buf_data) @@ -173,6 +174,7 @@ fn create_array( ); node_index = node_index + 1; buffer_index = buffer_index + 2; + dbg!((array.len(), &array)); array } }; @@ -528,9 +530,9 @@ mod tests { // the test is repetitive, thus we can read all supported files at once let paths = vec![ // "generated_datetime", - "generated_nested", - "generated_primitive_no_batches", - "generated_primitive_zerolength", + // "generated_nested", + // "generated_primitive_no_batches", + // "generated_primitive_zerolength", "generated_primitive", ]; paths.iter().for_each(|path| { diff --git a/rust/arrow/src/util/integration_util.rs b/rust/arrow/src/util/integration_util.rs index 58923a2a2412b..3652708e4a285 100644 --- a/rust/arrow/src/util/integration_util.rs +++ b/rust/arrow/src/util/integration_util.rs @@ -115,6 +115,7 @@ impl ArrowJsonBatch { return false; } let json_array: Vec = json_from_col(&col, field.data_type()); + println!("Data type: {:?}", field.data_type()); match field.data_type() { DataType::Boolean => { let arr = arr.as_any().downcast_ref::().unwrap(); @@ -157,6 +158,9 @@ impl ArrowJsonBatch { } DataType::Float32 => { let arr = arr.as_any().downcast_ref::().unwrap(); + dbg!(&arr); + dbg!(&arr.len()); + dbg!(&json_array); arr.equals_json(&json_array.iter().collect::>()[..]) } DataType::Float64 => { diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 9b2294091f09d..d7198126bdf4a 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -326,7 +326,9 @@ impl ExecutionContext { match expr { &Expr::Literal(ref scalar_value) => { let limit: usize = match scalar_value { - ScalarValue::Int8(limit) if *limit >= 0 => Ok(*limit as usize), + ScalarValue::Int8(limit) if *limit >= 0 => { + Ok(*limit as usize) + } ScalarValue::Int16(limit) if *limit >= 0 => { Ok(*limit as usize) } From c7100768c35d8c340bad9379b74c9cbed5ed68a2 Mon Sep 17 00:00:00 2001 From: Paddy Horan Date: Mon, 18 Nov 2019 14:57:28 -0500 Subject: [PATCH 14/16] Update to fix precision on float32 arrays. --- rust/arrow/src/datatypes.rs | 2 +- rust/arrow/src/util/integration_util.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs index e4502b770e378..00ad1790962c6 100644 --- a/rust/arrow/src/datatypes.rs +++ b/rust/arrow/src/datatypes.rs @@ -184,7 +184,7 @@ impl ArrowNativeType for u64 { impl ArrowNativeType for f32 { fn into_json_value(self) -> Option { - Number::from_f64(self as f64).map(|num| VNumber(num)) + Number::from_f64(f64::round(self as f64 * 1000.0) / 1000.0).map(|num| VNumber(num)) } } diff --git a/rust/arrow/src/util/integration_util.rs b/rust/arrow/src/util/integration_util.rs index 3652708e4a285..56feab7d34bc4 100644 --- a/rust/arrow/src/util/integration_util.rs +++ b/rust/arrow/src/util/integration_util.rs @@ -95,7 +95,7 @@ impl ArrowJsonSchema { } impl ArrowJsonBatch { - /// Comapre the Arrow JSON record batch with a `RecordBatch` + /// Compare the Arrow JSON record batch with a `RecordBatch` fn equals_batch(&self, batch: &RecordBatch) -> bool { if self.count != batch.num_rows() { return false; From 84e31b7d3e3fdb5e3f256c7a9db8f16f54a31d0b Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Tue, 19 Nov 2019 05:28:18 +0200 Subject: [PATCH 15/16] minor cleaning up, test IPC reader with more files --- rust/arrow/src/datatypes.rs | 3 ++- rust/arrow/src/ipc/file/reader.rs | 11 ++++++----- rust/arrow/src/util/integration_util.rs | 6 +----- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs index 00ad1790962c6..2f04796565375 100644 --- a/rust/arrow/src/datatypes.rs +++ b/rust/arrow/src/datatypes.rs @@ -184,7 +184,8 @@ impl ArrowNativeType for u64 { impl ArrowNativeType for f32 { fn into_json_value(self) -> Option { - Number::from_f64(f64::round(self as f64 * 1000.0) / 1000.0).map(|num| VNumber(num)) + Number::from_f64(f64::round(self as f64 * 1000.0) / 1000.0) + .map(|num| VNumber(num)) } } diff --git a/rust/arrow/src/ipc/file/reader.rs b/rust/arrow/src/ipc/file/reader.rs index eec1dfe49390a..e01d08b2ae164 100644 --- a/rust/arrow/src/ipc/file/reader.rs +++ b/rust/arrow/src/ipc/file/reader.rs @@ -34,7 +34,6 @@ static ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; /// Read a buffer based on offset and length fn read_buffer(buf: &ipc::Buffer, a_data: &Vec) -> Buffer { let start_offset = buf.offset() as usize; - dbg!(&buf); let end_offset = start_offset + buf.length() as usize; let buf_data = &a_data[start_offset..end_offset]; Buffer::from(&buf_data) @@ -174,7 +173,6 @@ fn create_array( ); node_index = node_index + 1; buffer_index = buffer_index + 2; - dbg!((array.len(), &array)); array } }; @@ -291,6 +289,8 @@ fn create_primitive_array( make_array(array_data) } +/// Reads the correct number of buffers based on list type an null_count, and creates a +/// list array ref fn create_list_array( field_node: &ipc::FieldNode, data_type: &DataType, @@ -328,6 +328,7 @@ fn create_list_array( } } +/// Creates a record batch from binary data using the `ipc::RecordBatch` indexes and the `Schema` fn read_record_batch( buf: &Vec, batch: ipc::RecordBatch, @@ -530,9 +531,9 @@ mod tests { // the test is repetitive, thus we can read all supported files at once let paths = vec![ // "generated_datetime", - // "generated_nested", - // "generated_primitive_no_batches", - // "generated_primitive_zerolength", + "generated_nested", + "generated_primitive_no_batches", + "generated_primitive_zerolength", "generated_primitive", ]; paths.iter().for_each(|path| { diff --git a/rust/arrow/src/util/integration_util.rs b/rust/arrow/src/util/integration_util.rs index 56feab7d34bc4..bbdb51602352a 100644 --- a/rust/arrow/src/util/integration_util.rs +++ b/rust/arrow/src/util/integration_util.rs @@ -63,7 +63,7 @@ struct ArrowJsonColumn { } impl ArrowJson { - // Compare the Arrow JSON with a record batch reader + /// Compare the Arrow JSON with a record batch reader pub fn equals_reader(&self, reader: &mut RecordBatchReader) -> bool { if !self.schema.equals_schema(&reader.schema()) { return false; @@ -115,7 +115,6 @@ impl ArrowJsonBatch { return false; } let json_array: Vec = json_from_col(&col, field.data_type()); - println!("Data type: {:?}", field.data_type()); match field.data_type() { DataType::Boolean => { let arr = arr.as_any().downcast_ref::().unwrap(); @@ -158,9 +157,6 @@ impl ArrowJsonBatch { } DataType::Float32 => { let arr = arr.as_any().downcast_ref::().unwrap(); - dbg!(&arr); - dbg!(&arr.len()); - dbg!(&json_array); arr.equals_json(&json_array.iter().collect::>()[..]) } DataType::Float64 => { From cc38646e705c54f24a06aa4c4ff7e13413beb5c9 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Tue, 19 Nov 2019 05:29:29 +0200 Subject: [PATCH 16/16] fix compiler warnings --- rust/arrow/src/memory.rs | 1 - rust/parquet/src/encodings/decoding.rs | 2 -- rust/parquet/src/encodings/encoding.rs | 5 ----- 3 files changed, 8 deletions(-) diff --git a/rust/arrow/src/memory.rs b/rust/arrow/src/memory.rs index d73ddd11676c8..c941d34bfec16 100644 --- a/rust/arrow/src/memory.rs +++ b/rust/arrow/src/memory.rs @@ -51,7 +51,6 @@ pub unsafe fn memcpy(dst: *mut u8, src: *const u8, len: usize) { } extern "C" { - #[inline] pub fn memcmp(p1: *const u8, p2: *const u8, len: usize) -> i32; } diff --git a/rust/parquet/src/encodings/decoding.rs b/rust/parquet/src/encodings/decoding.rs index 7dc97b65cf94b..453d91cae83ad 100644 --- a/rust/parquet/src/encodings/decoding.rs +++ b/rust/parquet/src/encodings/decoding.rs @@ -657,10 +657,8 @@ impl Decoder for DeltaBitPackDecoder { /// Helper trait to define specific conversions when decoding values trait DeltaBitPackDecoderConversion { /// Sets decoded value based on type `T`. - #[inline] fn get_delta(&self, index: usize) -> i64; - #[inline] fn set_decoded_value(&self, buffer: &mut [T::T], index: usize, value: i64); } diff --git a/rust/parquet/src/encodings/encoding.rs b/rust/parquet/src/encodings/encoding.rs index 5aae81d242f42..c9cad12e42d6a 100644 --- a/rust/parquet/src/encodings/encoding.rs +++ b/rust/parquet/src/encodings/encoding.rs @@ -417,7 +417,6 @@ impl Encoder for DictEncoder { /// Provides encoded size for a data type. /// This is a workaround to calculate dictionary size in bytes. trait DictEncodedSize { - #[inline] fn get_encoded_size(&self, value: &T::T) -> usize; } @@ -752,16 +751,12 @@ impl Encoder for DeltaBitPackEncoder { /// Helper trait to define specific conversions and subtractions when computing deltas trait DeltaBitPackEncoderConversion { // Method should panic if type is not supported, otherwise no-op - #[inline] fn assert_supported_type(); - #[inline] fn as_i64(&self, values: &[T::T], index: usize) -> i64; - #[inline] fn subtract(&self, left: i64, right: i64) -> i64; - #[inline] fn subtract_u64(&self, left: i64, right: i64) -> u64; }