Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cosmos 1.6.0 does not emit Datasets on Airflow 2.9.3 with athena-dbt #1203

Closed
moritzsanne opened this issue Sep 11, 2024 · 2 comments
Closed
Labels
area:datasets Related to the Airflow datasets feature/module area:lineage Related to open lineage or DBT dataset lineage dbt:run Primarily related to dbt run command or functionality execution:local Related to Local execution environment parsing:dbt_manifest Issues, questions, or features related to dbt_manifest parsing profile:athena Related to Athena ProfileConfig

Comments

@moritzsanne
Copy link

Hey Everyone, I would like to set up data aware scheduling and would like cosmos to automatically emit Airflow datasets.

My airflow env looks like this:

  • apache-airflow-providers-openlineage==1.11.0
  • apache-airflow==2.10.1
  • astronomer-cosmos==1.6.0
  • dbt-athena-community==1.8.4
  • dbt-core==1.8.6

Here's the relevant part of the log of a cosmos run.

[2024-09-11, 16:06:41 UTC] {logging_mixin.py:190} INFO - 16:06:41
[2024-09-11, 16:06:43 UTC] {logging_mixin.py:190} INFO - 16:06:43  dbt.adapters.athena.constants adapter: Table 'mtv_vertriebs_rohdaten_download_bestandskundenpotential' has no TableType attribute - Ignoring
[2024-09-11, 16:06:48 UTC] {logging_mixin.py:190} INFO - 16:06:48  Concurrency: 1 threads (target='db__prod_general_prepared__bucket__prepared_non_pii')
[2024-09-11, 16:06:48 UTC] {logging_mixin.py:190} INFO - 16:06:48
[2024-09-11, 16:06:48 UTC] {logging_mixin.py:190} INFO - 16:06:48  1 of 1 START sql table model prod_general_prepared.data_glue_jobs_metadata ..... [RUN]
[2024-09-11, 16:06:58 UTC] {logging_mixin.py:190} INFO - 16:06:58  1 of 1 OK created sql table model prod_general_prepared.data_glue_jobs_metadata  [OK 100 in 9.38s]
[2024-09-11, 16:06:58 UTC] {logging_mixin.py:190} INFO - 16:06:58
[2024-09-11, 16:06:58 UTC] {logging_mixin.py:190} INFO - 16:06:58  Finished running 1 table model in 0 hours 0 minutes and 16.36 seconds (16.36s).
[2024-09-11, 16:06:58 UTC] {logging_mixin.py:190} INFO - 16:06:58
[2024-09-11, 16:06:58 UTC] {logging_mixin.py:190} INFO - 16:06:58  Completed successfully
[2024-09-11, 16:06:58 UTC] {logging_mixin.py:190} INFO - 16:06:58
[2024-09-11, 16:06:58 UTC] {logging_mixin.py:190} INFO - 16:06:58  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
**[2024-09-11, 16:06:58 UTC] {local.py:196} WARNING - Artifact schema version: https://schemas.getdbt.com/dbt/manifest/v12.json is above dbt-ol supported version 7. This might cause errors.**
**[2024-09-11, 16:06:58 UTC] {local.py:389} INFO - Inlets: []
[2024-09-11, 16:06:58 UTC] {local.py:390} INFO - Outlets: []**
[2024-09-11, 16:06:58 UTC] {dag.py:3229} INFO - Sync 1 DAGs

Can anyone help identify the Problem? Is athena-dbt incompatible with open lineage? I'd like to understand where I would need to contribute to make this work.

Many thanks!

Copy link

dosubot bot commented Sep 11, 2024

Hey @moritzsanne! I'm here to help you with your Cosmos issue. I can assist with bugs, questions, and contributions while you wait for a human maintainer. Let's tackle this together!

I found two similar issues that might be relevant to your problem:

  1. Bug when using Airflow 2.9.1 and trying to emit Datasets - This issue was closed by a pull request that gracefully errors if users try to emit datasets with Airflow 2.9.0 or 2.9.1 [1].
  2. No datasets emitted in DbtLocalBaseOperator after Cosmos upgrade to 1.3.0 - This issue was closed by a pull request that fixed the OpenLineage integration documentation [2].

Cosmos 1.6.0 does not emit Datasets on Airflow 2.9.3 with athena-dbt due to a breaking change in Dataset URIs introduced in Apache Airflow 2.9.0 and 2.9.1. This issue is documented in the code:

except ValueError:
    raise AirflowCompatibilityError(
        """
        Apache Airflow 2.9.0 & 2.9.1 introduced a breaking change in Dataset URIs, to be fixed in newer versions:
        https://github.com/apache/airflow/issues/39486

        If you want to use Cosmos with one of these Airflow versions, you will have to disable emission of Datasets:
        By setting ``emit_datasets=False`` in ``RenderConfig``. For more information, see https://astronomer.github.io/astronomer-cosmos/configuration/render-config.html.
        """
    )

Regarding the compatibility of athena-dbt with OpenLineage, there is no explicit mention of incompatibility. However, the code does handle scenarios where OpenLineage is not available:

try:
    from airflow.datasets import Dataset
    from openlineage.common.provider.dbt.local import DbtLocalArtifactProcessor
except ModuleNotFoundError:
    is_openlineage_available = False
    DbtLocalArtifactProcessor = None
else:
    is_openlineage_available = True

And it logs a warning if OpenLineage is not available:

logger.warning(
    "To enable emitting Openlineage events, upgrade to Airflow 2.7 or install astronomer-cosmos[openlineage]."
)

To enable OpenLineage events in Cosmos when using Airflow 2.10.1, follow these steps:

  1. Install the necessary dependencies:

    pip install astronomer-cosmos[openlineage]
  2. Configure the OpenLineage processor:

    from openlineage.common.provider.dbt.local import DbtLocalArtifactProcessor
    from cosmos.constants import OPENLINEAGE_PRODUCER, LINEAGE_NAMESPACE
    
    openlineage_processor = DbtLocalArtifactProcessor(
        producer=OPENLINEAGE_PRODUCER,
        job_namespace=LINEAGE_NAMESPACE,
        project_dir=project_dir,
        profile_name=self.profile_config.profile_name,
        target=self.profile_config.target_name,
    )
    openlineage_processor.should_raise_on_unsupported_command = False
    try:
        events = openlineage_processor.parse()
        self.openlineage_events_completes = events.completes
    except (FileNotFoundError, NotImplementedError, ValueError, KeyError, jinja2.exceptions.UndefinedError):
        self.log.debug("Unable to parse OpenLineage events", stack_info=True)
  3. Handle datasets and register them:

    from airflow.datasets import Dataset
    from cosmos.exceptions import AirflowCompatibilityError
    
    def get_datasets(self, source: Literal["inputs", "outputs"]) -> list[Dataset]:
        uris = []
        for completed in self.openlineage_events_completes:
            for output in getattr(completed, source):
                dataset_uri = output.namespace + "/" + output.name
                uris.append(dataset_uri)
        self.log.debug("URIs to be converted to Dataset: %s", uris)
    
        datasets = []
        try:
            datasets = [Dataset(uri) for uri in uris]
        except ValueError:
            raise AirflowCompatibilityError(
                """
                Apache Airflow 2.9.0 & 2.9.1 introduced a breaking change in Dataset URIs, to be fixed in newer versions:
                https://github.com/apache/airflow/issues/39486
    
                If you want to use Cosmos with one of these Airflow versions, you will have to disable emission of Datasets:
                By setting ``emit_datasets=False`` in ``RenderConfig``. For more information, see https://astronomer.github.io/astronomer-cosmos/configuration/render-config.html.
                """
            )
        return datasets
    
    def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset]) -> None:
        with create_session() as session:
            self.outlets.extend(new_outlets)
            self.inlets.extend(new_inlets)
            for task in self.dag.tasks:
                if task.task_id == self.task_id:
                    task.outlets.extend(new_outlets)
                    task.inlets.extend(new_inlets)
            DAG.bulk_write_to_db([self.dag], session=session)
            session.commit()
  4. Collect OpenLineage facets on task completion:

    from airflow.models.taskinstance import TaskInstance
    from openlineage.client.run import RunEvent
    
    def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> OperatorLineage:
        inputs = []
        outputs = []
        run_facets: dict[str, Any] = {}
        job_facets: dict[str, Any] = {}
        # Populate inputs, outputs, run_facets, and job_facets as needed
        return OperatorLineage(inputs=inputs, outputs=outputs, run_facets=run_facets, job_facets=job_facets)

By following these steps, you can enable OpenLineage events in Cosmos when using Airflow 2.10.1 [3].

To continue talking to Dosu, mention @dosu.

@dosubot dosubot bot added area:datasets Related to the Airflow datasets feature/module area:lineage Related to open lineage or DBT dataset lineage dbt:run Primarily related to dbt run command or functionality execution:local Related to Local execution environment parsing:dbt_manifest Issues, questions, or features related to dbt_manifest parsing profile:athena Related to Athena ProfileConfig labels Sep 11, 2024
@moritzsanne
Copy link
Author

I think I identified the problem and will close this issue because it is not a cosmos problem. OpenLineage currently does not support the athena connector.
Here's the PR to add support for it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:datasets Related to the Airflow datasets feature/module area:lineage Related to open lineage or DBT dataset lineage dbt:run Primarily related to dbt run command or functionality execution:local Related to Local execution environment parsing:dbt_manifest Issues, questions, or features related to dbt_manifest parsing profile:athena Related to Athena ProfileConfig
Projects
None yet
Development

No branches or pull requests

1 participant