From 8894e46bb66e89f86e6702006b7a016408174a74 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 8 May 2024 18:04:56 +0100 Subject: [PATCH 1/6] Reproduce issue #945 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``` [2024-05-07, 14:20:09 UTC] {local.py:409} DEBUG - URIs to be converted to Dataset: [] [2024-05-07, 14:20:09 UTC] {local.py:409} DEBUG - URIs to be converted to Dataset: ['***://***:5432/***.dbt.stg_customers'] [2024-05-07, 14:20:09 UTC] {providers_manager.py:376} DEBUG - Initializing Providers Manager[dataset_uris] [2024-05-07, 14:20:09 UTC] {providers_manager.py:379} DEBUG - Initialization of Providers Manager[dataset_uris] took 0.00 seconds [2024-05-07, 14:20:09 UTC] {taskinstance.py:441} ▼ Post task execution logs [2024-05-07, 14:20:09 UTC] {taskinstance.py:2905} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 400, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/base.py", line 266, in execute self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags()) File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 470, in build_and_run_cmd result = self.run_command(cmd=dbt_cmd, env=env, context=context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 343, in run_command outlets = self.get_datasets("outputs") ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in get_datasets return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^ File "", line 3, in __init__ _setattr('uri', __attr_converter_uri(uri)) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/datasets/__init__.py", line 81, in _sanitize_uri parsed = normalizer(parsed) ^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/providers/postgres/datasets/postgres.py", line 34, in sanitize_uri raise ValueError("URI format postgres:// must contain database, schema, and table names") ValueError: URI format ***:// must contain database, schema, and table names ``` As seen in: #945 --- scripts/test/integration-setup.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/test/integration-setup.sh b/scripts/test/integration-setup.sh index eba4f1513..4524fb8a8 100644 --- a/scripts/test/integration-setup.sh +++ b/scripts/test/integration-setup.sh @@ -3,4 +3,4 @@ pip uninstall -y dbt-postgres dbt-databricks dbt-vertica; \ rm -rf airflow.*; \ airflow db init; \ -pip install 'dbt-core' 'dbt-databricks' 'dbt-postgres' 'dbt-vertica' 'openlineage-airflow' +pip install 'dbt-core' 'dbt-databricks' 'dbt-postgres' 'dbt-vertica' 'openlineage-airflow' 'apache-airflow-providers-postgres' From 1d5cbf04dcd9da01713d1661825a9040dab61d6a Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 8 May 2024 19:09:29 +0100 Subject: [PATCH 2/6] Improve integration setup Try to solve: https://github.com/astronomer/astronomer-cosmos/actions/runs/9005741081/job/24742560379\?pr\=948 --- scripts/test/integration-setup.sh | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scripts/test/integration-setup.sh b/scripts/test/integration-setup.sh index 4524fb8a8..92031b6d6 100644 --- a/scripts/test/integration-setup.sh +++ b/scripts/test/integration-setup.sh @@ -1,6 +1,6 @@ # we install using the following workaround to overcome installation conflicts, such as: # apache-airflow 2.3.0 and dbt-core [0.13.0 - 1.5.2] and jinja2>=3.0.0 because these package versions have conflicting dependencies -pip uninstall -y dbt-postgres dbt-databricks dbt-vertica; \ -rm -rf airflow.*; \ -airflow db init; \ -pip install 'dbt-core' 'dbt-databricks' 'dbt-postgres' 'dbt-vertica' 'openlineage-airflow' 'apache-airflow-providers-postgres' +pip uninstall -y dbt-postgres dbt-databricks dbt-vertica; +pip install 'dbt-core' 'dbt-databricks' 'dbt-postgres' 'dbt-vertica' 'openlineage-airflow' 'apache-airflow-providers-postgres'; +rm -rf airflow.*; +airflow db init; From fd796308ebd55765711c1eca0f4a9a77a6e1b88f Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 9 May 2024 00:17:32 +0100 Subject: [PATCH 3/6] Fix issue #945 --- cosmos/constants.py | 5 ++ cosmos/exceptions.py | 4 ++ cosmos/operators/local.py | 17 ++++++- dev/dags/example_virtualenv.py | 1 + tests/operators/test_local.py | 92 ++++++++++++++++++++++++++++++++-- tests/test_example_dags.py | 8 ++- 6 files changed, 121 insertions(+), 6 deletions(-) diff --git a/cosmos/constants.py b/cosmos/constants.py index bea5e25eb..847820ff2 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -3,6 +3,7 @@ from pathlib import Path import aenum +from packaging.version import Version DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml") DEFAULT_DBT_PROFILE_NAME = "cosmos_profile" @@ -20,6 +21,10 @@ DEFAULT_OPENLINEAGE_NAMESPACE = "cosmos" OPENLINEAGE_PRODUCER = "https://github.com/astronomer/astronomer-cosmos/" +# Cosmos will not emit datasets for the following Airflow versions, due to a breaking change that's fixed in later Airflow 2.x versions +# https://github.com/apache/airflow/issues/39486 +PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS = [Version("2.9.0"), Version("2.9.1")] + class LoadMode(Enum): """ diff --git a/cosmos/exceptions.py b/cosmos/exceptions.py index 74091f4a1..85df285b1 100644 --- a/cosmos/exceptions.py +++ b/cosmos/exceptions.py @@ -3,3 +3,7 @@ class CosmosValueError(ValueError): """Raised when a Cosmos config value is invalid.""" + + +class AirflowCompatibilityError(Exception): + """Raised when Cosmos features are limited for Airflow version being used.""" diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index efc022bf6..3fcf29ab5 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -21,6 +21,7 @@ from cosmos import cache from cosmos.constants import InvocationMode from cosmos.dbt.project import get_partial_parse_path +from cosmos.exceptions import AirflowCompatibilityError try: from airflow.datasets import Dataset @@ -407,7 +408,21 @@ def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]: dataset_uri = output.namespace + "/" + output.name uris.append(dataset_uri) logger.debug("URIs to be converted to Dataset: %s", uris) - return [Dataset(uri) for uri in uris] + + datasets = [] + try: + datasets = [Dataset(uri) for uri in uris] + except ValueError as e: + raise AirflowCompatibilityError( + """ + Apache Airflow 2.9.0 & 2.9.1 introduced a breaking change in Dataset URIs, to be fixed in newer versions: + https://github.com/apache/airflow/issues/39486 + + If you want to use Cosmos with one of these Airflow versions, you will have to disable emission of Datasets: + By setting ``emit_datasets=False`` in ``RenderConfig``. For more information, see https://astronomer.github.io/astronomer-cosmos/configuration/render-config.html. + """ + ) + return datasets def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset]) -> None: """ diff --git a/dev/dags/example_virtualenv.py b/dev/dags/example_virtualenv.py index 55ecf0a66..6275b1048 100644 --- a/dev/dags/example_virtualenv.py +++ b/dev/dags/example_virtualenv.py @@ -36,6 +36,7 @@ "py_system_site_packages": False, "py_requirements": ["dbt-postgres==1.6.0b1"], "install_deps": True, + "emit_datasets": False, # Example of how to not set inlets and outlets }, # normal dag parameters schedule_interval="@daily", diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 80c6c58a4..419be9e10 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -18,12 +18,13 @@ from cosmos import cache from cosmos.config import ProfileConfig -from cosmos.constants import InvocationMode +from cosmos.constants import PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, InvocationMode from cosmos.dbt.parser.output import ( extract_dbt_runner_issues, parse_number_of_warnings_dbt_runner, parse_number_of_warnings_subprocess, ) +from cosmos.exceptions import AirflowCompatibilityError from cosmos.operators.local import ( DbtBuildLocalOperator, DbtDocsAzureStorageLocalOperator, @@ -384,11 +385,12 @@ def test_dbt_test_local_operator_invocation_mode_methods(mock_extract_log_issues @pytest.mark.skipif( - version.parse(airflow_version) < version.parse("2.4"), - reason="Airflow DAG did not have datasets until the 2.4 release", + version.parse(airflow_version) < version.parse("2.4") + or version.parse(airflow_version) in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, + reason="Airflow DAG did not have datasets until the 2.4 release, inlets and outlets do not work by default in Airflow 2.9.0 and 2.9.1", ) @pytest.mark.integration -def test_run_operator_dataset_inlets_and_outlets(): +def test_run_operator_dataset_inlets_and_outlets(caplog): from airflow.datasets import Dataset with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: @@ -417,13 +419,95 @@ def test_run_operator_dataset_inlets_and_outlets(): append_env=True, ) seed_operator >> run_operator >> test_operator + run_test_dag(dag) + assert run_operator.inlets == [] assert run_operator.outlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.stg_customers", extra=None)] assert test_operator.inlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.stg_customers", extra=None)] assert test_operator.outlets == [] +@pytest.mark.skipif( + version.parse(airflow_version) not in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, + reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs", + # https://github.com/apache/airflow/issues/39486 +) +@pytest.mark.integration +def test_run_operator_dataset_emission_fails(caplog): + from airflow.datasets import Dataset + + with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: + seed_operator = DbtSeedLocalOperator( + profile_config=real_profile_config, + project_dir=DBT_PROJ_DIR, + task_id="seed", + dbt_cmd_flags=["--select", "raw_customers"], + install_deps=True, + append_env=True, + ) + run_operator = DbtRunLocalOperator( + profile_config=real_profile_config, + project_dir=DBT_PROJ_DIR, + task_id="run", + dbt_cmd_flags=["--models", "stg_customers"], + install_deps=True, + append_env=True, + ) + + seed_operator >> run_operator + + with pytest.raises(AirflowCompatibilityError) as exc: + run_test_dag(dag) + + err_msg = str(exc.value) + assert ( + "Apache Airflow 2.9.0 & 2.9.1 introduced a breaking change in Dataset URIs, to be fixed in newer versions" + in err_msg + ) + assert ( + "If you want to use Cosmos with one of these Airflow versions, you will have to disable emission of Datasets" + in err_msg + ) + + +@pytest.mark.skipif( + version.parse(airflow_version) not in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, + reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs", + # https://github.com/apache/airflow/issues/39486 +) +@pytest.mark.integration +def test_run_operator_dataset_emission_is_skipped(caplog): + from airflow.datasets import Dataset + + with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: + seed_operator = DbtSeedLocalOperator( + profile_config=real_profile_config, + project_dir=DBT_PROJ_DIR, + task_id="seed", + dbt_cmd_flags=["--select", "raw_customers"], + install_deps=True, + append_env=True, + emit_datasets=False, + ) + run_operator = DbtRunLocalOperator( + profile_config=real_profile_config, + project_dir=DBT_PROJ_DIR, + task_id="run", + dbt_cmd_flags=["--models", "stg_customers"], + install_deps=True, + append_env=True, + emit_datasets=False, + ) + + seed_operator >> run_operator + + run_test_dag(dag) + + assert run_operator.inlets == [] + assert run_operator.outlets == [] + + @pytest.mark.integration def test_run_operator_caches_partial_parsing(caplog, tmp_path): caplog.set_level(logging.DEBUG) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 91fd1d6c2..62091de57 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -16,6 +16,8 @@ from dbt.version import get_installed_version as get_dbt_version from packaging.version import Version +from cosmos.constants import PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS + from . import utils as test_utils EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "dev/dags" @@ -28,6 +30,7 @@ IGNORED_DAG_FILES = ["performance_dag.py"] + # Sort descending based on Versions and convert string to an actual version MIN_VER_DAG_FILE_VER: dict[Version, list[str]] = { Version(version): MIN_VER_DAG_FILE[version] for version in sorted(MIN_VER_DAG_FILE, key=Version, reverse=True) @@ -50,7 +53,10 @@ def get_dag_bag() -> DagBag: """Create a DagBag by adding the files that are not supported to .airflowignore""" with open(AIRFLOW_IGNORE_FILE, "w+") as file: for min_version, files in MIN_VER_DAG_FILE_VER.items(): - if Version(airflow.__version__) < min_version: + if ( + Version(airflow.__version__) < min_version + or Version(airflow.__version__) in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS + ): print(f"Adding {files} to .airflowignore") file.writelines([f"{file}\n" for file in files]) From d5e512e1ecd89e4e5243de72646172584b847f7a Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 9 May 2024 00:45:11 +0100 Subject: [PATCH 4/6] Fix CI issue (Airflow 2.9 being installed when 2.5 should have been installed) --- pyproject.toml | 1 + scripts/test/integration-setup.sh | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 04486f552..fead2cdcf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -120,6 +120,7 @@ dependencies = [ "apache-airflow-providers-cncf-kubernetes>=5.1.1", "apache-airflow-providers-docker>=3.5.0", "apache-airflow-providers-microsoft-azure", + "apache-airflow-providers-postgres", "types-PyYAML", "types-attrs", "types-requests", diff --git a/scripts/test/integration-setup.sh b/scripts/test/integration-setup.sh index 92031b6d6..2f33e6aa9 100644 --- a/scripts/test/integration-setup.sh +++ b/scripts/test/integration-setup.sh @@ -1,6 +1,6 @@ # we install using the following workaround to overcome installation conflicts, such as: # apache-airflow 2.3.0 and dbt-core [0.13.0 - 1.5.2] and jinja2>=3.0.0 because these package versions have conflicting dependencies pip uninstall -y dbt-postgres dbt-databricks dbt-vertica; -pip install 'dbt-core' 'dbt-databricks' 'dbt-postgres' 'dbt-vertica' 'openlineage-airflow' 'apache-airflow-providers-postgres'; +pip install 'dbt-core' 'dbt-databricks' 'dbt-postgres' 'dbt-vertica' 'openlineage-airflow' rm -rf airflow.*; airflow db init; From f6b4c5981471002538512dabaeb30b240e4f810e Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 9 May 2024 12:55:11 +0100 Subject: [PATCH 5/6] Try to fix issue in the CI: https://github.com/astronomer/astronomer-cosmos/actions/runs/9010008280 --- pyproject.toml | 4 ++-- scripts/test/integration-setup.sh | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index fead2cdcf..5f0e5ee0e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -117,10 +117,10 @@ packages = ["/cosmos"] [tool.hatch.envs.tests] dependencies = [ "astronomer-cosmos[tests]", + "apache-airflow-providers-postgres", "apache-airflow-providers-cncf-kubernetes>=5.1.1", "apache-airflow-providers-docker>=3.5.0", "apache-airflow-providers-microsoft-azure", - "apache-airflow-providers-postgres", "types-PyYAML", "types-attrs", "types-requests", @@ -136,7 +136,7 @@ airflow = ["2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9"] [tool.hatch.envs.tests.overrides] matrix.airflow.dependencies = [ - { value = "typing_extensions<4.6", if = ["2.6"] } + { value = "typing_extensions<4.6", if = ["2.6"] }, ] [tool.hatch.envs.tests.scripts] diff --git a/scripts/test/integration-setup.sh b/scripts/test/integration-setup.sh index 2f33e6aa9..eba4f1513 100644 --- a/scripts/test/integration-setup.sh +++ b/scripts/test/integration-setup.sh @@ -1,6 +1,6 @@ # we install using the following workaround to overcome installation conflicts, such as: # apache-airflow 2.3.0 and dbt-core [0.13.0 - 1.5.2] and jinja2>=3.0.0 because these package versions have conflicting dependencies -pip uninstall -y dbt-postgres dbt-databricks dbt-vertica; +pip uninstall -y dbt-postgres dbt-databricks dbt-vertica; \ +rm -rf airflow.*; \ +airflow db init; \ pip install 'dbt-core' 'dbt-databricks' 'dbt-postgres' 'dbt-vertica' 'openlineage-airflow' -rm -rf airflow.*; -airflow db init; From 1aa2f04830a7b49aad4a2e568193f08d830d80f8 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 9 May 2024 13:46:11 +0100 Subject: [PATCH 6/6] Fix CI --- tests/test_example_dags.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 62091de57..af45191c9 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -23,6 +23,7 @@ EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "dev/dags" AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore" DBT_VERSION = Version(get_dbt_version().to_version_string()[1:]) +AIRFLOW_VERSION = Version(airflow.__version__) MIN_VER_DAG_FILE: dict[str, list[str]] = { "2.4": ["cosmos_seed_dag.py"], @@ -51,12 +52,12 @@ def session(): @cache def get_dag_bag() -> DagBag: """Create a DagBag by adding the files that are not supported to .airflowignore""" + if AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS: + return DagBag(dag_folder=None, include_examples=False) + with open(AIRFLOW_IGNORE_FILE, "w+") as file: for min_version, files in MIN_VER_DAG_FILE_VER.items(): - if ( - Version(airflow.__version__) < min_version - or Version(airflow.__version__) in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS - ): + if AIRFLOW_VERSION < min_version: print(f"Adding {files} to .airflowignore") file.writelines([f"{file}\n" for file in files]) @@ -83,6 +84,10 @@ def get_dag_ids() -> list[str]: return dag_bag.dag_ids +@pytest.mark.skipif( + AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, + reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs, and Cosmos errors if `emit_datasets` is not False", +) @pytest.mark.integration @pytest.mark.parametrize("dag_id", get_dag_ids()) def test_example_dag(session, dag_id: str):