Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: don't use default dbt selection with asset checks #18117

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -890,18 +890,23 @@ def my_dbt_op(dbt: DbtCliResource):

# When dbt is enabled with asset checks, we turn off any indirection with dbt selection.
# This way, the Dagster context completely determines what is executed in a dbt
# invocation with a subsetted selection.
# invocation.
if (
version.parse(dbt_version) >= version.parse("1.5.0")
and dagster_dbt_translator.settings.enable_asset_checks
):
logger.info(
"Setting environment variable `DBT_INDIRECT_SELECTION` to 'empty'. When dbt "
"tests are modeled as asset checks, they are executed through explicit selection."
)
env["DBT_INDIRECT_SELECTION"] = "empty"

selection_args = get_subset_selection_for_context(
context=context,
manifest=manifest,
select=context.op.tags.get("dagster-dbt/select"),
exclude=context.op.tags.get("dagster-dbt/exclude"),
dagster_dbt_translator=dagster_dbt_translator,
)
else:
manifest = validate_manifest(manifest) if manifest else {}
Expand Down Expand Up @@ -944,6 +949,7 @@ def get_subset_selection_for_context(
manifest: Mapping[str, Any],
select: Optional[str],
exclude: Optional[str],
dagster_dbt_translator: DagsterDbtTranslator,
) -> List[str]:
"""Generate a dbt selection string to materialize the selected resources in a subsetted execution context.

Expand All @@ -970,7 +976,12 @@ def get_subset_selection_for_context(
dbt_resource_props_by_output_name = get_dbt_resource_props_by_output_name(manifest)
dbt_resource_props_by_test_name = get_dbt_resource_props_by_test_name(manifest)

if not context.is_subset:
# It's nice to use the default dbt selection arguments when not subsetting for readability.
# However with asset checks, we make a tradeoff between readability and functionality for the user.
# This is because we use an explicit selection of each individual dbt resource we execute to ensure
# we control which models *and tests* run. By using explicit selection, we allow the user to
# also subset the execution of dbt tests.
if not context.is_subset and not dagster_dbt_translator.settings.enable_asset_checks:
logger.info(
"A dbt subsetted execution is not being performed. Using the default dbt selection"
f" arguments `{default_dbt_selection}`."
Expand Down Expand Up @@ -1000,10 +1011,17 @@ def get_subset_selection_for_context(
# https://docs.getdbt.com/reference/node-selection/set-operators#unions
union_selected_dbt_resources = ["--select"] + [" ".join(selected_dbt_resources)]

logger.info(
"A dbt subsetted execution is being performed. Overriding default dbt selection"
f" arguments `{default_dbt_selection}` with arguments: `{union_selected_dbt_resources}`"
)
if context.is_subset:
logger.info(
"A dbt subsetted execution is being performed. Overriding default dbt selection"
f" arguments `{default_dbt_selection}` with arguments: `{union_selected_dbt_resources}`"
)
else:
logger.info(
"A dbt subsetted execution is not being performed. Because asset checks are enabled,"
f" converting the default dbt selection arguments `{default_dbt_selection}` with the "
f"explicit set of models and tests: `{union_selected_dbt_resources}`"
)

return union_selected_dbt_resources

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ version: 2
models:
- name: customers
description: This table has basic information about a customer, as well as some derived facts based on a customer's orders
config:
tags: "customer_info"

columns:
- name: customer_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pytest
from dagster import (
AssetCheckKey,
AssetCheckResult,
AssetExecutionContext,
AssetKey,
Expand Down Expand Up @@ -240,43 +241,33 @@ def test_asset_checks_are_logged_from_resource(
is_dbt_1_4,
reason="DBT_INDIRECT_SELECTION=empty is not supported in dbt 1.4",
)
def test_dbt_model_selection(dbt_commands: List[List[str]]):
dbt = DbtCliResource(project_dir=os.fspath(test_asset_checks_dbt_project_dir))

@dbt_assets(
manifest=manifest,
dagster_dbt_translator=dagster_dbt_translator_with_checks,
select="customers",
)
def my_dbt_assets(context, dbt: DbtCliResource):
for dbt_command in dbt_commands:
yield from dbt.cli(dbt_command, context=context).stream()

result = materialize([my_dbt_assets], resources={"dbt": dbt})

assert result.success
assert len(result.get_asset_materialization_events()) == 1
assert len(result.get_asset_check_evaluations()) == 0


@pytest.mark.xfail(
is_dbt_1_4,
reason="DBT_INDIRECT_SELECTION=empty is not supported in dbt 1.4",
@pytest.mark.parametrize(
"selection",
[
"customers",
"tag:customer_info",
],
)
def test_dbt_test_selection(dbt_commands: List[List[str]]):
def test_select_model_with_tests(dbt_commands: List[List[str]], selection):
dbt = DbtCliResource(project_dir=os.fspath(test_asset_checks_dbt_project_dir))

@dbt_assets(
manifest=manifest,
dagster_dbt_translator=dagster_dbt_translator_with_checks,
select="customers tag:data_quality",
select=selection,
)
def my_dbt_assets(context, dbt: DbtCliResource):
for dbt_command in dbt_commands:
yield from dbt.cli(dbt_command, context=context).stream()

assert my_dbt_assets.keys == {AssetKey(["customers"])}
assert my_dbt_assets.check_keys == {
AssetCheckKey(asset_key=AssetKey(["customers"]), name="unique_customers_customer_id"),
AssetCheckKey(asset_key=AssetKey(["customers"]), name="not_null_customers_customer_id"),
}

result = materialize([my_dbt_assets], resources={"dbt": dbt})

assert result.success
assert len(result.get_asset_materialization_events()) == 1
assert len(result.get_asset_check_evaluations()) == 1
assert len(result.get_asset_check_evaluations()) == 2