Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make _run_raw_task AIP-44 compatible #38992

Merged

Conversation

dstandish
Copy link
Contributor

@dstandish dstandish commented Apr 13, 2024

Implement core functionality in _run_raw_task for AIP-44 / database isolation.

Followed by other PRs from #37851.

@dstandish dstandish requested review from potiuk and vincbeck April 13, 2024 17:23
@dstandish dstandish marked this pull request as draft April 14, 2024 16:21
@potiuk
Copy link
Member

potiuk commented Apr 15, 2024

Generally it's what I would expect - let's get the test pass and merge the depending PR and we can review that one in detail

@dstandish dstandish force-pushed the make-_run_raw_task-aip-44-compatible branch 2 times, most recently from df8cb7b to 9fc852b Compare April 16, 2024 14:22
@dstandish
Copy link
Contributor Author

checks passed @potiuk

@dstandish dstandish marked this pull request as ready for review April 16, 2024 14:58
@potiuk
Copy link
Member

potiuk commented Apr 16, 2024

I am moatly out till Friday and this one needs quite a bit more review - if it can wait.

@dstandish
Copy link
Contributor Author

I am moatly out till Friday and this one needs quite a bit more review - if it can wait.

no worries, enjoy

@potiuk
Copy link
Member

potiuk commented Apr 18, 2024

I reviewed it quite a bit more thoroughly. I think it's the right direction, but we should - I think - complete it (i.e. turn handle_reschedule into internal_api, merge commiting changes to defer task and avoid keeping duplicated _run_raw_task for DB/DB_isolation mode (unless there is a good reason for now to keep it, but can't think of any).

airflow/models/taskinstance.py Outdated Show resolved Hide resolved
airflow/models/taskinstance.py Outdated Show resolved Hide resolved
airflow/models/taskinstance.py Show resolved Hide resolved
@dstandish dstandish marked this pull request as draft April 22, 2024 18:13
@dstandish
Copy link
Contributor Author

converting to draft while i work through a few more aspects of this.

@dstandish dstandish force-pushed the make-_run_raw_task-aip-44-compatible branch 3 times, most recently from 9673464 to 2f625e7 Compare April 25, 2024 14:41
@dstandish dstandish marked this pull request as ready for review April 25, 2024 15:00
@dstandish dstandish force-pushed the make-_run_raw_task-aip-44-compatible branch from ce1921b to 6db95e0 Compare May 16, 2024 22:38
@dstandish
Copy link
Contributor Author

dstandish commented May 16, 2024

ok @potiuk @vincbeck @uranusjr -- i think this is ready for another look.

This "base" PR is the first 4 commits from the mothership PR (#37851)

After this main PR there will be 5 other PRs (each of the remaining commits in #37851 will be a distinct PR) that will follow to complete the job, by which I mean being able to run a task entirely without db connectivity, including mapped tasks, xcom, and async.

Using the mothership branch I have done live testing to ensure that these commits work properly for the other changes which follow, and which can be reviewed and merged separately.

I'll also mention one more thing -- in this PR there are 4 distinct commits each covering a different part of this, and it may be helpful to review them one at a time

@potiuk
Copy link
Member

potiuk commented May 17, 2024

@dstandish - I am currently at PyCon US, and I have a busy plan of attending talks and meeting people here :) - > I plan to take a very serious look at the AIP-44 related changes right after I come back (next week Wednesday).

airflow/datasets/manager.py Show resolved Hide resolved
airflow/models/taskinstance.py Outdated Show resolved Hide resolved
airflow/models/taskinstance.py Show resolved Hide resolved
Copy link
Contributor

@vincbeck vincbeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While elaborating on more details for AIP-69 I stumbled over this PR and assume this is one of the things which need to be merged before my (naive) attempts for AIP-69 will be possible.

By reading over the code some comments - just recommendations - do not consider them as full/real review.
If it helps let me know then I could make a full test as I am failing exactly on this point maight be easy. Otherwise I assume some more pytests are missing? Wondering why I see no changes on test code?

airflow/models/taskinstance.py Show resolved Hide resolved
airflow/models/taskinstance.py Show resolved Hide resolved
airflow/models/taskinstance.py Show resolved Hide resolved
airflow/models/taskinstance.py Show resolved Hide resolved
airflow/models/taskinstance.py Show resolved Hide resolved
airflow/models/taskinstance.py Show resolved Hide resolved
airflow/models/taskinstance.py Show resolved Hide resolved
@dstandish
Copy link
Contributor Author

If it helps let me know then I could make a full test as I am failing exactly on this point maight be easy. Otherwise I assume some more pytests are missing?

can you clarify what you mean here?

@jscheffl
Copy link
Contributor

can you clarify what you mean here?

Uuups, too many typos. I wanted to say:
(1) If in any means besides other reviewers it helps to support testing, let me know. Just did a reeading of code w/o execution
(2) I saw mainly functional code changing but no pytests. I was wondering if the PR is ready to be merged (by intent refactoring not affecting test results) or if tests are added before merge.

@dstandish
Copy link
Contributor Author

dstandish commented May 19, 2024

Thanks yeah you mean like help with practical testing like trying things out. I’ve done a lot of it so I feel ok about it so not necessary unless you want to. If you do I recommend checking out the “mothership” pr branch. I think I referenced it somewhere. There you can run this with helm in full isolation mode with dedicated rpc server.

Re unit tests, I think with many of these aip44 changes it’s meant to be a no behavior change refactor so I think that’s why we haven’t historically added many tests. But I can add if you have specific areas of concern.

Re the naming suggestions you made above, I sort of stuck with the pattern that was established before me, which was just to keep the same name as the method. I don’t personally think it is likely to create confusion, since they are namespaced by the module anyway. We could make this more explicit in the rpc module by importing modules and not functions, thereby making things more explicit, but maybe I’d leave that for another pr. Note so that in all cases the functions are private which makes the choice much less consequential as not user facing and changeable at any time. Incidentally I feel like we maybe should find a way so we don’t have to explicitly add those imports in the rpc module. Seems it shouldn’t be necessary given that we already decorate the func. But again that’s a different pr.

@jscheffl
Copy link
Contributor

Okay, did some testing with a breeze setup, manually "massaged" the ENV to enable AIP-44, strated internal API server, took away the DB connection and set the internal API endpoints for the worker. Using CeleryExecutor got the following results - based on the "mothership" PR - don't know if this helps in review.

Based on example DAGs:

  1. Worker had problems parsing SubdagOperator, was not able to copy text from tmux, error stack is - failed in DB select of Pool object.. missing in internal API or will Subdag just not be supported?
    image
  2. DAG example_python_operator - worked / executed w/o error
  3. DAG example_python_decorator - worked / executed w/o error
  4. DAG example_branch_operator - failed in task branching - logs below - Seems there is still some DB access for branching
  5. DAG example_dynamic_task_mapping - failed in task sum_if - logs below - Seems to be serialized values are wrong type?
  6. DAG tutorial - failed in task templated - logs below - Seems to be an issue with templating
  7. DAG Params Trigger UI - failed in Select languages (as well as othrs) - logs below - Seems to be problems with serialized parameters

Logs for 4

(...)
[2024-05-20, 18:03:32 CEST] {baseoperator.py:404} WARNING - BranchPythonOperator.execute cannot be called outside TaskInstance!
[2024-05-20, 18:03:32 CEST] {python.py:240} INFO - Done. Returned value was: branch_a
[2024-05-20, 18:03:32 CEST] {branch.py:36} INFO - Branch into branch_a
[2024-05-20, 18:03:32 CEST] {skipmixin.py:178} INFO - Following branch branch_a
[2024-05-20, 18:03:32 CEST] {taskinstance.py:742} ▼ Post task execution logs
[2024-05-20, 18:03:32 CEST] {standard_task_runner.py:112} ERROR - Failed to execute job 108 for task branching (DagRunPydantic.get_task_instance() missing 1 required positional argument: 'session'; 7490)
Traceback (most recent call last):
  File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line 105, in _start_by_fork
    ret = args.func(args, dag=self.dag)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/cli/commands/task_command.py", line 477, in task_run
    task_return_code = _run_task_by_selected_method(args, _dag, ti)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/cli/commands/task_command.py", line 254, in _run_task_by_selected_method
    return _run_raw_task(args, ti)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/cli/commands/task_command.py", line 336, in _run_raw_task
    return ti._run_raw_task(
           ^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 147, in _run_raw_task
    return _run_raw_task(
           ^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/models/taskinstance.py", line 278, in _run_raw_task
    TaskInstance._execute_task_with_callbacks(
  File "/opt/airflow/airflow/models/taskinstance.py", line 2984, in _execute_task_with_callbacks
    result = self._execute_task(context, task_orig)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 244, in _execute_task
    return _execute_task(task_instance=self, context=context, task_orig=task_orig)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/models/taskinstance.py", line 766, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/models/taskinstance.py", line 729, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/operators/python.py", line 273, in execute
    return self.do_branch(context, super().execute(context))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/operators/branch.py", line 37, in do_branch
    self.skip_all_except(context["ti"], branches_to_execute)
  File "/opt/airflow/airflow/models/skipmixin.py", line 241, in skip_all_except
    if (downstream_ti := dag_run.get_task_instance(t.task_id, map_index=ti.map_index))
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: DagRunPydantic.get_task_instance() missing 1 required positional argument: 'session'
[2024-05-20, 18:03:32 CEST] {local_task_job_runner.py:240} INFO - Task exited with return code 1

Logs for 5

(...)
[2024-05-20, 17:42:51 CEST] {taskinstance.py:742} ▼ Post task execution logs
[2024-05-20, 17:42:51 CEST] {standard_task_runner.py:112} ERROR - Failed to execute job 82 for task sum_it (unsupported operand type(s) for +: 'int' and 'str'; 4346)
Traceback (most recent call last):
  File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line 105, in _start_by_fork
    ret = args.func(args, dag=self.dag)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/cli/commands/task_command.py", line 477, in task_run
    task_return_code = _run_task_by_selected_method(args, _dag, ti)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/cli/commands/task_command.py", line 254, in _run_task_by_selected_method
    return _run_raw_task(args, ti)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/cli/commands/task_command.py", line 336, in _run_raw_task
    return ti._run_raw_task(
           ^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 147, in _run_raw_task
    return _run_raw_task(
           ^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/models/taskinstance.py", line 278, in _run_raw_task
    TaskInstance._execute_task_with_callbacks(
  File "/opt/airflow/airflow/models/taskinstance.py", line 2984, in _execute_task_with_callbacks
    result = self._execute_task(context, task_orig)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 244, in _execute_task
    return _execute_task(task_instance=self, context=context, task_orig=task_orig)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/models/taskinstance.py", line 766, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/models/taskinstance.py", line 729, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/decorators/base.py", line 265, in execute
    return_value = super().execute(context)
                   ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/operators/python.py", line 238, in execute
    return_value = self.execute_callable()
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/operators/python.py", line 256, in execute_callable
    return runner.run(*self.op_args, **self.op_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/example_dags/example_dynamic_task_mapping.py", line 35, in sum_it
    total = sum(values)
            ^^^^^^^^^^^
TypeError: unsupported operand type(s) for +: 'int' and 'str'
[2024-05-20, 17:42:52 CEST] {local_task_job_runner.py:240} INFO - Task exited with return code 1

Logs for 6

(...)
[2024-05-20, 17:28:53 CEST] {standard_task_runner.py:93} INFO - Job 33: Subtask templated
[2024-05-20, 17:28:53 CEST] {task_command.py:462} INFO - Running task_id='templated' dag_id='tutorial' run_id='manual__2024-05-20T15:23:39.841642+00:00' map_index=-1 start_date=datetime.datetime(2024, 5, 20, 15, 28, 53, 331391, tzinfo=TzInfo(UTC)) end_date=None execution_date=datetime.datetime(2024, 5, 20, 15, 23, 39, 841642, tzinfo=TzInfo(UTC)) duration=0.372874 state='running' try_number=2 max_tries=1 hostname='65a1e8ad69ac' unixname='root' job_id=33 pool='default_pool' pool_slots=1 queue='default' priority_weight=1 operator='BashOperator' custom_operator_name=None queued_dttm=datetime.datetime(2024, 5, 20, 15, 28, 50, 223477, tzinfo=TzInfo(UTC)) queued_by_job_id=23 pid=None executor=None executor_config={} updated_at=datetime.datetime(2024, 5, 20, 15, 28, 53, 351237, tzinfo=TzInfo(UTC)) rendered_map_index=None external_executor_id='d5da9e9f-715a-48d6-a34c-0eeeb5f3d74f' trigger_id=None trigger_timeout=None next_method=None next_kwargs=None run_as_user=None task=<Task(BashOperator): templated> test_mode=False dag_run=DagRunPydantic(id=9, dag_id='tutorial', queued_at=datetime.datetime(2024, 5, 20, 15, 23, 39, 884965, tzinfo=TzInfo(UTC)), execution_date=datetime.datetime(2024, 5, 20, 15, 23, 39, 841642, tzinfo=TzInfo(UTC)), start_date=datetime.datetime(2024, 5, 20, 15, 23, 40, 785817, tzinfo=TzInfo(UTC)), end_date=None, state='running', run_id='manual__2024-05-20T15:23:39.841642+00:00', creating_job_id=None, external_trigger=True, run_type='manual', conf={}, data_interval_start=datetime.datetime(2024, 5, 19, 15, 23, 39, 841642, tzinfo=TzInfo(UTC)), data_interval_end=datetime.datetime(2024, 5, 20, 15, 23, 39, 841642, tzinfo=TzInfo(UTC)), last_scheduling_decision=datetime.datetime(2024, 5, 20, 15, 28, 53, 73827, tzinfo=TzInfo(UTC)), dag_hash='4e598e3c2b0dd862a24b86c57cc9387b', updated_at=datetime.datetime(2024, 5, 20, 15, 28, 53, 79029, tzinfo=TzInfo(UTC)), dag=None, consumed_dataset_events=[], log_template_id=2) dag_model=DagModelPydantic(dag_id='tutorial', root_dag_id=None, is_paused_at_creation=True, is_paused=False, is_subdag=False, is_active=True, last_parsed_time=datetime.datetime(2024, 5, 20, 15, 28, 43, 870484, tzinfo=TzInfo(UTC)), last_pickled=None, last_expired=None, scheduler_lock=None, pickle_id=None, fileloc='/opt/airflow/airflow/example_dags/tutorial.py', processor_subdir='/files/dags', owners='airflow', description='A simple tutorial DAG', default_view='grid', schedule_interval=datetime.timedelta(days=1), timetable_description='', tags=[DagTagPydantic(name='example', dag_id='tutorial')], dag_owner_links=[], parent_dag=None, max_active_tasks=16, max_active_runs=16, max_consecutive_failed_dag_runs=0, has_task_concurrency_limits=False, has_import_errors=False) raw=True is_trigger_log_context=False on host 65a1e8ad69ac
[2024-05-20, 17:28:53 CEST] {warnings.py:110} WARNING - /usr/local/lib/python3.12/site-packages/pydantic/main.py:347: UserWarning: Pydantic serializer warnings:
  Expected `int` but got `str` - serialized value may not be as expected
  return self.__pydantic_serializer__.to_python(
[2024-05-20, 17:28:53 CEST] {abstractoperator.py:741} ERROR - Exception rendering Jinja template for task 'templated', field 'bash_command'. Template: '\n{% for i in range(5) %}\n    echo "{{ ds }}"\n    echo "{{ macros.ds_add(ds, 7)}}"\n{% endfor %}\n'
Traceback (most recent call last):
  File "/opt/airflow/airflow/models/abstractoperator.py", line 733, in _do_render_template_fields
    rendered_content = self.render_template(
                       ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/template/templater.py", line 169, in render_template
    return self._render(template, context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/models/abstractoperator.py", line 691, in _render
    return super()._render(template, context, dag=dag)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/template/templater.py", line 126, in _render
    return render_template_to_string(template, context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/utils/helpers.py", line 289, in render_template_to_string
    return render_template(template, cast(MutableMapping[str, Any], context), native=False)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/utils/helpers.py", line 284, in render_template
    return "".join(nodes)
           ^^^^^^^^^^^^^^
  File "<template>", line 21, in root
  File "/usr/local/lib/python3.12/site-packages/jinja2/sandbox.py", line 392, in call
    if not __self.is_safe_callable(__obj):
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/jinja2/sandbox.py", line 276, in is_safe_callable
    getattr(obj, "unsafe_callable", False) or getattr(obj, "alters_data", False)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/jinja2/runtime.py", line 864, in __getattr__
    return self._fail_with_undefined_error()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/jinja2/runtime.py", line 857, in _fail_with_undefined_error
    raise self._undefined_exception(self._undefined_message)
jinja2.exceptions.UndefinedError: 'str object' has no attribute 'ds_add'
[2024-05-20, 17:28:53 CEST] {standard_task_runner.py:112} ERROR - Failed to execute job 33 for task templated ('str object' has no attribute 'ds_add'; 2098)
Traceback (most recent call last):
  File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line 105, in _start_by_fork
    ret = args.func(args, dag=self.dag)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/cli/commands/task_command.py", line 477, in task_run
    task_return_code = _run_task_by_selected_method(args, _dag, ti)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/cli/commands/task_command.py", line 254, in _run_task_by_selected_method
    return _run_raw_task(args, ti)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/cli/commands/task_command.py", line 336, in _run_raw_task
    return ti._run_raw_task(
           ^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 147, in _run_raw_task
    return _run_raw_task(
           ^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/models/taskinstance.py", line 278, in _run_raw_task
    TaskInstance._execute_task_with_callbacks(
  File "/opt/airflow/airflow/models/taskinstance.py", line 2940, in _execute_task_with_callbacks
    task_orig = self.render_templates(context=context, jinja_env=jinja_env)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 161, in render_templates
    return TaskInstance.render_templates(self=self, context=context, jinja_env=jinja_env)  # type: ignore[arg-type]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/models/taskinstance.py", line 3354, in render_templates
    original_task.render_template_fields(context, jinja_env)
  File "/opt/airflow/airflow/models/baseoperator.py", line 1375, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/opt/airflow/airflow/models/abstractoperator.py", line 733, in _do_render_template_fields
    rendered_content = self.render_template(
                       ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/template/templater.py", line 169, in render_template
    return self._render(template, context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/models/abstractoperator.py", line 691, in _render
    return super()._render(template, context, dag=dag)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/template/templater.py", line 126, in _render
    return render_template_to_string(template, context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/utils/helpers.py", line 289, in render_template_to_string
    return render_template(template, cast(MutableMapping[str, Any], context), native=False)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/utils/helpers.py", line 284, in render_template
    return "".join(nodes)
           ^^^^^^^^^^^^^^
  File "<template>", line 21, in root
  File "/usr/local/lib/python3.12/site-packages/jinja2/sandbox.py", line 392, in call
    if not __self.is_safe_callable(__obj):
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/jinja2/sandbox.py", line 276, in is_safe_callable
    getattr(obj, "unsafe_callable", False) or getattr(obj, "alters_data", False)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/jinja2/runtime.py", line 864, in __getattr__
    return self._fail_with_undefined_error()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/jinja2/runtime.py", line 857, in _fail_with_undefined_error
    raise self._undefined_exception(self._undefined_message)
jinja2.exceptions.UndefinedError: 'str object' has no attribute 'ds_add'
[2024-05-20, 17:28:53 CEST] {local_task_job_runner.py:240} INFO - Task exited with return code 1

Logs for 7:

(...)
[2024-05-20, 17:21:43 CEST] {task_command.py:462} INFO - Running task_id='select_languages' dag_id='example_params_trigger_ui' run_id='manual__2024-05-20T17:21:36+02:00' map_index=-1 start_date=datetime.datetime(2024, 5, 20, 15, 21, 43, 236734, tzinfo=TzInfo(UTC)) end_date=None execution_date=datetime.datetime(2024, 5, 20, 15, 21, 36, tzinfo=TzInfo(UTC)) duration=None state='running' try_number=1 max_tries=0 hostname='65a1e8ad69ac' unixname='root' job_id=24 pool='default_pool' pool_slots=1 queue='default' priority_weight=5 operator='_BranchPythonDecoratedOperator' custom_operator_name='@task.branch' queued_dttm=datetime.datetime(2024, 5, 20, 15, 21, 39, 246806, tzinfo=TzInfo(UTC)) queued_by_job_id=23 pid=None executor=None executor_config={} updated_at=datetime.datetime(2024, 5, 20, 15, 21, 43, 255403, tzinfo=TzInfo(UTC)) rendered_map_index=None external_executor_id='d04a166e-5f47-44cc-884c-3178e7f0e7bd' trigger_id=None trigger_timeout=None next_method=None next_kwargs=None run_as_user=None task=<Task(_BranchPythonDecoratedOperator): select_languages> test_mode=False dag_run=DagRunPydantic(id=8, dag_id='example_params_trigger_ui', queued_at=datetime.datetime(2024, 5, 20, 15, 21, 38, 752539, tzinfo=TzInfo(UTC)), execution_date=datetime.datetime(2024, 5, 20, 15, 21, 36, tzinfo=TzInfo(UTC)), start_date=datetime.datetime(2024, 5, 20, 15, 21, 39, 180598, tzinfo=TzInfo(UTC)), end_date=None, state='running', run_id='manual__2024-05-20T17:21:36+02:00', creating_job_id=None, external_trigger=True, run_type='manual', conf={'names': ['Linda', 'Martha', 'Thomas'], 'english': True, 'german': True, 'french': True}, data_interval_start=datetime.datetime(2024, 5, 20, 15, 21, 36, tzinfo=TzInfo(UTC)), data_interval_end=datetime.datetime(2024, 5, 20, 15, 21, 36, tzinfo=TzInfo(UTC)), last_scheduling_decision=datetime.datetime(2024, 5, 20, 15, 21, 42, 720624, tzinfo=TzInfo(UTC)), dag_hash='ac1f36ca0393ba5bbc744485395bc36c', updated_at=datetime.datetime(2024, 5, 20, 15, 21, 42, 737459, tzinfo=TzInfo(UTC)), dag=None, consumed_dataset_events=[], log_template_id=2) dag_model=DagModelPydantic(dag_id='example_params_trigger_ui', root_dag_id=None, is_paused_at_creation=True, is_paused=False, is_subdag=False, is_active=True, last_parsed_time=datetime.datetime(2024, 5, 20, 15, 21, 27, 370307, tzinfo=TzInfo(UTC)), last_pickled=None, last_expired=None, scheduler_lock=None, pickle_id=None, fileloc='/opt/airflow/airflow/example_dags/example_params_trigger_ui.py', processor_subdir='/files/dags', owners='airflow', description='Example DAG demonstrating the usage DAG params to model a trigger UI with a user form', default_view='grid', schedule_interval=None, timetable_description='Never, external triggers only', tags=[DagTagPydantic(name='params', dag_id='example_params_trigger_ui'), DagTagPydantic(name='example', dag_id='example_params_trigger_ui')], dag_owner_links=[], parent_dag=None, max_active_tasks=16, max_active_runs=16, max_consecutive_failed_dag_runs=0, has_task_concurrency_limits=False, has_import_errors=False) raw=True is_trigger_log_context=False on host 65a1e8ad69ac
[2024-05-20, 17:21:43 CEST] {warnings.py:110} WARNING - /usr/local/lib/python3.12/site-packages/pydantic/main.py:347: UserWarning: Pydantic serializer warnings:
  Expected `int` but got `str` - serialized value may not be as expected
  return self.__pydantic_serializer__.to_python(
[2024-05-20, 17:21:43 CEST] {standard_task_runner.py:112} ERROR - Failed to execute job 24 for task select_languages (Error calling function `serialize_operator`: ValueError: Params to a DAG or a Task can be only of type airflow.models.param.Param, but param 'names' is <class 'list'>; 1015)
Traceback (most recent call last):
  File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 62, in serialize_operator
    return BaseSerialization.serialize(x, use_pydantic_models=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 557, in serialize
    return cls._encode(SerializedBaseOperator.serialize_operator(var), type_=DAT.OP)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1015, in serialize_operator
    return cls._serialize_node(op, include_deps=op.deps is not BaseOperator.deps)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1066, in _serialize_node
    serialize_op["params"] = cls._serialize_params_dict(op.params)
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 852, in _serialize_params_dict
    raise ValueError(
ValueError: Params to a DAG or a Task can be only of type airflow.models.param.Param, but param 'names' is <class 'list'>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/opt/airflow/airflow/models/taskinstance.py", line 278, in _run_raw_task
    TaskInstance._execute_task_with_callbacks(
  File "/opt/airflow/airflow/models/taskinstance.py", line 2948, in _execute_task_with_callbacks
    _update_rtif(ti=self, rendered_fields=rendered_fields)
  File "/opt/airflow/airflow/api_internal/internal_api_call.py", line 149, in wrapper
    args_dict = BaseSerialization.serialize(arguments_dict, use_pydantic_models=True)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 529, in serialize
    str(k): cls.serialize(v, strict=strict, use_pydantic_models=use_pydantic_models)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 647, in serialize
    mod = _pydantic_model_dump(pyd_mod, var)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 643, in _pydantic_model_dump
    return model_cls.model_validate(var).model_dump(mode="json")  # type: ignore[attr-defined]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/pydantic/main.py", line 347, in model_dump
    return self.__pydantic_serializer__.to_python(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.PydanticSerializationError: Error calling function `serialize_operator`: ValueError: Params to a DAG or a Task can be only of type airflow.models.param.Param, but param 'names' is <class 'list'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 62, in serialize_operator
    return BaseSerialization.serialize(x, use_pydantic_models=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 557, in serialize
    return cls._encode(SerializedBaseOperator.serialize_operator(var), type_=DAT.OP)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1015, in serialize_operator
    return cls._serialize_node(op, include_deps=op.deps is not BaseOperator.deps)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1066, in _serialize_node
    serialize_op["params"] = cls._serialize_params_dict(op.params)
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 852, in _serialize_params_dict
    raise ValueError(
ValueError: Params to a DAG or a Task can be only of type airflow.models.param.Param, but param 'names' is <class 'list'>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line 105, in _start_by_fork
    ret = args.func(args, dag=self.dag)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/cli/commands/task_command.py", line 477, in task_run
    task_return_code = _run_task_by_selected_method(args, _dag, ti)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/cli/commands/task_command.py", line 254, in _run_task_by_selected_method
    return _run_raw_task(args, ti)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/cli/commands/task_command.py", line 336, in _run_raw_task
    return ti._run_raw_task(
           ^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 147, in _run_raw_task
    return _run_raw_task(
           ^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/models/taskinstance.py", line 341, in _run_raw_task
    ti.handle_failure(e, test_mode, context, session=session)
  File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 333, in handle_failure
    _handle_failure(
  File "/opt/airflow/airflow/utils/session.py", line 81, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/api_internal/internal_api_call.py", line 149, in wrapper
    args_dict = BaseSerialization.serialize(arguments_dict, use_pydantic_models=True)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 529, in serialize
    str(k): cls.serialize(v, strict=strict, use_pydantic_models=use_pydantic_models)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 647, in serialize
    mod = _pydantic_model_dump(pyd_mod, var)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 643, in _pydantic_model_dump
    return model_cls.model_validate(var).model_dump(mode="json")  # type: ignore[attr-defined]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/pydantic/main.py", line 347, in model_dump
    return self.__pydantic_serializer__.to_python(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.PydanticSerializationError: Error calling function `serialize_operator`: ValueError: Params to a DAG or a Task can be only of type airflow.models.param.Param, but param 'names' is <class 'list'>
[2024-05-20, 17:21:43 CEST] {local_task_job_runner.py:240} INFO - Task exited with return code 1

@dstandish
Copy link
Contributor Author

dstandish commented May 20, 2024

thanks @jscheffl -- which branch were you on? there are more changes re mapping on the "mothership" PR (#37851) that are excluded from this PR just for easier review -- that's the branch you should be on for end-to-end testing. i cherry pick from that PR branch to make the smaller prs.

i've been doing little bits at a time and this PR is bigger than i'd like, but it's still not the whole enchalada. this individual PR is still an incremental addition.

@jscheffl
Copy link
Contributor

thanks @jscheffl -- which branch were you on? there are more changes re mapping on the "mothership" PR (#37851) that are excluded from this PR just for easier review -- that's the branch you should be on for end-to-end testing. i cherry pick from that PR branch to make the smaller prs.

i've been doing little bits at a time and this PR is bigger than i'd like, but it's still not the whole enchalada. this individual PR is still an incremental addition.

Fully acknowledge. As I was trying to get started into PoC for AIP-69 I struggled the same and AIP-44 clearly solves all basic problems that I run into as well. Seems to be this rework is quite complex. Good that you have split up the "mothership" for incremental review.

As proposed by you I tested on the "mothership" PR #37851 with GIT hash c1b78c035bf6e8250a12e1a7a0f83f78fcbeaa4e ("run-a-full-task-with-internal-api"). No rebase/merge with main before tests.

So as you cherry-pick into smaller PRs and leaving complexity manageable... if all is green might still be good to merge the pieces and I assume to make it working then a few other API calls need to be reworked.

@dstandish
Copy link
Contributor Author

dstandish commented May 20, 2024

missing in internal API or will Subdag just not be supported?

etc

ah yeah those scenarios are good to add to my testing dag. maybe you could share your dag? and maybe i could add them as integration tests with xfail for API. like i said, not everything is implemented in this branch (or in the other branch for that matter) but many things are and it's all a work in progress. pretty soon i am going to have to roll off onto other things and will ticket out remaining known AIP-44 issues for the next person to take over. for example we have not done anything re triggerer yet. so just to be extra clear, this PR is not the final AIP-44 PR. this PR is just "get it working for the simplest cases"

@dstandish
Copy link
Contributor Author

So as you cherry-pick into smaller PRs and leaving complexity manageable... if all is green might still be good to merge the pieces and I assume to make it working then a few other API calls need to be reworked.

Yeah that's sorta how i've been thinking about it. and i think that ultimately we should probably compile a dag with the known important scenarios and set it up as an integration test.

@dstandish
Copy link
Contributor Author

also for reference @jscheffl here's the PRs that came before on this one 😅

https://github.com/apache/airflow/pulls?q=is%3Apr+project%3Aapache%2F169+is%3Aclosed

Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making thunbs-up here. Knowing there are glitches to be fixed but this PR is a bit of progress as increment then to take next steps.

@dstandish
Copy link
Contributor Author

@potiuk i'll just go ahead and merge this now so i can close out all the PRs waiting on it. if you later want to look i'm happy to take your suggestions and make changes afterwards. thanks

@dstandish dstandish merged commit bca2930 into apache:main May 24, 2024
42 checks passed
@dstandish dstandish deleted the make-_run_raw_task-aip-44-compatible branch May 24, 2024 21:03
@utkarsharma2 utkarsharma2 added this to the Airflow 2.10.0 milestone Jun 3, 2024
@utkarsharma2 utkarsharma2 added the AIP-44 Airflow Internal API label Jun 3, 2024
@ephraimbuddy ephraimbuddy added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label Jun 4, 2024
romsharon98 pushed a commit to romsharon98/airflow that referenced this pull request Jul 26, 2024
Migrate _run_raw_task to work with AIP-44. Also, _handle_reschedule, defer_task, xcom_pull and xcom_push.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AIP-44 Airflow Internal API area:serialization changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..)
Projects
No open projects
Status: Done
Development

Successfully merging this pull request may close these issues.

6 participants