-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use DagRun.run_id
instead of execution_date
when updating state of TIs(UI & REST API)
#18724
Conversation
6ff53b8
to
9e5874f
Compare
I took the easiest way for this PR. There are a set of functions that uses the execution date underneath this PR but I limited the PR to the REST API. Let me know if those functions need to be touched too |
9e5874f
to
6da4034
Compare
6da4034
to
bfc39ea
Compare
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. |
bfc39ea
to
5a9ce95
Compare
6c74ac7
to
fa4ca9e
Compare
fa4ca9e
to
3f53972
Compare
c7b46d8
to
acd7db4
Compare
airflow/models/dag.py
Outdated
input_execution_date = execution_date | ||
if dag_run_id: | ||
dag_run = ( | ||
session.query(DagRun).filter(DagRun.run_id == dag_run_id, DagRun.dag_id == self.dag_id).one() | ||
) # Raises an error if not found | ||
resolve_execution_date = dag_run.execution_date |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
if execution_date is None:
dag_run = ...
resolved_execution_date = dag_run.execution_date
else:
resolved_execution_date = execution_date
This would simplify the later code to
end_date = resolve_execution_date if not future else None
start_date = resolve_execution_date if not future else None
dag_run_id=run_id, | ||
execution_date=execution_date, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This combined with the exactly_one
check in set_task_instance_state
means the request would error (with 500) if the user passes both execution_date
and run_id
.=
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have schema level validation. So it won't allow both values to be supplied
description: | | ||
The task instance's DAG run ID. Either set this or execution_date but not both. | ||
|
||
*New in version 2.2.3* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2.2.3 is already out now isn't it, So 2.2.4 or 2.3.0 (Probably 2.3, as this one doesn't feel like a bug fix so shouldn't be backported.)
airflow/models/taskinstance.py
Outdated
@@ -742,6 +742,34 @@ def error(self, session=NEW_SESSION): | |||
session.merge(self) | |||
session.commit() | |||
|
|||
@classmethod | |||
@provide_session | |||
def find( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is only used for this one test, right? If so lets not add it (I'm not sure the pattern we have for DagRun.find etc is good)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah its used one place -- but still, I don't think we need it.
216d7ea
to
71bc93e
Compare
Error when the task instance does not exist Update airflow/api_connexion/endpoints/task_instance_endpoint.py Co-authored-by: Ash Berlin-Taylor <[email protected]> remove mock use try/accept Support the use of run_id in set_task_instance_state(REST API) We can now use run_id as well as execution_date to update states of task instances deprecate execution_date improve error message add run_id to set_state function require either the run_id or the execution_date fixup! require either the run_id or the execution_date Update airflow/api_connexion/endpoints/task_instance_endpoint.py Co-authored-by: Tzu-ping Chung <[email protected]> fixup! Update airflow/api_connexion/endpoints/task_instance_endpoint.py Ensure supplying both execution_date and run_id raises for dag.set_task_instance_state fixup! Ensure supplying both execution_date and run_id raises for dag.set_task_instance_state Do not depreccate execution_date fixup! Do not depreccate execution_date Use func.count use run_id instead of execution_date fix tests and deprecate mark_tasks fixup! fix tests and deprecate mark_tasks fixup! fixup! fix tests and deprecate mark_tasks fixup! fixup! fixup! fix tests and deprecate mark_tasks fixup! fixup! fixup! fix tests and deprecate mark_tasks fixup! fixup! fixup! fixup! fix tests and deprecate mark_tasks fixup! Ensure task_instance exists before running update on its state(REST API) Use execution_date for run/clear fixup! Use execution_date for run/clear constrain trino Apply suggestions from code review Co-authored-by: Ash Berlin-Taylor <[email protected]> add execution_date remove lib fix tests fixup! fix tests apply reviews from code review Fix typing Fix tests Apply suggestions from code review Co-authored-by: Tzu-ping Chung <[email protected]> fixup! Apply suggestions from code review add find method to task instance name execution_date explicitly fix mypy accept only run_id accept only run_id fixup! accept only run_id Update airflow/models/taskinstance.py Co-authored-by: Tzu-ping Chung <[email protected]> update docstring
Co-authored-by: Ash Berlin-Taylor <[email protected]>
4f9669a
to
e5e60db
Compare
When I merged apache#18724 the jobs ran successfully but it's now failing in main. This PR fixes it
When I merged #18724 the jobs ran successfully but it's now failing in main. This PR fixes it Co-authored-by: Kaxil Naik <[email protected]> Co-authored-by: Ash Berlin-Taylor <[email protected]>
We can now use run_id as well as execution_date to update states
of task instances
Depends on #18642
Closes: #18672
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.