Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dbt): Adding flag to raise an error when a model sync fails #266

Merged
merged 11 commits into from
Mar 11, 2024
30 changes: 27 additions & 3 deletions src/preset_cli/cli/superset/sync/dbt/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from preset_cli.cli.superset.sync.dbt.databases import sync_database
from preset_cli.cli.superset.sync.dbt.datasets import sync_datasets
from preset_cli.cli.superset.sync.dbt.exposures import ModelKey, sync_exposures
from preset_cli.cli.superset.sync.dbt.lib import apply_select
from preset_cli.cli.superset.sync.dbt.lib import apply_select, list_failed_models
from preset_cli.cli.superset.sync.dbt.metrics import (
get_models_from_sql,
get_superset_metrics_per_model,
Expand Down Expand Up @@ -95,6 +95,12 @@
default=False,
help="Update Preset configurations based on dbt metadata. Preset-only metrics are preserved",
)
@click.option(
"--raise-failures",
is_flag=True,
default=False,
help="Ends the execution with an error in case a model failed to be synced",
)
@click.pass_context
def dbt_core( # pylint: disable=too-many-arguments, too-many-branches, too-many-locals ,too-many-statements
ctx: click.core.Context,
Expand All @@ -112,6 +118,7 @@ def dbt_core( # pylint: disable=too-many-arguments, too-many-branches, too-many
preserve_columns: bool = False,
preserve_metadata: bool = False,
merge_metadata: bool = False,
raise_failures: bool = False,
) -> None:
"""
Sync models/metrics from dbt Core to Superset and charts/dashboards to dbt exposures.
Expand Down Expand Up @@ -192,6 +199,8 @@ def dbt_core( # pylint: disable=too-many-arguments, too-many-branches, too-many
models = apply_select(models, select, exclude)
model_map = {ModelKey(model["schema"], model["name"]): model for model in models}

failures: List[str] = []

if exposures_only:
datasets = [
dataset
Expand Down Expand Up @@ -224,7 +233,7 @@ def dbt_core( # pylint: disable=too-many-arguments, too-many-branches, too-many
click.echo("No database was found, pass ``--import-db`` to create")
return

datasets = sync_datasets(
datasets, failures = sync_datasets(
client,
models,
superset_metrics,
Expand All @@ -239,6 +248,9 @@ def dbt_core( # pylint: disable=too-many-arguments, too-many-branches, too-many
exposures = os.path.expanduser(exposures)
sync_exposures(client, Path(exposures), datasets, model_map)

if failures and raise_failures:
list_failed_models(failures)


def get_account_id(client: DBTClient) -> int:
"""
Expand Down Expand Up @@ -432,6 +444,12 @@ def process_sl_metrics(
"--access-url",
help="Custom API URL for dbt Cloud (eg, https://ab123.us1.dbt.com)",
)
@click.option(
"--raise-failures",
is_flag=True,
default=False,
help="Ends the execution with an error in case a model failed to be synced",
)
@click.pass_context
def dbt_cloud( # pylint: disable=too-many-arguments, too-many-locals
ctx: click.core.Context,
Expand All @@ -448,6 +466,7 @@ def dbt_cloud( # pylint: disable=too-many-arguments, too-many-locals
preserve_columns: bool = False,
preserve_metadata: bool = False,
merge_metadata: bool = False,
raise_failures: bool = False,
access_url: Optional[str] = None,
) -> None:
"""
Expand Down Expand Up @@ -501,14 +520,16 @@ def dbt_cloud( # pylint: disable=too-many-arguments, too-many-locals
sl_metrics = process_sl_metrics(dbt_client, job["environment_id"], model_map)
superset_metrics = get_superset_metrics_per_model(og_metrics, sl_metrics)

failures: List[str] = []

if exposures_only:
datasets = [
dataset
for dataset in superset_client.get_datasets()
if ModelKey(dataset["schema"], dataset["table_name"]) in model_map
]
else:
datasets = sync_datasets(
datasets, failures = sync_datasets(
superset_client,
models,
superset_metrics,
Expand All @@ -522,3 +543,6 @@ def dbt_cloud( # pylint: disable=too-many-arguments, too-many-locals
if exposures:
exposures = os.path.expanduser(exposures)
sync_exposures(superset_client, Path(exposures), datasets, model_map)

if failures and raise_failures:
list_failed_models(failures)
37 changes: 26 additions & 11 deletions src/preset_cli/cli/superset/sync/dbt/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import json
import logging
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple

from sqlalchemy.engine import create_engine
from sqlalchemy.engine.url import URL as SQLAlchemyURL
Expand All @@ -16,6 +16,7 @@
from preset_cli.api.clients.dbt import ModelSchema
from preset_cli.api.clients.superset import SupersetClient, SupersetMetricDefinition
from preset_cli.api.operators import OneToMany
from preset_cli.exceptions import SupersetError

DEFAULT_CERTIFICATION = {"details": "This table is produced by dbt"}

Expand Down Expand Up @@ -86,14 +87,15 @@ def sync_datasets( # pylint: disable=too-many-locals, too-many-branches, too-ma
certification: Optional[Dict[str, Any]] = None,
reload_columns: bool = True,
merge_metadata: bool = False,
) -> List[Any]:
) -> Tuple[List[Any], List[str]]:
"""
Read the dbt manifest and import models as datasets with metrics.
"""
base_url = URL(external_url_prefix) if external_url_prefix else None

# add datasets
datasets = []
failed_datasets = []
for model in models:
# load additional metadata from dbt model definition
model_kwargs = model.get("meta", {}).pop("superset", {})
Expand Down Expand Up @@ -123,6 +125,7 @@ def sync_datasets( # pylint: disable=too-many-locals, too-many-branches, too-ma
dataset = create_dataset(client, database, model)
except Exception: # pylint: disable=broad-except
_logger.exception("Unable to create dataset")
failed_datasets.append(model["unique_id"])
continue

extra = {
Expand Down Expand Up @@ -174,13 +177,23 @@ def sync_datasets( # pylint: disable=too-many-locals, too-many-branches, too-ma
if base_url:
fragment = "!/model/{unique_id}".format(**model)
update["external_url"] = str(base_url.with_fragment(fragment))
client.update_dataset(dataset["id"], override_columns=reload_columns, **update)
try:
client.update_dataset(
dataset["id"], override_columns=reload_columns, **update
)
except SupersetError:
failed_datasets.append(model["unique_id"])
continue

if reload_columns and dataset_metrics:
update = {
"metrics": dataset_metrics,
}
client.update_dataset(dataset["id"], override_columns=False, **update)
try:
client.update_dataset(dataset["id"], override_columns=False, **update)
except SupersetError:
failed_datasets.append(model["unique_id"])
continue

# update column descriptions
if columns := model.get("columns"):
Expand All @@ -199,13 +212,15 @@ def sync_datasets( # pylint: disable=too-many-locals, too-many-branches, too-ma
# https://github.com/preset-io/backend-sdk/issues/163
if "is_active" in column and column["is_active"] is None:
del column["is_active"]

client.update_dataset(
dataset["id"],
override_columns=reload_columns,
columns=current_columns,
)
try:
client.update_dataset(
dataset["id"],
override_columns=reload_columns,
columns=current_columns,
)
except SupersetError:
failed_datasets.append(model["unique_id"])

datasets.append(dataset)

return datasets
return datasets, failed_datasets
19 changes: 19 additions & 0 deletions src/preset_cli/cli/superset/sync/dbt/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
import logging
import os
import re
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, TypedDict, Union

import click
import yaml
from jinja2 import Environment
from sqlalchemy.engine.url import URL
Expand Down Expand Up @@ -459,3 +461,20 @@ def apply_select(
del selected[id_]

return list(selected.values())


def list_failed_models(failed_models: List[str]) -> None:
"""
List models that failed to sync and ends execution with an error code
"""
error_message = "Below model(s) failed to sync:"
Vitor-Avila marked this conversation as resolved.
Show resolved Hide resolved
for failed_model in failed_models:
error_message += f"\n - {failed_model}"

click.echo(
click.style(
error_message,
fg="bright_red",
),
)
sys.exit(1)
Loading
Loading