Skip to content

Commit

Permalink
feat(dbt): use catalog when possible
Browse files Browse the repository at this point in the history
  • Loading branch information
betodealmeida committed May 16, 2024
1 parent 7a0e761 commit c75ab5e
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 17 deletions.
39 changes: 39 additions & 0 deletions src/preset_cli/cli/superset/sync/dbt/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ def create_dataset(
Virtual datasets are created when the table database is different from the main
database, for systems that support cross-database queries (Trino, BigQuery, etc.)
"""
kwargs = {
"database": database["id"],
"catalog": model["database"],
"schema": model["schema"],
"table_name": model.get("alias") or model["name"],
}
try:
# try to create dataset with catalog
return client.create_dataset(**kwargs)
except SupersetError as ex:
if not no_catalog_support(ex):
raise ex

url = make_url(database["sqlalchemy_uri"])
if model_in_database(model, url):
kwargs = {
Expand All @@ -91,6 +104,32 @@ def create_dataset(
return client.create_dataset(**kwargs)


def no_catalog_support(ex: SupersetError) -> bool:
"""
Return if the error is due to a lack of catalog support.
The errors payload looks like this:
[
{
"message": json.dumps({"message": {"catalog": ["Unknown field."]}}),
"error_type": "UNKNOWN_ERROR",
"level": ErrorLevel.ERROR,
},
]
"""
for error in ex.errors:
try:
message = json.loads(error["message"])
if "Unknown field." in message["message"]["catalog"]:
return True
except Exception: # pylint: disable=broad-except
pass

return False


def get_or_create_dataset(
client: SupersetClient,
model: ModelSchema,
Expand Down
137 changes: 120 additions & 17 deletions tests/cli/superset/sync/dbt/datasets_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Tests for ``preset_cli.cli.superset.sync.dbt.datasets``.
"""
# pylint: disable=invalid-name, too-many-lines
# pylint: disable=invalid-name, too-many-lines, redefined-outer-name

import json
from typing import Any, Dict, List, cast
Expand All @@ -25,9 +25,16 @@
get_certification_info,
get_or_create_dataset,
model_in_database,
no_catalog_support,
sync_datasets,
)
from preset_cli.exceptions import CLIError, ErrorLevel, ErrorPayload, SupersetError
from preset_cli.exceptions import (
CLIError,
DatabaseNotFoundError,
ErrorLevel,
ErrorPayload,
SupersetError,
)

metric_schema = MetricSchema()

Expand Down Expand Up @@ -147,6 +154,7 @@ def test_sync_datasets_new(mocker: MockerFixture) -> None:
)
client.create_dataset.assert_called_with(
database=1,
catalog="examples_dev",
schema="public",
table_name="messages_channels",
)
Expand Down Expand Up @@ -396,7 +404,12 @@ def test_sync_datasets_new_bq_error(mocker: MockerFixture) -> None:
)
client.create_dataset.assert_has_calls(
[
mock.call(database=1, schema="public", table_name="messages_channels"),
mock.call(
database=1,
catalog="examples_dev",
schema="public",
table_name="messages_channels",
),
],
)
client.update_dataset.assert_has_calls([])
Expand Down Expand Up @@ -741,41 +754,69 @@ def test_sync_datasets_no_columns(mocker: MockerFixture) -> None:
)


def test_create_dataset_physical(mocker: MockerFixture) -> None:
@pytest.fixture()
def no_catalog_support_client(mocker: MockerFixture) -> Any:
"""
Test ``create_dataset`` for physical datasets.
Fixture to return a mocked client with no catalog support.
"""
client = mocker.MagicMock()
client.create_dataset.side_effect = [
SupersetError(
errors=[
{
"message": json.dumps({"message": {"catalog": ["Unknown field."]}}),
"error_type": "UNKNOWN_ERROR",
"level": ErrorLevel.ERROR,
},
],
),
None,
]
mocker.patch(
"preset_cli.cli.superset.sync.dbt.datasets.no_catalog_support",
return_value=True,
)
return client


def test_create_dataset_physical_no_catalog(
no_catalog_support_client: MockerFixture,
) -> None:
"""
Test ``create_dataset`` for physical datasets.
"""
create_dataset(
client,
no_catalog_support_client,
{
"id": 1,
"catalog": "examples_dev",
"schema": "public",
"name": "Database",
"sqlalchemy_uri": "postgresql://user@host/examples_dev",
},
models[0],
)
client.create_dataset.assert_called_with(
no_catalog_support_client.create_dataset.assert_called_with(
database=1,
schema="public",
table_name="messages_channels",
)


def test_create_dataset_virtual(mocker: MockerFixture) -> None:
def test_create_dataset_virtual(
mocker: MockerFixture,
no_catalog_support_client: MockerFixture,
) -> None:
"""
Test ``create_dataset`` for virtual datasets.
"""
create_engine = mocker.patch(
"preset_cli.cli.superset.sync.dbt.lib.create_engine",
)
create_engine().dialect.identifier_preparer.quote = lambda token: token
client = mocker.MagicMock()

create_dataset(
client,
no_catalog_support_client,
{
"id": 1,
"schema": "public",
Expand All @@ -784,7 +825,7 @@ def test_create_dataset_virtual(mocker: MockerFixture) -> None:
},
models[0],
)
client.create_dataset.assert_called_with(
no_catalog_support_client.create_dataset.assert_called_with(
database=1,
schema="public",
table_name="messages_channels",
Expand All @@ -793,18 +834,15 @@ def test_create_dataset_virtual(mocker: MockerFixture) -> None:


def test_create_dataset_virtual_missing_dependency(
capsys: pytest.CaptureFixture[str],
mocker: MockerFixture,
no_catalog_support_client: MockerFixture,
) -> None:
"""
Test ``create_dataset`` for virtual datasets when the DB connection requires
an additional package.
"""
client = mocker.MagicMock()

with pytest.raises(NotImplementedError):
create_dataset(
client,
no_catalog_support_client,
{
"id": 1,
"schema": "public",
Expand All @@ -814,9 +852,18 @@ def test_create_dataset_virtual_missing_dependency(
models[0],
)


def test_create_dataset_virtual_missing_dependency_snowflake(
capsys: pytest.CaptureFixture[str],
no_catalog_support_client: MockerFixture,
) -> None:
"""
Test ``create_dataset`` for virtual datasets when the DB connection requires
an additional package.
"""
with pytest.raises(SystemExit) as excinfo:
create_dataset(
client,
no_catalog_support_client,
{
"id": 1,
"schema": "public",
Expand Down Expand Up @@ -1442,3 +1489,59 @@ def test_clean_metadata() -> None:
"to_be_kept": "To Be Kept",
"yeah": "sure",
}


def test_no_catalog_support() -> None:
"""
Test the ``no_catalog_support`` helper.
"""
assert no_catalog_support(DatabaseNotFoundError()) is False
assert (
no_catalog_support(
SupersetError(
errors=[
{
"message": json.dumps({"message": "Error"}),
"error_type": "UNKNOWN_ERROR",
"level": ErrorLevel.ERROR,
},
{
"message": json.dumps(
{"message": {"catalog": ["Cannot contain spaces."]}},
),
"error_type": "UNKNOWN_ERROR",
"level": ErrorLevel.ERROR,
},
{
"message": json.dumps(
{"message": {"catalog": ["Unknown field."]}},
),
"error_type": "UNKNOWN_ERROR",
"level": ErrorLevel.ERROR,
},
],
),
)
is True
)


def test_create_dataset_error(mocker: MockerFixture) -> None:
"""
Test that ``create_dataset`` surfaces errors.
"""
client = mocker.MagicMock()
client.create_dataset.side_effect = DatabaseNotFoundError()

with pytest.raises(DatabaseNotFoundError):
create_dataset(
client,
{
"id": 1,
"catalog": "examples_dev",
"schema": "public",
"name": "Database",
"sqlalchemy_uri": "postgresql://user@host/examples_dev",
},
models[0],
)

0 comments on commit c75ab5e

Please sign in to comment.