-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed backfill interference with scheduler #22701
Conversation
7a00543
to
1c924bf
Compare
a39f51a
to
a2e6705
Compare
Why do you change states to NONE ? Were the tests wrong to create dagruns in RUNNING state in those tests. Can we also have a backfill test covering the warning? |
Yeah, we think it's a bug to allow running backfill against an active dag run managed by the scheduler. This was allowed in the current airflow main branch and this PR prevents it. I will add a test for it today 👍 |
a2e6705
to
ed091f8
Compare
@potiuk test added. |
5bf4025
to
04732c1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks cool @houqp ! - just one NIT about asserting error message.
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
04732c1
to
f51708e
Compare
d8bd9ba
to
a020b3f
Compare
0b6e5dd
to
56b29c0
Compare
7940588
to
f5f788e
Compare
2945b03
to
ee0563c
Compare
tests/jobs/test_backfill_job.py
Outdated
start_date=DEFAULT_DATE, | ||
end_date=DEFAULT_DATE + datetime.timedelta(days=2), | ||
) | ||
with patch.object(job.log, 'error') as mock_error: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need to mock this, you can use pytest’s caplog
fixture.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL, thanks for the tip!
A few minor comments on implementation, the general logic looks right to me. |
ee0563c
to
6d5e283
Compare
@@ -1760,7 +1760,7 @@ def evaluate_dagrun( | |||
dr = dag.create_dagrun( | |||
run_type=DagRunType.SCHEDULED, | |||
execution_date=dagrun_info.logical_date, | |||
state=State.RUNNING, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you need to change these tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because these tests created a scheduled dagrun in running state and this fix prevents a backfill run from overriding a scheduled DAG run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohh, yes, the scheduler tests that use dag.run
which actually uses BackfillJob. We should fix that at some point.
The error message included in the commit should be fairly self-explanatory :)
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.