Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Partition Binding and safe PartitionSpecBuilder #491

Merged
merged 15 commits into from
Aug 14, 2024
3 changes: 1 addition & 2 deletions crates/catalog/memory/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,8 @@ mod tests {

assert_eq!(metadata.current_schema().as_ref(), expected_schema);

let expected_partition_spec = PartitionSpec::builder()
let expected_partition_spec = PartitionSpec::builder(expected_schema)
.with_spec_id(0)
.with_fields(vec![])
.build()
.unwrap();

Expand Down
6 changes: 3 additions & 3 deletions crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1467,13 +1467,13 @@ mod tests {
.properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
.partition_spec(
UnboundPartitionSpec::builder()
.with_fields(vec![UnboundPartitionField::builder()
.add_partition_fields(vec![UnboundPartitionField::builder()
.source_id(1)
.transform(Transform::Truncate(3))
.name("id".to_string())
.build()])
.build()
.unwrap(),
.unwrap()
.build(),
)
.sort_order(
SortOrder::builder()
Expand Down
34 changes: 9 additions & 25 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ pub struct TableCreation {
/// The schema of the table.
pub schema: Schema,
/// The partition spec of the table, could be None.
#[builder(default, setter(strip_option))]
#[builder(default, setter(strip_option, into))]
pub partition_spec: Option<UnboundPartitionSpec>,
/// The sort order of the table.
#[builder(default, setter(strip_option))]
Expand Down Expand Up @@ -476,7 +476,7 @@ mod tests {
use crate::spec::{
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary,
TableMetadataBuilder, Transform, Type, UnboundPartitionField, UnboundPartitionSpec,
TableMetadataBuilder, Transform, Type, UnboundPartitionSpec,
};
use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement, TableUpdate};

Expand Down Expand Up @@ -820,29 +820,13 @@ mod tests {
"#,
TableUpdate::AddSpec {
spec: UnboundPartitionSpec::builder()
.with_unbound_partition_field(
UnboundPartitionField::builder()
.source_id(4)
.name("ts_day".to_string())
.transform(Transform::Day)
.build(),
)
.with_unbound_partition_field(
UnboundPartitionField::builder()
.source_id(1)
.name("id_bucket".to_string())
.transform(Transform::Bucket(16))
.build(),
)
.with_unbound_partition_field(
UnboundPartitionField::builder()
.source_id(2)
.name("id_truncate".to_string())
.transform(Transform::Truncate(4))
.build(),
)
.build()
.unwrap(),
.add_partition_field(4, "ts_day".to_string(), Transform::Day)
.unwrap()
.add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16))
.unwrap()
.add_partition_field(2, "id_truncate".to_string(), Transform::Truncate(4))
.unwrap()
.build(),
},
);
}
Expand Down
14 changes: 8 additions & 6 deletions crates/iceberg/src/expr/visitors/expression_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,9 @@ mod tests {
UnaryExpression,
};
use crate::spec::{
DataContentType, DataFile, DataFileFormat, Datum, Literal, NestedField, PartitionField,
PartitionSpec, PartitionSpecRef, PrimitiveType, Schema, SchemaRef, Struct, Transform, Type,
DataContentType, DataFile, DataFileFormat, Datum, Literal, NestedField, PartitionSpec,
PartitionSpecRef, PrimitiveType, Schema, SchemaRef, Struct, Transform, Type,
UnboundPartitionField,
};
use crate::Result;

Expand All @@ -274,14 +275,15 @@ mod tests {
))])
.build()?;

let spec = PartitionSpec::builder()
let spec = PartitionSpec::builder(&schema)
.with_spec_id(1)
.with_fields(vec![PartitionField::builder()
.add_unbound_fields(vec![UnboundPartitionField::builder()
.source_id(1)
.name("a".to_string())
.field_id(1)
.partition_id(1)
.transform(Transform::Identity)
.build()])
.unwrap()
.build()
.unwrap();

Expand All @@ -298,7 +300,7 @@ mod tests {
let partition_fields = partition_type.fields().to_owned();

let partition_schema = Schema::builder()
.with_schema_id(partition_spec.spec_id)
.with_schema_id(partition_spec.spec_id())
.with_fields(partition_fields)
.build()?;

Expand Down
11 changes: 6 additions & 5 deletions crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,8 @@ mod test {
UnaryExpression,
};
use crate::spec::{
DataContentType, DataFile, DataFileFormat, Datum, NestedField, PartitionField,
PartitionSpec, PrimitiveType, Schema, Struct, Transform, Type,
DataContentType, DataFile, DataFileFormat, Datum, NestedField, PartitionSpec,
PrimitiveType, Schema, Struct, Transform, Type, UnboundPartitionField,
};

const INT_MIN_VALUE: i32 = 30;
Expand Down Expand Up @@ -1656,14 +1656,15 @@ mod test {
.unwrap();
let table_schema_ref = Arc::new(table_schema);

let partition_spec = PartitionSpec::builder()
let partition_spec = PartitionSpec::builder(&table_schema_ref)
.with_spec_id(1)
.with_fields(vec![PartitionField::builder()
.add_unbound_fields(vec![UnboundPartitionField::builder()
.source_id(1)
.name("a".to_string())
.field_id(1)
.partition_id(1)
.transform(Transform::Identity)
.build()])
.unwrap()
.build()
.unwrap();
let partition_spec_ref = Arc::new(partition_spec);
Expand Down
110 changes: 59 additions & 51 deletions crates/iceberg/src/expr/visitors/inclusive_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl InclusiveProjection {
fn get_parts_for_field_id(&mut self, field_id: i32) -> &Vec<PartitionField> {
if let std::collections::hash_map::Entry::Vacant(e) = self.cached_parts.entry(field_id) {
let mut parts: Vec<PartitionField> = vec![];
for partition_spec_field in &self.partition_spec.fields {
for partition_spec_field in self.partition_spec.fields() {
if partition_spec_field.source_id == field_id {
parts.push(partition_spec_field.clone())
}
Expand Down Expand Up @@ -236,6 +236,7 @@ mod tests {
use crate::expr::{Bind, Predicate, Reference};
use crate::spec::{
Datum, NestedField, PartitionField, PartitionSpec, PrimitiveType, Schema, Transform, Type,
UnboundPartitionField,
};

fn build_test_schema() -> Schema {
Expand Down Expand Up @@ -265,9 +266,8 @@ mod tests {
fn test_inclusive_projection_logic_ops() {
let schema = build_test_schema();

let partition_spec = PartitionSpec::builder()
let partition_spec = PartitionSpec::builder(&schema)
.with_spec_id(1)
.with_fields(vec![])
.build()
.unwrap();

Expand Down Expand Up @@ -296,14 +296,17 @@ mod tests {
fn test_inclusive_projection_identity_transform() {
let schema = build_test_schema();

let partition_spec = PartitionSpec::builder()
let partition_spec = PartitionSpec::builder(&schema)
.with_spec_id(1)
.with_fields(vec![PartitionField::builder()
.source_id(1)
.name("a".to_string())
.field_id(1)
.transform(Transform::Identity)
.build()])
.add_unbound_field(
UnboundPartitionField::builder()
.source_id(1)
.name("a".to_string())
.partition_id(1)
.transform(Transform::Identity)
.build(),
)
.unwrap()
.build()
.unwrap();

Expand All @@ -330,30 +333,29 @@ mod tests {
fn test_inclusive_projection_date_transforms() {
let schema = build_test_schema();

let partition_spec = PartitionSpec::builder()
.with_spec_id(1)
.with_fields(vec![
PartitionField::builder()
.source_id(2)
.name("year".to_string())
.field_id(2)
.transform(Transform::Year)
.build(),
PartitionField::builder()
.source_id(2)
.name("month".to_string())
.field_id(2)
.transform(Transform::Month)
.build(),
PartitionField::builder()
.source_id(2)
.name("day".to_string())
.field_id(2)
.transform(Transform::Day)
.build(),
])
.build()
.unwrap();
let partition_spec = PartitionSpec {
spec_id: 1,
fields: vec![
PartitionField {
source_id: 2,
name: "year".to_string(),
field_id: 1000,
transform: Transform::Year,
},
PartitionField {
source_id: 2,
name: "month".to_string(),
field_id: 1001,
transform: Transform::Month,
},
PartitionField {
source_id: 2,
name: "day".to_string(),
field_id: 1002,
transform: Transform::Day,
},
],
};

let arc_schema = Arc::new(schema);
let arc_partition_spec = Arc::new(partition_spec);
Expand All @@ -378,14 +380,17 @@ mod tests {
fn test_inclusive_projection_truncate_transform() {
let schema = build_test_schema();

let partition_spec = PartitionSpec::builder()
let partition_spec = PartitionSpec::builder(&schema)
.with_spec_id(1)
.with_fields(vec![PartitionField::builder()
.source_id(3)
.name("name".to_string())
.field_id(3)
.transform(Transform::Truncate(4))
.build()])
.add_unbound_field(
UnboundPartitionField::builder()
.source_id(3)
.name("name_truncate".to_string())
.partition_id(3)
.transform(Transform::Truncate(4))
.build(),
)
.unwrap()
.build()
.unwrap();

Expand All @@ -398,15 +403,15 @@ mod tests {

// applying InclusiveProjection to bound_predicate
// should result in the 'name STARTS WITH "Testy McTest"'
// predicate being transformed to 'name STARTS WITH "Test"',
// predicate being transformed to 'name_truncate STARTS WITH "Test"',
// since a `Truncate(4)` partition will map values of
// name that start with "Testy McTest" into a partition
// for values of name that start with the first four letters
// of that, ie "Test".
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone());
let result = inclusive_projection.project(&bound_predicate).unwrap();

let expected = "name STARTS WITH \"Test\"".to_string();
let expected = "name_truncate STARTS WITH \"Test\"".to_string();

assert_eq!(result.to_string(), expected)
}
Expand All @@ -415,14 +420,17 @@ mod tests {
fn test_inclusive_projection_bucket_transform() {
let schema = build_test_schema();

let partition_spec = PartitionSpec::builder()
let partition_spec = PartitionSpec::builder(&schema)
.with_spec_id(1)
.with_fields(vec![PartitionField::builder()
.source_id(1)
.name("a".to_string())
.field_id(1)
.transform(Transform::Bucket(7))
.build()])
.add_unbound_field(
UnboundPartitionField::builder()
.source_id(1)
.name("a_bucket[7]".to_string())
.partition_id(1)
.transform(Transform::Bucket(7))
.build(),
)
.unwrap()
.build()
.unwrap();

Expand All @@ -440,7 +448,7 @@ mod tests {
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone());
let result = inclusive_projection.project(&bound_predicate).unwrap();

let expected = "a = 2".to_string();
let expected = "a_bucket[7] = 2".to_string();

assert_eq!(result.to_string(), expected)
}
Expand Down
8 changes: 4 additions & 4 deletions crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,14 @@ impl ManifestWriter {
)?;
avro_writer.add_user_metadata(
"partition-spec".to_string(),
to_vec(&manifest.metadata.partition_spec.fields).map_err(|err| {
to_vec(&manifest.metadata.partition_spec.fields()).map_err(|err| {
Error::new(ErrorKind::DataInvalid, "Fail to serialize partition spec")
.with_source(err)
})?,
)?;
avro_writer.add_user_metadata(
"partition-spec-id".to_string(),
manifest.metadata.partition_spec.spec_id.to_string(),
manifest.metadata.partition_spec.spec_id().to_string(),
)?;
avro_writer.add_user_metadata(
"format-version".to_string(),
Expand Down Expand Up @@ -300,12 +300,12 @@ impl ManifestWriter {
self.output.write(Bytes::from(content)).await?;

let partition_summary =
self.get_field_summary_vec(&manifest.metadata.partition_spec.fields);
self.get_field_summary_vec(manifest.metadata.partition_spec.fields());

Ok(ManifestFile {
manifest_path: self.output.location().to_string(),
manifest_length: length as i64,
partition_spec_id: manifest.metadata.partition_spec.spec_id,
partition_spec_id: manifest.metadata.partition_spec.spec_id(),
content: manifest.metadata.content,
// sequence_number and min_sequence_number with UNASSIGNED_SEQUENCE_NUMBER will be replace with
// real sequence number in `ManifestListWriter`.
Expand Down
Loading
Loading