-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bug when using Airflow 2.9.1 and trying to emit Datasets #945
Comments
I just upgraded to airflow 2.9.1 and can confirm I am hitting this bug! Thanks for opening the ticket! To disable the emission, use |
Thanks for the feedback, @Flinz, that's a great workaround when not using Data-aware scheduling! I'll try to work on the fix in Airflow, so from Airflow 2.9.3, users won't face this issue anymore. |
[2024-05-07, 14:20:09 UTC] {taskinstance.py:441} ▼ Post task execution logs [2024-05-07, 14:20:09 UTC] {taskinstance.py:2905} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 400, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/base.py", line 266, in execute self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags()) File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 470, in build_and_run_cmd result = self.run_command(cmd=dbt_cmd, env=env, context=context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 343, in run_command outlets = self.get_datasets("outputs") ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in get_datasets return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in <listcomp> return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^ File "<attrs generated init airflow.datasets.Dataset>", line 3, in __init__ _setattr('uri', __attr_converter_uri(uri)) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/datasets/__init__.py", line 81, in _sanitize_uri parsed = normalizer(parsed) ^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/providers/postgres/datasets/postgres.py", line 34, in sanitize_uri raise ValueError("URI format postgres:// must contain database, schema, and table names") This improvement allowed us to confirm how were the Dataset URIs Cosmos was attempting to generate: #945 (cherry picked from commit c7a4599)
``` [2024-05-07, 14:20:09 UTC] {local.py:409} DEBUG - URIs to be converted to Dataset: [] [2024-05-07, 14:20:09 UTC] {local.py:409} DEBUG - URIs to be converted to Dataset: ['***://***:5432/***.dbt.stg_customers'] [2024-05-07, 14:20:09 UTC] {providers_manager.py:376} DEBUG - Initializing Providers Manager[dataset_uris] [2024-05-07, 14:20:09 UTC] {providers_manager.py:379} DEBUG - Initialization of Providers Manager[dataset_uris] took 0.00 seconds [2024-05-07, 14:20:09 UTC] {taskinstance.py:441} ▼ Post task execution logs [2024-05-07, 14:20:09 UTC] {taskinstance.py:2905} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 400, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/base.py", line 266, in execute self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags()) File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 470, in build_and_run_cmd result = self.run_command(cmd=dbt_cmd, env=env, context=context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 343, in run_command outlets = self.get_datasets("outputs") ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in get_datasets return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in <listcomp> return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^ File "<attrs generated init airflow.datasets.Dataset>", line 3, in __init__ _setattr('uri', __attr_converter_uri(uri)) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/datasets/__init__.py", line 81, in _sanitize_uri parsed = normalizer(parsed) ^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/providers/postgres/datasets/postgres.py", line 34, in sanitize_uri raise ValueError("URI format postgres:// must contain database, schema, and table names") ValueError: URI format ***:// must contain database, schema, and table names ``` As seen in: #945
``` [2024-05-07, 14:20:09 UTC] {local.py:409} DEBUG - URIs to be converted to Dataset: [] [2024-05-07, 14:20:09 UTC] {local.py:409} DEBUG - URIs to be converted to Dataset: ['***://***:5432/***.dbt.stg_customers'] [2024-05-07, 14:20:09 UTC] {providers_manager.py:376} DEBUG - Initializing Providers Manager[dataset_uris] [2024-05-07, 14:20:09 UTC] {providers_manager.py:379} DEBUG - Initialization of Providers Manager[dataset_uris] took 0.00 seconds [2024-05-07, 14:20:09 UTC] {taskinstance.py:441} ▼ Post task execution logs [2024-05-07, 14:20:09 UTC] {taskinstance.py:2905} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 400, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/base.py", line 266, in execute self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags()) File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 470, in build_and_run_cmd result = self.run_command(cmd=dbt_cmd, env=env, context=context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 343, in run_command outlets = self.get_datasets("outputs") ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in get_datasets return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in <listcomp> return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^ File "<attrs generated init airflow.datasets.Dataset>", line 3, in __init__ _setattr('uri', __attr_converter_uri(uri)) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/datasets/__init__.py", line 81, in _sanitize_uri parsed = normalizer(parsed) ^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/providers/postgres/datasets/postgres.py", line 34, in sanitize_uri raise ValueError("URI format postgres:// must contain database, schema, and table names") ValueError: URI format ***:// must contain database, schema, and table names ``` As seen in: #945
``` [2024-05-07, 14:20:09 UTC] {local.py:409} DEBUG - URIs to be converted to Dataset: [] [2024-05-07, 14:20:09 UTC] {local.py:409} DEBUG - URIs to be converted to Dataset: ['***://***:5432/***.dbt.stg_customers'] [2024-05-07, 14:20:09 UTC] {providers_manager.py:376} DEBUG - Initializing Providers Manager[dataset_uris] [2024-05-07, 14:20:09 UTC] {providers_manager.py:379} DEBUG - Initialization of Providers Manager[dataset_uris] took 0.00 seconds [2024-05-07, 14:20:09 UTC] {taskinstance.py:441} ▼ Post task execution logs [2024-05-07, 14:20:09 UTC] {taskinstance.py:2905} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 400, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/base.py", line 266, in execute self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags()) File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 470, in build_and_run_cmd result = self.run_command(cmd=dbt_cmd, env=env, context=context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 343, in run_command outlets = self.get_datasets("outputs") ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in get_datasets return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in <listcomp> return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^ File "<attrs generated init airflow.datasets.Dataset>", line 3, in __init__ _setattr('uri', __attr_converter_uri(uri)) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/datasets/__init__.py", line 81, in _sanitize_uri parsed = normalizer(parsed) ^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/providers/postgres/datasets/postgres.py", line 34, in sanitize_uri raise ValueError("URI format postgres:// must contain database, schema, and table names") ValueError: URI format ***:// must contain database, schema, and table names ``` As seen in: #945
Improve the logs so we can understand which Dataset URIs Cosmos was setting, while trying to execute a task in Airflow 2.9: ``` [2024-05-07, 14:20:09 UTC] {taskinstance.py:441} ▼ Post task execution logs [2024-05-07, 14:20:09 UTC] {taskinstance.py:2905} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 400, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/base.py", line 266, in execute self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags()) File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 470, in build_and_run_cmd result = self.run_command(cmd=dbt_cmd, env=env, context=context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 343, in run_command outlets = self.get_datasets("outputs") ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in get_datasets return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in <listcomp> return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^ File "<attrs generated init airflow.datasets.Dataset>", line 3, in __init__ _setattr('uri', __attr_converter_uri(uri)) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/datasets/__init__.py", line 81, in _sanitize_uri parsed = normalizer(parsed) ^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/providers/postgres/datasets/postgres.py", line 34, in sanitize_uri raise ValueError("URI format postgres:// must contain database, schema, and table names") ``` This improvement allowed us to confirm how the Dataset URIs Cosmos was attempting to generate, allowing us to log the following issue: #945 (cherry picked from commit c7a4599)
``` [2024-05-07, 14:20:09 UTC] {local.py:409} DEBUG - URIs to be converted to Dataset: [] [2024-05-07, 14:20:09 UTC] {local.py:409} DEBUG - URIs to be converted to Dataset: ['***://***:5432/***.dbt.stg_customers'] [2024-05-07, 14:20:09 UTC] {providers_manager.py:376} DEBUG - Initializing Providers Manager[dataset_uris] [2024-05-07, 14:20:09 UTC] {providers_manager.py:379} DEBUG - Initialization of Providers Manager[dataset_uris] took 0.00 seconds [2024-05-07, 14:20:09 UTC] {taskinstance.py:441} ▼ Post task execution logs [2024-05-07, 14:20:09 UTC] {taskinstance.py:2905} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 400, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/base.py", line 266, in execute self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags()) File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 470, in build_and_run_cmd result = self.run_command(cmd=dbt_cmd, env=env, context=context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 343, in run_command outlets = self.get_datasets("outputs") ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in get_datasets return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in <listcomp> return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^ File "<attrs generated init airflow.datasets.Dataset>", line 3, in __init__ _setattr('uri', __attr_converter_uri(uri)) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/datasets/__init__.py", line 81, in _sanitize_uri parsed = normalizer(parsed) ^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/providers/postgres/datasets/postgres.py", line 34, in sanitize_uri raise ValueError("URI format postgres:// must contain database, schema, and table names") ValueError: URI format ***:// must contain database, schema, and table names ``` As seen in: #945
…2.9.1 (#948) Improve Cosmos error message when using Airflow 2.9.0 or 2.9.1 and emitting OL events, to avoid this: ``` [2024-05-07, 14:20:09 UTC] {local.py:409} DEBUG - URIs to be converted to Dataset: [] [2024-05-07, 14:20:09 UTC] {local.py:409} DEBUG - URIs to be converted to Dataset: ['***://***:5432/***.dbt.stg_customers'] [2024-05-07, 14:20:09 UTC] {providers_manager.py:376} DEBUG - Initializing Providers Manager[dataset_uris] [2024-05-07, 14:20:09 UTC] {providers_manager.py:379} DEBUG - Initialization of Providers Manager[dataset_uris] took 0.00 seconds [2024-05-07, 14:20:09 UTC] {taskinstance.py:441} ▼ Post task execution logs [2024-05-07, 14:20:09 UTC] {taskinstance.py:2905} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 400, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/base.py", line 266, in execute self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags()) File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 470, in build_and_run_cmd result = self.run_command(cmd=dbt_cmd, env=env, context=context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 343, in run_command outlets = self.get_datasets("outputs") ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in get_datasets return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in <listcomp> return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^ File "<attrs generated init airflow.datasets.Dataset>", line 3, in __init__ _setattr('uri', __attr_converter_uri(uri)) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/datasets/__init__.py", line 81, in _sanitize_uri parsed = normalizer(parsed) ^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/providers/postgres/datasets/postgres.py", line 34, in sanitize_uri raise ValueError("URI format postgres:// must contain database, schema, and table names") ValueError: URI format ***:// must contain database, schema, and table names ``` Closes: #945
…mer#947) Improve the logs so we can understand which Dataset URIs Cosmos was setting, while trying to execute a task in Airflow 2.9: ``` [2024-05-07, 14:20:09 UTC] {taskinstance.py:441} ▼ Post task execution logs [2024-05-07, 14:20:09 UTC] {taskinstance.py:2905} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 400, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/base.py", line 266, in execute self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags()) File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 470, in build_and_run_cmd result = self.run_command(cmd=dbt_cmd, env=env, context=context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 343, in run_command outlets = self.get_datasets("outputs") ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in get_datasets return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in <listcomp> return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^ File "<attrs generated init airflow.datasets.Dataset>", line 3, in __init__ _setattr('uri', __attr_converter_uri(uri)) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/datasets/__init__.py", line 81, in _sanitize_uri parsed = normalizer(parsed) ^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/providers/postgres/datasets/postgres.py", line 34, in sanitize_uri raise ValueError("URI format postgres:// must contain database, schema, and table names") ``` This improvement allowed us to confirm how the Dataset URIs Cosmos was attempting to generate, allowing us to log the following issue: astronomer#945 (cherry picked from commit c7a4599)
…2.9.1 (astronomer#948) Improve Cosmos error message when using Airflow 2.9.0 or 2.9.1 and emitting OL events, to avoid this: ``` [2024-05-07, 14:20:09 UTC] {local.py:409} DEBUG - URIs to be converted to Dataset: [] [2024-05-07, 14:20:09 UTC] {local.py:409} DEBUG - URIs to be converted to Dataset: ['***://***:5432/***.dbt.stg_customers'] [2024-05-07, 14:20:09 UTC] {providers_manager.py:376} DEBUG - Initializing Providers Manager[dataset_uris] [2024-05-07, 14:20:09 UTC] {providers_manager.py:379} DEBUG - Initialization of Providers Manager[dataset_uris] took 0.00 seconds [2024-05-07, 14:20:09 UTC] {taskinstance.py:441} ▼ Post task execution logs [2024-05-07, 14:20:09 UTC] {taskinstance.py:2905} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 400, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/base.py", line 266, in execute self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags()) File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 470, in build_and_run_cmd result = self.run_command(cmd=dbt_cmd, env=env, context=context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 343, in run_command outlets = self.get_datasets("outputs") ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in get_datasets return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 410, in <listcomp> return [Dataset(uri) for uri in uris] ^^^^^^^^^^^^ File "<attrs generated init airflow.datasets.Dataset>", line 3, in __init__ _setattr('uri', __attr_converter_uri(uri)) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/datasets/__init__.py", line 81, in _sanitize_uri parsed = normalizer(parsed) ^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/providers/postgres/datasets/postgres.py", line 34, in sanitize_uri raise ValueError("URI format postgres:// must contain database, schema, and table names") ValueError: URI format ***:// must contain database, schema, and table names ``` Closes: astronomer#945
Context
This bug happens in Airflow 2.9.0 - 2.9.1 with Cosmos 1.1.0 - 1.4.0a4.
Airflow 2.9.0 introduced (Airflow) Dataset URI validation:
https://github.com/apache/airflow/blob/2d53c1089f78d8d1416f51af60e1e0354781c661/airflow/datasets/__init__.py#L45-L82
It is erroring when Cosmos tries to create Airflow Dataset URIs using the OpenLineage naming convention:
https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md
When running
simple_dag
from cosmos-demo using theDockerfile
:We're getting the error:
We cannot change how Cosmos generates outlet Dataset URIs in a minor release - since this could silently break dataset-scheduled DAGs.
Solution
The text was updated successfully, but these errors were encountered: