Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expand_kwargs.map(func) gives unhelpful error message if func returns list #25352

Closed
1 of 2 tasks
MatrixManAtYrService opened this issue Jul 27, 2022 · 0 comments · Fixed by #25355
Closed
1 of 2 tasks
Labels

Comments

@MatrixManAtYrService
Copy link
Contributor

MatrixManAtYrService commented Jul 27, 2022

Apache Airflow version

main (development)

What happened

Here's a DAG:

with DAG(
    dag_id="expand_list",
    doc_md="try to get kwargs from a list",
    schedule_interval=None,
    start_date=datetime(2001, 1, 1),
) as expand_list:

    @expand_list.task
    def do_this():
        return [
            ("echo hello $USER", "USER", "foo"),
            ("echo hello $USER", "USER", "bar"),
        ]

    def mapper(tuple):
        if tuple[2] == "bar":
            return [1, 2, 3]
        else:
            return {"bash_command": tuple[0], "env": {tuple[1]: tuple[2]}}

    BashOperator.partial(task_id="one_cmd").expand_kwargs(do_this().map(mapper))

The foo task instance succeeds as expected, and the bar task fails as expected. But the error message that it gives isn't particularly helpful to a user who doesn't know what they did wrong:

ERROR - Failed to execute task: resolve() takes 3 positional arguments but 4 were given.
Traceback (most recent call last):
  File "/home/matt/src/airflow/airflow/executors/debug_executor.py", line 78, in _run_task
    ti.run(job_id=ti.job_id, **params)
  File "/home/matt/src/airflow/airflow/utils/session.py", line 71, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/matt/src/airflow/airflow/models/taskinstance.py", line 1782, in run
    self._run_raw_task(
  File "/home/matt/src/airflow/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/home/matt/src/airflow/airflow/models/taskinstance.py", line 1445, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode)
  File "/home/matt/src/airflow/airflow/models/taskinstance.py", line 1580, in _execute_task_with_callbacks
    task_orig = self.render_templates(context=context)
  File "/home/matt/src/airflow/airflow/models/taskinstance.py", line 2202, in render_templates
    rendered_task = self.task.render_template_fields(context)
  File "/home/matt/src/airflow/airflow/models/mappedoperator.py", line 751, in render_template_fields
    unmapped_task = self.unmap(mapped_kwargs)
  File "/home/matt/src/airflow/airflow/models/mappedoperator.py", line 591, in unmap
    kwargs = self._expand_mapped_kwargs(resolve)
  File "/home/matt/src/airflow/airflow/models/mappedoperator.py", line 546, in _expand_mapped_kwargs
    return expand_input.resolve(*resolve)
TypeError: resolve() takes 3 positional arguments but 4 were given

What you think should happen instead

Whatever checks the return value for mappability should do more to point the user to their error. Perhaps something like:

UnmappableDataError: Expected a dict with keys that BashOperator accepts, got [1, 2, 3] instead

How to reproduce

Run the dag above

Operating System

Linux 5.10.101 #1-NixOS SMP Wed Feb 16 11:54:31 UTC 2022 x86_64 GNU/Linux

Versions of Apache Airflow Providers

n/a

Deployment

Virtualenv installation

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants