Skip to content

Commit

Permalink
Fix crash when user clicks on "Task Instance Details" caused by start…
Browse files Browse the repository at this point in the history
…_date being None (#14416)

This is to fix the following error that happens when a user clicks on 'Task Instance Details' for a TaskInstance that has previous TaskInstance not yet run. E.g.

The previous TaskInstance has not yet run because its dependencies are not yet met
The previous TaskInstance has not yet run because scheduler is busy,
the previous TaskInstance was marked success without running.
This bug was caused by #12910. It affects Airflow 2.0.0 and 2.0.1.

GitOrigin-RevId: 21f297425ae85ce89e21477d55b51d5560f47bf8
  • Loading branch information
yuqian90 authored and Cloud Composer Team committed Aug 27, 2022
1 parent 5762a63 commit 79d502e
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 1 deletion.
3 changes: 2 additions & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,8 @@ def get_previous_start_date(
"""
self.log.debug("previous_start_date was called")
prev_ti = self.get_previous_ti(state=state, session=session)
return prev_ti and pendulum.instance(prev_ti.start_date)
# prev_ti may not exist and prev_ti.start_date may be None.
return prev_ti and prev_ti.start_date and pendulum.instance(prev_ti.start_date)

@property
def previous_start_date_success(self) -> Optional[pendulum.DateTime]:
Expand Down
33 changes: 33 additions & 0 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1488,6 +1488,39 @@ def test_previous_start_date_success(self, _, schedule_interval, catchup) -> Non
assert ti_list[3].get_previous_start_date(state=State.SUCCESS) == ti_list[1].start_date
assert ti_list[3].get_previous_start_date(state=State.SUCCESS) != ti_list[2].start_date

def test_get_previous_start_date_none(self):
"""
Test that get_previous_start_date() can handle TaskInstance with no start_date.
"""
with DAG("test_get_previous_start_date_none", start_date=DEFAULT_DATE, schedule_interval=None) as dag:
task = DummyOperator(task_id="op")

day_1 = DEFAULT_DATE
day_2 = DEFAULT_DATE + datetime.timedelta(days=1)

# Create a DagRun for day_1 and day_2. Calling ti_2.get_previous_start_date()
# should return the start_date of ti_1 (which is None because ti_1 was not run).
# It should not raise an error.
dagrun_1 = dag.create_dagrun(
execution_date=day_1,
state=State.RUNNING,
run_type=DagRunType.MANUAL,
)

dagrun_2 = dag.create_dagrun(
execution_date=day_2,
state=State.RUNNING,
run_type=DagRunType.MANUAL,
)

ti_1 = dagrun_1.get_task_instance(task.task_id)
ti_2 = dagrun_2.get_task_instance(task.task_id)
ti_1.task = task
ti_2.task = task

assert ti_2.get_previous_start_date() == ti_1.start_date
assert ti_1.start_date is None

def test_pendulum_template_dates(self):
dag = models.DAG(
dag_id='test_pendulum_template_dates',
Expand Down

0 comments on commit 79d502e

Please sign in to comment.