Skip to content

Commit

Permalink
Squash builder
Browse files Browse the repository at this point in the history
  • Loading branch information
c-thiel committed Nov 7, 2024
1 parent 0ef9304 commit e36d834
Show file tree
Hide file tree
Showing 12 changed files with 2,181 additions and 108 deletions.
4 changes: 3 additions & 1 deletion crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,9 @@ impl Catalog for GlueCatalog {
}
};

let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(creation)?
.build()?
.metadata;
let metadata_location = create_metadata_location(&location, 0)?;

self.file_io
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/glue/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ mod tests {
.location("my_location".to_string())
.schema(schema)
.build();
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
.build()?
.metadata;

Ok(metadata)
}
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/glue/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,9 @@ mod tests {
.location("my_location".to_string())
.schema(schema)
.build();
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
.build()?
.metadata;

Ok(metadata)
}
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,9 @@ impl Catalog for HmsCatalog {
}
};

let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(creation)?
.build()?
.metadata;
let metadata_location = create_metadata_location(&location, 0)?;

self.file_io
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/memory/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,9 @@ impl Catalog for MemoryCatalog {
}
};

let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
.build()?
.metadata;
let metadata_location = format!(
"{}/metadata/{}-{}.metadata.json",
&location,
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,9 @@ impl Catalog for SqlCatalog {
}
};

let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?.build()?;
let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
.build()?
.metadata;
let tbl_metadata_location = format!(
"{}/metadata/0-{}.metadata.json",
location.clone(),
Expand Down
53 changes: 48 additions & 5 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,46 @@ impl TableUpdate {
/// Applies the update to the table metadata builder.
pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
match self {
TableUpdate::AssignUuid { uuid } => builder.assign_uuid(uuid),
_ => unimplemented!(),
TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
TableUpdate::AddSchema {
schema,
last_column_id,
} => {
if let Some(last_column_id) = last_column_id {
if builder.last_column_id() > last_column_id {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Invalid last column ID: {last_column_id} < {} (previous last column ID)",
builder.last_column_id()
),
));
}
};
Ok(builder.add_schema(schema))
}
TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id),
TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id),
TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order),
TableUpdate::SetDefaultSortOrder { sort_order_id } => {
builder.set_default_sort_order(sort_order_id)
}
TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot),
TableUpdate::SetSnapshotRef {
ref_name,
reference,
} => builder.set_ref(&ref_name, reference),
TableUpdate::RemoveSnapshots { snapshot_ids } => {
Ok(builder.remove_snapshots(&snapshot_ids))
}
TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)),
TableUpdate::SetLocation { location } => Ok(builder.set_location(location)),
TableUpdate::SetProperties { updates } => builder.set_properties(updates),
TableUpdate::RemoveProperties { removals } => Ok(builder.remove_properties(&removals)),
TableUpdate::UpgradeFormatVersion { format_version } => {
builder.upgrade_format_version(format_version)
}
}
}
}
Expand Down Expand Up @@ -1221,16 +1259,21 @@ mod tests {
let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
.unwrap()
.build()
.unwrap();
let table_metadata_builder = TableMetadataBuilder::new(table_metadata);
.unwrap()
.metadata;
let table_metadata_builder = TableMetadataBuilder::new_from_metadata(
table_metadata,
Some("s3://db/table/metadata/metadata1.gz.json".to_string()),
);

let uuid = uuid::Uuid::new_v4();
let update = TableUpdate::AssignUuid { uuid };
let updated_metadata = update
.apply(table_metadata_builder)
.unwrap()
.build()
.unwrap();
.unwrap()
.metadata;
assert_eq!(updated_metadata.uuid(), uuid);
}
}
1 change: 1 addition & 0 deletions crates/iceberg/src/spec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod schema;
mod snapshot;
mod sort;
mod table_metadata;
mod table_metadata_builder;
mod transform;
mod values;
mod view_metadata;
Expand Down
6 changes: 5 additions & 1 deletion crates/iceberg/src/spec/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ impl BoundPartitionSpec {
}

/// Get the highest field id in the partition spec.
/// If the partition spec is unpartitioned, it returns the last unpartitioned last assigned id (999).
pub fn highest_field_id(&self) -> Option<i32> {
self.fields.iter().map(|f| f.field_id).max()
}
Expand Down Expand Up @@ -182,6 +181,11 @@ impl BoundPartitionSpec {

true
}

/// Change the spec id of the partition spec
pub fn with_spec_id(self, spec_id: i32) -> Self {
Self { spec_id, ..self }
}
}

impl SchemalessPartitionSpec {
Expand Down
18 changes: 18 additions & 0 deletions crates/iceberg/src/spec/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,24 @@ impl Schema {
pub fn accessor_by_field_id(&self, field_id: i32) -> Option<Arc<StructAccessor>> {
self.field_id_to_accessor.get(&field_id).cloned()
}

/// Check if this schema is identical to another schema semantically - excluding schema id.
pub(crate) fn is_same_schema(&self, other: &SchemaRef) -> bool {
self.as_struct().eq(other.as_struct())
&& self.identifier_field_ids().eq(other.identifier_field_ids())
}

/// Change the schema id of this schema.
// This is redundant with the `with_schema_id` method on the builder, but useful
// as it is infallible in contrast to the builder `build()` method.
pub(crate) fn with_schema_id(self, schema_id: SchemaId) -> Self {
Self { schema_id, ..self }
}

/// Return A HashMap matching field ids to field names.
pub(crate) fn field_id_to_name_map(&self) -> &HashMap<i32, String> {
&self.id_to_name
}
}

impl Display for Schema {
Expand Down
113 changes: 17 additions & 96 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ use serde_repr::{Deserialize_repr, Serialize_repr};
use uuid::Uuid;

use super::snapshot::SnapshotReference;
pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataBuilder};
use super::{
BoundPartitionSpec, BoundPartitionSpecRef, SchemaId, SchemaRef, SchemalessPartitionSpecRef,
Snapshot, SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, DEFAULT_PARTITION_SPEC_ID,
BoundPartitionSpecRef, SchemaId, SchemaRef, SchemalessPartitionSpecRef, Snapshot, SnapshotRef,
SnapshotRetention, SortOrder, SortOrderRef, DEFAULT_PARTITION_SPEC_ID,
};
use crate::error::{timestamp_ms_to_utc, Result};
use crate::{Error, ErrorKind, TableCreation};
use crate::{Error, ErrorKind};

static MAIN_BRANCH: &str = "main";
pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
Expand Down Expand Up @@ -165,6 +166,17 @@ pub struct TableMetadata {
}

impl TableMetadata {
/// Convert this Table Metadata into a builder for modification.
///
/// `current_file_location` is the location where the current version
/// of the metadata file is stored. This is used to update the metadata log.
/// If `current_file_location` is `None`, the metadata log will not be updated.
/// This should only be used to stage-create tables.
#[must_use]
pub fn into_builder(self, current_file_location: Option<String>) -> TableMetadataBuilder {
TableMetadataBuilder::new_from_metadata(self, current_file_location)
}

/// Returns format version of this metadata.
#[inline]
pub fn format_version(&self) -> FormatVersion {
Expand Down Expand Up @@ -539,98 +551,6 @@ impl TableMetadata {
}
}

/// Manipulating table metadata.
pub struct TableMetadataBuilder(TableMetadata);

impl TableMetadataBuilder {
/// Creates a new table metadata builder from the given table metadata.
pub fn new(origin: TableMetadata) -> Self {
Self(origin)
}

/// Creates a new table metadata builder from the given table creation.
pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
let TableCreation {
name: _,
location,
schema,
partition_spec,
sort_order,
properties,
} = table_creation;

let schema: Arc<super::Schema> = Arc::new(schema);
let unpartition_spec = BoundPartitionSpec::unpartition_spec(schema.clone());
let partition_specs = match partition_spec {
Some(_) => {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
"Can't create table with partition spec now",
))
}
None => HashMap::from([(
unpartition_spec.spec_id(),
Arc::new(unpartition_spec.clone().into_schemaless()),
)]),
};

let sort_orders = match sort_order {
Some(_) => {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
"Can't create table with sort order now",
))
}
None => HashMap::from([(
SortOrder::UNSORTED_ORDER_ID,
Arc::new(SortOrder::unsorted_order()),
)]),
};

let mut table_metadata = TableMetadata {
format_version: FormatVersion::V2,
table_uuid: Uuid::now_v7(),
location: location.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
"Can't create table without location",
)
})?,
last_sequence_number: 0,
last_updated_ms: Utc::now().timestamp_millis(),
last_column_id: schema.highest_field_id(),
current_schema_id: schema.schema_id(),
schemas: HashMap::from([(schema.schema_id(), schema)]),
partition_specs,
default_spec: BoundPartitionSpecRef::new(unpartition_spec),
last_partition_id: 0,
properties,
current_snapshot_id: None,
snapshots: Default::default(),
snapshot_log: vec![],
sort_orders,
metadata_log: vec![],
default_sort_order_id: SortOrder::UNSORTED_ORDER_ID,
refs: Default::default(),
};

table_metadata.try_normalize()?;

Ok(Self(table_metadata))
}

/// Changes uuid of table metadata.
pub fn assign_uuid(mut self, uuid: Uuid) -> Result<Self> {
self.0.table_uuid = uuid;
Ok(self)
}

/// Returns the new table metadata after changes.
pub fn build(self) -> Result<TableMetadata> {
Ok(self.0)
}
}

pub(super) mod _serde {
use std::borrow::BorrowMut;
/// This is a helper module that defines types to help with serialization/deserialization.
Expand Down Expand Up @@ -2308,7 +2228,8 @@ mod tests {
let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
.unwrap()
.build()
.unwrap();
.unwrap()
.metadata;
assert_eq!(table_metadata.location, "s3://db/table");
assert_eq!(table_metadata.schemas.len(), 1);
assert_eq!(
Expand Down
Loading

0 comments on commit e36d834

Please sign in to comment.