Skip to content

Commit

Permalink
read lists and structs, reafactor reader
Browse files Browse the repository at this point in the history
  • Loading branch information
nevi-me committed Apr 26, 2019
1 parent a54b152 commit ab6ff9b
Show file tree
Hide file tree
Showing 4 changed files with 378 additions and 129 deletions.
29 changes: 29 additions & 0 deletions rust/arrow/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,35 @@ impl From<Vec<(Field, ArrayRef)>> for StructArray {
}
}

impl From<(Vec<(Field, ArrayRef)>, Buffer, usize)> for StructArray {
fn from(triple: (Vec<(Field, ArrayRef)>, Buffer, usize)) -> Self {
let (field_types, field_values): (Vec<_>, Vec<_>) = triple.0.into_iter().unzip();

// Check the length of the child arrays
let length = field_values[0].len();
for i in 1..field_values.len() {
assert_eq!(
length,
field_values[i].len(),
"all child arrays of a StructArray must have the same length"
);
assert_eq!(
field_types[i].data_type(),
field_values[i].data().data_type(),
"the field data types must match the array data in a StructArray"
)
}

let data = ArrayData::builder(DataType::Struct(field_types))
.add_buffer(triple.1)
.child_data(field_values.into_iter().map(|a| a.data()).collect())
.len(length)
.null_count(triple.2)
.build();
Self::from(data)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
50 changes: 36 additions & 14 deletions rust/arrow/src/ipc/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ use crate::ipc;
use flatbuffers::FlatBufferBuilder;

/// Serialize a schema in IPC format
///
/// TODO(Neville) add bit-widths and other field metadata to flatbuffer Type
fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder {
// TODO add bit-widths and other field metadata to flatbuffer Type
use DataType::*;
let mut fbb = FlatBufferBuilder::new();

Expand Down Expand Up @@ -63,19 +62,22 @@ fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder {
fbb
}

fn fb_to_field(field: ipc::Field) -> Field {
Field::new(
field.name().unwrap(),
get_data_type(field),
field.nullable(),
)
}

/// Deserialize a Schema table from IPC format to Schema data type
pub fn fb_to_schema(fb: ipc::Schema) -> Schema {
let mut fields: Vec<Field> = vec![];
let c_fields = fb.fields().unwrap();
let len = c_fields.len();
for i in 0..len {
let c_field: ipc::Field = c_fields.get(i);
let field = Field::new(
c_field.name().unwrap(),
get_data_type(c_field),
c_field.nullable(),
);
fields.push(field);
fields.push(fb_to_field(c_field));
}
Schema::new(fields)
}
Expand All @@ -102,6 +104,7 @@ fn get_fbs_type(dtype: DataType) -> ipc::Type {

/// Get the Arrow data type from the flatbuffer Field table
fn get_data_type(field: ipc::Field) -> DataType {
// TODO add recursion protection for deeply-nested fields (struct and list)
match field.type_type() {
ipc::Type::Bool => DataType::Boolean,
ipc::Type::Int => {
Expand All @@ -118,7 +121,9 @@ fn get_data_type(field: ipc::Field) -> DataType {
_ => panic!("Unexpected bitwidth and signed"),
}
}
ipc::Type::Utf8 | ipc::Type::Binary => DataType::Utf8,
ipc::Type::Utf8 | ipc::Type::Binary | ipc::Type::FixedSizeBinary => {
DataType::Utf8
}
ipc::Type::FloatingPoint => {
let float = field.type__as_floating_point().unwrap();
match float.precision() {
Expand All @@ -127,6 +132,13 @@ fn get_data_type(field: ipc::Field) -> DataType {
ipc::Precision::DOUBLE => DataType::Float64,
}
}
ipc::Type::Date => {
let date = field.type__as_date().unwrap();
match date.unit() {
ipc::DateUnit::DAY => DataType::Date32(DateUnit::Day),
ipc::DateUnit::MILLISECOND => DataType::Date64(DateUnit::Millisecond),
}
}
ipc::Type::Time => {
let time = field.type__as_time().unwrap();
match (time.bitWidth(), time.unit()) {
Expand All @@ -153,12 +165,22 @@ fn get_data_type(field: ipc::Field) -> DataType {
ipc::TimeUnit::NANOSECOND => DataType::Timestamp(TimeUnit::Nanosecond),
}
}
ipc::Type::Date => {
let date = field.type__as_date().unwrap();
match date.unit() {
ipc::DateUnit::DAY => DataType::Date32(DateUnit::Day),
ipc::DateUnit::MILLISECOND => DataType::Date64(DateUnit::Millisecond),
ipc::Type::List => {
let children = field.children().unwrap();
if children.len() != 1 {
panic!("expect a list to have one child")
}
let child_field = children.get(0);
// returning int16 for now, to test, not sure how to get data type
DataType::List(Box::new(get_data_type(child_field)))
}
ipc::Type::Struct_ => {
let children = field.children().unwrap();
let mut fields = vec![];
for i in 0..children.len() {
fields.push(fb_to_field(children.get(i)));
}
DataType::Struct(fields)
}
// TODO add interval support
t @ _ => unimplemented!("Type {:?} not supported", t),
Expand Down
Loading

0 comments on commit ab6ff9b

Please sign in to comment.