Skip to content

Commit

Permalink
Fix issues related to having catalog_name in identifier (apache#964)
Browse files Browse the repository at this point in the history
* first attempt

* add license

* refactor new tests
  • Loading branch information
HonahX authored Jul 26, 2024
1 parent dd8d76d commit 0213dab
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 4 deletions.
18 changes: 16 additions & 2 deletions pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,10 @@ def _identifier_to_validated_tuple(self, identifier: Union[str, Identifier]) ->

def _split_identifier_for_path(self, identifier: Union[str, Identifier, TableIdentifier]) -> Properties:
if isinstance(identifier, TableIdentifier):
return {"namespace": NAMESPACE_SEPARATOR.join(identifier.namespace.root[1:]), "table": identifier.name}
if identifier.namespace.root[0] == self.name:
return {"namespace": NAMESPACE_SEPARATOR.join(identifier.namespace.root[1:]), "table": identifier.name}
else:
return {"namespace": NAMESPACE_SEPARATOR.join(identifier.namespace.root), "table": identifier.name}
identifier_tuple = self._identifier_to_validated_tuple(identifier)
return {"namespace": NAMESPACE_SEPARATOR.join(identifier_tuple[:-1]), "table": identifier_tuple[-1]}

Expand Down Expand Up @@ -675,6 +678,17 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U

return self.load_table(to_identifier)

def _remove_catalog_name_from_table_request_identifier(self, table_request: CommitTableRequest) -> CommitTableRequest:
if table_request.identifier.namespace.root[0] == self.name:
return table_request.model_copy(
update={
"identifier": TableIdentifier(
namespace=table_request.identifier.namespace.root[1:], name=table_request.identifier.name
).model_dump()
}
)
return table_request

@retry(**_RETRY_ARGS)
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.
Expand All @@ -692,7 +706,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
"""
response = self._session.post(
self.url(Endpoints.update_table, prefixed=True, **self._split_identifier_for_path(table_request.identifier)),
data=table_request.model_dump_json().encode(UTF8),
data=self._remove_catalog_name_from_table_request_identifier(table_request).model_dump_json().encode(UTF8),
)
try:
response.raise_for_status()
Expand Down
24 changes: 23 additions & 1 deletion tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from pyiceberg.io import load_file_io
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.table import CommitTableRequest, Table, TableIdentifier
from pyiceberg.table.metadata import TableMetadataV1
from pyiceberg.table.sorting import SortField, SortOrder
from pyiceberg.transforms import IdentityTransform, TruncateTransform
Expand Down Expand Up @@ -1226,3 +1226,25 @@ def test_catalog_from_parameters_empty_env(rest_mock: Mocker) -> None:

catalog = cast(RestCatalog, load_catalog("production", uri="https://other-service.io/api"))
assert catalog.uri == "https://other-service.io/api"


def test_table_identifier_in_commit_table_request(rest_mock: Mocker, example_table_metadata_v2: Dict[str, Any]) -> None:
test_table_request = CommitTableRequest(
identifier=TableIdentifier(namespace=("catalog_name", "namespace"), name="table_name"),
updates=[],
requirements=[],
)
rest_mock.post(
url=f"{TEST_URI}v1/namespaces/namespace/tables/table_name",
json={
"metadata": example_table_metadata_v2,
"metadata-location": "test",
},
status_code=200,
request_headers=TEST_HEADERS,
)
RestCatalog("catalog_name", uri=TEST_URI, token=TEST_TOKEN)._commit_table(test_table_request)
assert (
rest_mock.last_request.text
== """{"identifier":{"namespace":["namespace"],"name":"table_name"},"requirements":[],"updates":[]}"""
)
13 changes: 12 additions & 1 deletion tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from pyspark.sql import SparkSession
from pytest_mock.plugin import MockerFixture

from pyiceberg.catalog import Catalog
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.catalog.hive import HiveCatalog
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.catalog.sql import SqlCatalog
Expand Down Expand Up @@ -1282,3 +1282,14 @@ def test_merge_manifests_file_content(session_catalog: Catalog, arrow_table_with
(11, 3),
(12, 3),
]


@pytest.mark.integration
def test_rest_catalog_with_empty_catalog_name_append_data(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.test_rest_append"
test_catalog = load_catalog(
"", # intentionally empty
**session_catalog.properties,
)
tbl = _create_table(test_catalog, identifier, data=[])
tbl.append(arrow_table_with_null)

0 comments on commit 0213dab

Please sign in to comment.