Skip to content

Commit

Permalink
bugfix: don't use default dbt selection with asset checks
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm committed Nov 27, 2023
1 parent 286b523 commit 465529f
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -890,18 +890,23 @@ def my_dbt_op(dbt: DbtCliResource):

# When dbt is enabled with asset checks, we turn off any indirection with dbt selection.
# This way, the Dagster context completely determines what is executed in a dbt
# invocation with a subsetted selection.
# invocation.
if (
version.parse(dbt_version) >= version.parse("1.5.0")
and dagster_dbt_translator.settings.enable_asset_checks
):
logger.info(
"Setting environment variable `DBT_INDIRECT_SELECTION` to 'empty' When dbt "
"tests are modeled as asset checks, they are executed through explicit selection."
)
env["DBT_INDIRECT_SELECTION"] = "empty"

selection_args = get_subset_selection_for_context(
context=context,
manifest=manifest,
select=context.op.tags.get("dagster-dbt/select"),
exclude=context.op.tags.get("dagster-dbt/exclude"),
dagster_dbt_translator=dagster_dbt_translator,
)
else:
manifest = validate_manifest(manifest) if manifest else {}
Expand Down Expand Up @@ -944,6 +949,7 @@ def get_subset_selection_for_context(
manifest: Mapping[str, Any],
select: Optional[str],
exclude: Optional[str],
dagster_dbt_translator: DagsterDbtTranslator,
) -> List[str]:
"""Generate a dbt selection string to materialize the selected resources in a subsetted execution context.
Expand All @@ -970,7 +976,12 @@ def get_subset_selection_for_context(
dbt_resource_props_by_output_name = get_dbt_resource_props_by_output_name(manifest)
dbt_resource_props_by_test_name = get_dbt_resource_props_by_test_name(manifest)

if not context.is_subset:
# It's nice to use the default dbt selection arguments when not subsetting for readability.
# However with asset checks, we make a tradeoff between readability and functionality for the user.
# This is because we use an explicit selection of each individual dbt resource we execute to ensure
# we control which models *and tests* run. By using explicit selection, we allow the user to
# also subset the execution of dbt tests.
if not context.is_subset and not dagster_dbt_translator.settings.enable_asset_checks:
logger.info(
"A dbt subsetted execution is not being performed. Using the default dbt selection"
f" arguments `{default_dbt_selection}`."
Expand Down Expand Up @@ -1000,10 +1011,17 @@ def get_subset_selection_for_context(
# https://docs.getdbt.com/reference/node-selection/set-operators#unions
union_selected_dbt_resources = ["--select"] + [" ".join(selected_dbt_resources)]

logger.info(
"A dbt subsetted execution is being performed. Overriding default dbt selection"
f" arguments `{default_dbt_selection}` with arguments: `{union_selected_dbt_resources}`"
)
if context.is_subset:
logger.info(
"A dbt subsetted execution is being performed. Overriding default dbt selection"
f" arguments `{default_dbt_selection}` with arguments: `{union_selected_dbt_resources}`"
)
else:
logger.info(
"A dbt subsetted execution is not being performed. Because asset checks are enabled,"
f" overriding the default dbt selection arguments `{default_dbt_selection}` with the "
f"explicit set of models and tests: `{union_selected_dbt_resources}`"
)

return union_selected_dbt_resources

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ version: 2
models:
- name: customers
description: This table has basic information about a customer, as well as some derived facts based on a customer's orders
config:
tags: "customer_info"

columns:
- name: customer_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pytest
from dagster import (
AssetCheckKey,
AssetCheckResult,
AssetExecutionContext,
AssetKey,
Expand Down Expand Up @@ -240,43 +241,33 @@ def test_asset_checks_are_logged_from_resource(
is_dbt_1_4,
reason="DBT_INDIRECT_SELECTION=empty is not supported in dbt 1.4",
)
def test_dbt_model_selection(dbt_commands: List[List[str]]):
dbt = DbtCliResource(project_dir=os.fspath(test_asset_checks_dbt_project_dir))

@dbt_assets(
manifest=manifest,
dagster_dbt_translator=dagster_dbt_translator_with_checks,
select="customers",
)
def my_dbt_assets(context, dbt: DbtCliResource):
for dbt_command in dbt_commands:
yield from dbt.cli(dbt_command, context=context).stream()

result = materialize([my_dbt_assets], resources={"dbt": dbt})

assert result.success
assert len(result.get_asset_materialization_events()) == 1
assert len(result.get_asset_check_evaluations()) == 0


@pytest.mark.xfail(
is_dbt_1_4,
reason="DBT_INDIRECT_SELECTION=empty is not supported in dbt 1.4",
@pytest.mark.parametrize(
"selection",
[
"customers",
"tag:customer_info",
],
)
def test_dbt_test_selection(dbt_commands: List[List[str]]):
def test_select_model_with_tests(dbt_commands: List[List[str]], selection):
dbt = DbtCliResource(project_dir=os.fspath(test_asset_checks_dbt_project_dir))

@dbt_assets(
manifest=manifest,
dagster_dbt_translator=dagster_dbt_translator_with_checks,
select="customers tag:data_quality",
select=selection,
)
def my_dbt_assets(context, dbt: DbtCliResource):
for dbt_command in dbt_commands:
yield from dbt.cli(dbt_command, context=context).stream()

assert my_dbt_assets.keys == {AssetKey(["customers"])}
assert my_dbt_assets.check_keys == {
AssetCheckKey(asset_key=AssetKey(["customers"]), name="unique_customers_customer_id"),
AssetCheckKey(asset_key=AssetKey(["customers"]), name="not_null_customers_customer_id"),
}

result = materialize([my_dbt_assets], resources={"dbt": dbt})

assert result.success
assert len(result.get_asset_materialization_events()) == 1
assert len(result.get_asset_check_evaluations()) == 1
assert len(result.get_asset_check_evaluations()) == 2

0 comments on commit 465529f

Please sign in to comment.