Skip to content

Commit

Permalink
feat: Define schema post order visitor. (#25)
Browse files Browse the repository at this point in the history
* Define schema post order visitor

* Resolve conflicts
  • Loading branch information
liurenjie1024 authored Aug 10, 2023
1 parent 4ee05b4 commit 1a186d8
Show file tree
Hide file tree
Showing 5 changed files with 1,085 additions and 106 deletions.
5 changes: 4 additions & 1 deletion crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ keywords = ["iceberg"]

[dependencies]
apache-avro = "0.15.0"
serde = "^1.0"
serde = {version = "^1.0", features = ["rc"]}
serde_bytes = "0.11.8"
serde_json = "^1.0"
serde_derive = "^1.0"
Expand All @@ -39,6 +39,9 @@ chrono = "0.4"
uuid = "1.4.1"
ordered-float = "3.7.0"
bitvec = "1.0.1"
itertools = "0.11"
bimap = "0.6"


[dev-dependencies]
pretty_assertions = "1.4.0"
25 changes: 25 additions & 0 deletions crates/iceberg/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ impl Error {
pub fn kind(&self) -> ErrorKind {
self.kind
}

/// Return error's message.
#[inline]
pub fn message(&self) -> &str {
self.message.as_str()
}
}

macro_rules! define_from_err {
Expand Down Expand Up @@ -283,6 +289,25 @@ define_from_err!(
"Failed to convert between uuid und iceberg value"
);

/// Helper macro to check arguments.
///
///
/// Example:
///
/// Following example check `a > 0`, otherwise returns an error.
/// ```ignore
/// use iceberg::check;
/// ensure_data_valid!(a > 0, "{} is not positive.", a);
/// ```
#[macro_export]
macro_rules! ensure_data_valid {
($cond: expr, $fmt: literal, $($arg:tt)*) => {
if !$cond {
return Err($crate::error::Error::new($crate::error::ErrorKind::DataInvalid, format!($fmt, $($arg)*)))
}
};
}

#[cfg(test)]
mod tests {
use anyhow::anyhow;
Expand Down
102 changes: 68 additions & 34 deletions crates/iceberg/src/spec/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ use ::serde::de::{MapAccess, Visitor};
use serde::de::{Error, IntoDeserializer};
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
use std::cell::OnceCell;
use std::slice::Iter;
use std::sync::Arc;
use std::{collections::HashMap, fmt, ops::Index};

/// Field name for list type.
const LIST_FILED_NAME: &str = "element";
const MAP_KEY_FIELD_NAME: &str = "key";
const MAP_VALUE_FIELD_NAME: &str = "value";
pub(crate) const LIST_FILED_NAME: &str = "element";
pub(crate) const MAP_KEY_FIELD_NAME: &str = "key";
pub(crate) const MAP_VALUE_FIELD_NAME: &str = "value";

#[derive(Debug, PartialEq, Eq, Clone)]
/// All data types are either primitives or nested types, which are maps, lists, or structs.
Expand All @@ -54,6 +54,20 @@ impl fmt::Display for Type {
}
}

impl Type {
/// Whether the type is primitive type.
#[inline(always)]
pub fn is_primitive(&self) -> bool {
matches!(self, Type::Primitive(_))
}

/// Whether the type is struct type.
#[inline(always)]
pub fn is_struct(&self) -> bool {
matches!(self, Type::Struct(_))
}
}

/// Primitive data types
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "lowercase", remote = "Self")]
Expand Down Expand Up @@ -218,7 +232,7 @@ impl fmt::Display for PrimitiveType {
#[serde(rename = "struct", tag = "type")]
pub struct StructType {
/// Struct fields
fields: Vec<NestedField>,
fields: Vec<NestedFieldRef>,
/// Lookup for index by field id
#[serde(skip_serializing)]
id_lookup: OnceCell<HashMap<i32, usize>>,
Expand Down Expand Up @@ -261,7 +275,7 @@ impl<'de> Deserialize<'de> for StructType {
}
}
}
let fields: Vec<NestedField> =
let fields: Vec<NestedFieldRef> =
fields.ok_or_else(|| de::Error::missing_field("fields"))?;

Ok(StructType::new(fields))
Expand All @@ -275,14 +289,14 @@ impl<'de> Deserialize<'de> for StructType {

impl StructType {
/// Creates a struct type with the given fields.
pub fn new(fields: Vec<NestedField>) -> Self {
pub fn new(fields: Vec<NestedFieldRef>) -> Self {
Self {
fields,
id_lookup: OnceCell::default(),
}
}
/// Get struct field with certain id
pub fn field_by_id(&self, id: i32) -> Option<&NestedField> {
pub fn field_by_id(&self, id: i32) -> Option<&NestedFieldRef> {
self.field_id_to_index(id).map(|idx| &self.fields[idx])
}

Expand All @@ -294,9 +308,10 @@ impl StructType {
.get(&field_id)
.copied()
}
/// Returns an iteratorr over the struct fields
pub fn iter(&self) -> Iter<NestedField> {
self.fields.iter()

/// Get fields.
pub fn fields(&self) -> &[NestedFieldRef] {
&self.fields
}
}

Expand Down Expand Up @@ -352,6 +367,9 @@ pub struct NestedField {
pub write_default: Option<String>,
}

/// Reference to nested field.
pub type NestedFieldRef = Arc<NestedField>;

impl NestedField {
/// Construct a required field.
pub fn required(id: i32, name: impl ToString, field_type: Type) -> Self {
Expand Down Expand Up @@ -443,13 +461,15 @@ impl fmt::Display for NestedField {
/// Elements can be either optional or required. Element types may be any type.
pub struct ListType {
/// Element field of list type.
pub element_field: NestedField,
pub element_field: NestedFieldRef,
}

/// Module for type serialization/deserialization.
pub(super) mod _serde {
use crate::spec::datatypes::Type::Map;
use crate::spec::datatypes::{ListType, MapType, NestedField, PrimitiveType, StructType, Type};
use crate::spec::datatypes::{
ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, StructType, Type,
};
use serde_derive::{Deserialize, Serialize};
use std::borrow::Cow;

Expand All @@ -466,7 +486,7 @@ pub(super) mod _serde {
},
Struct {
r#type: String,
fields: Cow<'a, Vec<NestedField>>,
fields: Cow<'a, Vec<NestedFieldRef>>,
},
#[serde(rename_all = "kebab-case")]
Map {
Expand All @@ -493,7 +513,8 @@ pub(super) mod _serde {
element_id,
element.into_owned(),
element_required,
),
)
.into(),
}),
SerdeType::Map {
r#type: _,
Expand All @@ -503,12 +524,13 @@ pub(super) mod _serde {
value_required,
value,
} => Map(MapType {
key_field: NestedField::map_key_element(key_id, key.into_owned()),
key_field: NestedField::map_key_element(key_id, key.into_owned()).into(),
value_field: NestedField::map_value_element(
value_id,
value.into_owned(),
value_required,
),
)
.into(),
}),
SerdeType::Struct { r#type: _, fields } => {
Self::Struct(StructType::new(fields.into_owned()))
Expand Down Expand Up @@ -552,9 +574,9 @@ pub(super) mod _serde {
/// Both map keys and map values may be any type, including nested types.
pub struct MapType {
/// Field for key.
pub key_field: NestedField,
pub key_field: NestedFieldRef,
/// Field for value.
pub value_field: NestedField,
pub value_field: NestedFieldRef,
}

#[cfg(test)]
Expand Down Expand Up @@ -598,7 +620,8 @@ mod tests {
precision: 9,
scale: 2,
}),
)],
)
.into()],
id_lookup: OnceCell::default(),
}),
)
Expand Down Expand Up @@ -627,7 +650,8 @@ mod tests {
1,
"id",
Type::Primitive(PrimitiveType::Fixed(8)),
)],
)
.into()],
id_lookup: OnceCell::default(),
}),
)
Expand Down Expand Up @@ -662,8 +686,9 @@ mod tests {
fields: vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Uuid))
.with_initial_default("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb")
.with_write_default("ec5911be-b0a7-458c-8438-c9a3e53cffae"),
NestedField::optional(2, "data", Type::Primitive(PrimitiveType::Int)),
.with_write_default("ec5911be-b0a7-458c-8438-c9a3e53cffae")
.into(),
NestedField::optional(2, "data", Type::Primitive(PrimitiveType::Int)).into(),
],
id_lookup: HashMap::from([(1, 0), (2, 1)]).into(),
}),
Expand Down Expand Up @@ -725,17 +750,21 @@ mod tests {
let struct_type = Type::Struct(StructType::new(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Uuid))
.with_initial_default("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb")
.with_write_default("ec5911be-b0a7-458c-8438-c9a3e53cffae"),
NestedField::optional(2, "data", Type::Primitive(PrimitiveType::Int)),
.with_write_default("ec5911be-b0a7-458c-8438-c9a3e53cffae")
.into(),
NestedField::optional(2, "data", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(
3,
"address",
Type::Struct(StructType::new(vec![
NestedField::required(4, "street", Type::Primitive(PrimitiveType::String)),
NestedField::optional(5, "province", Type::Primitive(PrimitiveType::String)),
NestedField::required(6, "zip", Type::Primitive(PrimitiveType::Int)),
NestedField::required(4, "street", Type::Primitive(PrimitiveType::String))
.into(),
NestedField::optional(5, "province", Type::Primitive(PrimitiveType::String))
.into(),
NestedField::required(6, "zip", Type::Primitive(PrimitiveType::Int)).into(),
])),
),
)
.into(),
]));

check_type_serde(record, struct_type)
Expand All @@ -759,7 +788,8 @@ mod tests {
3,
Type::Primitive(PrimitiveType::String),
true,
),
)
.into(),
}),
);
}
Expand All @@ -780,12 +810,14 @@ mod tests {
check_type_serde(
record,
Type::Map(MapType {
key_field: NestedField::map_key_element(4, Type::Primitive(PrimitiveType::String)),
key_field: NestedField::map_key_element(4, Type::Primitive(PrimitiveType::String))
.into(),
value_field: NestedField::map_value_element(
5,
Type::Primitive(PrimitiveType::Double),
false,
),
)
.into(),
}),
);
}
Expand All @@ -806,12 +838,14 @@ mod tests {
check_type_serde(
record,
Type::Map(MapType {
key_field: NestedField::map_key_element(4, Type::Primitive(PrimitiveType::Int)),
key_field: NestedField::map_key_element(4, Type::Primitive(PrimitiveType::Int))
.into(),
value_field: NestedField::map_value_element(
5,
Type::Primitive(PrimitiveType::String),
false,
),
)
.into(),
}),
);
}
Expand Down
Loading

0 comments on commit 1a186d8

Please sign in to comment.