diff --git a/src/preset_cli/cli/superset/sync/dbt/datasets.py b/src/preset_cli/cli/superset/sync/dbt/datasets.py index a0622f2..ec41d6d 100644 --- a/src/preset_cli/cli/superset/sync/dbt/datasets.py +++ b/src/preset_cli/cli/superset/sync/dbt/datasets.py @@ -70,6 +70,19 @@ def create_dataset( Virtual datasets are created when the table database is different from the main database, for systems that support cross-database queries (Trino, BigQuery, etc.) """ + kwargs = { + "database": database["id"], + "catalog": model["database"], + "schema": model["schema"], + "table_name": model.get("alias") or model["name"], + } + try: + # try to create dataset with catalog + return client.create_dataset(**kwargs) + except SupersetError as ex: + if not no_catalog_support(ex): + raise ex + url = make_url(database["sqlalchemy_uri"]) if model_in_database(model, url): kwargs = { @@ -91,6 +104,32 @@ def create_dataset( return client.create_dataset(**kwargs) +def no_catalog_support(ex: SupersetError) -> bool: + """ + Return if the error is due to a lack of catalog support. + + The errors payload looks like this: + + [ + { + "message": json.dumps({"message": {"catalog": ["Unknown field."]}}), + "error_type": "UNKNOWN_ERROR", + "level": ErrorLevel.ERROR, + }, + ] + + """ + for error in ex.errors: + try: + message = json.loads(error["message"]) + if "Unknown field." in message["message"]["catalog"]: + return True + except Exception: # pylint: disable=broad-except + pass + + return False + + def get_or_create_dataset( client: SupersetClient, model: ModelSchema, diff --git a/tests/cli/superset/sync/dbt/datasets_test.py b/tests/cli/superset/sync/dbt/datasets_test.py index d49beb0..f29dd0d 100644 --- a/tests/cli/superset/sync/dbt/datasets_test.py +++ b/tests/cli/superset/sync/dbt/datasets_test.py @@ -1,7 +1,7 @@ """ Tests for ``preset_cli.cli.superset.sync.dbt.datasets``. """ -# pylint: disable=invalid-name, too-many-lines +# pylint: disable=invalid-name, too-many-lines, redefined-outer-name import json from typing import Any, Dict, List, cast @@ -25,9 +25,16 @@ get_certification_info, get_or_create_dataset, model_in_database, + no_catalog_support, sync_datasets, ) -from preset_cli.exceptions import CLIError, ErrorLevel, ErrorPayload, SupersetError +from preset_cli.exceptions import ( + CLIError, + DatabaseNotFoundError, + ErrorLevel, + ErrorPayload, + SupersetError, +) metric_schema = MetricSchema() @@ -147,6 +154,7 @@ def test_sync_datasets_new(mocker: MockerFixture) -> None: ) client.create_dataset.assert_called_with( database=1, + catalog="examples_dev", schema="public", table_name="messages_channels", ) @@ -396,7 +404,12 @@ def test_sync_datasets_new_bq_error(mocker: MockerFixture) -> None: ) client.create_dataset.assert_has_calls( [ - mock.call(database=1, schema="public", table_name="messages_channels"), + mock.call( + database=1, + catalog="examples_dev", + schema="public", + table_name="messages_channels", + ), ], ) client.update_dataset.assert_has_calls([]) @@ -741,30 +754,59 @@ def test_sync_datasets_no_columns(mocker: MockerFixture) -> None: ) -def test_create_dataset_physical(mocker: MockerFixture) -> None: +@pytest.fixture() +def no_catalog_support_client(mocker: MockerFixture) -> Any: """ - Test ``create_dataset`` for physical datasets. + Fixture to return a mocked client with no catalog support. """ client = mocker.MagicMock() + client.create_dataset.side_effect = [ + SupersetError( + errors=[ + { + "message": json.dumps({"message": {"catalog": ["Unknown field."]}}), + "error_type": "UNKNOWN_ERROR", + "level": ErrorLevel.ERROR, + }, + ], + ), + None, + ] + mocker.patch( + "preset_cli.cli.superset.sync.dbt.datasets.no_catalog_support", + return_value=True, + ) + return client + +def test_create_dataset_physical_no_catalog( + no_catalog_support_client: MockerFixture, +) -> None: + """ + Test ``create_dataset`` for physical datasets. + """ create_dataset( - client, + no_catalog_support_client, { "id": 1, + "catalog": "examples_dev", "schema": "public", "name": "Database", "sqlalchemy_uri": "postgresql://user@host/examples_dev", }, models[0], ) - client.create_dataset.assert_called_with( + no_catalog_support_client.create_dataset.assert_called_with( database=1, schema="public", table_name="messages_channels", ) -def test_create_dataset_virtual(mocker: MockerFixture) -> None: +def test_create_dataset_virtual( + mocker: MockerFixture, + no_catalog_support_client: MockerFixture, +) -> None: """ Test ``create_dataset`` for virtual datasets. """ @@ -772,10 +814,9 @@ def test_create_dataset_virtual(mocker: MockerFixture) -> None: "preset_cli.cli.superset.sync.dbt.lib.create_engine", ) create_engine().dialect.identifier_preparer.quote = lambda token: token - client = mocker.MagicMock() create_dataset( - client, + no_catalog_support_client, { "id": 1, "schema": "public", @@ -784,7 +825,7 @@ def test_create_dataset_virtual(mocker: MockerFixture) -> None: }, models[0], ) - client.create_dataset.assert_called_with( + no_catalog_support_client.create_dataset.assert_called_with( database=1, schema="public", table_name="messages_channels", @@ -793,18 +834,15 @@ def test_create_dataset_virtual(mocker: MockerFixture) -> None: def test_create_dataset_virtual_missing_dependency( - capsys: pytest.CaptureFixture[str], - mocker: MockerFixture, + no_catalog_support_client: MockerFixture, ) -> None: """ Test ``create_dataset`` for virtual datasets when the DB connection requires an additional package. """ - client = mocker.MagicMock() - with pytest.raises(NotImplementedError): create_dataset( - client, + no_catalog_support_client, { "id": 1, "schema": "public", @@ -814,9 +852,18 @@ def test_create_dataset_virtual_missing_dependency( models[0], ) + +def test_create_dataset_virtual_missing_dependency_snowflake( + capsys: pytest.CaptureFixture[str], + no_catalog_support_client: MockerFixture, +) -> None: + """ + Test ``create_dataset`` for virtual datasets when the DB connection requires + an additional package. + """ with pytest.raises(SystemExit) as excinfo: create_dataset( - client, + no_catalog_support_client, { "id": 1, "schema": "public", @@ -1442,3 +1489,59 @@ def test_clean_metadata() -> None: "to_be_kept": "To Be Kept", "yeah": "sure", } + + +def test_no_catalog_support() -> None: + """ + Test the ``no_catalog_support`` helper. + """ + assert no_catalog_support(DatabaseNotFoundError()) is False + assert ( + no_catalog_support( + SupersetError( + errors=[ + { + "message": json.dumps({"message": "Error"}), + "error_type": "UNKNOWN_ERROR", + "level": ErrorLevel.ERROR, + }, + { + "message": json.dumps( + {"message": {"catalog": ["Cannot contain spaces."]}}, + ), + "error_type": "UNKNOWN_ERROR", + "level": ErrorLevel.ERROR, + }, + { + "message": json.dumps( + {"message": {"catalog": ["Unknown field."]}}, + ), + "error_type": "UNKNOWN_ERROR", + "level": ErrorLevel.ERROR, + }, + ], + ), + ) + is True + ) + + +def test_create_dataset_error(mocker: MockerFixture) -> None: + """ + Test that ``create_dataset`` surfaces errors. + """ + client = mocker.MagicMock() + client.create_dataset.side_effect = DatabaseNotFoundError() + + with pytest.raises(DatabaseNotFoundError): + create_dataset( + client, + { + "id": 1, + "catalog": "examples_dev", + "schema": "public", + "name": "Database", + "sqlalchemy_uri": "postgresql://user@host/examples_dev", + }, + models[0], + )