-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix max_active_runs=1 not scheduling runs when min_file_process_interval is high #21413
Fix max_active_runs=1 not scheduling runs when min_file_process_interval is high #21413
Conversation
8e31be8
to
cbbcaa2
Compare
It turned out that we just needed to flush the session at this point and not that dag.get_num_active_runs is giving a wrong value. I'm wondering if it's still necessary to run dag_run.schedule_tis for a finished dag_run? if dag_run.state in State.finished:
session.flush() # to update the dag_run state
active_runs = dag.get_num_active_runs(only_running=False, session=session)
# Work out if we should allow creating a new DagRun now?
if self._should_update_dag_next_dagruns(dag, dag_model, active_runs):
dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run))
else:
# This will do one query per dag run. We "could" build up a complex
# query to update all the TIs across all the execution dates and dag
# IDs in a single query, but it turns out that can be _very very slow_
# see #11147/commit ee90807ac for more details
dag_run.schedule_tis(schedulable_tis, session)
return callback |
@@ -609,6 +609,7 @@ def update_state( | |||
self.data_interval_end, | |||
self.dag_hash, | |||
) | |||
session.flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall lgtm, should this be down with the merge
though so it happens regardless of the ultimate state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was my initial thought but I don't know if there would be a regression when added there since it would apply to running dagruns
as well. It use to have a commit
down there, after merge
but was removed here: 73b9163#diff-649fbbf224bab54417f03338c27d0fdb3c3336e53a522a13dfd9806c99f63137L377
cc: @ashb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just did move this down with merge
and query count increased...If we are cool with it I will fix the tests
129ab90
to
c5a42c9
Compare
c5a42c9
to
c10c6f3
Compare
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
…val is high The finished dagrun was still being seen as running when we call dag.get_num_active_runs because the session was not flushed. This PR fixes it
c10c6f3
to
d7bf47b
Compare
…val is high (apache#21413) The finished dagrun was still being seen as running when we call dag.get_num_active_runs because the session was not flushed. This PR fixes it
The finished dagrun was still being seen as running when we call dag.get_num_active_runs
because the session was not flushed. This PR fixes it
closes: #21083
closes: #19901
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.