Skip to content

Commit

Permalink
Return None if an XComArg fails to resolve in a multiple_outputs Task
Browse files Browse the repository at this point in the history
Tasks with multiple_outputs set to True returns XComs with different keys
which are not known to Airflow. Because they have multiple_outputs set, we
should return None if we can't find the XCom, just like we return None when
the key is equal to XCOM_RETURN_KEY known to Airflow.

Closes: #29199
  • Loading branch information
ephraimbuddy committed Jun 21, 2023
1 parent c508b8e commit db79d2a
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 2 deletions.
7 changes: 7 additions & 0 deletions airflow/models/xcom_arg.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,13 @@ def resolve(self, context: Context, session: Session = NEW_SESSION) -> Any:
return result
if self.key == XCOM_RETURN_KEY:
return None
if self.operator.multiple_outputs: # type: ignore
# If the operator is set to have multiple outputs and it was not executed,
# we should return "None" instead of showing an error. This is because when
# multiple outputs XComs are created, the XCom keys associated with them will have
# different names than the predefined "XCOM_RETURN_KEY" and won't be found.
# Therefore, it's better to return "None" like we did above where self.key==XCOM_RETURN_KEY.
return None
raise XComNotFound(ti.dag_id, task_id, self.key)


Expand Down
47 changes: 45 additions & 2 deletions tests/decorators/test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
import sys
from collections import namedtuple
from datetime import date, timedelta
from typing import TYPE_CHECKING, Dict, Tuple
from typing import TYPE_CHECKING, Dict, Tuple, Union

import pytest

from airflow import PY38, PY311
from airflow.decorators import setup, task as task_decorator, teardown
from airflow.decorators.base import DecoratedMappedOperator
from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, XComNotFound
from airflow.models import DAG
from airflow.models.baseoperator import BaseOperator
from airflow.models.expandinput import DictOfListsExpandInput
Expand Down Expand Up @@ -804,6 +804,49 @@ def down(a, b):
assert result == "'example' None"


@pytest.mark.parametrize("multiple_outputs", [True, False])
def test_multiple_outputs_produces_none_xcom_when_task_is_skipped(dag_maker, session, multiple_outputs):
from airflow.exceptions import AirflowSkipException
from airflow.utils.trigger_rule import TriggerRule

result = None

with dag_maker(session=session) as dag:

@dag.task()
def up1() -> str:
return "example"

@dag.task(multiple_outputs=multiple_outputs)
def up2(x) -> Union[dict, None]:
if x == 2:
return {"x": "example"}
raise AirflowSkipException()

@dag.task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
def down(a, b):
nonlocal result
result = f"{a!r} {b!r}"

down(up1(), up2(1)["x"])

dr = dag_maker.create_dagrun()

decision = dr.task_instance_scheduling_decisions(session=session)
assert len(decision.schedulable_tis) == 2 # "up1" and "up2"
for ti in decision.schedulable_tis:
ti.run(session=session)

decision = dr.task_instance_scheduling_decisions(session=session)
assert len(decision.schedulable_tis) == 1 # "down"
if multiple_outputs:
decision.schedulable_tis[0].run(session=session)
assert result == "'example' None"
else:
with pytest.raises(XComNotFound):
decision.schedulable_tis[0].run(session=session)


@pytest.mark.filterwarnings("error")
def test_no_warnings(reset_logging_config, caplog):
@task_decorator
Expand Down

0 comments on commit db79d2a

Please sign in to comment.