Skip to content

Commit

Permalink
Merge pull request #238 from preset-io/dbt_preserve_metrics
Browse files Browse the repository at this point in the history
feat(dbt sync): Merge metadata and preserve Preset configs
  • Loading branch information
Vitor-Avila authored Aug 31, 2023
2 parents 2d6107f + 9d3d8d6 commit 96f087c
Show file tree
Hide file tree
Showing 11 changed files with 600 additions and 50 deletions.
11 changes: 3 additions & 8 deletions src/preset_cli/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def preset_cli( # pylint: disable=too-many-branches, too-many-locals, too-many-
click.echo(
click.style(
"Failed to auth using the provided credentials."
" Please run 'preset-cli auth'",
" Please run ``preset-cli auth``",
fg="bright_red",
),
)
Expand Down Expand Up @@ -255,7 +255,7 @@ def auth(baseurl: str, overwrite: bool = False, show: bool = False) -> None:
click.style(
(
f"The file {credentials_path} already exists. "
"Pass --overwrite to replace it."
"Pass ``--overwrite`` to replace it."
),
fg="bright_red",
),
Expand Down Expand Up @@ -353,14 +353,13 @@ def list_group_membership(
if save_report and save_report.casefold() not in {"yaml", "csv"}:
click.echo(
click.style(
"Invalid option. Please use --save-report=csv or --save-report=yaml",
"Invalid option. Please use ``--save-report=csv`` or ``--save-report=yaml``",
fg="bright_red",
),
)
sys.exit(1)

for team in teams:

# print the team name in case multiple teams were provided and it's not an export
if not save_report and len(teams) > 1:
click.echo(f"## Team {team} ##")
Expand All @@ -371,12 +370,10 @@ def list_group_membership(

# account for pagination
while start_at <= group_count:

groups = client.get_group_membership(team, start_at)
group_count = groups["totalResults"]

if group_count > 0:

# print groups in console
if not save_report:
print_group_membership(groups)
Expand Down Expand Up @@ -430,10 +427,8 @@ def export_group_membership_csv(groups: Dict[str, Any], team: str) -> None:
"""
csv_name = team + "_user_group_membership.csv"
for group in groups["Resources"]:

# CSV report would include a group only in case it has members
if group.get("members"):

# Assure we just write headers once
file_exists = os.path.isfile(csv_name)

Expand Down
2 changes: 1 addition & 1 deletion src/preset_cli/cli/superset/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def export_resource( # pylint: disable=too-many-arguments, too-many-locals
target = root / file_name
if target.exists() and not overwrite:
raise Exception(
f"File already exists and --overwrite was not specified: {target}",
f"File already exists and ``--overwrite`` was not specified: {target}",
)
if not target.parent.exists():
target.parent.mkdir(parents=True, exist_ok=True)
Expand Down
70 changes: 63 additions & 7 deletions src/preset_cli/cli/superset/sync/dbt/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,22 @@
"--preserve-columns",
is_flag=True,
default=False,
help="Preserve column configurations",
help="Preserve column and metric configurations defined in Preset",
)
@click.option(
"--preserve-metadata",
is_flag=True,
default=False,
help="Preserve column and metric configurations defined in Preset",
)
@click.option(
"--merge-metadata",
is_flag=True,
default=False,
help="Update Preset configurations based on dbt metadata. Preset-only metrics are preserved",
)
@click.pass_context
def dbt_core( # pylint: disable=too-many-arguments, too-many-locals
def dbt_core( # pylint: disable=too-many-arguments, too-many-branches, too-many-locals ,too-many-statements
ctx: click.core.Context,
file: str,
project: Optional[str],
Expand All @@ -88,6 +100,8 @@ def dbt_core( # pylint: disable=too-many-arguments, too-many-locals
external_url_prefix: str = "",
exposures_only: bool = False,
preserve_columns: bool = False,
preserve_metadata: bool = False,
merge_metadata: bool = False,
) -> None:
"""
Sync models/metrics from dbt Core to Superset and charts/dashboards to dbt exposures.
Expand All @@ -96,7 +110,20 @@ def dbt_core( # pylint: disable=too-many-arguments, too-many-locals
url = URL(ctx.obj["INSTANCE"])
client = SupersetClient(url, auth)

reload_columns = not preserve_columns
if (preserve_columns or preserve_metadata) and merge_metadata:
click.echo(
click.style(
"""
``--preserve-columns`` / ``--preserve-metadata`` and ``--merge-metadata``
can't be combined. Please include only one to the command.
""",
fg="bright_red",
),
)
sys.exit(1)

reload_columns = not (preserve_columns or preserve_metadata or merge_metadata)
preserve_metadata = preserve_columns if preserve_columns else preserve_metadata

if profiles is None:
profiles = os.path.expanduser("~/.dbt/profiles.yml")
Expand Down Expand Up @@ -134,7 +161,7 @@ def dbt_core( # pylint: disable=too-many-arguments, too-many-locals
else:
click.echo(
click.style(
"FILE should be either manifest.json or dbt_project.yml",
"FILE should be either ``manifest.json`` or ``dbt_project.yml``",
fg="bright_red",
),
)
Expand Down Expand Up @@ -185,7 +212,7 @@ def dbt_core( # pylint: disable=too-many-arguments, too-many-locals
external_url_prefix,
)
except DatabaseNotFoundError:
click.echo("No database was found, pass --import-db to create")
click.echo("No database was found, pass ``--import-db`` to create")
return

datasets = sync_datasets(
Expand All @@ -196,6 +223,7 @@ def dbt_core( # pylint: disable=too-many-arguments, too-many-locals
disallow_edits,
external_url_prefix,
reload_columns=reload_columns,
merge_metadata=merge_metadata,
)

if exposures:
Expand Down Expand Up @@ -325,7 +353,19 @@ def get_job_id(
"--preserve-columns",
is_flag=True,
default=False,
help="Preserve column configurations",
help="Preserve column and metric configurations defined in Preset",
)
@click.option(
"--preserve-metadata",
is_flag=True,
default=False,
help="Preserve column and metric configurations defined in Preset",
)
@click.option(
"--merge-metadata",
is_flag=True,
default=False,
help="Update Preset configurations based on dbt metadata. Preset-only metrics are preserved",
)
@click.pass_context
def dbt_cloud( # pylint: disable=too-many-arguments, too-many-locals
Expand All @@ -339,6 +379,8 @@ def dbt_cloud( # pylint: disable=too-many-arguments, too-many-locals
external_url_prefix: str = "",
exposures_only: bool = False,
preserve_columns: bool = False,
preserve_metadata: bool = False,
merge_metadata: bool = False,
) -> None:
"""
Sync models/metrics from dbt Cloud to Superset.
Expand All @@ -350,7 +392,20 @@ def dbt_cloud( # pylint: disable=too-many-arguments, too-many-locals
dbt_auth = TokenAuth(token)
dbt_client = DBTClient(dbt_auth)

reload_columns = not preserve_columns
if (preserve_columns or preserve_metadata) and merge_metadata:
click.echo(
click.style(
"""
``--preserve-columns`` / ``--preserve-metadata`` and ``--merge-metadata``
can't be combined. Please include only one to the command.
""",
fg="bright_red",
),
)
sys.exit(1)

reload_columns = not (preserve_columns or preserve_metadata or merge_metadata)
preserve_metadata = preserve_columns if preserve_columns else preserve_metadata

if job_id is None:
job_id = get_job_id(dbt_client)
Expand Down Expand Up @@ -390,6 +445,7 @@ def dbt_cloud( # pylint: disable=too-many-arguments, too-many-locals
disallow_edits,
external_url_prefix,
reload_columns=reload_columns,
merge_metadata=merge_metadata,
)

if exposures:
Expand Down
51 changes: 38 additions & 13 deletions src/preset_cli/cli/superset/sync/dbt/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ def model_in_database(model: ModelSchema, url: SQLAlchemyURL) -> bool:
return model["database"] == url.database


def clean_metadata(metadata: Dict[str, Any]) -> Dict[str, Any]:
"""
Remove incompatbile columns from metatada.
When updating an existing column/metric we need to remove some fields from the payload.
"""
for key in ("changed_on", "created_on", "type_generic"):
if key in metadata:
del metadata[key]

return metadata


def create_dataset(
client: SupersetClient,
database: Dict[str, Any],
Expand Down Expand Up @@ -68,7 +80,7 @@ def create_dataset(
return client.create_dataset(**kwargs)


def sync_datasets( # pylint: disable=too-many-locals, too-many-branches, too-many-arguments, too-many-statements
def sync_datasets( # pylint: disable=too-many-locals, too-many-branches, too-many-arguments, too-many-statements # noqa:C901
client: SupersetClient,
models: List[ModelSchema],
metrics: List[MetricSchema],
Expand All @@ -77,6 +89,7 @@ def sync_datasets( # pylint: disable=too-many-locals, too-many-branches, too-ma
external_url_prefix: str,
certification: Optional[Dict[str, Any]] = None,
reload_columns: bool = True,
merge_metadata: bool = False,
) -> List[Any]:
"""
Read the dbt manifest and import models as datasets with metrics.
Expand All @@ -86,7 +99,6 @@ def sync_datasets( # pylint: disable=too-many-locals, too-many-branches, too-ma
# add datasets
datasets = []
for model in models:

# load additional metadata from dbt model definition
model_kwargs = model.get("meta", {}).pop("superset", {})

Expand Down Expand Up @@ -132,14 +144,28 @@ def sync_datasets( # pylint: disable=too-many-locals, too-many-branches, too-ma
}

dataset_metrics = []
current_metrics = {}
model_metrics = {
metric["name"]: metric for metric in get_metrics_for_model(model, metrics)
}

if not reload_columns:
current_metrics = {
metric["metric_name"]: metric
for metric in client.get_dataset(dataset["id"])["metrics"]
}
for name, metric in current_metrics.items():
# remove data that is not part of the update payload
metric = clean_metadata(metric)
if not merge_metadata or name not in model_metrics:
dataset_metrics.append(metric)

for name, metric in model_metrics.items():
meta = metric.get("meta", {})
kwargs = meta.pop("superset", {})
dataset_metrics.append(
{

if reload_columns or name not in current_metrics or merge_metadata:
metric_definition = {
"expression": get_metric_expression(name, model_metrics),
"metric_name": name,
"metric_type": (
Expand All @@ -150,24 +176,25 @@ def sync_datasets( # pylint: disable=too-many-locals, too-many-branches, too-ma
"description": metric.get("description", ""),
"extra": json.dumps(meta),
**kwargs, # include additional metric metadata defined in metric.meta.superset
},
)
}
if merge_metadata and name in current_metrics:
metric_definition["id"] = current_metrics[name]["id"]
dataset_metrics.append(metric_definition)

# update dataset metadata from dbt and clearing metrics
update = {
"description": model.get("description", ""),
"extra": json.dumps(extra),
"is_managed_externally": disallow_edits,
"metrics": [],
"metrics": [] if reload_columns else dataset_metrics,
**model_kwargs, # include additional model metadata defined in model.meta.superset
}
if base_url:
fragment = "!/model/{unique_id}".format(**model)
update["external_url"] = str(base_url.with_fragment(fragment))
client.update_dataset(dataset["id"], override_columns=reload_columns, **update)

# ...then update metrics
if dataset_metrics:
if reload_columns and dataset_metrics:
update = {
"metrics": dataset_metrics,
}
Expand All @@ -183,10 +210,8 @@ def sync_datasets( # pylint: disable=too-many-locals, too-many-branches, too-ma
column["description"] = column_metadata[name].get("description", "")
column["verbose_name"] = column_metadata[name].get("name", "")

# remove columns that are not part of the update payload
for key in ("changed_on", "created_on", "type_generic"):
if key in column:
del column[key]
# remove data that is not part of the update payload
column = clean_metadata(column)

# for some reason this is being sent as null sometimes
# https://github.com/preset-io/backend-sdk/issues/163
Expand Down
2 changes: 1 addition & 1 deletion src/preset_cli/cli/superset/sync/native/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ def import_resources(
click.echo(
click.style(
(
"The following file(s) already exist. Pass --overwrite to "
"The following file(s) already exist. Pass ``--overwrite`` to "
f"replace them.\n{existing_list}"
),
fg="bright_red",
Expand Down
6 changes: 3 additions & 3 deletions tests/cli/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1204,7 +1204,7 @@ def test_list_group_membership_group_with_no_members(mocker: MockerFixture) -> N

def test_list_group_membership_incorrect_export(mocker: MockerFixture) -> None:
"""
Test the ``list_group_membership`` command with an incorrect --export-report parameter.
Test the ``list_group_membership`` command with an incorrect ``--export-report`` parameter.
"""
PresetClient = mocker.patch("preset_cli.cli.main.PresetClient")
mocker.patch("preset_cli.cli.main.input", side_effect=["invalid", "-"])
Expand All @@ -1229,7 +1229,7 @@ def test_list_group_membership_incorrect_export(mocker: MockerFixture) -> None:

def test_list_group_membership_export_yaml(mocker: MockerFixture) -> None:
"""
Test the ``list_group_membership`` command setting --export-report=yaml.
Test the ``list_group_membership`` command setting ``--export-report=yaml``.
"""
PresetClient = mocker.patch("preset_cli.cli.main.PresetClient")
mocker.patch("preset_cli.cli.main.input", side_effect=["invalid", "-"])
Expand Down Expand Up @@ -1315,7 +1315,7 @@ def test_list_group_membership_export_yaml(mocker: MockerFixture) -> None:

def test_list_group_membership_export_csv(mocker: MockerFixture) -> None:
"""
Test the ``list_group_membership`` setting --export-report=csv.
Test the ``list_group_membership`` setting ``--export-report=csv``.
"""
PresetClient = mocker.patch("preset_cli.cli.main.PresetClient")
mocker.patch("preset_cli.cli.main.input", side_effect=["invalid", "-"])
Expand Down
6 changes: 3 additions & 3 deletions tests/cli/superset/export_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def test_export_resource_overwrite(
disable_jinja_escaping=False,
)
assert str(excinfo.value) == (
"File already exists and --overwrite was not specified: "
"File already exists and ``--overwrite`` was not specified: "
"/path/to/root/databases/gsheets.yaml"
)

Expand Down Expand Up @@ -533,7 +533,7 @@ def test_export_resource_jinja_escaping_disabled(
dataset_export: BytesIO,
) -> None:
"""
Test ``export_resource`` with --disable-jinja-escaping.
Test ``export_resource`` with ``--disable-jinja-escaping``.
"""
root = Path("/path/to/root")
fs.create_dir(root)
Expand Down Expand Up @@ -574,7 +574,7 @@ def test_export_resource_jinja_escaping_disabled_command(
fs: FakeFilesystem,
) -> None:
"""
Test the ``export_assets`` with --disable-jinja-escaping command.
Test the ``export_assets`` with ``--disable-jinja-escaping`` command.
"""
# root must exist for command to succeed
root = Path("/path/to/root")
Expand Down
Loading

0 comments on commit 96f087c

Please sign in to comment.