Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raise error upon DAG import instead of DAG trigger for incorrect pool_slots value in mapped tasks using partial() #39724

Merged
merged 9 commits into from
Oct 2, 2024

Conversation

karenbraganz
Copy link
Contributor

@karenbraganz karenbraganz commented May 21, 2024

Check pool_slots value in mapped tasks using partial()


closes: #39639
Issue #39639 was opened because a DAG import error would show up for regular tasks with an incorrect pool_slots value but not for mapped tasks using partial() and expand().

It looks like this error is raised for regular tasks due to a conditional statement that is executed when the task is instantiated (from the BaseOperator init() method)

if self.pool_slots < 1:
    dag_str = f" in dag {dag.dag_id}" if dag else ""
    raise ValueError(f"pool slots for {self.task_id}{dag_str} cannot be less than 1")

This code isn't being executed for mapped tasks using partial(). I am not sure if it's because they are instantiated in a different way. However, I was able to get the error to be raised by adding the following code to def partial() in baseoperator.py.

if partial_kwargs["pool_slots"] < 1:
    dag_str = f" in dag {partial_kwargs['dag'].dag_id}" if partial_kwargs["dag"] else ""
    raise ValueError(f"pool slots for {partial_kwargs['task_id']}{dag_str} cannot be less than 1")

I am working on investigating whether this is the correct approach or whether this code should be executed when the task is instantiated like it is for a regular task. I'm not sure if mapped tasks using partial() are purposely designed to be instantiated differently or if they should pass through some of the same checks as a regular task upon instantiation.

Additionally, if modifying def partial() is the way to go, I might have to modify the expand() method in the _TaskDecorator class as well.

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@RNHTTR
Copy link
Contributor

RNHTTR commented May 22, 2024

This approach seems reasonable IMO. Can you add a test for it?

Also, do you have pre-commit configured? That probably woulda fixed the failing test you're seeing.

@karenbraganz
Copy link
Contributor Author

Great, I will configure pre-commit and add a test before submitting the pull request!

@karenbraganz karenbraganz marked this pull request as ready for review May 29, 2024 15:32
@karenbraganz karenbraganz requested a review from uranusjr as a code owner May 29, 2024 15:32
@karenbraganz
Copy link
Contributor Author

I have added a unit test and configured my pre-commit hooks- both passed. I also added changes to _expand() in decorators/base.py to reflect changes in partial().

Copy link
Contributor

@RNHTTR RNHTTR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only critique here is that this code is repeated. Maybe it makes sense to add into a util somewhere, but I'll defer to @uranusjr

@eladkal eladkal requested a review from uranusjr August 1, 2024 12:53
@eladkal eladkal added this to the Airflow 2.10.0 milestone Aug 1, 2024
@eladkal eladkal added the type:bug-fix Changelog: Bug Fixes label Aug 1, 2024
Copy link
Member

@uranusjr uranusjr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t like this duplicates the check in so many places (there’s also already one in BaseOperator). This solves the problem for pool_slots specifically, but not the underlying problem that partial() does not do the same checks as in BaseOperator. Some refactoring should be done to unify the call sites so they always do the same checks instead.

airflow/decorators/base.py Outdated Show resolved Hide resolved
airflow/decorators/base.py Outdated Show resolved Hide resolved
@karenbraganz
Copy link
Contributor Author

Currently, for mapped tasks these checks happen in the __init__() method of the BaseOperator class in baseoperator.py whereas for mapped tasks, the checks happen in partial() and _expand(). I can unify the checks by creating a function that runs these checks and then calling that function in __init__(), partial(), and _expand(). In this way, we don't have to repeat code in each function. I will work on creating a new commit for this.

Please let me know your thoughts on this approach.

@karenbraganz
Copy link
Contributor Author

karenbraganz commented Aug 9, 2024

@uranusjr @RNHTTR Here are different approaches in which I can implement this PR. I think the second approach is best in terms of maintaining a clear scope in the PRs but let me know what you think.

Since this PR is focused on making the pool_slots check available to partial() and _expand() ( to fix issue #39639), I could create the separate function and only include the pool_slots check in it for now. I will also open a separate PR to modify the other checks and add them to that function in the way we discussed. This would maintain the scope of this PR on fixing the issue. The downside to this approach is that I would be partially implementing the change of creating a separate function to unify operator checks between two different PRs.

Alternatively, I could just add the pool_slots check to both partial() and _expand() functions only for this PR instead of using the separate function, and open a separate PR to create the function that unifies the repeated code in one place. This would maintain the scope of this PR and also have a separate PR with the scope of modifying the way we run operator checks (instead of having this scope scattered between two PRs).

The last option would be to add all fields including pool_slots to the separate function. This would fix the issue and also change the way we run operator checks in a single PR, but I think this would make the scope of the PR too broad.

Please let me know your thoughts and which approach you think is best.

@uranusjr
Copy link
Member

Either approach sounds fine to me, the difference is just procedural and we don’t need to be strict here.

@karenbraganz
Copy link
Contributor Author

I will go with the second approach. I have made the changes you requested to my code regarding the usage of the dag variable from the local scope and splitting the if statement in base.py.

I will start working on a separate PR to create a method that unifies the DAG checks instead of having them separately for the BaseOperator init(), partial(), and _expand() methods.

Please let me know if any other changes are required for this PR.

@uranusjr uranusjr dismissed their stale review October 2, 2024 06:19

Fine now

@uranusjr uranusjr merged commit 39d207b into apache:main Oct 2, 2024
52 checks passed
@uranusjr
Copy link
Member

uranusjr commented Oct 2, 2024

@karenbraganz Can you work on the backport PR please?

karenbraganz added a commit to karenbraganz/airflow that referenced this pull request Oct 3, 2024
@karenbraganz
Copy link
Contributor Author

Backport PR- #42693

joaopamaral pushed a commit to joaopamaral/airflow that referenced this pull request Oct 21, 2024
ellisms pushed a commit to ellisms/airflow that referenced this pull request Nov 13, 2024
eladkal pushed a commit that referenced this pull request Nov 27, 2024
utkarsharma2 added a commit that referenced this pull request Dec 4, 2024
utkarsharma2 added a commit that referenced this pull request Dec 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type:bug-fix Changelog: Bug Fixes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Dynamic task mapping - partial args are not check and only fail at trigger
6 participants