Skip to content

Commit

Permalink
first pass: replace os env with project flag
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk committed Sep 30, 2024
1 parent a86e2b4 commit a9df50f
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 29 deletions.
2 changes: 1 addition & 1 deletion core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def resolve_limit(self) -> Optional[int]:
def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeFilter]:
event_time_filter = None
if (
os.environ.get("DBT_EXPERIMENTAL_MICROBATCH")
get_flags().require_builtin_microbatch_strategy
and (isinstance(target.config, NodeConfig) or isinstance(target.config, SourceConfig))
and target.config.event_time
and self.model.config.materialized == "incremental"
Expand Down
2 changes: 2 additions & 0 deletions core/dbt/contracts/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ class ProjectFlags(ExtensibleDbtClassMixin):
write_json: Optional[bool] = None

# legacy behaviors - https://github.com/dbt-labs/dbt-core/blob/main/docs/guides/behavior-change-flags.md
require_builtin_microbatch_strategy: bool = False
require_explicit_package_overrides_for_builtin_materializations: bool = True
require_resource_names_without_spaces: bool = False
source_freshness_run_project_hooks: bool = False
Expand All @@ -348,6 +349,7 @@ class ProjectFlags(ExtensibleDbtClassMixin):
@property
def project_only_flags(self) -> Dict[str, Any]:
return {
"require_builtin_microbatch_strategy": self.require_builtin_microbatch_strategy,
"require_explicit_package_overrides_for_builtin_materializations": self.require_explicit_package_overrides_for_builtin_materializations,
"require_resource_names_without_spaces": self.require_resource_names_without_spaces,
"source_freshness_run_project_hooks": self.source_freshness_run_project_hooks,
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1383,7 +1383,7 @@ def check_valid_snapshot_config(self):
node.config.final_validate()

def check_valid_microbatch_config(self):
if os.environ.get("DBT_EXPERIMENTAL_MICROBATCH"):
if get_flags().require_builtin_microbatch_strategy:
for node in self.manifest.nodes.values():
if (
node.config.materialized == "incremental"
Expand Down
5 changes: 2 additions & 3 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import functools
import os
import threading
from datetime import datetime
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type
Expand Down Expand Up @@ -31,6 +30,7 @@
RunningOperationCaughtError,
)
from dbt.exceptions import CompilationError, DbtInternalError, DbtRuntimeError
from dbt.flags import get_flags
from dbt.graph import ResourceTypeSelector
from dbt.hooks import get_hook_dict
from dbt.materializations.incremental.microbatch import MicrobatchBuilder
Expand Down Expand Up @@ -447,9 +447,8 @@ def execute(self, model, manifest):
)

hook_ctx = self.adapter.pre_model_hook(context_config)

if (
os.environ.get("DBT_EXPERIMENTAL_MICROBATCH")
get_flags().require_builtin_microbatch_strategy
and model.config.materialized == "incremental"
and model.config.incremental_strategy == "microbatch"
):
Expand Down
24 changes: 8 additions & 16 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from unittest import mock

import pytest
Expand Down Expand Up @@ -111,6 +110,14 @@ def models(self):
def macros(self):
return {"microbatch.sql": custom_microbatch_strategy}

@pytest.fixture(scope="class")
def project_config_update(self):
return {
"flags": {
"require_builtin_microbatch_strategy": True,
}
}


class TestMicrobatchCustomUserStrategyDefault(BaseMicrobatchCustomUserStrategy):
def test_use_custom_microbatch_strategy_by_default(self, project):
Expand All @@ -126,7 +133,6 @@ def test_use_custom_microbatch_strategy_by_default(self, project):


class TestMicrobatchCustomUserStrategyEnvVarTrueValid(BaseMicrobatchCustomUserStrategy):
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_use_custom_microbatch_strategy_env_var_true_invalid_incremental_strategy(
self, project
):
Expand All @@ -143,10 +149,7 @@ def test_use_custom_microbatch_strategy_env_var_true_invalid_incremental_strateg
assert "custom microbatch strategy" in logs


# TODO: Consider a behaviour flag here if DBT_EXPERIMENTAL_MICROBATCH is removed
# Since this causes an exception prior to using an override
class TestMicrobatchCustomUserStrategyEnvVarTrueInvalid(BaseMicrobatchCustomUserStrategy):
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_use_custom_microbatch_strategy_env_var_true_invalid_incremental_strategy(
self, project
):
Expand Down Expand Up @@ -183,7 +186,6 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int)


class TestMicrobatchCLI(BaseMicrobatchTest):
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# run without --event-time-start or --event-time-end - 3 expected rows in output
with patch_microbatch_end_time("2020-01-03 13:57:00"):
Expand Down Expand Up @@ -214,7 +216,6 @@ def test_run_with_event_time(self, project):


class TestMicroBatchBoundsDefault(BaseMicrobatchTest):
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# initial run -- backfills all data
with patch_microbatch_end_time("2020-01-03 13:57:00"):
Expand Down Expand Up @@ -266,7 +267,6 @@ def models(self):
"seeds.yml": seeds_yaml,
}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# ensure seed is created for source
run_dbt(["seed"])
Expand Down Expand Up @@ -314,7 +314,6 @@ def models(self):
"microbatch_model.sql": microbatch_model_sql,
}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# initial run -- backfills all data
with patch_microbatch_end_time("2020-01-03 13:57:00"):
Expand Down Expand Up @@ -342,7 +341,6 @@ def test_run_with_event_time(self, project):


class TestMicrobatchUsingRefRenderSkipsFilter(BaseMicrobatchTest):
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# initial run -- backfills all data
with patch_microbatch_end_time("2020-01-03 13:57:00"):
Expand Down Expand Up @@ -395,7 +393,6 @@ def models(self):
"microbatch_model.sql": microbatch_model_context_vars,
}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time_logs(self, project):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
_, logs = run_dbt_and_capture(["run", "--event-time-start", "2020-01-01"])
Expand Down Expand Up @@ -428,7 +425,6 @@ def models(self):
"downstream_model.sql": downstream_model_of_microbatch_sql,
}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# run all partitions from start - 2 expected rows in output, one failed
with patch_microbatch_end_time("2020-01-03 13:57:00"):
Expand All @@ -453,7 +449,6 @@ def models(self):
"microbatch_model.sql": microbatch_model_failing_incremental_partition_sql,
}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# run all partitions from start - 2 expected rows in output, one failed
with patch_microbatch_end_time("2020-01-03 13:57:00"):
Expand Down Expand Up @@ -502,7 +497,6 @@ def models(self):
"microbatch_model.sql": microbatch_model_first_partition_failing_sql,
}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# run all partitions from start - 2 expected rows in output, one failed
with patch_microbatch_end_time("2020-01-03 13:57:00"):
Expand All @@ -511,7 +505,6 @@ def test_run_with_event_time(self, project):


class TestMicrobatchCompiledRunPaths(BaseMicrobatchTest):
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# run all partitions from start - 2 expected rows in output, one failed
with patch_microbatch_end_time("2020-01-03 13:57:00"):
Expand Down Expand Up @@ -594,7 +587,6 @@ def models(self):
"downstream_model.sql": downstream_model_of_microbatch_sql,
}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# run all partitions from 2020-01-02 to spoofed "now" - 2 expected rows in output
with patch_microbatch_end_time("2020-01-03 13:57:00"):
Expand Down
21 changes: 16 additions & 5 deletions tests/functional/microbatch/test_microbatch_config_validation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
import os
from unittest import mock

import pytest

from dbt.exceptions import ParsingError
Expand Down Expand Up @@ -86,7 +83,14 @@ class BaseMicrobatchTestParseError:
def models(self):
return {}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"flags": {
"require_builtin_microbatch_strategy": True,
}
}

def test_parsing_error_raised(self, project):
with pytest.raises(ParsingError):
run_dbt(["parse"])
Expand All @@ -97,7 +101,14 @@ class BaseMicrobatchTestNoError:
def models(self):
return {}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"flags": {
"require_builtin_microbatch_strategy": True,
}
}

def test_parsing_error_not_raised(self, project):
run_dbt(["parse"])

Expand Down
12 changes: 9 additions & 3 deletions tests/unit/context/test_providers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import os
from argparse import Namespace
from unittest import mock

import pytest
Expand All @@ -13,6 +13,7 @@
RuntimeRefResolver,
RuntimeSourceResolver,
)
from dbt.flags import set_from_args


class TestBaseResolver:
Expand Down Expand Up @@ -56,8 +57,9 @@ def test_resolve_event_time_filter(
incremental_strategy: str,
expect_filter: bool,
) -> None:
if dbt_experimental_microbatch:
mocker.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
set_from_args(
Namespace(require_builtin_microbatch_strategy=dbt_experimental_microbatch), None
)

# Target mocking
target = mock.Mock()
Expand Down Expand Up @@ -117,6 +119,8 @@ def test_create_relation_with_empty(self, resolver, empty, is_ephemeral_model, e
mock_node.is_ephemeral_model = is_ephemeral_model
mock_node.defer_relation = None

set_from_args(Namespace(require_builtin_microbatch_strategy=False), None)

# create limited relation
with mock.patch("dbt.contracts.graph.nodes.ParsedNode", new=mock.Mock):
relation = resolver.create_relation(mock_node)
Expand Down Expand Up @@ -156,6 +160,8 @@ def test_create_relation_with_empty(self, resolver, empty, expected_limit):
mock_source.quoting_dict = {}
resolver.manifest.resolve_source.return_value = mock_source

set_from_args(Namespace(require_builtin_microbatch_strategy=False), None)

# create limited relation
relation = resolver.resolve("test", "test")
assert relation.limit == expected_limit

0 comments on commit a9df50f

Please sign in to comment.