Skip to content

Commit

Permalink
Feat: Add fail_if_exists param to create_table
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala committed Feb 12, 2024
1 parent e9e265a commit 29e6374
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 33 deletions.
2 changes: 2 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ def create_table(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
fail_if_exists: bool = True,
) -> Table:
"""Create a table.
Expand All @@ -307,6 +308,7 @@ def create_table(
partition_spec (PartitionSpec): PartitionSpec for the table.
sort_order (SortOrder): SortOrder for the table.
properties (Properties): Table properties that can be a string based dictionary.
fail_if_exists (bool): If True, raise an error if the table already exists.
Returns:
Table: the created table instance.
Expand Down
5 changes: 4 additions & 1 deletion pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def create_table(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
fail_if_exists: bool = True,
) -> Table:
"""
Create an Iceberg table.
Expand All @@ -147,6 +148,7 @@ def create_table(
partition_spec: PartitionSpec for the table.
sort_order: SortOrder for the table.
properties: Table properties that can be a string based dictionary.
fail_if_exists: If True, raise an error if the table already exists.
Returns:
Table: the created table instance.
Expand Down Expand Up @@ -178,7 +180,8 @@ def create_table(
condition_expression=f"attribute_not_exists({DYNAMODB_COL_IDENTIFIER})",
)
except ConditionalCheckFailedException as e:
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e
if fail_if_exists:
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e

return self.load_table(identifier=identifier)

Expand Down
17 changes: 14 additions & 3 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,18 @@ def _convert_glue_to_iceberg(self, glue_table: TableTypeDef) -> Table:
catalog=self,
)

def _create_glue_table(self, database_name: str, table_name: str, table_input: TableInputTypeDef) -> None:
def _create_glue_table(
self,
database_name: str,
table_name: str,
table_input: TableInputTypeDef,
fail_if_exists: bool = True,
) -> None:
try:
self.glue.create_table(DatabaseName=database_name, TableInput=table_input)
except self.glue.exceptions.AlreadyExistsException as e:
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e
if fail_if_exists:
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchNamespaceError(f"Database {database_name} does not exist") from e

Expand Down Expand Up @@ -340,6 +347,7 @@ def create_table(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
fail_if_exists: bool = True,
) -> Table:
"""
Create an Iceberg table.
Expand All @@ -351,6 +359,7 @@ def create_table(
partition_spec: PartitionSpec for the table.
sort_order: SortOrder for the table.
properties: Table properties that can be a string based dictionary.
fail_if_exists: If True, raise an error if the table already exists.
Returns:
Table: the created table instance.
Expand All @@ -374,7 +383,9 @@ def create_table(

table_input = _construct_table_input(table_name, metadata_location, properties, metadata)
database_name, table_name = self.identifier_to_database_and_table(identifier)
self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input)
self._create_glue_table(
database_name=database_name, table_name=table_name, table_input=table_input, fail_if_exists=fail_if_exists
)

return self.load_table(identifier=identifier)

Expand Down
7 changes: 6 additions & 1 deletion pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ def create_table(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
fail_if_exists: bool = True,
) -> Table:
"""Create a table.
Expand All @@ -279,6 +280,7 @@ def create_table(
partition_spec: PartitionSpec for the table.
sort_order: SortOrder for the table.
properties: Table properties that can be a string based dictionary.
fail_if_exists: If True, raise an error if the table already exists.
Returns:
Table: the created table instance.
Expand Down Expand Up @@ -321,7 +323,10 @@ def create_table(
open_client.create_table(tbl)
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
except AlreadyExistsException as e:
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e
if fail_if_exists:
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e
else:
hive_table = self.load_table(identifier)

return self._convert_hive_into_iceberg(hive_table, io)

Expand Down
1 change: 1 addition & 0 deletions pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def create_table(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
fail_if_exists: bool = True,
) -> Table:
raise NotImplementedError

Expand Down
14 changes: 10 additions & 4 deletions pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ def create_table(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
fail_if_exists: bool = True,
) -> Table:
iceberg_schema = self._convert_schema_if_needed(schema)
fresh_schema = assign_fresh_schema_ids(iceberg_schema)
Expand All @@ -468,11 +469,16 @@ def create_table(
)
try:
response.raise_for_status()
table_response = TableResponse(**response.json())
return self._response_to_table(self.identifier_to_tuple(identifier), table_response)
except HTTPError as exc:
self._handle_non_200_response(exc, {409: TableAlreadyExistsError})

table_response = TableResponse(**response.json())
return self._response_to_table(self.identifier_to_tuple(identifier), table_response)
try:
self._handle_non_200_response(exc, {409: TableAlreadyExistsError})
except TableAlreadyExistsError:
if fail_if_exists:
raise
return self.load_table(identifier)
raise

def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
"""Register a new table using existing metadata.
Expand Down
6 changes: 4 additions & 2 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ def create_table(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
fail_if_exists: bool = True,
) -> Table:
"""
Create an Iceberg table.
Expand All @@ -164,6 +165,7 @@ def create_table(
partition_spec: PartitionSpec for the table.
sort_order: SortOrder for the table.
properties: Table properties that can be a string based dictionary.
fail_if_exists: If True, raise an error if the table already exists.
Returns:
Table: the created table instance.
Expand Down Expand Up @@ -200,8 +202,8 @@ def create_table(
)
session.commit()
except IntegrityError as e:
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e

if fail_if_exists:
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e
return self.load_table(identifier=identifier)

def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
Expand Down
26 changes: 22 additions & 4 deletions tests/catalog/integration_test_dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

from typing import Generator, List
from typing import Generator, List, Type

import boto3
import pytest
Expand Down Expand Up @@ -89,11 +89,29 @@ def test_create_table_with_invalid_database(test_catalog: Catalog, table_schema_
test_catalog.create_table(identifier, table_schema_nested)


def test_create_duplicated_table(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None:
@pytest.mark.parametrize(
"fail_if_exists, expected_exception",
[
(True, TableAlreadyExistsError),
(False, None),
],
)
def test_create_duplicated_table(
test_catalog: Catalog,
table_schema_nested: Schema,
database_name: str,
table_name: str,
fail_if_exists: bool,
expected_exception: Type[Exception],
) -> None:
test_catalog.create_namespace(database_name)
test_catalog.create_table((database_name, table_name), table_schema_nested)
with pytest.raises(TableAlreadyExistsError):
test_catalog.create_table((database_name, table_name), table_schema_nested)
if expected_exception:
with pytest.raises(expected_exception):
test_catalog.create_table((database_name, table_name), table_schema_nested, fail_if_exists=fail_if_exists)
else:
table = test_catalog.create_table((database_name, table_name), table_schema_nested, fail_if_exists=fail_if_exists)
assert table.identifier == (test_catalog.name, database_name, table_name)


def test_load_table(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None:
Expand Down
26 changes: 22 additions & 4 deletions tests/catalog/integration_test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

import time
from typing import Any, Dict, Generator, List
from typing import Any, Dict, Generator, List, Type
from uuid import uuid4

import boto3
Expand Down Expand Up @@ -193,11 +193,29 @@ def test_create_table_with_invalid_database(test_catalog: Catalog, table_schema_
test_catalog.create_table(identifier, table_schema_nested)


def test_create_duplicated_table(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None:
@pytest.mark.parametrize(
"fail_if_exists, expected_exception",
[
(True, TableAlreadyExistsError),
(False, None),
],
)
def test_create_duplicated_table(
test_catalog: Catalog,
table_schema_nested: Schema,
table_name: str,
database_name: str,
fail_if_exists: bool,
expected_exception: Type[Exception],
) -> None:
test_catalog.create_namespace(database_name)
test_catalog.create_table((database_name, table_name), table_schema_nested)
with pytest.raises(TableAlreadyExistsError):
test_catalog.create_table((database_name, table_name), table_schema_nested)
if expected_exception:
with pytest.raises(expected_exception):
test_catalog.create_table((database_name, table_name), table_schema_nested, fail_if_exists=fail_if_exists)
else:
table = test_catalog.create_table((database_name, table_name), table_schema_nested, fail_if_exists=fail_if_exists)
assert table.identifier == (CATALOG_NAME, database_name, table_name)


def test_load_table(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None:
Expand Down
6 changes: 5 additions & 1 deletion tests/catalog/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,18 @@ def create_table(
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
fail_if_exists: bool = True,
) -> Table:
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore

identifier = Catalog.identifier_to_tuple(identifier)
namespace = Catalog.namespace_from(identifier)

if identifier in self.__tables:
raise TableAlreadyExistsError(f"Table already exists: {identifier}")
if fail_if_exists:
raise TableAlreadyExistsError(f"Table already exists: {identifier}")
else:
return self.__tables[identifier]
else:
if namespace not in self.__namespaces:
self.__namespaces[namespace] = {}
Expand Down
25 changes: 21 additions & 4 deletions tests/catalog/test_dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import List
from typing import List, Type
from unittest import mock

import boto3
Expand Down Expand Up @@ -164,16 +164,33 @@ def test_create_table_with_no_database(
test_catalog.create_table(identifier=identifier, schema=table_schema_nested)


@pytest.mark.parametrize(
"fail_if_exists, expected_exception",
[
(True, TableAlreadyExistsError),
(False, None),
],
)
@mock_aws
def test_create_duplicated_table(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
_bucket_initialize: None,
moto_endpoint_url: str,
table_schema_nested: Schema,
database_name: str,
table_name: str,
fail_if_exists: bool,
expected_exception: Type[Exception],
) -> None:
identifier = (database_name, table_name)
test_catalog = DynamoDbCatalog("test_ddb_catalog", **{"warehouse": f"s3://{BUCKET_NAME}", "s3.endpoint": moto_endpoint_url})
test_catalog.create_namespace(namespace=database_name)
test_catalog.create_table(identifier, table_schema_nested)
with pytest.raises(TableAlreadyExistsError):
test_catalog.create_table(identifier, table_schema_nested)
if fail_if_exists:
with pytest.raises(expected_exception):
test_catalog.create_table(identifier, table_schema_nested, fail_if_exists=fail_if_exists)
else:
table = test_catalog.create_table(identifier, table_schema_nested, fail_if_exists=fail_if_exists)
assert table.identifier == ("test_ddb_catalog",) + identifier


@mock_aws
Expand Down
37 changes: 32 additions & 5 deletions tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
# pylint: disable=redefined-outer-name,unused-argument
import os
from typing import Any, Dict, cast
from typing import Any, Dict, Type, cast
from unittest import mock

import pytest
Expand Down Expand Up @@ -489,7 +489,20 @@ def test_create_table_200(
assert actual == expected


def test_create_table_409(rest_mock: Mocker, table_schema_simple: Schema) -> None:
@pytest.mark.parametrize(
"fail_if_exists, expected_exception",
[
(True, TableAlreadyExistsError),
(False, None),
],
)
def test_create_table_409(
rest_mock: Mocker,
example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any],
table_schema_simple: Schema,
fail_if_exists: bool,
expected_exception: Type[Exception],
) -> None:
rest_mock.post(
f"{TEST_URI}v1/namespaces/fokko/tables",
json={
Expand All @@ -502,9 +515,15 @@ def test_create_table_409(rest_mock: Mocker, table_schema_simple: Schema) -> Non
status_code=409,
request_headers=TEST_HEADERS,
)
rest_mock.get(
f"{TEST_URI}v1/namespaces/fokko/tables/fokko2",
json=example_table_metadata_with_snapshot_v1_rest_json,
status_code=200,
request_headers=TEST_HEADERS,
)

with pytest.raises(TableAlreadyExistsError) as e:
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).create_table(
def _create_table(_fail_if_exists: bool) -> Table:
return RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).create_table(
identifier=("fokko", "fokko2"),
schema=table_schema_simple,
location=None,
Expand All @@ -513,8 +532,16 @@ def test_create_table_409(rest_mock: Mocker, table_schema_simple: Schema) -> Non
),
sort_order=SortOrder(SortField(source_id=2, transform=IdentityTransform())),
properties={"owner": "fokko"},
fail_if_exists=_fail_if_exists,
)
assert "Table already exists" in str(e.value)

if expected_exception:
with pytest.raises(expected_exception) as e:
_create_table(_fail_if_exists=fail_if_exists)
assert "Table already exists" in str(e.value)
else:
table = _create_table(_fail_if_exists=fail_if_exists)
assert table.identifier == ("rest", "fokko", "fokko2")


def test_register_table_200(
Expand Down
Loading

0 comments on commit 29e6374

Please sign in to comment.