-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure that tasks wait for running indirect setup #33903
Ensure that tasks wait for running indirect setup #33903
Conversation
812f7f7
to
8fc410f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks good overall; however, we can optimize some of the used queries. I will create a new PR for 2.7.2 once this one is merged.
elif skipped: | ||
new_state = TaskInstanceState.SKIPPED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
elif skipped: | |
new_state = TaskInstanceState.SKIPPED | |
elif done and skipped: | |
new_state = TaskInstanceState.SKIPPED |
With
done = upstream == (success + skipped + failed + upstream_failed + removed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to wait for done if we already have one that is skipped?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, waiting for a failed setup, in this case we will always have an upstream_failed state if we have at least one failed setup. Without this condition we will have a skipped state once a setup task is skipped, even if it will be followed by a failed setup.
elif removed and success and ti.map_index > -1: | ||
if ti.map_index >= success: | ||
new_state = TaskInstanceState.REMOVED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I never understood why why we should have at least one success upstream and the task should not be mapped to change its state to removed.
Also IMHO we need a condition on done
, otherwise it will not be deterministic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also IMHO we need a condition on done, otherwise it will not be deterministic.
yeah, the reason why i felt it is ok is that airflow is already a race when there are many-to-one. so this seems basically consistent with the existing behavior -- to fail or skip fast. i think in practice it is not likely to make much difference. but...
for others, the scenario is, if there are multiple upstream indirect setups, should we wait for all of them to complete before determining the state of the object task (and then e.g. if there is a combination of success / skip / fail, then we fail it) or should we just take the state of the first non-success?
taking the state of the first non-success is how things work currently when there's a many-to-one direct upstream e.g. [a, b] >> c
. in this way it "fails fast" (or skips). but indirect upstream setup is of course something a little different. we could choose to wait until all upstream setups are done before marking state. this means we do not fail fast, and the payoff is there is no race condition w.r.t. state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main problem here is that the skipped state is used as one of the successful states (along with success), most of the users do not consider it as failure, it simply means that there is no need to run the task, nor to send a notification (on_failure).
And the problem will be much complicated if we discuss scenarios where there is an external task sensor waits for this task...
I'd love to hear from others what they think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so with your example, if the upstream setups are skipped, the task will be skipped. it's only if there's a mixed state, which is actually a pretty odd situation, don't think super common generally, but we must to decide what to do.
the question is take the first one or wait for all and the evaluate.
if your position is "wait for all and evaluate" what would be your logic?
if at least one failure, then upstream_failed
else if at least one skip, skip
?
i think it's probably fine... add an implicit all_done for the setups and then evaluate. i don't think it makes a huge difference either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I lean more toward consistency here (fast fail), even thought that consistency is a non-deterministic outcome.
if new_state == TaskInstanceState.SKIPPED and dep_context.wait_for_past_depends_before_skipping: | ||
past_depends_met = ti.xcom_pull( | ||
task_ids=ti.task_id, key=PAST_DEPENDS_MET, session=session, default=False | ||
) | ||
if not past_depends_met: | ||
yield self._failing_status( | ||
reason="Task should be skipped but the past depends are not met" | ||
), changed | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oups, I was breaking my little feature in my PR! 👍
non_successes = upstream - success | ||
if ti.map_index > -1: | ||
non_successes -= removed | ||
if non_successes > 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can replace all of this by done
after adding it:
non_successes = upstream - success | |
if ti.map_index > -1: | |
non_successes -= removed | |
if non_successes > 0: | |
if not done: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could be true but, this logic was added to deal with a deadlock and i don't really want to mess with it here. i'm just keeping the behavior consistent with trigger rule. maybe let's do this in followup?
72e27ed
to
8b8e65d
Compare
have to update a couple tests since now we can't test the two constraints independently. i'll do tomorrow. |
I would prefer that we move this to 2.7.2 considering that it touches the heart of Airflow and we are cutting the RC tomorrow. What do you think? |
I'd take the risk to be honest @ephraimbuddy. And maybe @hussein-awala might chime-in, but I think this bug was undermining rather important use case of setup/teardown and it also changes a behaviour from 2.7.0 that someone might start relying on. So this is really a bugfix, but for some people it might break their workflows (literally). If we also keep the old behaviour in 2.7.1 and fix it in 2.7.2, it will be a bit harder to explain that this is a bugfix. Also 2.7.0 had quite a few teething problems and people went back to 2.6.3 and eagerly waiting for 2.7.1 (I hope) so likely not many people had a chance to start relying in 2.7.0 behaviour, so taking a bit of risk now might be worth it. |
I understand that I think the risk level is very low here, especially after all the tests I ran with @dstandish. |
* move internal functions to methods -- no behavior change * add setup constraint logic * comments * simplify * simplify * fix * fix * update tests * static checks * add constraint that setup tasks followed by ALL_SUCCESS rule * add todo * docs * docs * add test * fix static check (cherry picked from commit e75ceca)
Cherrypicked! |
Ensuring that cleared tasks wait for indirect setup tasks was fixed in apache#33903. That fix entailed applying logic similar to ALL_SUCCESS logic for just the indirect upstream tasks before moving on to the evaluation of direct upstreams. This had the benefit of similicity and isolation -- it would only be called for tasks that had indirect upstreams and it would be layered on top of the existing trigger rule dep logic -- not mixed in. But it resulted in new round trips to the db when there is an indirect setup. What this PR does is try to get all the upstream TI info in one query.
Note: this doesn't change existing trigger rule but does extract the inner functions into methods. For ease of review i have done this part in the first commit, separate from the introduction of setup constraint.
Alternative to #33570
This is yet another approach to solving this one. But I feel it's the best one thus far because it does not make any changes to existing trigger rule logic, apart from pulling out some internal functions into methods.
The indirect setup constraint logic is processed prior to trigger rules and is completely separate.
This should play nicely with all mapping scenarios but more testing is required to verify.
There are a couple choices to make here.
One is in the "setup constraint" logic, should we apply it only to indirect setups or to all relevant setups? I think the only time it makes a difference is when it there is a mixture of skipped and failed upstream setups and how we determine state of the object task.
Currently I only look at indirect setups.
Another thing is, for calculation of state, do we just go based on the state of the first non-succcess? Or should we wait until everything is done and calculate at that point? Currently it just takes the first non-success and fails (or skips) fast. And I think this is consistent with airflow's many-to-one behavior more generally.
Note: for easier review, the first commit is a no-behavior-change refactor that just pulls the inner funcions out into methods. The second commit is where the setup constraint logic is added.
Here is an example dag that can be used to test the behavior:
Uploading setup_teardown_bug.txt…
The way to use that dag is to first change all the "scenarios" to
["success", "success", "success"]
and then you'll get a clean run.Then you change the scenario e.g. to
["skip", "fail", "succes"]
. Then, you clear the task with indirect upstream setup and see what happens. The first mapped setup will skip, 5 seconds later the next fails, then 5 seconds later there's a success.