Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task mapping against 'params' #24014

Closed
1 of 2 tasks
hammerhead opened this issue May 30, 2022 · 9 comments · Fixed by #26100
Closed
1 of 2 tasks

Task mapping against 'params' #24014

hammerhead opened this issue May 30, 2022 · 9 comments · Fixed by #26100
Assignees
Milestone

Comments

@hammerhead
Copy link

Apache Airflow version

2.3.1 (latest released)

What happened

Importing a DAG using PostgresOperator with expand(params=[...]) fails, claiming params was already specified as a partial argument, even though it wasn't.

What you think should happen instead

The DAG imports successfully.

How to reproduce

from pathlib import Path
import pendulum
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.decorators import dag, task


@dag(
    start_date=pendulum.datetime(2021, 11, 19, tz="UTC"),
    schedule_interval="@daily",
    catchup=False,
)
def test():
    query_values = [{"a": 1}, {"a": 2}]

    PostgresOperator.partial(
        task_id="simple_select",
        sql="SELECT {{ params.a }}",
    ).expand(params=query_values)

test_dag = test()

Exception during import:

Broken DAG: [/usr/local/airflow/dags/test_dag.py] Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 199, in expand
    prevent_duplicates(self.kwargs, mapped_kwargs, fail_reason="mapping already partial")
  File "/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 139, in prevent_duplicates
    raise TypeError(f"{fail_reason} argument: {duplicated_keys.pop()}")
TypeError: mapping already partial argument: params

Operating System

macOS 12.4

Versions of Apache Airflow Providers

apache-airflow-providers-postgres==4.1.0

Deployment

Astronomer

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@hammerhead hammerhead added area:core kind:bug This is a clearly a bug labels May 30, 2022
@uranusjr
Copy link
Member

uranusjr commented May 30, 2022

PostgresOperator is misusing params. This argument should be renamed. Edit: Wrong operator, I was remembering something else.

Unfortunately params cannot be mapped at the time, since the value is currently needed in the scheduler, when the operator expansion is not resolved. We may change this in the future.

@uranusjr uranusjr changed the title Using PostgresOperator's params parameter in dynamic task mapping results in error Task mapping against 'params' May 30, 2022
@uranusjr uranusjr added kind:feature Feature Requests area:dynamic-task-mapping AIP-42 and removed kind:bug This is a clearly a bug labels May 30, 2022
@eladkal
Copy link
Contributor

eladkal commented May 30, 2022

Unfortunately params cannot be mapped at the time, since the value is currently needed in the scheduler, when the operator expansion is not resolved. We may change this in the future.

Is there a way to raise exception that will explain to the user that the chosen parameter for expand is not supported?

@uranusjr
Copy link
Member

It’s possible, but not particularly doable unfortunately, since it’s difficult to tell apart what the user puts in when a function is called. With a function like this:

def foo(a=None): pass

We don’t have a good way to tell if user called foo(a=None) or foo(). I’m putting together a PR to make the message slightly more vague so it’s at least not misleading, and add some documentation to explain it.

@uranusjr
Copy link
Member

Oh, actually we already mention this in the documentation:

Some arguments are not mappable and must be passed to partial(), such as task_id, queue, pool, and most other arguments to BaseOperator.

https://airflow.apache.org/docs/apache-airflow/stable/concepts/dynamic-task-mapping.html

@ashb
Copy link
Member

ashb commented Jun 1, 2022

@hammerhead
Copy link
Author

In this particular example the fix is to use parameters not params for the PostgresOperator https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/_api/airflow/providers/postgres/operators/postgres/index.html#airflow.providers.postgres.operators.postgres.PostgresOperator

Yes, indeed. The original DAG I ran into the problem with was using different types of queries, such as DELETE FROM {table_fqn} WHERE {column} = {value};.
In that example, I can't use parameters for {table_fqn} as %(table_fqn)s will wrap the value in single quotes, resulting in an SQL syntax error. Using params gives me the plain value instead.

See also this Stack Overflow post from another user.

Right now, I'm using a Python task as a workaround and then pass it directly to the sql paramer of the PostgresOperator:

@task
def generate_sql(policy):
    return Path('include/data_retention_delete.sql') \
        .read_text(encoding="utf-8").format(table_fqn=policy[0],
                                            column=policy[1],
                                            value=policy[2],
                                           )

@dag(
    start_date=pendulum.datetime(2021, 11, 19, tz="UTC"),
    schedule_interval="@daily",
    catchup=False,
)
def data_retention_delete():
    sql_statements = generate_sql.expand(policy=get_policies())

    PostgresOperator.partial(
        task_id="delete_partition",
        postgres_conn_id="cratedb_connection",
    ).expand(sql=sql_statements)

@josh-fell
Copy link
Contributor

josh-fell commented Jun 11, 2022

@hammerhead See the "Anything else" section of #24388, but you should be able to still map via parameters for the DELETE DML in a Jinja-templated SQL using {{ task.mapped_kwargs.parameters[ti.map_index].<attribute>}} as a workaround.

Assuming the desired SQL DELETE FROM {table_fqn} WHERE {column} = {value};, you could try:

sql = """
DELETE FROM {{ task.mapped_kwargs.parameters[ti.map_index].table_fqn }}
WHERE {{ task.mapped_kwargs.parameters[ti.map_index].column }} = {{ task.mapped_kwargs.parameters[ti.map_index].value }};"""

PostgresOperator.partial(
    task_id="delete_partitions",
    postgres_conn_id="cratedb_connection",
    sql=sql,
).expand(parameters=[
    {"table_fqn": "tbl1", "column": "col1", "value": "val1"},
    {"table_fqn": "tbl2", "column": "col2", "value": "val3"},
)

Does this help?

@hammerhead
Copy link
Author

Thanks, @josh-fell, the example you provided does work.
However, I couldn't apply it successfully to our implementation, in which the value of parameters is provided by another task:

    @task
    def parameter_values():
        # in our original implementation, this is a database query using pg_hook
        return [
            {"table_fqn": "raw_metrics", "column": "ts_day", "value": "v1"},
        ]


    sql = """
    DELETE FROM {{ task.mapped_kwargs.parameters[ti.map_index].table_fqn }}
    WHERE {{ task.mapped_kwargs.parameters[ti.map_index].column }} = {{ task.mapped_kwargs.parameters[ti.map_index].value }};"""

    PostgresOperator.partial(
        task_id="delete_partitions",
        postgres_conn_id="cratedb_connection",
        sql=sql,
    ).expand(parameters=parameter_values())

This fails with an error related to XCom lookups:

[2022-06-22, 07:54:34 UTC] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1451, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1555, in _execute_task_with_callbacks
    task_orig = self.render_templates(context=context)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2212, in render_templates
    rendered_task = self.task.render_template_fields(context)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 726, in render_template_fields
    self._do_render_template_fields(
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 344, in _do_render_template_fields
    rendered_content = self.render_template(
  File "/usr/local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 391, in render_template
    return render_template_to_string(template, context)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/helpers.py", line 296, in render_template_to_string
    return render_template(template, context, native=False)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/helpers.py", line 291, in render_template
    return "".join(nodes)
  File "<template>", line 14, in root
  File "/usr/local/lib/python3.9/site-packages/jinja2/sandbox.py", line 303, in getitem
    return obj[argument]
  File "/usr/local/lib/python3.9/site-packages/airflow/models/xcom_arg.py", line 77, in __getitem__
    raise ValueError(f"XComArg only supports str lookup, received {type(item).__name__}")
ValueError: XComArg only supports str lookup, received int

@ashb
Copy link
Member

ashb commented Jul 26, 2022

@uranusjr Could you add this to your todo list to fix please?

@ashb ashb added this to the Airflow 2.4.0 milestone Jul 26, 2022
@uranusjr uranusjr self-assigned this Jul 27, 2022
@ashb ashb modified the milestones: Airflow 2.4.0, Airflow 2.5.0 Sep 8, 2022
hammerhead added a commit to crate/cratedb-airflow-tutorial that referenced this issue Dec 23, 2022
…rams` parameter

This is a new upstream improvement that required a workaround earlier: apache/airflow#24014
hammerhead added a commit to crate/cratedb-airflow-tutorial that referenced this issue Dec 27, 2022
…rams` parameter

This is a new upstream improvement that required a workaround earlier: apache/airflow#24014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants