Skip to content

Commit

Permalink
feat(dbt): Support defining default values for columns and metrics (#325
Browse files Browse the repository at this point in the history
)

* feat(dbt): Support defining default values for columns and metrics
* Removing unnecessary test
* Fixing logic
  • Loading branch information
Vitor-Avila authored Nov 7, 2024
1 parent 65b0dd9 commit 4827024
Show file tree
Hide file tree
Showing 2 changed files with 348 additions and 2 deletions.
28 changes: 26 additions & 2 deletions src/preset_cli/cli/superset/sync/dbt/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
"""

# pylint: disable=consider-using-f-string
from __future__ import annotations

import copy
import json
import logging
from typing import Any, Dict, List, Optional, Tuple
Expand All @@ -17,7 +19,7 @@
from preset_cli.api.operators import OneToMany
from preset_cli.cli.superset.sync.dbt.lib import create_engine_with_check
from preset_cli.exceptions import CLIError, SupersetError
from preset_cli.lib import raise_cli_errors
from preset_cli.lib import dict_merge, raise_cli_errors

DEFAULT_CERTIFICATION = {"details": "This table is produced by dbt"}

Expand Down Expand Up @@ -171,6 +173,7 @@ def compute_metrics(
dbt_metrics: List[Any],
reload_columns: bool,
merge_metadata: bool,
metric_defaults: Dict[str, Any] | None = None,
) -> List[Any]:
"""
Compute the final list of metrics that should be used to update the dataset
Expand All @@ -187,16 +190,23 @@ def compute_metrics(

for name, metric_definition in model_metrics.items():
if reload_columns or merge_metadata or name not in current_dataset_metrics:
final_metric = {}
if name in current_dataset_metrics:
metric_definition["id"] = current_dataset_metrics[name]["id"]
final_dataset_metrics.append(metric_definition)
if metric_defaults:
final_metric = copy.deepcopy(metric_defaults)
dict_merge(final_metric, metric_definition)
final_dataset_metrics.append(final_metric or metric_definition)

# Preserving Superset metadata
if not reload_columns:
for name, metric in current_dataset_metrics.items():
if not merge_metadata or name not in model_metrics:
# remove data that is not part of the update payload
metric = clean_metadata(metric)
if merge_metadata and metric_defaults:
final_metric = copy.deepcopy(metric_defaults)
dict_merge(metric, final_metric)
final_dataset_metrics.append(metric)

return final_dataset_metrics
Expand Down Expand Up @@ -233,6 +243,7 @@ def compute_columns_metadata(
dataset_columns: List[Any],
reload_columns: bool,
merge_metadata: bool,
column_defaults: Dict[str, Any] | None = None,
) -> List[Any]:
"""
Adds dbt metadata to dataset columns.
Expand All @@ -251,13 +262,20 @@ def compute_columns_metadata(
dbt_metadata[column]["verbose_name"] = column
for key, value in definition.pop("meta", {}).get("superset", {}).items():
dbt_metadata[column][key] = value
if column_defaults:
final_column = copy.deepcopy(column_defaults)
dict_merge(final_column, dbt_metadata[column])
dbt_metadata[column] = final_column

for column in dataset_columns:
name = column["column_name"]
if name in dbt_metadata:
for key, value in dbt_metadata[name].items():
if reload_columns or merge_metadata or not column.get(key):
column[key] = value
elif column_defaults and (reload_columns or merge_metadata):
for key, value in column_defaults.items():
column[key] = value

# remove data that is not part of the update payload
column = clean_metadata(column)
Expand Down Expand Up @@ -338,12 +356,17 @@ def sync_datasets( # pylint: disable=too-many-locals, too-many-arguments
failed_datasets.append(model["unique_id"])
continue

default_configs = (
model.get("meta", {}).get("superset", {}).pop("default_configs", {})
)

# compute metrics
final_dataset_metrics = compute_metrics(
dataset["metrics"],
metrics.get(model["unique_id"], []),
reload_columns,
merge_metadata,
metric_defaults=default_configs.get("metrics", {}),
)

# compute columns
Expand Down Expand Up @@ -387,6 +410,7 @@ def sync_datasets( # pylint: disable=too-many-locals, too-many-arguments
current_dataset_columns,
reload_columns,
merge_metadata,
column_defaults=default_configs.get("columns", {}),
)
try:
client.update_dataset(
Expand Down
Loading

0 comments on commit 4827024

Please sign in to comment.