diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index d86bbfedc..44086f8d3 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -358,9 +358,8 @@ mod tests { assert_eq!(metadata.current_schema().as_ref(), expected_schema); - let expected_partition_spec = PartitionSpec::builder() + let expected_partition_spec = PartitionSpec::builder(expected_schema) .with_spec_id(0) - .with_fields(vec![]) .build() .unwrap(); diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 30f2e29d2..d74c8de06 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -1467,13 +1467,13 @@ mod tests { .properties(HashMap::from([("owner".to_string(), "testx".to_string())])) .partition_spec( UnboundPartitionSpec::builder() - .with_fields(vec![UnboundPartitionField::builder() + .add_partition_fields(vec![UnboundPartitionField::builder() .source_id(1) .transform(Transform::Truncate(3)) .name("id".to_string()) .build()]) - .build() - .unwrap(), + .unwrap() + .build(), ) .sort_order( SortOrder::builder() diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index bc987725e..aa2311b4a 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -229,7 +229,7 @@ pub struct TableCreation { /// The schema of the table. pub schema: Schema, /// The partition spec of the table, could be None. - #[builder(default, setter(strip_option))] + #[builder(default, setter(strip_option, into))] pub partition_spec: Option, /// The sort order of the table. #[builder(default, setter(strip_option))] @@ -476,7 +476,7 @@ mod tests { use crate::spec::{ FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary, - TableMetadataBuilder, Transform, Type, UnboundPartitionField, UnboundPartitionSpec, + TableMetadataBuilder, Transform, Type, UnboundPartitionSpec, }; use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement, TableUpdate}; @@ -820,29 +820,13 @@ mod tests { "#, TableUpdate::AddSpec { spec: UnboundPartitionSpec::builder() - .with_unbound_partition_field( - UnboundPartitionField::builder() - .source_id(4) - .name("ts_day".to_string()) - .transform(Transform::Day) - .build(), - ) - .with_unbound_partition_field( - UnboundPartitionField::builder() - .source_id(1) - .name("id_bucket".to_string()) - .transform(Transform::Bucket(16)) - .build(), - ) - .with_unbound_partition_field( - UnboundPartitionField::builder() - .source_id(2) - .name("id_truncate".to_string()) - .transform(Transform::Truncate(4)) - .build(), - ) - .build() - .unwrap(), + .add_partition_field(4, "ts_day".to_string(), Transform::Day) + .unwrap() + .add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16)) + .unwrap() + .add_partition_field(2, "id_truncate".to_string(), Transform::Truncate(4)) + .unwrap() + .build(), }, ); } diff --git a/crates/iceberg/src/expr/visitors/expression_evaluator.rs b/crates/iceberg/src/expr/visitors/expression_evaluator.rs index 3700a9b3d..d8a47ec48 100644 --- a/crates/iceberg/src/expr/visitors/expression_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/expression_evaluator.rs @@ -258,8 +258,9 @@ mod tests { UnaryExpression, }; use crate::spec::{ - DataContentType, DataFile, DataFileFormat, Datum, Literal, NestedField, PartitionField, - PartitionSpec, PartitionSpecRef, PrimitiveType, Schema, SchemaRef, Struct, Transform, Type, + DataContentType, DataFile, DataFileFormat, Datum, Literal, NestedField, PartitionSpec, + PartitionSpecRef, PrimitiveType, Schema, SchemaRef, Struct, Transform, Type, + UnboundPartitionField, }; use crate::Result; @@ -274,14 +275,15 @@ mod tests { ))]) .build()?; - let spec = PartitionSpec::builder() + let spec = PartitionSpec::builder(&schema) .with_spec_id(1) - .with_fields(vec![PartitionField::builder() + .add_unbound_fields(vec![UnboundPartitionField::builder() .source_id(1) .name("a".to_string()) - .field_id(1) + .partition_id(1) .transform(Transform::Identity) .build()]) + .unwrap() .build() .unwrap(); @@ -298,7 +300,7 @@ mod tests { let partition_fields = partition_type.fields().to_owned(); let partition_schema = Schema::builder() - .with_schema_id(partition_spec.spec_id) + .with_schema_id(partition_spec.spec_id()) .with_fields(partition_fields) .build()?; diff --git a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs index 430ebfc1a..e8e7337ac 100644 --- a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs @@ -495,8 +495,8 @@ mod test { UnaryExpression, }; use crate::spec::{ - DataContentType, DataFile, DataFileFormat, Datum, NestedField, PartitionField, - PartitionSpec, PrimitiveType, Schema, Struct, Transform, Type, + DataContentType, DataFile, DataFileFormat, Datum, NestedField, PartitionSpec, + PrimitiveType, Schema, Struct, Transform, Type, UnboundPartitionField, }; const INT_MIN_VALUE: i32 = 30; @@ -1656,14 +1656,15 @@ mod test { .unwrap(); let table_schema_ref = Arc::new(table_schema); - let partition_spec = PartitionSpec::builder() + let partition_spec = PartitionSpec::builder(&table_schema_ref) .with_spec_id(1) - .with_fields(vec![PartitionField::builder() + .add_unbound_fields(vec![UnboundPartitionField::builder() .source_id(1) .name("a".to_string()) - .field_id(1) + .partition_id(1) .transform(Transform::Identity) .build()]) + .unwrap() .build() .unwrap(); let partition_spec_ref = Arc::new(partition_spec); diff --git a/crates/iceberg/src/expr/visitors/inclusive_projection.rs b/crates/iceberg/src/expr/visitors/inclusive_projection.rs index 9cfbb4fd8..716f0869a 100644 --- a/crates/iceberg/src/expr/visitors/inclusive_projection.rs +++ b/crates/iceberg/src/expr/visitors/inclusive_projection.rs @@ -40,7 +40,7 @@ impl InclusiveProjection { fn get_parts_for_field_id(&mut self, field_id: i32) -> &Vec { if let std::collections::hash_map::Entry::Vacant(e) = self.cached_parts.entry(field_id) { let mut parts: Vec = vec![]; - for partition_spec_field in &self.partition_spec.fields { + for partition_spec_field in self.partition_spec.fields() { if partition_spec_field.source_id == field_id { parts.push(partition_spec_field.clone()) } @@ -236,6 +236,7 @@ mod tests { use crate::expr::{Bind, Predicate, Reference}; use crate::spec::{ Datum, NestedField, PartitionField, PartitionSpec, PrimitiveType, Schema, Transform, Type, + UnboundPartitionField, }; fn build_test_schema() -> Schema { @@ -265,9 +266,8 @@ mod tests { fn test_inclusive_projection_logic_ops() { let schema = build_test_schema(); - let partition_spec = PartitionSpec::builder() + let partition_spec = PartitionSpec::builder(&schema) .with_spec_id(1) - .with_fields(vec![]) .build() .unwrap(); @@ -296,14 +296,17 @@ mod tests { fn test_inclusive_projection_identity_transform() { let schema = build_test_schema(); - let partition_spec = PartitionSpec::builder() + let partition_spec = PartitionSpec::builder(&schema) .with_spec_id(1) - .with_fields(vec![PartitionField::builder() - .source_id(1) - .name("a".to_string()) - .field_id(1) - .transform(Transform::Identity) - .build()]) + .add_unbound_field( + UnboundPartitionField::builder() + .source_id(1) + .name("a".to_string()) + .partition_id(1) + .transform(Transform::Identity) + .build(), + ) + .unwrap() .build() .unwrap(); @@ -330,30 +333,29 @@ mod tests { fn test_inclusive_projection_date_transforms() { let schema = build_test_schema(); - let partition_spec = PartitionSpec::builder() - .with_spec_id(1) - .with_fields(vec![ - PartitionField::builder() - .source_id(2) - .name("year".to_string()) - .field_id(2) - .transform(Transform::Year) - .build(), - PartitionField::builder() - .source_id(2) - .name("month".to_string()) - .field_id(2) - .transform(Transform::Month) - .build(), - PartitionField::builder() - .source_id(2) - .name("day".to_string()) - .field_id(2) - .transform(Transform::Day) - .build(), - ]) - .build() - .unwrap(); + let partition_spec = PartitionSpec { + spec_id: 1, + fields: vec![ + PartitionField { + source_id: 2, + name: "year".to_string(), + field_id: 1000, + transform: Transform::Year, + }, + PartitionField { + source_id: 2, + name: "month".to_string(), + field_id: 1001, + transform: Transform::Month, + }, + PartitionField { + source_id: 2, + name: "day".to_string(), + field_id: 1002, + transform: Transform::Day, + }, + ], + }; let arc_schema = Arc::new(schema); let arc_partition_spec = Arc::new(partition_spec); @@ -378,14 +380,17 @@ mod tests { fn test_inclusive_projection_truncate_transform() { let schema = build_test_schema(); - let partition_spec = PartitionSpec::builder() + let partition_spec = PartitionSpec::builder(&schema) .with_spec_id(1) - .with_fields(vec![PartitionField::builder() - .source_id(3) - .name("name".to_string()) - .field_id(3) - .transform(Transform::Truncate(4)) - .build()]) + .add_unbound_field( + UnboundPartitionField::builder() + .source_id(3) + .name("name_truncate".to_string()) + .partition_id(3) + .transform(Transform::Truncate(4)) + .build(), + ) + .unwrap() .build() .unwrap(); @@ -398,7 +403,7 @@ mod tests { // applying InclusiveProjection to bound_predicate // should result in the 'name STARTS WITH "Testy McTest"' - // predicate being transformed to 'name STARTS WITH "Test"', + // predicate being transformed to 'name_truncate STARTS WITH "Test"', // since a `Truncate(4)` partition will map values of // name that start with "Testy McTest" into a partition // for values of name that start with the first four letters @@ -406,7 +411,7 @@ mod tests { let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone()); let result = inclusive_projection.project(&bound_predicate).unwrap(); - let expected = "name STARTS WITH \"Test\"".to_string(); + let expected = "name_truncate STARTS WITH \"Test\"".to_string(); assert_eq!(result.to_string(), expected) } @@ -415,14 +420,17 @@ mod tests { fn test_inclusive_projection_bucket_transform() { let schema = build_test_schema(); - let partition_spec = PartitionSpec::builder() + let partition_spec = PartitionSpec::builder(&schema) .with_spec_id(1) - .with_fields(vec![PartitionField::builder() - .source_id(1) - .name("a".to_string()) - .field_id(1) - .transform(Transform::Bucket(7)) - .build()]) + .add_unbound_field( + UnboundPartitionField::builder() + .source_id(1) + .name("a_bucket[7]".to_string()) + .partition_id(1) + .transform(Transform::Bucket(7)) + .build(), + ) + .unwrap() .build() .unwrap(); @@ -440,7 +448,7 @@ mod tests { let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone()); let result = inclusive_projection.project(&bound_predicate).unwrap(); - let expected = "a = 2".to_string(); + let expected = "a_bucket[7] = 2".to_string(); assert_eq!(result.to_string(), expected) } diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index e2f8251c1..14b8a8000 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -227,14 +227,14 @@ impl ManifestWriter { )?; avro_writer.add_user_metadata( "partition-spec".to_string(), - to_vec(&manifest.metadata.partition_spec.fields).map_err(|err| { + to_vec(&manifest.metadata.partition_spec.fields()).map_err(|err| { Error::new(ErrorKind::DataInvalid, "Fail to serialize partition spec") .with_source(err) })?, )?; avro_writer.add_user_metadata( "partition-spec-id".to_string(), - manifest.metadata.partition_spec.spec_id.to_string(), + manifest.metadata.partition_spec.spec_id().to_string(), )?; avro_writer.add_user_metadata( "format-version".to_string(), @@ -300,12 +300,12 @@ impl ManifestWriter { self.output.write(Bytes::from(content)).await?; let partition_summary = - self.get_field_summary_vec(&manifest.metadata.partition_spec.fields); + self.get_field_summary_vec(manifest.metadata.partition_spec.fields()); Ok(ManifestFile { manifest_path: self.output.location().to_string(), manifest_length: length as i64, - partition_spec_id: manifest.metadata.partition_spec.spec_id, + partition_spec_id: manifest.metadata.partition_spec.spec_id(), content: manifest.metadata.content, // sequence_number and min_sequence_number with UNASSIGNED_SEQUENCE_NUMBER will be replace with // real sequence number in `ManifestListWriter`. diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index f1244e4e9..055b48e68 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -25,7 +25,10 @@ use typed_builder::TypedBuilder; use super::transform::Transform; use super::{NestedField, Schema, StructType}; -use crate::{Error, ErrorKind}; +use crate::{Error, ErrorKind, Result}; + +pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999; +pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0; /// Reference to [`PartitionSpec`]. pub type PartitionSpecRef = Arc; @@ -44,22 +47,37 @@ pub struct PartitionField { pub transform: Transform, } +impl PartitionField { + /// To unbound partition field + pub fn into_unbound(self) -> UnboundPartitionField { + self.into() + } +} + /// Partition spec that defines how to produce a tuple of partition values from a record. -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)] #[serde(rename_all = "kebab-case")] -#[builder(setter(prefix = "with"))] pub struct PartitionSpec { /// Identifier for PartitionSpec - pub spec_id: i32, + pub(crate) spec_id: i32, /// Details of the partition spec - #[builder(setter(each(name = "with_partition_field")))] - pub fields: Vec, + pub(crate) fields: Vec, } impl PartitionSpec { /// Create partition spec builer - pub fn builder() -> PartitionSpecBuilder { - PartitionSpecBuilder::default() + pub fn builder(schema: &Schema) -> PartitionSpecBuilder { + PartitionSpecBuilder::new(schema) + } + + /// Spec id of the partition spec + pub fn spec_id(&self) -> i32 { + self.spec_id + } + + /// Fields of the partition spec + pub fn fields(&self) -> &[PartitionField] { + &self.fields } /// Returns if the partition spec is unpartitioned. @@ -74,7 +92,7 @@ impl PartitionSpec { } /// Returns the partition type of this partition spec. - pub fn partition_type(&self, schema: &Schema) -> Result { + pub fn partition_type(&self, schema: &Schema) -> Result { let mut fields = Vec::with_capacity(self.fields.len()); for partition_field in &self.fields { let field = schema @@ -96,6 +114,13 @@ impl PartitionSpec { } Ok(StructType::new(fields)) } + + /// Turn this partition spec into an unbound partition spec. + /// + /// The `field_id` is retained as `partition_id` in the unbound partition spec. + pub fn to_unbound(self) -> UnboundPartitionSpec { + self.into() + } } /// Reference to [`UnboundPartitionSpec`]. @@ -117,16 +142,13 @@ pub struct UnboundPartitionField { } /// Unbound partition spec can be built without a schema and later bound to a schema. -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)] #[serde(rename_all = "kebab-case")] -#[builder(setter(prefix = "with"))] pub struct UnboundPartitionSpec { /// Identifier for PartitionSpec - #[builder(default, setter(strip_option))] - pub spec_id: Option, + pub(crate) spec_id: Option, /// Details of the partition spec - #[builder(setter(each(name = "with_unbound_partition_field")))] - pub fields: Vec, + pub(crate) fields: Vec, } impl UnboundPartitionSpec { @@ -134,6 +156,424 @@ impl UnboundPartitionSpec { pub fn builder() -> UnboundPartitionSpecBuilder { UnboundPartitionSpecBuilder::default() } + + /// Bind this unbound partition spec to a schema. + pub fn bind(self, schema: &Schema) -> Result { + PartitionSpecBuilder::new_from_unbound(self, schema)?.build() + } + + /// Spec id of the partition spec + pub fn spec_id(&self) -> Option { + self.spec_id + } + + /// Fields of the partition spec + pub fn fields(&self) -> &[UnboundPartitionField] { + &self.fields + } +} + +impl From for UnboundPartitionField { + fn from(field: PartitionField) -> Self { + UnboundPartitionField { + source_id: field.source_id, + partition_id: Some(field.field_id), + name: field.name, + transform: field.transform, + } + } +} + +impl From for UnboundPartitionSpec { + fn from(spec: PartitionSpec) -> Self { + UnboundPartitionSpec { + spec_id: Some(spec.spec_id), + fields: spec.fields.into_iter().map(Into::into).collect(), + } + } +} + +/// Create a new UnboundPartitionSpec +#[derive(Debug, Default)] +pub struct UnboundPartitionSpecBuilder { + spec_id: Option, + fields: Vec, +} + +impl UnboundPartitionSpecBuilder { + /// Create a new partition spec builder with the given schema. + pub fn new() -> Self { + Self { + spec_id: None, + fields: vec![], + } + } + + /// Set the spec id for the partition spec. + pub fn with_spec_id(mut self, spec_id: i32) -> Self { + self.spec_id = Some(spec_id); + self + } + + /// Add a new partition field to the partition spec from an unbound partition field. + pub fn add_partition_field( + self, + source_id: i32, + target_name: impl ToString, + transformation: Transform, + ) -> Result { + let field = UnboundPartitionField { + source_id, + partition_id: None, + name: target_name.to_string(), + transform: transformation, + }; + self.add_partition_field_internal(field) + } + + /// Add multiple partition fields to the partition spec. + pub fn add_partition_fields( + self, + fields: impl IntoIterator, + ) -> Result { + let mut builder = self; + for field in fields { + builder = builder.add_partition_field_internal(field)?; + } + Ok(builder) + } + + fn add_partition_field_internal(mut self, field: UnboundPartitionField) -> Result { + self.check_name_set_and_unique(&field.name)?; + self.check_for_redundant_partitions(field.source_id, &field.transform)?; + if let Some(partition_id) = field.partition_id { + self.check_partition_id_unique(partition_id)?; + } + self.fields.push(field); + Ok(self) + } + + /// Build the unbound partition spec. + pub fn build(self) -> UnboundPartitionSpec { + UnboundPartitionSpec { + spec_id: self.spec_id, + fields: self.fields, + } + } +} + +/// Create valid partition specs for a given schema. +#[derive(Debug)] +pub struct PartitionSpecBuilder<'a> { + spec_id: Option, + last_assigned_field_id: i32, + fields: Vec, + schema: &'a Schema, +} + +impl<'a> PartitionSpecBuilder<'a> { + /// Create a new partition spec builder with the given schema. + pub fn new(schema: &'a Schema) -> Self { + Self { + spec_id: None, + fields: vec![], + last_assigned_field_id: UNPARTITIONED_LAST_ASSIGNED_ID, + schema, + } + } + + /// Create a new partition spec builder from an existing unbound partition spec. + pub fn new_from_unbound(unbound: UnboundPartitionSpec, schema: &'a Schema) -> Result { + let mut builder = + Self::new(schema).with_spec_id(unbound.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID)); + + for field in unbound.fields { + builder = builder.add_unbound_field(field)?; + } + Ok(builder) + } + + /// Set the last assigned field id for the partition spec. + /// + /// Set this field when a new partition spec is created for an existing TableMetaData. + /// As `field_id` must be unique in V2 metadata, this should be set to + /// the highest field id used previously. + pub fn with_last_assigned_field_id(mut self, last_assigned_field_id: i32) -> Self { + self.last_assigned_field_id = last_assigned_field_id; + self + } + + /// Set the spec id for the partition spec. + pub fn with_spec_id(mut self, spec_id: i32) -> Self { + self.spec_id = Some(spec_id); + self + } + + /// Add a new partition field to the partition spec. + pub fn add_partition_field( + self, + source_name: impl AsRef, + target_name: impl Into, + transform: Transform, + ) -> Result { + let source_id = self + .schema + .field_by_name(source_name.as_ref()) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot find source column with name: {} in schema", + source_name.as_ref() + ), + ) + })? + .id; + let field = UnboundPartitionField { + source_id, + partition_id: None, + name: target_name.into(), + transform, + }; + + self.add_unbound_field(field) + } + + /// Add a new partition field to the partition spec. + /// + /// If `partition_id` is set, it is used as the field id. + /// Otherwise, a new `field_id` is assigned. + pub fn add_unbound_field(mut self, field: UnboundPartitionField) -> Result { + self.check_name_set_and_unique(&field.name)?; + self.check_for_redundant_partitions(field.source_id, &field.transform)?; + Self::check_name_does_not_collide_with_schema(&field, self.schema)?; + Self::check_transform_compatibility(&field, self.schema)?; + if let Some(partition_id) = field.partition_id { + self.check_partition_id_unique(partition_id)?; + } + + // Non-fallible from here + self.fields.push(field); + Ok(self) + } + + /// Wrapper around `with_unbound_fields` to add multiple partition fields. + pub fn add_unbound_fields( + self, + fields: impl IntoIterator, + ) -> Result { + let mut builder = self; + for field in fields { + builder = builder.add_unbound_field(field)?; + } + Ok(builder) + } + + /// Build a bound partition spec with the given schema. + pub fn build(self) -> Result { + let fields = Self::set_field_ids(self.fields, self.last_assigned_field_id)?; + Ok(PartitionSpec { + spec_id: self.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID), + fields, + }) + } + + fn set_field_ids( + fields: Vec, + last_assigned_field_id: i32, + ) -> Result> { + let mut last_assigned_field_id = last_assigned_field_id; + // Already assigned partition ids. If we see one of these during iteration, + // we skip it. + let assigned_ids = fields + .iter() + .filter_map(|f| f.partition_id) + .collect::>(); + + fn _check_add_1(prev: i32) -> Result { + prev.checked_add(1).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Cannot assign more partition ids. Overflow.", + ) + }) + } + + let mut bound_fields = Vec::with_capacity(fields.len()); + for field in fields.into_iter() { + let partition_id = if let Some(partition_id) = field.partition_id { + last_assigned_field_id = std::cmp::max(last_assigned_field_id, partition_id); + partition_id + } else { + last_assigned_field_id = _check_add_1(last_assigned_field_id)?; + while assigned_ids.contains(&last_assigned_field_id) { + last_assigned_field_id = _check_add_1(last_assigned_field_id)?; + } + last_assigned_field_id + }; + + bound_fields.push(PartitionField { + source_id: field.source_id, + field_id: partition_id, + name: field.name, + transform: field.transform, + }) + } + + Ok(bound_fields) + } + + /// Ensure that the partition name is unique among columns in the schema. + /// Duplicate names are allowed if: + /// 1. The column is sourced from the column with the same name. + /// 2. AND the transformation is identity + fn check_name_does_not_collide_with_schema( + field: &UnboundPartitionField, + schema: &Schema, + ) -> Result<()> { + match schema.field_by_name(field.name.as_str()) { + Some(schema_collision) => { + if field.transform == Transform::Identity { + if schema_collision.id == field.source_id { + Ok(()) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot create identity partition sourced from different field in schema. Field name '{}' has id `{}` in schema but partition source id is `{}`", + field.name, schema_collision.id, field.source_id + ), + )) + } + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot create partition with name: '{}' that conflicts with schema field and is not an identity transform.", + field.name + ), + )) + } + } + None => Ok(()), + } + } + + /// Ensure that the transformation of the field is compatible with type of the field + /// in the schema. Implicitly also checks if the source field exists in the schema. + fn check_transform_compatibility(field: &UnboundPartitionField, schema: &Schema) -> Result<()> { + let schema_field = schema.field_by_id(field.source_id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot find partition source field with id `{}` in schema", + field.source_id + ), + ) + })?; + + if field.transform != Transform::Void { + if !schema_field.field_type.is_primitive() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot partition by non-primitive source field: '{}'.", + schema_field.field_type + ), + )); + } + + if field + .transform + .result_type(&schema_field.field_type) + .is_err() + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid source type: '{}' for transform: '{}'.", + schema_field.field_type, + field.transform.dedup_name() + ), + )); + } + } + + Ok(()) + } +} + +/// Contains checks that are common to both PartitionSpecBuilder and UnboundPartitionSpecBuilder +trait CorePartitionSpecValidator { + /// Ensure that the partition name is unique among the partition fields and is not empty. + fn check_name_set_and_unique(&self, name: &str) -> Result<()> { + if name.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Cannot use empty partition name", + )); + } + + if self.fields().iter().any(|f| f.name == name) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Cannot use partition name more than once: {}", name), + )); + } + Ok(()) + } + + /// For a single source-column transformations must be unique. + fn check_for_redundant_partitions(&self, source_id: i32, transform: &Transform) -> Result<()> { + let collision = self.fields().iter().find(|f| { + f.source_id == source_id && f.transform.dedup_name() == transform.dedup_name() + }); + + if let Some(collision) = collision { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add redundant partition with source id `{}` and transform `{}`. A partition with the same source id and transform already exists with name `{}`", + source_id, transform.dedup_name(), collision.name + ), + )) + } else { + Ok(()) + } + } + + /// Check field / partition_id unique within the partition spec if set + fn check_partition_id_unique(&self, field_id: i32) -> Result<()> { + if self + .fields() + .iter() + .any(|f| f.partition_id == Some(field_id)) + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot use field id more than once in one PartitionSpec: {}", + field_id + ), + )); + } + + Ok(()) + } + + fn fields(&self) -> &Vec; +} + +impl CorePartitionSpecValidator for PartitionSpecBuilder<'_> { + fn fields(&self) -> &Vec { + &self.fields + } +} + +impl CorePartitionSpecValidator for UnboundPartitionSpecBuilder { + fn fields(&self) -> &Vec { + &self.fields + } } #[cfg(test)] @@ -184,9 +624,21 @@ mod tests { #[test] fn test_is_unpartitioned() { - let partition_spec = PartitionSpec::builder() + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + let partition_spec = PartitionSpec::builder(&schema) .with_spec_id(1) - .with_fields(vec![]) .build() .unwrap(); assert!( @@ -194,23 +646,20 @@ mod tests { "Empty partition spec should be unpartitioned" ); - let partition_spec = PartitionSpec::builder() - .with_partition_field( - PartitionField::builder() + let partition_spec = PartitionSpec::builder(&schema) + .add_unbound_fields(vec![ + UnboundPartitionField::builder() .source_id(1) - .field_id(1) .name("id".to_string()) .transform(Transform::Identity) .build(), - ) - .with_partition_field( - PartitionField::builder() + UnboundPartitionField::builder() .source_id(2) - .field_id(2) - .name("name".to_string()) + .name("name_string".to_string()) .transform(Transform::Void) .build(), - ) + ]) + .unwrap() .with_spec_id(1) .build() .unwrap(); @@ -219,24 +668,21 @@ mod tests { "Partition spec with one non void transform should not be unpartitioned" ); - let partition_spec = PartitionSpec::builder() + let partition_spec = PartitionSpec::builder(&schema) .with_spec_id(1) - .with_partition_field( - PartitionField::builder() + .add_unbound_fields(vec![ + UnboundPartitionField::builder() .source_id(1) - .field_id(1) - .name("id".to_string()) + .name("id_void".to_string()) .transform(Transform::Void) .build(), - ) - .with_partition_field( - PartitionField::builder() + UnboundPartitionField::builder() .source_id(2) - .field_id(2) - .name("name".to_string()) + .name("name_void".to_string()) .transform(Transform::Void) .build(), - ) + ]) + .unwrap() .build() .unwrap(); assert!( @@ -489,4 +935,336 @@ mod tests { assert!(partition_spec.partition_type(&schema).is_err()); } + + #[test] + fn test_builder_disallow_duplicate_names() { + UnboundPartitionSpec::builder() + .add_partition_field(1, "ts_day".to_string(), Transform::Day) + .unwrap() + .add_partition_field(2, "ts_day".to_string(), Transform::Day) + .unwrap_err(); + } + + #[test] + fn test_builder_disallow_duplicate_field_ids() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + PartitionSpec::builder(&schema) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + partition_id: Some(1000), + name: "id".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + source_id: 2, + partition_id: Some(1000), + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap_err(); + } + + #[test] + fn test_builder_auto_assign_field_ids() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + NestedField::required( + 3, + "ts", + Type::Primitive(crate::spec::PrimitiveType::Timestamp), + ) + .into(), + ]) + .build() + .unwrap(); + let spec = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + name: "id".to_string(), + transform: Transform::Identity, + partition_id: Some(1012), + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + source_id: 2, + name: "name_void".to_string(), + transform: Transform::Void, + partition_id: None, + }) + .unwrap() + // Should keep its ID even if its lower + .add_unbound_field(UnboundPartitionField { + source_id: 3, + name: "year".to_string(), + transform: Transform::Year, + partition_id: Some(1), + }) + .unwrap() + .build() + .unwrap(); + + assert_eq!(1012, spec.fields[0].field_id); + assert_eq!(1013, spec.fields[1].field_id); + assert_eq!(1, spec.fields[2].field_id); + } + + #[test] + fn test_builder_valid_schema() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + PartitionSpec::builder(&schema) + .with_spec_id(1) + .build() + .unwrap(); + + let spec = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_partition_field("id", "id_bucket[16]", Transform::Bucket(16)) + .unwrap() + .build() + .unwrap(); + + assert_eq!(spec, PartitionSpec { + spec_id: 1, + fields: vec![PartitionField { + source_id: 1, + field_id: 1000, + name: "id_bucket[16]".to_string(), + transform: Transform::Bucket(16), + }] + }); + } + + #[test] + fn test_collision_with_schema_name() { + let schema = Schema::builder() + .with_fields(vec![NestedField::required( + 1, + "id", + Type::Primitive(crate::spec::PrimitiveType::Int), + ) + .into()]) + .build() + .unwrap(); + + PartitionSpec::builder(&schema) + .with_spec_id(1) + .build() + .unwrap(); + + let err = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "id".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap_err(); + assert!(err.message().contains("conflicts with schema")) + } + + #[test] + fn test_builder_collision_is_ok_for_identity_transforms() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "number", + Type::Primitive(crate::spec::PrimitiveType::Int), + ) + .into(), + ]) + .build() + .unwrap(); + + PartitionSpec::builder(&schema) + .with_spec_id(1) + .build() + .unwrap(); + + PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "id".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + + // Not OK for different source id + PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 2, + partition_id: None, + name: "id".to_string(), + transform: Transform::Identity, + }) + .unwrap_err(); + } + + #[test] + fn test_builder_all_source_ids_must_exist() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + NestedField::required( + 3, + "ts", + Type::Primitive(crate::spec::PrimitiveType::Timestamp), + ) + .into(), + ]) + .build() + .unwrap(); + + // Valid + PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_fields(vec![ + UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }, + UnboundPartitionField { + source_id: 2, + partition_id: None, + name: "name".to_string(), + transform: Transform::Identity, + }, + ]) + .unwrap() + .build() + .unwrap(); + + // Invalid + PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_fields(vec![ + UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }, + UnboundPartitionField { + source_id: 4, + partition_id: None, + name: "name".to_string(), + transform: Transform::Identity, + }, + ]) + .unwrap_err(); + } + + #[test] + fn test_builder_disallows_redundant() { + let err = UnboundPartitionSpec::builder() + .with_spec_id(1) + .add_partition_field(1, "id_bucket[16]".to_string(), Transform::Bucket(16)) + .unwrap() + .add_partition_field( + 1, + "id_bucket_with_other_name".to_string(), + Transform::Bucket(16), + ) + .unwrap_err(); + assert!(err.message().contains("redundant partition")); + } + + #[test] + fn test_builder_incompatible_transforms_disallowed() { + let schema = Schema::builder() + .with_fields(vec![NestedField::required( + 1, + "id", + Type::Primitive(crate::spec::PrimitiveType::Int), + ) + .into()]) + .build() + .unwrap(); + + PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "id_year".to_string(), + transform: Transform::Year, + }) + .unwrap_err(); + } + + #[test] + fn test_build_unbound_specs_without_partition_id() { + let spec = UnboundPartitionSpec::builder() + .with_spec_id(1) + .add_partition_fields(vec![UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "id_bucket[16]".to_string(), + transform: Transform::Bucket(16), + }]) + .unwrap() + .build(); + + assert_eq!(spec, UnboundPartitionSpec { + spec_id: Some(1), + fields: vec![UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "id_bucket[16]".to_string(), + transform: Transform::Bucket(16), + }] + }); + } } diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 53bcabb8f..cd7f046c6 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -32,12 +32,12 @@ use uuid::Uuid; use super::snapshot::{Snapshot, SnapshotReference, SnapshotRetention}; use super::{ PartitionSpec, PartitionSpecRef, SchemaId, SchemaRef, SnapshotRef, SortOrder, SortOrderRef, + DEFAULT_PARTITION_SPEC_ID, }; use crate::error::{timestamp_ms_to_utc, Result}; use crate::{Error, ErrorKind, TableCreation}; static MAIN_BRANCH: &str = "main"; -static DEFAULT_SPEC_ID: i32 = 0; static DEFAULT_SORT_ORDER_ID: i64 = 0; pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1; @@ -187,8 +187,8 @@ impl TableMetadata { /// Get default partition spec #[inline] pub fn default_partition_spec(&self) -> Option<&PartitionSpecRef> { - if self.default_spec_id == DEFAULT_SPEC_ID { - self.partition_spec_by_id(DEFAULT_SPEC_ID) + if self.default_spec_id == DEFAULT_PARTITION_SPEC_ID { + self.partition_spec_by_id(DEFAULT_PARTITION_SPEC_ID) } else { Some( self.partition_spec_by_id(self.default_spec_id) @@ -308,9 +308,9 @@ impl TableMetadataBuilder { )) } None => HashMap::from([( - DEFAULT_SPEC_ID, + DEFAULT_PARTITION_SPEC_ID, Arc::new(PartitionSpec { - spec_id: DEFAULT_SPEC_ID, + spec_id: DEFAULT_PARTITION_SPEC_ID, fields: vec![], }), )]), @@ -347,7 +347,7 @@ impl TableMetadataBuilder { current_schema_id: schema.schema_id(), schemas: HashMap::from([(schema.schema_id(), Arc::new(schema))]), partition_specs, - default_spec_id: DEFAULT_SPEC_ID, + default_spec_id: DEFAULT_PARTITION_SPEC_ID, last_partition_id: 0, properties, current_snapshot_id: None, @@ -391,8 +391,8 @@ pub(super) mod _serde { use uuid::Uuid; use super::{ - FormatVersion, MetadataLog, SnapshotLog, TableMetadata, DEFAULT_SORT_ORDER_ID, - DEFAULT_SPEC_ID, MAIN_BRANCH, + FormatVersion, MetadataLog, SnapshotLog, TableMetadata, DEFAULT_PARTITION_SPEC_ID, + DEFAULT_SORT_ORDER_ID, MAIN_BRANCH, }; use crate::spec::schema::_serde::{SchemaV1, SchemaV2}; use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2}; @@ -568,7 +568,7 @@ pub(super) mod _serde { value .partition_specs .into_iter() - .map(|x| (x.spec_id, Arc::new(x))), + .map(|x| (x.spec_id(), Arc::new(x))), ), default_spec_id: value.default_spec_id, last_partition_id: value.last_partition_id, @@ -643,12 +643,12 @@ pub(super) mod _serde { .partition_specs .unwrap_or_else(|| { vec![PartitionSpec { - spec_id: DEFAULT_SPEC_ID, + spec_id: DEFAULT_PARTITION_SPEC_ID, fields: value.partition_spec, }] }) .into_iter() - .map(|x| (x.spec_id, Arc::new(x))), + .map(|x| (x.spec_id(), Arc::new(x))), ); Ok(TableMetadata { format_version: FormatVersion::V1, @@ -808,7 +808,7 @@ pub(super) mod _serde { partition_spec: v .partition_specs .get(&v.default_spec_id) - .map(|x| x.fields.clone()) + .map(|x| x.fields().to_vec()) .unwrap_or_default(), partition_specs: Some( v.partition_specs @@ -935,7 +935,7 @@ mod tests { use crate::spec::{ NestedField, NullOrder, Operation, PartitionField, PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, - Summary, Transform, Type, + Summary, Transform, Type, UnboundPartitionField, }; use crate::TableCreation; @@ -1020,16 +1020,15 @@ mod tests { .build() .unwrap(); - let partition_spec = PartitionSpec::builder() - .with_spec_id(1) - .with_partition_field(PartitionField { + let partition_spec = PartitionSpec { + spec_id: 1, + fields: vec![PartitionField { name: "ts_day".to_string(), transform: Transform::Day, source_id: 4, field_id: 1000, - }) - .build() - .unwrap(); + }], + }; let expected = TableMetadata { format_version: FormatVersion::V2, @@ -1179,14 +1178,10 @@ mod tests { .build() .unwrap(); - let partition_spec = PartitionSpec::builder() + let partition_spec = PartitionSpec::builder(&schema) .with_spec_id(0) - .with_partition_field(PartitionField { - name: "vendor_id".to_string(), - transform: Transform::Identity, - source_id: 1, - field_id: 1000, - }) + .add_partition_field("vendor_id", "vendor_id", Transform::Identity) + .unwrap() .build() .unwrap(); @@ -1292,14 +1287,15 @@ mod tests { .build() .unwrap(); - let partition_spec = PartitionSpec::builder() + let partition_spec = PartitionSpec::builder(&schema1) .with_spec_id(0) - .with_partition_field(PartitionField { + .add_unbound_field(UnboundPartitionField { name: "x".to_string(), transform: Transform::Identity, source_id: 1, - field_id: 1000, + partition_id: Some(1000), }) + .unwrap() .build() .unwrap(); @@ -1414,14 +1410,15 @@ mod tests { .build() .unwrap(); - let partition_spec = PartitionSpec::builder() + let partition_spec = PartitionSpec::builder(&schema) .with_spec_id(0) - .with_partition_field(PartitionField { + .add_unbound_field(UnboundPartitionField { name: "x".to_string(), transform: Transform::Identity, source_id: 1, - field_id: 1000, + partition_id: Some(1000), }) + .unwrap() .build() .unwrap(); @@ -1493,14 +1490,15 @@ mod tests { .build() .unwrap(); - let partition_spec = PartitionSpec::builder() + let partition_spec = PartitionSpec::builder(&schema) .with_spec_id(0) - .with_partition_field(PartitionField { + .add_unbound_field(UnboundPartitionField { name: "x".to_string(), transform: Transform::Identity, source_id: 1, - field_id: 1000, + partition_id: Some(1000), }) + .unwrap() .build() .unwrap(); @@ -1686,10 +1684,12 @@ mod tests { table_metadata.partition_specs, HashMap::from([( 0, - Arc::new(PartitionSpec { - spec_id: 0, - fields: vec![] - }) + Arc::new( + PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap()) + .with_spec_id(0) + .build() + .unwrap() + ) )]) ); assert_eq!(