Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully error if users try to emit_datasets with Airflow 2.9.0 or 2.9.1 #948

Merged
merged 6 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path

import aenum
from packaging.version import Version

DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml")
DEFAULT_DBT_PROFILE_NAME = "cosmos_profile"
Expand All @@ -20,6 +21,10 @@
DEFAULT_OPENLINEAGE_NAMESPACE = "cosmos"
OPENLINEAGE_PRODUCER = "https://github.com/astronomer/astronomer-cosmos/"

# Cosmos will not emit datasets for the following Airflow versions, due to a breaking change that's fixed in later Airflow 2.x versions
# https://github.com/apache/airflow/issues/39486
PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS = [Version("2.9.0"), Version("2.9.1")]


class LoadMode(Enum):
"""
Expand Down
4 changes: 4 additions & 0 deletions cosmos/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@

class CosmosValueError(ValueError):
"""Raised when a Cosmos config value is invalid."""


class AirflowCompatibilityError(Exception):
"""Raised when Cosmos features are limited for Airflow version being used."""
17 changes: 16 additions & 1 deletion cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from cosmos import cache
from cosmos.constants import InvocationMode
from cosmos.dbt.project import get_partial_parse_path
from cosmos.exceptions import AirflowCompatibilityError

try:
from airflow.datasets import Dataset
Expand Down Expand Up @@ -407,7 +408,21 @@ def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]:
dataset_uri = output.namespace + "/" + output.name
uris.append(dataset_uri)
logger.debug("URIs to be converted to Dataset: %s", uris)
return [Dataset(uri) for uri in uris]

datasets = []
try:
datasets = [Dataset(uri) for uri in uris]
except ValueError as e:
raise AirflowCompatibilityError(
"""
Apache Airflow 2.9.0 & 2.9.1 introduced a breaking change in Dataset URIs, to be fixed in newer versions:
https://github.com/apache/airflow/issues/39486

If you want to use Cosmos with one of these Airflow versions, you will have to disable emission of Datasets:
By setting ``emit_datasets=False`` in ``RenderConfig``. For more information, see https://astronomer.github.io/astronomer-cosmos/configuration/render-config.html.
"""
)
return datasets

def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset]) -> None:
"""
Expand Down
1 change: 1 addition & 0 deletions dev/dags/example_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"py_system_site_packages": False,
"py_requirements": ["dbt-postgres==1.6.0b1"],
"install_deps": True,
"emit_datasets": False, # Example of how to not set inlets and outlets
},
# normal dag parameters
schedule_interval="@daily",
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ packages = ["/cosmos"]
[tool.hatch.envs.tests]
dependencies = [
"astronomer-cosmos[tests]",
"apache-airflow-providers-postgres",
"apache-airflow-providers-cncf-kubernetes>=5.1.1",
"apache-airflow-providers-docker>=3.5.0",
"apache-airflow-providers-microsoft-azure",
Expand All @@ -135,7 +136,7 @@ airflow = ["2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9"]

[tool.hatch.envs.tests.overrides]
matrix.airflow.dependencies = [
{ value = "typing_extensions<4.6", if = ["2.6"] }
{ value = "typing_extensions<4.6", if = ["2.6"] },
]

[tool.hatch.envs.tests.scripts]
Expand Down
92 changes: 88 additions & 4 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

from cosmos import cache
from cosmos.config import ProfileConfig
from cosmos.constants import InvocationMode
from cosmos.constants import PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, InvocationMode
from cosmos.dbt.parser.output import (
extract_dbt_runner_issues,
parse_number_of_warnings_dbt_runner,
parse_number_of_warnings_subprocess,
)
from cosmos.exceptions import AirflowCompatibilityError
from cosmos.operators.local import (
DbtBuildLocalOperator,
DbtDocsAzureStorageLocalOperator,
Expand Down Expand Up @@ -384,11 +385,12 @@ def test_dbt_test_local_operator_invocation_mode_methods(mock_extract_log_issues


@pytest.mark.skipif(
version.parse(airflow_version) < version.parse("2.4"),
reason="Airflow DAG did not have datasets until the 2.4 release",
version.parse(airflow_version) < version.parse("2.4")
or version.parse(airflow_version) in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS,
reason="Airflow DAG did not have datasets until the 2.4 release, inlets and outlets do not work by default in Airflow 2.9.0 and 2.9.1",
)
@pytest.mark.integration
def test_run_operator_dataset_inlets_and_outlets():
def test_run_operator_dataset_inlets_and_outlets(caplog):
from airflow.datasets import Dataset

with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag:
Expand Down Expand Up @@ -417,13 +419,95 @@ def test_run_operator_dataset_inlets_and_outlets():
append_env=True,
)
seed_operator >> run_operator >> test_operator

run_test_dag(dag)

assert run_operator.inlets == []
assert run_operator.outlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.stg_customers", extra=None)]
assert test_operator.inlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.stg_customers", extra=None)]
assert test_operator.outlets == []


@pytest.mark.skipif(
version.parse(airflow_version) not in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS,
reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs",
# https://github.com/apache/airflow/issues/39486
)
@pytest.mark.integration
def test_run_operator_dataset_emission_fails(caplog):
from airflow.datasets import Dataset

with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag:
seed_operator = DbtSeedLocalOperator(
profile_config=real_profile_config,
project_dir=DBT_PROJ_DIR,
task_id="seed",
dbt_cmd_flags=["--select", "raw_customers"],
install_deps=True,
append_env=True,
)
run_operator = DbtRunLocalOperator(
profile_config=real_profile_config,
project_dir=DBT_PROJ_DIR,
task_id="run",
dbt_cmd_flags=["--models", "stg_customers"],
install_deps=True,
append_env=True,
)

seed_operator >> run_operator

with pytest.raises(AirflowCompatibilityError) as exc:
run_test_dag(dag)

err_msg = str(exc.value)
assert (
"Apache Airflow 2.9.0 & 2.9.1 introduced a breaking change in Dataset URIs, to be fixed in newer versions"
in err_msg
)
assert (
"If you want to use Cosmos with one of these Airflow versions, you will have to disable emission of Datasets"
in err_msg
)


@pytest.mark.skipif(
version.parse(airflow_version) not in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS,
reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs",
# https://github.com/apache/airflow/issues/39486
)
@pytest.mark.integration
def test_run_operator_dataset_emission_is_skipped(caplog):
from airflow.datasets import Dataset

with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag:
seed_operator = DbtSeedLocalOperator(
profile_config=real_profile_config,
project_dir=DBT_PROJ_DIR,
task_id="seed",
dbt_cmd_flags=["--select", "raw_customers"],
install_deps=True,
append_env=True,
emit_datasets=False,
)
run_operator = DbtRunLocalOperator(
profile_config=real_profile_config,
project_dir=DBT_PROJ_DIR,
task_id="run",
dbt_cmd_flags=["--models", "stg_customers"],
install_deps=True,
append_env=True,
emit_datasets=False,
)

seed_operator >> run_operator

run_test_dag(dag)

assert run_operator.inlets == []
assert run_operator.outlets == []


@pytest.mark.integration
def test_run_operator_caches_partial_parsing(caplog, tmp_path):
caplog.set_level(logging.DEBUG)
Expand Down
13 changes: 12 additions & 1 deletion tests/test_example_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@
from dbt.version import get_installed_version as get_dbt_version
from packaging.version import Version

from cosmos.constants import PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS

from . import utils as test_utils

EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "dev/dags"
AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore"
DBT_VERSION = Version(get_dbt_version().to_version_string()[1:])
AIRFLOW_VERSION = Version(airflow.__version__)

MIN_VER_DAG_FILE: dict[str, list[str]] = {
"2.4": ["cosmos_seed_dag.py"],
}

IGNORED_DAG_FILES = ["performance_dag.py"]


# Sort descending based on Versions and convert string to an actual version
MIN_VER_DAG_FILE_VER: dict[Version, list[str]] = {
Version(version): MIN_VER_DAG_FILE[version] for version in sorted(MIN_VER_DAG_FILE, key=Version, reverse=True)
Expand All @@ -48,9 +52,12 @@ def session():
@cache
def get_dag_bag() -> DagBag:
"""Create a DagBag by adding the files that are not supported to .airflowignore"""
if AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS:
return DagBag(dag_folder=None, include_examples=False)

with open(AIRFLOW_IGNORE_FILE, "w+") as file:
for min_version, files in MIN_VER_DAG_FILE_VER.items():
if Version(airflow.__version__) < min_version:
if AIRFLOW_VERSION < min_version:
print(f"Adding {files} to .airflowignore")
file.writelines([f"{file}\n" for file in files])

Expand All @@ -77,6 +84,10 @@ def get_dag_ids() -> list[str]:
return dag_bag.dag_ids


@pytest.mark.skipif(
AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS,
reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs, and Cosmos errors if `emit_datasets` is not False",
)
@pytest.mark.integration
@pytest.mark.parametrize("dag_id", get_dag_ids())
def test_example_dag(session, dag_id: str):
Expand Down
Loading