Skip to content

Commit

Permalink
Add CreateTableTransaction API and implement it in Glue and Rest (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
HonahX authored Apr 4, 2024
1 parent 7837d52 commit a892309
Show file tree
Hide file tree
Showing 14 changed files with 723 additions and 195 deletions.
19 changes: 19 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,25 @@ catalog.create_table(
)
```

To create a table with some subsequent changes atomically in a transaction:

```python
with catalog.create_table_transaction(
identifier="docs_example.bids",
schema=schema,
location="s3://pyiceberg",
partition_spec=partition_spec,
sort_order=sort_order,
) as txn:
with txn.update_schema() as update_schema:
update_schema.add_column(path="new_column", field_type=StringType())
with txn.update_spec() as update_spec:
update_spec.add_identity("symbol")
txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
```

## Load a table

### Catalog table
Expand Down
298 changes: 203 additions & 95 deletions pyiceberg/catalog/__init__.py

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
METADATA_LOCATION,
PREVIOUS_METADATA_LOCATION,
TABLE_TYPE,
Catalog,
MetastoreCatalog,
PropertiesUpdateSummary,
)
from pyiceberg.exceptions import (
Expand Down Expand Up @@ -79,7 +79,7 @@
ITEM = "Item"


class DynamoDbCatalog(Catalog):
class DynamoDbCatalog(MetastoreCatalog):
def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)
session = boto3.Session(
Expand Down
136 changes: 82 additions & 54 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
METADATA_LOCATION,
PREVIOUS_METADATA_LOCATION,
TABLE_TYPE,
Catalog,
MetastoreCatalog,
PropertiesUpdateSummary,
)
from pyiceberg.exceptions import (
Expand All @@ -62,8 +62,13 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
from pyiceberg.table.metadata import TableMetadata, new_table_metadata
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
Table,
update_table_metadata,
)
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
from pyiceberg.types import (
Expand Down Expand Up @@ -273,7 +278,7 @@ def add_glue_catalog_id(params: Dict[str, str], **kwargs: Any) -> None:
event_system.register("provide-client-params.glue", add_glue_catalog_id)


class GlueCatalog(Catalog):
class GlueCatalog(MetastoreCatalog):
def __init__(self, name: str, **properties: Any):
super().__init__(name, **properties)

Expand Down Expand Up @@ -384,20 +389,18 @@ def create_table(
ValueError: If the identifier is invalid, or no path is given to store metadata.
"""
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore

database_name, table_name = self.identifier_to_database_and_table(identifier)

location = self._resolve_table_location(location, database_name, table_name)
metadata_location = self._get_metadata_location(location=location)
metadata = new_table_metadata(
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
staged_table = self._create_staged_table(
identifier=identifier,
schema=schema,
location=location,
partition_spec=partition_spec,
sort_order=sort_order,
properties=properties,
)
io = load_file_io(properties=self.properties, location=metadata_location)
self._write_metadata(metadata, io, metadata_location)

table_input = _construct_table_input(table_name, metadata_location, properties, metadata)
database_name, table_name = self.identifier_to_database_and_table(identifier)

self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location)
table_input = _construct_table_input(table_name, staged_table.metadata_location, properties, staged_table.metadata)
self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input)

return self.load_table(identifier=identifier)
Expand Down Expand Up @@ -435,46 +438,71 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple)

current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
glue_table_version_id = current_glue_table.get("VersionId")
if not glue_table_version_id:
raise CommitFailedException(f"Cannot commit {database_name}.{table_name} because Glue table version id is missing")
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
base_metadata = current_table.metadata

# Validate the update requirements
for requirement in table_request.requirements:
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata, table_request.updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

update_table_input = _construct_table_input(
table_name=table_name,
metadata_location=new_metadata_location,
properties=current_table.properties,
metadata=updated_metadata,
glue_table=current_glue_table,
prev_metadata_location=current_table.metadata_location,
)
try:
current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
# Update the table
glue_table_version_id = current_glue_table.get("VersionId")
if not glue_table_version_id:
raise CommitFailedException(
f"Cannot commit {database_name}.{table_name} because Glue table version id is missing"
)
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
base_metadata = current_table.metadata

# Validate the update requirements
for requirement in table_request.requirements:
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata=base_metadata, updates=table_request.updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

update_table_input = _construct_table_input(
table_name=table_name,
metadata_location=new_metadata_location,
properties=current_table.properties,
metadata=updated_metadata,
glue_table=current_glue_table,
prev_metadata_location=current_table.metadata_location,
)

# Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
# modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
self._update_glue_table(
database_name=database_name,
table_name=table_name,
table_input=update_table_input,
version_id=glue_table_version_id,
)
# Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
# modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
self._update_glue_table(
database_name=database_name,
table_name=table_name,
table_input=update_table_input,
version_id=glue_table_version_id,
)

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
except NoSuchTableError:
# Create the table
updated_metadata = update_table_metadata(
base_metadata=self._empty_table_metadata(), updates=table_request.updates, enforce_validation=True
)
new_metadata_version = 0
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
self._write_metadata(
updated_metadata, self._load_file_io(updated_metadata.properties, new_metadata_location), new_metadata_location
)

create_table_input = _construct_table_input(
table_name=table_name,
metadata_location=new_metadata_location,
properties=updated_metadata.properties,
metadata=updated_metadata,
)

self._create_glue_table(database_name=database_name, table_name=table_name, table_input=create_table_input)

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)

def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""Load the table's metadata and returns the table instance.
Expand Down
4 changes: 2 additions & 2 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
LOCATION,
METADATA_LOCATION,
TABLE_TYPE,
Catalog,
MetastoreCatalog,
PropertiesUpdateSummary,
)
from pyiceberg.exceptions import (
Expand Down Expand Up @@ -230,7 +230,7 @@ def primitive(self, primitive: PrimitiveType) -> str:
return HIVE_PRIMITIVE_TYPES[type(primitive)]


class HiveCatalog(Catalog):
class HiveCatalog(MetastoreCatalog):
_client: _HiveClient

def __init__(self, name: str, **properties: str):
Expand Down
18 changes: 18 additions & 0 deletions pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
CreateTableTransaction,
Table,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand All @@ -49,9 +50,23 @@ def create_table(
) -> Table:
raise NotImplementedError

def create_table_transaction(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> CreateTableTransaction:
raise NotImplementedError

def load_table(self, identifier: Union[str, Identifier]) -> Table:
raise NotImplementedError

def table_exists(self, identifier: Union[str, Identifier]) -> bool:
raise NotImplementedError

def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
"""Register a new table using existing metadata.
Expand All @@ -70,6 +85,9 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
def drop_table(self, identifier: Union[str, Identifier]) -> None:
raise NotImplementedError

def purge_table(self, identifier: Union[str, Identifier]) -> None:
raise NotImplementedError

def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
raise NotImplementedError

Expand Down
Loading

0 comments on commit a892309

Please sign in to comment.