-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prevent DagRun's start_date
from reset (#30124)
#30125
Prevent DagRun's start_date
from reset (#30124)
#30125
Conversation
@houqp, brother! What do you think? |
d6f38ee
to
7fee8fd
Compare
Left one minor comment, but otherwise looks good to me 👍 It would be good to add a unit test to prevent this from regressing again in the future. |
Also (additionally to the unit tests mentioned by @houqp ), it seems that existing tests are failing, seems that the change broke them so it should be checked if this is not by any chance breaking an intentional behaviour. |
hm, looks like all failed tests are testing exactly what I need |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First, I'm not sure if we really need to update this function, but if we do, we should fix something in this PR:
param dag_run_state: state to set DagRun to. If set to False, dagrun state will not be changed.
we need to update the condition to:
if dr.state in (State.SUCCESS, State.FAILED) or dag_run_state:
and set the default value of dag_run_state to False
.
Alternative solution:
Why we don't just update the call of this method in the BaseOperator
by setting dag_run_state to False
:
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index d78152d990..6b9a61fec6 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -1244,7 +1244,7 @@ class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta):
qry = qry.filter(TaskInstance.task_id.in_(tasks))
results = qry.all()
count = len(results)
- clear_task_instances(results, session, dag=self.dag)
+ clear_task_instances(results, session, dag=self.dag, dag_run_state=False)
session.commit()
return count
Sorry guys, got sick... but back now :) First suggestion
For example I'm going to clear finished (let's say Second suggestion
I'm not familiar with this part of code, could you please explain how changes in |
8846d2c
to
5ff05ee
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the maintainers are not allowed to edit this PR, can you apply this patch?
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index f5b85a8af5..a973837689 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -193,8 +193,8 @@ def clear_task_instances(
:param tis: a list of task instances
:param session: current session
- :param dag_run_state: state to set DagRun to. If set to False, dagrun state will not
- be changed.
+ :param dag_run_state: state to set finished DagRuns to. If set to False,
+ DagRuns state will not be changed.
:param dag: DAG object
:param activate_dag_runs: Deprecated parameter, do not pass
"""
diff --git a/airflow/utils/state.py b/airflow/utils/state.py
index 67638ab1a1..f4a8dc1a0a 100644
--- a/airflow/utils/state.py
+++ b/airflow/utils/state.py
@@ -96,7 +96,7 @@ class State:
DEFERRED = TaskInstanceState.DEFERRED
finished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.SUCCESS, DagRunState.FAILED])
- unfinished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.SUCCESS, DagRunState.FAILED])
+ unfinished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.QUEUED, DagRunState.RUNNING])
task_states: tuple[TaskInstanceState | None, ...] = (None,) + tuple(TaskInstanceState)
diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py
index 8e42aea45c..4fa7363d96 100644
--- a/tests/models/test_cleartasks.py
+++ b/tests/models/test_cleartasks.py
@@ -172,8 +172,9 @@ class TestClearTasks:
@pytest.mark.parametrize("state", [DagRunState.QUEUED, DagRunState.RUNNING])
def test_clear_task_instances_on_running_dr(self, state, dag_maker):
- """Test that DR state, start_date and last_scheduling_decision doesn't change after clear
- if it's been cleared on a unfinished RD.
+ """
+ Test that DagRun state, start_date and last_scheduling_decision
+ are not changed after clearing TI in an unfinished DagRun.
"""
with dag_maker(
"test_clear_task_instances",
@@ -217,8 +218,9 @@ class TestClearTasks:
],
)
def test_clear_task_instances_on_finished_dr(self, state, last_scheduling, dag_maker):
- """Test that DR state, start_date and last_scheduling_decision doesn't change after clear
- if it's been cleared on a unfinished RD.
+ """
+ Test that DagRun state, start_date and last_scheduling_decision
+ are changed after clearing TI in a finished DagRun.
"""
with dag_maker(
"test_clear_task_instances",
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 5cb9a00801..8df2e9e0c1 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -112,7 +112,7 @@ class TestDagRun:
return dag_run
@pytest.mark.parametrize("state", [DagRunState.QUEUED, DagRunState.RUNNING])
- def test_clear_task_instances_for_backfill_running_dagrun(self, state, session):
+ def test_clear_task_instances_for_backfill_unfinished_dagrun(self, state, session):
now = timezone.utcnow()
dag_id = "test_clear_task_instances_for_backfill_dagrun"
dag = DAG(dag_id=dag_id, start_date=now)
6a08a66
to
317a9b8
Compare
all changes applied :) |
591d4a7
to
25c1ebc
Compare
5f28f2f
to
0def301
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good for me, just need to address @uranusjr's comments
0def301
to
b69766a
Compare
This PR is fixing the issue described in apache#30124. We should not reset DagRun's `state` and `start_date` in case somebody is clearing task of the running Dagrun
b69766a
to
4b398a8
Compare
(cherry picked from commit 070ecbd)
This PR is fixing the issue described in #30124. We should not reset DagRun's
state
andstart_date
in case somebody is clearing task of the running Dagruncloses: #30124