From ae4ed8e145a51b83929277354d9fa3da9e449769 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 13 May 2024 18:07:56 +0530 Subject: [PATCH 1/2] Handle append_env correctly based on execution modes --- cosmos/operators/base.py | 8 ++++---- cosmos/operators/local.py | 12 ++++++++++++ docs/configuration/operator-args.rst | 2 +- .../operators/test_azure_container_instance.py | 2 -- tests/operators/test_base.py | 6 ++++++ tests/operators/test_local.py | 16 ++++++++++++++++ tests/operators/test_virtualenv.py | 17 +++++++++++++++++ 7 files changed, 56 insertions(+), 7 deletions(-) diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index f9f4645b6..7a7fc7190 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -46,10 +46,10 @@ class AbstractDbtBaseOperator(BaseOperator, metaclass=ABCMeta): environment variables for the new process; these are used instead of inheriting the current process environment, which is the default behavior. (templated) - :param append_env: . If True (default), inherits the environment variables - from current passes and then environment variable passed by the user will either update the existing + :param append_env: . If True, inherits the environment variables + from current process and then environment variable passed by the user will either update the existing inherited environment variables or the new variables gets appended to it. - If False, only uses the environment variables passed in env params + If False (default), only uses the environment variables passed in env params and does not inherit the current process environment. :param output_encoding: Output encoding of bash command :param skip_exit_code: If task exits with this exit code, leave the task @@ -104,7 +104,7 @@ def __init__( db_name: str | None = None, schema: str | None = None, env: dict[str, Any] | None = None, - append_env: bool = True, + append_env: bool = False, output_encoding: str = "utf-8", skip_exit_code: int = 99, partial_parse: bool = True, diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 3fcf29ab5..a4c740ae0 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -113,6 +113,11 @@ class DbtLocalBaseOperator(AbstractDbtBaseOperator): :param target_name: A name to use for the dbt target. If not provided, and no target is found in your project's dbt_project.yml, "cosmos_target" is used. :param should_store_compiled_sql: If true, store the compiled SQL in the compiled_sql rendered template. + :param append_env: . If True(default), inherits the environment variables + from current process and then environment variable passed by the user will either update the existing + inherited environment variables or the new variables gets appended to it. + If False, only uses the environment variables passed in env params + and does not inherit the current process environment. """ template_fields: Sequence[str] = AbstractDbtBaseOperator.template_fields + ("compiled_sql",) # type: ignore[operator] @@ -127,6 +132,7 @@ def __init__( install_deps: bool = False, callback: Callable[[str], None] | None = None, should_store_compiled_sql: bool = True, + append_env: bool = True, **kwargs: Any, ) -> None: self.profile_config = profile_config @@ -144,6 +150,12 @@ def __init__( kwargs.pop("full_refresh", None) # usage of this param should be implemented in child classes super().__init__(**kwargs) + # For local execution mode, we're consistent with the LoadMode.DBT_LS command in forwarding the environment + # variables to the subprocess by default. Although this behavior is designed for ExecuteMode.LOCAL and + # ExecuteMode.VIRTUALENV, it is not desired for the other execution modes to forward the environment variables + # as it can break existing DAGs. + self.append_env = append_env + @cached_property def subprocess_hook(self) -> FullOutputSubprocessHook: """Returns hook for running the bash command.""" diff --git a/docs/configuration/operator-args.rst b/docs/configuration/operator-args.rst index 4e6a40b7f..d78b6f1b7 100644 --- a/docs/configuration/operator-args.rst +++ b/docs/configuration/operator-args.rst @@ -43,7 +43,7 @@ Summary of Cosmos-specific arguments dbt-related ........... -- ``append_env``: Expose the operating system environment variables to the ``dbt`` command. The default is ``False``. +- ``append_env``: Expose the operating system environment variables to the ``dbt`` command. For most execution modes, the default is ``False``, however, for execution modes ExecuteMode.LOCAL and ExecuteMode.VIRTUALENV, the default is True. This behavior is consistent with the LoadMode.DBT_LS command in forwarding the environment variables to the subprocess by default. - ``dbt_cmd_flags``: List of command flags to pass to ``dbt`` command, added after dbt subcommand - ``dbt_cmd_global_flags``: List of ``dbt`` `global flags `_ to be passed to the ``dbt`` command, before the subcommand - ``dbt_executable_path``: Path to dbt executable. diff --git a/tests/operators/test_azure_container_instance.py b/tests/operators/test_azure_container_instance.py index 84d733ce3..836ca4596 100644 --- a/tests/operators/test_azure_container_instance.py +++ b/tests/operators/test_azure_container_instance.py @@ -53,7 +53,6 @@ def test_dbt_azure_container_instance_operator_get_env(p_context_to_airflow_vars name="my-aci", resource_group="my-rg", project_dir="my/dir", - append_env=False, ) dbt_base_operator.env = { "start_date": "20220101", @@ -91,7 +90,6 @@ def test_dbt_azure_container_instance_operator_check_environment_variables( resource_group="my-rg", project_dir="my/dir", environment_variables={"FOO": "BAR"}, - append_env=False, ) dbt_base_operator.env = { "start_date": "20220101", diff --git a/tests/operators/test_base.py b/tests/operators/test_base.py index 5761d66aa..70e4059e7 100644 --- a/tests/operators/test_base.py +++ b/tests/operators/test_base.py @@ -69,3 +69,9 @@ def test_dbt_mixin_add_cmd_flags_run_operator(args, expected_flags): flags = run_operation.add_cmd_flags() assert flags == expected_flags + + +def test_abstract_dbt_base_operator_append_env_is_false_by_default(): + """Tests that the append_env attribute is set to False by default.""" + base_operator = AbstractDbtBaseOperator(task_id="fake_task", project_dir="fake_dir") + assert base_operator.append_env is False diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 419be9e10..11652e10d 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -103,6 +103,22 @@ def test_dbt_base_operator_add_global_flags() -> None: ] +def test_dbt_local_operator_append_env_is_true_by_default() -> None: + dbt_local_operator = ConcreteDbtLocalBaseOperator( + profile_config=profile_config, + task_id="my-task", + project_dir="my/dir", + vars={ + "start_time": "{{ data_interval_start.strftime('%Y%m%d%H%M%S') }}", + "end_time": "{{ data_interval_end.strftime('%Y%m%d%H%M%S') }}", + }, + no_version_check=True, + select=["my_first_model", "my_second_model"], + ) + + assert dbt_local_operator.append_env == True + + def test_dbt_base_operator_add_user_supplied_flags() -> None: dbt_base_operator = ConcreteDbtLocalBaseOperator( profile_config=profile_config, diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index acf3c72af..deb7151e5 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -73,3 +73,20 @@ def test_run_command( assert dbt_deps["command"][0] == dbt_cmd["command"][0] assert dbt_cmd["command"][1] == "do-something" assert mock_execute.call_count == 2 + + +def test_virtualenv_operator_append_env_is_true_by_default(): + venv_operator = ConcreteDbtVirtualenvBaseOperator( + dag=DAG("sample_dag", start_date=datetime(2024, 4, 16)), + profile_config=profile_config, + task_id="fake_task", + install_deps=True, + project_dir="./dev/dags/dbt/jaffle_shop", + py_system_site_packages=False, + pip_install_options=["--test-flag"], + py_requirements=["dbt-postgres==1.6.0b1"], + emit_datasets=False, + invocation_mode=InvocationMode.SUBPROCESS, + ) + + assert venv_operator.append_env is True From 831f3560222a280ac26832682baed31d720513e0 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 13 May 2024 18:28:28 +0530 Subject: [PATCH 2/2] Apply suggestions from code review Co-authored-by: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> --- cosmos/operators/base.py | 2 +- cosmos/operators/local.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index 7a7fc7190..b8112a73f 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -46,7 +46,7 @@ class AbstractDbtBaseOperator(BaseOperator, metaclass=ABCMeta): environment variables for the new process; these are used instead of inheriting the current process environment, which is the default behavior. (templated) - :param append_env: . If True, inherits the environment variables + :param append_env: If True, inherits the environment variables from current process and then environment variable passed by the user will either update the existing inherited environment variables or the new variables gets appended to it. If False (default), only uses the environment variables passed in env params diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index a4c740ae0..390c10244 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -113,7 +113,7 @@ class DbtLocalBaseOperator(AbstractDbtBaseOperator): :param target_name: A name to use for the dbt target. If not provided, and no target is found in your project's dbt_project.yml, "cosmos_target" is used. :param should_store_compiled_sql: If true, store the compiled SQL in the compiled_sql rendered template. - :param append_env: . If True(default), inherits the environment variables + :param append_env: If True(default), inherits the environment variables from current process and then environment variable passed by the user will either update the existing inherited environment variables or the new variables gets appended to it. If False, only uses the environment variables passed in env params