Skip to content

Commit

Permalink
Gracefully error if users try to emit_datasets with Airflow 2.9.0 or …
Browse files Browse the repository at this point in the history
…2.9.1 (astronomer#948)

Improve Cosmos error message when using Airflow 2.9.0 or 2.9.1 and
emitting OL events, to avoid this:

```
[2024-05-07, 14:20:09 UTC] {local.py:409} DEBUG - URIs to be converted to Dataset: []
[2024-05-07, 14:20:09 UTC] {local.py:409} DEBUG - URIs to be converted to Dataset: ['***://***:5432/***.dbt.stg_customers']
[2024-05-07, 14:20:09 UTC] {providers_manager.py:376} DEBUG - Initializing Providers Manager[dataset_uris]
[2024-05-07, 14:20:09 UTC] {providers_manager.py:379} DEBUG - Initialization of Providers Manager[dataset_uris] took 0.00 seconds
[2024-05-07, 14:20:09 UTC] {taskinstance.py:441} ▼ Post task execution logs
[2024-05-07, 14:20:09 UTC] {taskinstance.py:2905} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 400, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/cosmos/operators/base.py", line 266, in execute
    self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags())
  File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 470, in build_and_run_cmd
    result = self.run_command(cmd=dbt_cmd, env=env, context=context)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 343, in run_command
    outlets = self.get_datasets("outputs")
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in get_datasets
    return [Dataset(uri) for uri in uris]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in <listcomp>
    return [Dataset(uri) for uri in uris]
            ^^^^^^^^^^^^
  File "<attrs generated init airflow.datasets.Dataset>", line 3, in __init__
    _setattr('uri', __attr_converter_uri(uri))
                    ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/datasets/__init__.py", line 81, in _sanitize_uri
    parsed = normalizer(parsed)
             ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/postgres/datasets/postgres.py", line 34, in sanitize_uri
    raise ValueError("URI format postgres:// must contain database, schema, and table names")
ValueError: URI format ***:// must contain database, schema, and table names
```

Closes: astronomer#945
  • Loading branch information
tatiana authored and arojasb3 committed Jul 14, 2024
1 parent 135a069 commit 0c2ee75
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 7 deletions.
5 changes: 5 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path

import aenum
from packaging.version import Version

DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml")
DEFAULT_DBT_PROFILE_NAME = "cosmos_profile"
Expand All @@ -20,6 +21,10 @@
DEFAULT_OPENLINEAGE_NAMESPACE = "cosmos"
OPENLINEAGE_PRODUCER = "https://github.com/astronomer/astronomer-cosmos/"

# Cosmos will not emit datasets for the following Airflow versions, due to a breaking change that's fixed in later Airflow 2.x versions
# https://github.com/apache/airflow/issues/39486
PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS = [Version("2.9.0"), Version("2.9.1")]


class LoadMode(Enum):
"""
Expand Down
4 changes: 4 additions & 0 deletions cosmos/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@

class CosmosValueError(ValueError):
"""Raised when a Cosmos config value is invalid."""


class AirflowCompatibilityError(Exception):
"""Raised when Cosmos features are limited for Airflow version being used."""
17 changes: 16 additions & 1 deletion cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from cosmos import cache
from cosmos.constants import InvocationMode
from cosmos.dbt.project import get_partial_parse_path
from cosmos.exceptions import AirflowCompatibilityError

try:
from airflow.datasets import Dataset
Expand Down Expand Up @@ -407,7 +408,21 @@ def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]:
dataset_uri = output.namespace + "/" + output.name
uris.append(dataset_uri)
logger.debug("URIs to be converted to Dataset: %s", uris)
return [Dataset(uri) for uri in uris]

datasets = []
try:
datasets = [Dataset(uri) for uri in uris]
except ValueError as e:
raise AirflowCompatibilityError(
"""
Apache Airflow 2.9.0 & 2.9.1 introduced a breaking change in Dataset URIs, to be fixed in newer versions:
https://github.com/apache/airflow/issues/39486
If you want to use Cosmos with one of these Airflow versions, you will have to disable emission of Datasets:
By setting ``emit_datasets=False`` in ``RenderConfig``. For more information, see https://astronomer.github.io/astronomer-cosmos/configuration/render-config.html.
"""
)
return datasets

def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset]) -> None:
"""
Expand Down
1 change: 1 addition & 0 deletions dev/dags/example_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"py_system_site_packages": False,
"py_requirements": ["dbt-postgres==1.6.0b1"],
"install_deps": True,
"emit_datasets": False, # Example of how to not set inlets and outlets
},
# normal dag parameters
schedule_interval="@daily",
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ packages = ["/cosmos"]
[tool.hatch.envs.tests]
dependencies = [
"astronomer-cosmos[tests]",
"apache-airflow-providers-postgres",
"apache-airflow-providers-cncf-kubernetes>=5.1.1",
"apache-airflow-providers-docker>=3.5.0",
"apache-airflow-providers-microsoft-azure",
Expand All @@ -135,7 +136,7 @@ airflow = ["2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.9"]

[tool.hatch.envs.tests.overrides]
matrix.airflow.dependencies = [
{ value = "typing_extensions<4.6", if = ["2.6"] }
{ value = "typing_extensions<4.6", if = ["2.6"] },
]

[tool.hatch.envs.tests.scripts]
Expand Down
92 changes: 88 additions & 4 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

from cosmos import cache
from cosmos.config import ProfileConfig
from cosmos.constants import InvocationMode
from cosmos.constants import PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, InvocationMode
from cosmos.dbt.parser.output import (
extract_dbt_runner_issues,
parse_number_of_warnings_dbt_runner,
parse_number_of_warnings_subprocess,
)
from cosmos.exceptions import AirflowCompatibilityError
from cosmos.operators.local import (
DbtBuildLocalOperator,
DbtDocsAzureStorageLocalOperator,
Expand Down Expand Up @@ -384,11 +385,12 @@ def test_dbt_test_local_operator_invocation_mode_methods(mock_extract_log_issues


@pytest.mark.skipif(
version.parse(airflow_version) < version.parse("2.4"),
reason="Airflow DAG did not have datasets until the 2.4 release",
version.parse(airflow_version) < version.parse("2.4")
or version.parse(airflow_version) in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS,
reason="Airflow DAG did not have datasets until the 2.4 release, inlets and outlets do not work by default in Airflow 2.9.0 and 2.9.1",
)
@pytest.mark.integration
def test_run_operator_dataset_inlets_and_outlets():
def test_run_operator_dataset_inlets_and_outlets(caplog):
from airflow.datasets import Dataset

with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag:
Expand Down Expand Up @@ -417,13 +419,95 @@ def test_run_operator_dataset_inlets_and_outlets():
append_env=True,
)
seed_operator >> run_operator >> test_operator

run_test_dag(dag)

assert run_operator.inlets == []
assert run_operator.outlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.stg_customers", extra=None)]
assert test_operator.inlets == [Dataset(uri="postgres://0.0.0.0:5432/postgres.public.stg_customers", extra=None)]
assert test_operator.outlets == []


@pytest.mark.skipif(
version.parse(airflow_version) not in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS,
reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs",
# https://github.com/apache/airflow/issues/39486
)
@pytest.mark.integration
def test_run_operator_dataset_emission_fails(caplog):
from airflow.datasets import Dataset

with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag:
seed_operator = DbtSeedLocalOperator(
profile_config=real_profile_config,
project_dir=DBT_PROJ_DIR,
task_id="seed",
dbt_cmd_flags=["--select", "raw_customers"],
install_deps=True,
append_env=True,
)
run_operator = DbtRunLocalOperator(
profile_config=real_profile_config,
project_dir=DBT_PROJ_DIR,
task_id="run",
dbt_cmd_flags=["--models", "stg_customers"],
install_deps=True,
append_env=True,
)

seed_operator >> run_operator

with pytest.raises(AirflowCompatibilityError) as exc:
run_test_dag(dag)

err_msg = str(exc.value)
assert (
"Apache Airflow 2.9.0 & 2.9.1 introduced a breaking change in Dataset URIs, to be fixed in newer versions"
in err_msg
)
assert (
"If you want to use Cosmos with one of these Airflow versions, you will have to disable emission of Datasets"
in err_msg
)


@pytest.mark.skipif(
version.parse(airflow_version) not in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS,
reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs",
# https://github.com/apache/airflow/issues/39486
)
@pytest.mark.integration
def test_run_operator_dataset_emission_is_skipped(caplog):
from airflow.datasets import Dataset

with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag:
seed_operator = DbtSeedLocalOperator(
profile_config=real_profile_config,
project_dir=DBT_PROJ_DIR,
task_id="seed",
dbt_cmd_flags=["--select", "raw_customers"],
install_deps=True,
append_env=True,
emit_datasets=False,
)
run_operator = DbtRunLocalOperator(
profile_config=real_profile_config,
project_dir=DBT_PROJ_DIR,
task_id="run",
dbt_cmd_flags=["--models", "stg_customers"],
install_deps=True,
append_env=True,
emit_datasets=False,
)

seed_operator >> run_operator

run_test_dag(dag)

assert run_operator.inlets == []
assert run_operator.outlets == []


@pytest.mark.integration
def test_run_operator_caches_partial_parsing(caplog, tmp_path):
caplog.set_level(logging.DEBUG)
Expand Down
13 changes: 12 additions & 1 deletion tests/test_example_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@
from dbt.version import get_installed_version as get_dbt_version
from packaging.version import Version

from cosmos.constants import PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS

from . import utils as test_utils

EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "dev/dags"
AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore"
DBT_VERSION = Version(get_dbt_version().to_version_string()[1:])
AIRFLOW_VERSION = Version(airflow.__version__)

MIN_VER_DAG_FILE: dict[str, list[str]] = {
"2.4": ["cosmos_seed_dag.py"],
}

IGNORED_DAG_FILES = ["performance_dag.py"]


# Sort descending based on Versions and convert string to an actual version
MIN_VER_DAG_FILE_VER: dict[Version, list[str]] = {
Version(version): MIN_VER_DAG_FILE[version] for version in sorted(MIN_VER_DAG_FILE, key=Version, reverse=True)
Expand All @@ -48,9 +52,12 @@ def session():
@cache
def get_dag_bag() -> DagBag:
"""Create a DagBag by adding the files that are not supported to .airflowignore"""
if AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS:
return DagBag(dag_folder=None, include_examples=False)

with open(AIRFLOW_IGNORE_FILE, "w+") as file:
for min_version, files in MIN_VER_DAG_FILE_VER.items():
if Version(airflow.__version__) < min_version:
if AIRFLOW_VERSION < min_version:
print(f"Adding {files} to .airflowignore")
file.writelines([f"{file}\n" for file in files])

Expand All @@ -77,6 +84,10 @@ def get_dag_ids() -> list[str]:
return dag_bag.dag_ids


@pytest.mark.skipif(
AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS,
reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs, and Cosmos errors if `emit_datasets` is not False",
)
@pytest.mark.integration
@pytest.mark.parametrize("dag_id", get_dag_ids())
def test_example_dag(session, dag_id: str):
Expand Down

0 comments on commit 0c2ee75

Please sign in to comment.