From a9df50f1e316d6f250f8ef14ff320a17718785fc Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 30 Sep 2024 20:55:49 +0200 Subject: [PATCH] first pass: replace os env with project flag --- core/dbt/context/providers.py | 2 +- core/dbt/contracts/project.py | 2 ++ core/dbt/parser/manifest.py | 2 +- core/dbt/task/run.py | 5 ++-- .../functional/microbatch/test_microbatch.py | 24 +++++++------------ .../test_microbatch_config_validation.py | 21 ++++++++++++---- tests/unit/context/test_providers.py | 12 +++++++--- 7 files changed, 39 insertions(+), 29 deletions(-) diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index a0e3751587a..54d88a95c27 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -238,7 +238,7 @@ def resolve_limit(self) -> Optional[int]: def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeFilter]: event_time_filter = None if ( - os.environ.get("DBT_EXPERIMENTAL_MICROBATCH") + get_flags().require_builtin_microbatch_strategy and (isinstance(target.config, NodeConfig) or isinstance(target.config, SourceConfig)) and target.config.event_time and self.model.config.materialized == "incremental" diff --git a/core/dbt/contracts/project.py b/core/dbt/contracts/project.py index f5a4ec605ec..4a722eb66d7 100644 --- a/core/dbt/contracts/project.py +++ b/core/dbt/contracts/project.py @@ -338,6 +338,7 @@ class ProjectFlags(ExtensibleDbtClassMixin): write_json: Optional[bool] = None # legacy behaviors - https://github.com/dbt-labs/dbt-core/blob/main/docs/guides/behavior-change-flags.md + require_builtin_microbatch_strategy: bool = False require_explicit_package_overrides_for_builtin_materializations: bool = True require_resource_names_without_spaces: bool = False source_freshness_run_project_hooks: bool = False @@ -348,6 +349,7 @@ class ProjectFlags(ExtensibleDbtClassMixin): @property def project_only_flags(self) -> Dict[str, Any]: return { + "require_builtin_microbatch_strategy": self.require_builtin_microbatch_strategy, "require_explicit_package_overrides_for_builtin_materializations": self.require_explicit_package_overrides_for_builtin_materializations, "require_resource_names_without_spaces": self.require_resource_names_without_spaces, "source_freshness_run_project_hooks": self.source_freshness_run_project_hooks, diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index 7ffd00febc5..0e7d6a3dea9 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -1383,7 +1383,7 @@ def check_valid_snapshot_config(self): node.config.final_validate() def check_valid_microbatch_config(self): - if os.environ.get("DBT_EXPERIMENTAL_MICROBATCH"): + if get_flags().require_builtin_microbatch_strategy: for node in self.manifest.nodes.values(): if ( node.config.materialized == "incremental" diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 673a400ec02..187b29b4e36 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -1,5 +1,4 @@ import functools -import os import threading from datetime import datetime from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type @@ -31,6 +30,7 @@ RunningOperationCaughtError, ) from dbt.exceptions import CompilationError, DbtInternalError, DbtRuntimeError +from dbt.flags import get_flags from dbt.graph import ResourceTypeSelector from dbt.hooks import get_hook_dict from dbt.materializations.incremental.microbatch import MicrobatchBuilder @@ -447,9 +447,8 @@ def execute(self, model, manifest): ) hook_ctx = self.adapter.pre_model_hook(context_config) - if ( - os.environ.get("DBT_EXPERIMENTAL_MICROBATCH") + get_flags().require_builtin_microbatch_strategy and model.config.materialized == "incremental" and model.config.incremental_strategy == "microbatch" ): diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index c233657f180..9058fba2d93 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -1,4 +1,3 @@ -import os from unittest import mock import pytest @@ -111,6 +110,14 @@ def models(self): def macros(self): return {"microbatch.sql": custom_microbatch_strategy} + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "require_builtin_microbatch_strategy": True, + } + } + class TestMicrobatchCustomUserStrategyDefault(BaseMicrobatchCustomUserStrategy): def test_use_custom_microbatch_strategy_by_default(self, project): @@ -126,7 +133,6 @@ def test_use_custom_microbatch_strategy_by_default(self, project): class TestMicrobatchCustomUserStrategyEnvVarTrueValid(BaseMicrobatchCustomUserStrategy): - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_use_custom_microbatch_strategy_env_var_true_invalid_incremental_strategy( self, project ): @@ -143,10 +149,7 @@ def test_use_custom_microbatch_strategy_env_var_true_invalid_incremental_strateg assert "custom microbatch strategy" in logs -# TODO: Consider a behaviour flag here if DBT_EXPERIMENTAL_MICROBATCH is removed -# Since this causes an exception prior to using an override class TestMicrobatchCustomUserStrategyEnvVarTrueInvalid(BaseMicrobatchCustomUserStrategy): - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_use_custom_microbatch_strategy_env_var_true_invalid_incremental_strategy( self, project ): @@ -183,7 +186,6 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int) class TestMicrobatchCLI(BaseMicrobatchTest): - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run without --event-time-start or --event-time-end - 3 expected rows in output with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -214,7 +216,6 @@ def test_run_with_event_time(self, project): class TestMicroBatchBoundsDefault(BaseMicrobatchTest): - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # initial run -- backfills all data with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -266,7 +267,6 @@ def models(self): "seeds.yml": seeds_yaml, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # ensure seed is created for source run_dbt(["seed"]) @@ -314,7 +314,6 @@ def models(self): "microbatch_model.sql": microbatch_model_sql, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # initial run -- backfills all data with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -342,7 +341,6 @@ def test_run_with_event_time(self, project): class TestMicrobatchUsingRefRenderSkipsFilter(BaseMicrobatchTest): - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # initial run -- backfills all data with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -395,7 +393,6 @@ def models(self): "microbatch_model.sql": microbatch_model_context_vars, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time_logs(self, project): with patch_microbatch_end_time("2020-01-03 13:57:00"): _, logs = run_dbt_and_capture(["run", "--event-time-start", "2020-01-01"]) @@ -428,7 +425,6 @@ def models(self): "downstream_model.sql": downstream_model_of_microbatch_sql, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -453,7 +449,6 @@ def models(self): "microbatch_model.sql": microbatch_model_failing_incremental_partition_sql, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -502,7 +497,6 @@ def models(self): "microbatch_model.sql": microbatch_model_first_partition_failing_sql, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -511,7 +505,6 @@ def test_run_with_event_time(self, project): class TestMicrobatchCompiledRunPaths(BaseMicrobatchTest): - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -594,7 +587,6 @@ def models(self): "downstream_model.sql": downstream_model_of_microbatch_sql, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run all partitions from 2020-01-02 to spoofed "now" - 2 expected rows in output with patch_microbatch_end_time("2020-01-03 13:57:00"): diff --git a/tests/functional/microbatch/test_microbatch_config_validation.py b/tests/functional/microbatch/test_microbatch_config_validation.py index cdebd3a791b..f97e85a8f8a 100644 --- a/tests/functional/microbatch/test_microbatch_config_validation.py +++ b/tests/functional/microbatch/test_microbatch_config_validation.py @@ -1,6 +1,3 @@ -import os -from unittest import mock - import pytest from dbt.exceptions import ParsingError @@ -86,7 +83,14 @@ class BaseMicrobatchTestParseError: def models(self): return {} - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "require_builtin_microbatch_strategy": True, + } + } + def test_parsing_error_raised(self, project): with pytest.raises(ParsingError): run_dbt(["parse"]) @@ -97,7 +101,14 @@ class BaseMicrobatchTestNoError: def models(self): return {} - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "require_builtin_microbatch_strategy": True, + } + } + def test_parsing_error_not_raised(self, project): run_dbt(["parse"]) diff --git a/tests/unit/context/test_providers.py b/tests/unit/context/test_providers.py index 46c29254a9a..daa8d1d1bda 100644 --- a/tests/unit/context/test_providers.py +++ b/tests/unit/context/test_providers.py @@ -1,4 +1,4 @@ -import os +from argparse import Namespace from unittest import mock import pytest @@ -13,6 +13,7 @@ RuntimeRefResolver, RuntimeSourceResolver, ) +from dbt.flags import set_from_args class TestBaseResolver: @@ -56,8 +57,9 @@ def test_resolve_event_time_filter( incremental_strategy: str, expect_filter: bool, ) -> None: - if dbt_experimental_microbatch: - mocker.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + set_from_args( + Namespace(require_builtin_microbatch_strategy=dbt_experimental_microbatch), None + ) # Target mocking target = mock.Mock() @@ -117,6 +119,8 @@ def test_create_relation_with_empty(self, resolver, empty, is_ephemeral_model, e mock_node.is_ephemeral_model = is_ephemeral_model mock_node.defer_relation = None + set_from_args(Namespace(require_builtin_microbatch_strategy=False), None) + # create limited relation with mock.patch("dbt.contracts.graph.nodes.ParsedNode", new=mock.Mock): relation = resolver.create_relation(mock_node) @@ -156,6 +160,8 @@ def test_create_relation_with_empty(self, resolver, empty, expected_limit): mock_source.quoting_dict = {} resolver.manifest.resolve_source.return_value = mock_source + set_from_args(Namespace(require_builtin_microbatch_strategy=False), None) + # create limited relation relation = resolver.resolve("test", "test") assert relation.limit == expected_limit