From fb9503852fdf4aab6062f44474d6ac3827110d6f Mon Sep 17 00:00:00 2001 From: Honah J Date: Mon, 15 Apr 2024 13:39:00 -0700 Subject: [PATCH] Bug: HiveCatalog's `_commit_table` refresh and update the metadata within transaction (#607) * make refresh and update metadata in a transaction * fix integration tests --- pyiceberg/catalog/hive.py | 38 ++++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index f543626424..d515a54961 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -369,22 +369,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons identifier_tuple = self.identifier_to_tuple_without_catalog( tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) ) - current_table = self.load_table(identifier_tuple) database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) - base_metadata = current_table.metadata - for requirement in table_request.requirements: - requirement.validate(base_metadata) - - updated_metadata = update_table_metadata(base_metadata, table_request.updates) - if updated_metadata == base_metadata: - # no changes, do nothing - return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) - - # write new metadata - new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 - new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) - self._write_metadata(updated_metadata, current_table.io, new_metadata_location) - # commit to hive # https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232 with self._client as open_client: @@ -394,11 +379,28 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons if lock.state != LockState.ACQUIRED: raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}") - tbl = open_client.get_table(dbname=database_name, tbl_name=table_name) - tbl.parameters = _construct_parameters( + hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name) + io = load_file_io({**self.properties, **hive_table.parameters}, hive_table.sd.location) + current_table = self._convert_hive_into_iceberg(hive_table, io) + + base_metadata = current_table.metadata + for requirement in table_request.requirements: + requirement.validate(base_metadata) + + updated_metadata = update_table_metadata(base_metadata, table_request.updates) + if updated_metadata == base_metadata: + # no changes, do nothing + return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) + + # write new metadata + new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 + new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) + self._write_metadata(updated_metadata, current_table.io, new_metadata_location) + + hive_table.parameters = _construct_parameters( metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location ) - open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=tbl) + open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table) except NoSuchObjectException as e: raise NoSuchTableError(f"Table does not exist: {table_name}") from e finally: