From 0c2ee75c32188f7ce97b1b09d249f268a85c2a91 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 10 May 2024 14:39:58 +0100 Subject: [PATCH] Gracefully error if users try to emit_datasets with Airflow 2.9.0 or 2.9.1 (#948) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Improve Cosmos error message when using Airflow 2.9.0 or 2.9.1 and emitting OL events, to avoid this: ``` [2024-05-07, 14:20:09 UTC] {local.py:409} DEBUG - URIs to be converted to Dataset: [] [2024-05-07, 14:20:09 UTC] {local.py:409} DEBUG - URIs to be converted to Dataset: ['***://***:5432/***.dbt.stg_customers'] [2024-05-07, 14:20:09 UTC] {providers_manager.py:376} DEBUG - Initializing Providers Manager[dataset_uris] [2024-05-07, 14:20:09 UTC] {providers_manager.py:379} DEBUG - Initialization of Providers Manager[dataset_uris] took 0.00 seconds [2024-05-07, 14:20:09 UTC] {taskinstance.py:441} ▼ Post task execution logs [2024-05-07, 14:20:09 UTC] {taskinstance.py:2905} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 400, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/base.py", line 266, in execute self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags()) File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 470, in build_and_run_cmd result = self.run_command(cmd=dbt_cmd, env=env, context=context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 343, in run_command outlets = self.get_datasets("outputs") ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in get_datasets return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^ File "", line 3, in __init__ _setattr('uri', __attr_converter_uri(uri)) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/datasets/__init__.py", line 81, in _sanitize_uri parsed = normalizer(parsed) ^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/providers/postgres/datasets/postgres.py", line 34, in sanitize_uri raise ValueError("URI format postgres:// must contain database, schema, and table names") ValueError: URI format ***:// must contain database, schema, and table names ``` Closes: #945 --- cosmos/constants.py | 5 ++ cosmos/exceptions.py | 4 ++ cosmos/operators/local.py | 17 ++++++- dev/dags/example_virtualenv.py | 1 + pyproject.toml | 3 +- tests/operators/test_local.py | 92 ++++++++++++++++++++++++++++++++-- tests/test_example_dags.py | 13 ++++- 7 files changed, 128 insertions(+), 7 deletions(-) diff --git a/cosmos/constants.py b/cosmos/constants.py index bea5e25eb..847820ff2 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -3,6 +3,7 @@ from pathlib import Path import aenum +from packaging.version import Version DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml") DEFAULT_DBT_PROFILE_NAME = "cosmos_profile" @@ -20,6 +21,10 @@ DEFAULT_OPENLINEAGE_NAMESPACE = "cosmos" OPENLINEAGE_PRODUCER = "https://github.com/astronomer/astronomer-cosmos/" +# Cosmos will not emit datasets for the following Airflow versions, due to a breaking change that's fixed in later Airflow 2.x versions +# https://github.com/apache/airflow/issues/39486 +PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS = [Version("2.9.0"), Version("2.9.1")] + class LoadMode(Enum): """ diff --git a/cosmos/exceptions.py b/cosmos/exceptions.py index 74091f4a1..85df285b1 100644 --- a/cosmos/exceptions.py +++ b/cosmos/exceptions.py @@ -3,3 +3,7 @@ class CosmosValueError(ValueError): """Raised when a Cosmos config value is invalid.""" + + +class AirflowCompatibilityError(Exception): + """Raised when Cosmos features are limited for Airflow version being used.""" diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 64178b49f..ff4bb8280 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -21,6 +21,7 @@ from cosmos import cache from cosmos.constants import InvocationMode from cosmos.dbt.project import get_partial_parse_path +from cosmos.exceptions import AirflowCompatibilityError try: from airflow.datasets import Dataset @@ -407,7 +408,21 @@ def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]: dataset_uri = output.namespace + "/" + output.name uris.append(dataset_uri) logger.debug("URIs to be converted to Dataset: %s", uris) - return [Dataset(uri) for uri in uris] + + datasets = [] + try: + datasets = [Dataset(uri) for uri in uris] + except ValueError as e: + raise AirflowCompatibilityError( + """ + Apache Airflow 2.9.0 & 2.9.1 introduced a breaking change in Dataset URIs, to be fixed in newer versions: + https://github.com/apache/airflow/issues/39486 + + If you want to use Cosmos with one of these Airflow versions, you will have to disable emission of Datasets: + By setting ``emit_datasets=False`` in ``RenderConfig``. For more information, see https://astronomer.github.io/astronomer-cosmos/configuration/render-config.html. + """ + ) + return datasets def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset]) -> None: """ diff --git a/dev/dags/example_virtualenv.py b/dev/dags/example_virtualenv.py index 55ecf0a66..6275b1048 100644 --- a/dev/dags/example_virtualenv.py +++ b/dev/dags/example_virtualenv.py @@ -36,6 +36,7 @@ "py_system_site_packages": False, "py_requirements": ["dbt-postgres==1.6.0b1"], "install_deps": True, + "emit_datasets": False, # Example of how to not set inlets and outlets }, # normal dag parameters schedule_interval="@daily", diff --git a/pyproject.toml b/pyproject.toml index 04486f552..5f0e5ee0e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -117,6 +117,7 @@ packages = ["/cosmos"] [tool.hatch.envs.tests] dependencies = [ "astronomer-cosmos[tests]", + "apache-airflow-providers-postgres", "apache-airflow-providers-cncf-kubernetes>=5.1.1", "apache-airflow-providers-docker>=3.5.0", "apache-airflow-providers-microsoft-azure", @@ -135,7 +136,7 @@ airflow = ["2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9"] [tool.hatch.envs.tests.overrides] matrix.airflow.dependencies = [ - { value = "typing_extensions<4.6", if = ["2.6"] } + { value = "typing_extensions<4.6", if = ["2.6"] }, ] [tool.hatch.envs.tests.scripts] diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 80c6c58a4..419be9e10 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -18,12 +18,13 @@ from cosmos import cache from cosmos.config import ProfileConfig -from cosmos.constants import InvocationMode +from cosmos.constants import PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, InvocationMode from cosmos.dbt.parser.output import ( extract_dbt_runner_issues, parse_number_of_warnings_dbt_runner, parse_number_of_warnings_subprocess, ) +from cosmos.exceptions import AirflowCompatibilityError from cosmos.operators.local import ( DbtBuildLocalOperator, DbtDocsAzureStorageLocalOperator, @@ -384,11 +385,12 @@ def test_dbt_test_local_operator_invocation_mode_methods(mock_extract_log_issues @pytest.mark.skipif( - version.parse(airflow_version) < version.parse("2.4"), - reason="Airflow DAG did not have datasets until the 2.4 release", + version.parse(airflow_version) < version.parse("2.4") + or version.parse(airflow_version) in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, + reason="Airflow DAG did not have datasets until the 2.4 release, inlets and outlets do not work by default in Airflow 2.9.0 and 2.9.1", ) @pytest.mark.integration -def test_run_operator_dataset_inlets_and_outlets(): +def test_run_operator_dataset_inlets_and_outlets(caplog): from airflow.datasets import Dataset with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: @@ -417,13 +419,95 @@ def test_run_operator_dataset_inlets_and_outlets(): append_env=True, ) seed_operator >> run_operator >> test_operator + run_test_dag(dag) + assert run_operator.inlets == [] assert run_operator.outlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.stg_customers", extra=None)] assert test_operator.inlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.stg_customers", extra=None)] assert test_operator.outlets == [] +@pytest.mark.skipif( + version.parse(airflow_version) not in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, + reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs", + # https://github.com/apache/airflow/issues/39486 +) +@pytest.mark.integration +def test_run_operator_dataset_emission_fails(caplog): + from airflow.datasets import Dataset + + with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: + seed_operator = DbtSeedLocalOperator( + profile_config=real_profile_config, + project_dir=DBT_PROJ_DIR, + task_id="seed", + dbt_cmd_flags=["--select", "raw_customers"], + install_deps=True, + append_env=True, + ) + run_operator = DbtRunLocalOperator( + profile_config=real_profile_config, + project_dir=DBT_PROJ_DIR, + task_id="run", + dbt_cmd_flags=["--models", "stg_customers"], + install_deps=True, + append_env=True, + ) + + seed_operator >> run_operator + + with pytest.raises(AirflowCompatibilityError) as exc: + run_test_dag(dag) + + err_msg = str(exc.value) + assert ( + "Apache Airflow 2.9.0 & 2.9.1 introduced a breaking change in Dataset URIs, to be fixed in newer versions" + in err_msg + ) + assert ( + "If you want to use Cosmos with one of these Airflow versions, you will have to disable emission of Datasets" + in err_msg + ) + + +@pytest.mark.skipif( + version.parse(airflow_version) not in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, + reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs", + # https://github.com/apache/airflow/issues/39486 +) +@pytest.mark.integration +def test_run_operator_dataset_emission_is_skipped(caplog): + from airflow.datasets import Dataset + + with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: + seed_operator = DbtSeedLocalOperator( + profile_config=real_profile_config, + project_dir=DBT_PROJ_DIR, + task_id="seed", + dbt_cmd_flags=["--select", "raw_customers"], + install_deps=True, + append_env=True, + emit_datasets=False, + ) + run_operator = DbtRunLocalOperator( + profile_config=real_profile_config, + project_dir=DBT_PROJ_DIR, + task_id="run", + dbt_cmd_flags=["--models", "stg_customers"], + install_deps=True, + append_env=True, + emit_datasets=False, + ) + + seed_operator >> run_operator + + run_test_dag(dag) + + assert run_operator.inlets == [] + assert run_operator.outlets == [] + + @pytest.mark.integration def test_run_operator_caches_partial_parsing(caplog, tmp_path): caplog.set_level(logging.DEBUG) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 91fd1d6c2..af45191c9 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -16,11 +16,14 @@ from dbt.version import get_installed_version as get_dbt_version from packaging.version import Version +from cosmos.constants import PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS + from . import utils as test_utils EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "dev/dags" AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore" DBT_VERSION = Version(get_dbt_version().to_version_string()[1:]) +AIRFLOW_VERSION = Version(airflow.__version__) MIN_VER_DAG_FILE: dict[str, list[str]] = { "2.4": ["cosmos_seed_dag.py"], @@ -28,6 +31,7 @@ IGNORED_DAG_FILES = ["performance_dag.py"] + # Sort descending based on Versions and convert string to an actual version MIN_VER_DAG_FILE_VER: dict[Version, list[str]] = { Version(version): MIN_VER_DAG_FILE[version] for version in sorted(MIN_VER_DAG_FILE, key=Version, reverse=True) @@ -48,9 +52,12 @@ def session(): @cache def get_dag_bag() -> DagBag: """Create a DagBag by adding the files that are not supported to .airflowignore""" + if AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS: + return DagBag(dag_folder=None, include_examples=False) + with open(AIRFLOW_IGNORE_FILE, "w+") as file: for min_version, files in MIN_VER_DAG_FILE_VER.items(): - if Version(airflow.__version__) < min_version: + if AIRFLOW_VERSION < min_version: print(f"Adding {files} to .airflowignore") file.writelines([f"{file}\n" for file in files]) @@ -77,6 +84,10 @@ def get_dag_ids() -> list[str]: return dag_bag.dag_ids +@pytest.mark.skipif( + AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, + reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs, and Cosmos errors if `emit_datasets` is not False", +) @pytest.mark.integration @pytest.mark.parametrize("dag_id", get_dag_ids()) def test_example_dag(session, dag_id: str):