diff --git a/cosmos/constants.py b/cosmos/constants.py index bea5e25eb..847820ff2 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -3,6 +3,7 @@ from pathlib import Path import aenum +from packaging.version import Version DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml") DEFAULT_DBT_PROFILE_NAME = "cosmos_profile" @@ -20,6 +21,10 @@ DEFAULT_OPENLINEAGE_NAMESPACE = "cosmos" OPENLINEAGE_PRODUCER = "https://github.com/astronomer/astronomer-cosmos/" +# Cosmos will not emit datasets for the following Airflow versions, due to a breaking change that's fixed in later Airflow 2.x versions +# https://github.com/apache/airflow/issues/39486 +PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS = [Version("2.9.0"), Version("2.9.1")] + class LoadMode(Enum): """ diff --git a/cosmos/exceptions.py b/cosmos/exceptions.py index 74091f4a1..85df285b1 100644 --- a/cosmos/exceptions.py +++ b/cosmos/exceptions.py @@ -3,3 +3,7 @@ class CosmosValueError(ValueError): """Raised when a Cosmos config value is invalid.""" + + +class AirflowCompatibilityError(Exception): + """Raised when Cosmos features are limited for Airflow version being used.""" diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 0f7fa7f5f..6c86223e5 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -21,6 +21,7 @@ from cosmos import cache from cosmos.constants import InvocationMode from cosmos.dbt.project import get_partial_parse_path +from cosmos.exceptions import AirflowCompatibilityError try: from airflow.datasets import Dataset @@ -406,7 +407,21 @@ def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]: for output in getattr(completed, source): dataset_uri = output.namespace + "/" + output.name uris.append(dataset_uri) - return [Dataset(uri) for uri in uris] + + datasets = [] + try: + datasets = [Dataset(uri) for uri in uris] + except ValueError as e: + raise AirflowCompatibilityError( + """ + Apache Airflow 2.9.0 & 2.9.1 introduced a breaking change in Dataset URIs, to be fixed in newer versions: + https://github.com/apache/airflow/issues/39486 + + If you want to use Cosmos with one of these Airflow versions, you will have to disable emission of Datasets: + By setting ``emit_datasets=False`` in ``RenderConfig``. For more information, see https://astronomer.github.io/astronomer-cosmos/configuration/render-config.html. + """ + ) + return datasets def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset]) -> None: """ diff --git a/dev/dags/example_virtualenv.py b/dev/dags/example_virtualenv.py index 55ecf0a66..6275b1048 100644 --- a/dev/dags/example_virtualenv.py +++ b/dev/dags/example_virtualenv.py @@ -36,6 +36,7 @@ "py_system_site_packages": False, "py_requirements": ["dbt-postgres==1.6.0b1"], "install_deps": True, + "emit_datasets": False, # Example of how to not set inlets and outlets }, # normal dag parameters schedule_interval="@daily", diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 80c6c58a4..419be9e10 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -18,12 +18,13 @@ from cosmos import cache from cosmos.config import ProfileConfig -from cosmos.constants import InvocationMode +from cosmos.constants import PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, InvocationMode from cosmos.dbt.parser.output import ( extract_dbt_runner_issues, parse_number_of_warnings_dbt_runner, parse_number_of_warnings_subprocess, ) +from cosmos.exceptions import AirflowCompatibilityError from cosmos.operators.local import ( DbtBuildLocalOperator, DbtDocsAzureStorageLocalOperator, @@ -384,11 +385,12 @@ def test_dbt_test_local_operator_invocation_mode_methods(mock_extract_log_issues @pytest.mark.skipif( - version.parse(airflow_version) < version.parse("2.4"), - reason="Airflow DAG did not have datasets until the 2.4 release", + version.parse(airflow_version) < version.parse("2.4") + or version.parse(airflow_version) in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, + reason="Airflow DAG did not have datasets until the 2.4 release, inlets and outlets do not work by default in Airflow 2.9.0 and 2.9.1", ) @pytest.mark.integration -def test_run_operator_dataset_inlets_and_outlets(): +def test_run_operator_dataset_inlets_and_outlets(caplog): from airflow.datasets import Dataset with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: @@ -417,13 +419,95 @@ def test_run_operator_dataset_inlets_and_outlets(): append_env=True, ) seed_operator >> run_operator >> test_operator + run_test_dag(dag) + assert run_operator.inlets == [] assert run_operator.outlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.stg_customers", extra=None)] assert test_operator.inlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.stg_customers", extra=None)] assert test_operator.outlets == [] +@pytest.mark.skipif( + version.parse(airflow_version) not in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, + reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs", + # https://github.com/apache/airflow/issues/39486 +) +@pytest.mark.integration +def test_run_operator_dataset_emission_fails(caplog): + from airflow.datasets import Dataset + + with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: + seed_operator = DbtSeedLocalOperator( + profile_config=real_profile_config, + project_dir=DBT_PROJ_DIR, + task_id="seed", + dbt_cmd_flags=["--select", "raw_customers"], + install_deps=True, + append_env=True, + ) + run_operator = DbtRunLocalOperator( + profile_config=real_profile_config, + project_dir=DBT_PROJ_DIR, + task_id="run", + dbt_cmd_flags=["--models", "stg_customers"], + install_deps=True, + append_env=True, + ) + + seed_operator >> run_operator + + with pytest.raises(AirflowCompatibilityError) as exc: + run_test_dag(dag) + + err_msg = str(exc.value) + assert ( + "Apache Airflow 2.9.0 & 2.9.1 introduced a breaking change in Dataset URIs, to be fixed in newer versions" + in err_msg + ) + assert ( + "If you want to use Cosmos with one of these Airflow versions, you will have to disable emission of Datasets" + in err_msg + ) + + +@pytest.mark.skipif( + version.parse(airflow_version) not in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, + reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs", + # https://github.com/apache/airflow/issues/39486 +) +@pytest.mark.integration +def test_run_operator_dataset_emission_is_skipped(caplog): + from airflow.datasets import Dataset + + with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: + seed_operator = DbtSeedLocalOperator( + profile_config=real_profile_config, + project_dir=DBT_PROJ_DIR, + task_id="seed", + dbt_cmd_flags=["--select", "raw_customers"], + install_deps=True, + append_env=True, + emit_datasets=False, + ) + run_operator = DbtRunLocalOperator( + profile_config=real_profile_config, + project_dir=DBT_PROJ_DIR, + task_id="run", + dbt_cmd_flags=["--models", "stg_customers"], + install_deps=True, + append_env=True, + emit_datasets=False, + ) + + seed_operator >> run_operator + + run_test_dag(dag) + + assert run_operator.inlets == [] + assert run_operator.outlets == [] + + @pytest.mark.integration def test_run_operator_caches_partial_parsing(caplog, tmp_path): caplog.set_level(logging.DEBUG) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 91fd1d6c2..62091de57 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -16,6 +16,8 @@ from dbt.version import get_installed_version as get_dbt_version from packaging.version import Version +from cosmos.constants import PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS + from . import utils as test_utils EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "dev/dags" @@ -28,6 +30,7 @@ IGNORED_DAG_FILES = ["performance_dag.py"] + # Sort descending based on Versions and convert string to an actual version MIN_VER_DAG_FILE_VER: dict[Version, list[str]] = { Version(version): MIN_VER_DAG_FILE[version] for version in sorted(MIN_VER_DAG_FILE, key=Version, reverse=True) @@ -50,7 +53,10 @@ def get_dag_bag() -> DagBag: """Create a DagBag by adding the files that are not supported to .airflowignore""" with open(AIRFLOW_IGNORE_FILE, "w+") as file: for min_version, files in MIN_VER_DAG_FILE_VER.items(): - if Version(airflow.__version__) < min_version: + if ( + Version(airflow.__version__) < min_version + or Version(airflow.__version__) in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS + ): print(f"Adding {files} to .airflowignore") file.writelines([f"{file}\n" for file in files])