-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support in AWS Batch Operator for multinode jobs #28321
Add support in AWS Batch Operator for multinode jobs #28321
Conversation
…obs; update client to collect log info from multinode job descriptions
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
|
…batch-operator-for-multinode-jobs
self.container_overrides = overrides | ||
self.node_overrides = node_overrides |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a question. Did you check are this arguments mutually exclusive?
AWS API doesn't mention it however everything might possible because even new eksPropertiesOverride not marked as exclusive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes they are, specifying both returns an input validation error.
BTW @Taragolis , could you approve running the workflow to help get this PR ready-to-review ? 🙏
Ok I got a handle on all tests finally :) ready for review now. |
job_id, | ||
) | ||
log_configuration = ( | ||
job_node_range_properties[0].get("container", {}).get("logConfiguration", {}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to have zero element in the array ? i.e. should we add a check on len == 0
and a user-friendly error message ?
self.log.warning( | ||
"AWS Batch job (%s) is neither a container nor multinode job. Log info not found." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this could be an error log, considering the user-provided input is invalid for this kind of request ?
The other warning logs in this method are mostly informative (there are several node groups, which is important info for the user to know, but doesn't require any action), this one I think requires user action, and thus more attention.
}, | ||
} | ||
}, | ||
} | ||
], | ||
}, | ||
} | ||
] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
beautiful 😄
@@ -108,12 +110,13 @@ class BatchOperator(BaseOperator): | |||
"job_queue", | |||
"overrides", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL the names here need to match not the parameter name in the ctor but the name of the attribute in the class, so:
"overrides", | |
"container_overrides", |
Thinking about it, I wonder if this is a breaking change... I don't know too well how this works 😬
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@camilleanne @vandonr-amz I recommend deprecate old parameter but keep it for a while, so users would have a time for change their code.
It is much easier achieve in subclass of BaseOperator
(this PR case), because all arguments are keyword.
Some examples
Use old attribute value only if new attribute not set
airflow/airflow/providers/slack/operators/slack_webhook.py
Lines 95 to 104 in 11f30a8
http_conn_id = kwargs.pop("http_conn_id", None) | |
if http_conn_id: | |
warnings.warn( | |
"Parameter `http_conn_id` is deprecated. Please use `slack_webhook_conn_id` instead.", | |
DeprecationWarning, | |
stacklevel=2, | |
) | |
if slack_webhook_conn_id: | |
raise AirflowException("You cannot provide both `slack_webhook_conn_id` and `http_conn_id`.") | |
slack_webhook_conn_id = http_conn_id |
Use old attribute value only if it equal new attribute value or not new attribute not set
airflow/airflow/providers/amazon/aws/operators/athena.py
Lines 91 to 101 in 11f30a8
if max_tries: | |
warnings.warn( | |
f"Parameter `{self.__class__.__name__}.max_tries` is deprecated and will be removed " | |
"in a future release. Please use method `max_polling_attempts` instead.", | |
DeprecationWarning, | |
stacklevel=2, | |
) | |
if max_polling_attempts and max_polling_attempts != max_tries: | |
raise Exception("max_polling_attempts must be the same value as max_tries") | |
else: | |
self.max_polling_attempts = max_tries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given the fact that it was just a rename for readability, I wonder if it wouldn't be simpler to just keep the old name ?
but then it could be confusing to users...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it fine if rename this argument because one day we might also add eks_properties_overrides
, so override
might confuse users more rather than change attribute name.
And deprecation warning give time to change arguments in end users DAG code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL the names here need to match not the parameter name in the ctor but the name of the attribute in the class
To translate that into ELI5: if you have self.foo = bar
in your operator, the template field would be named "foo" to match "self.foo", not "bar". :P
Hey @camilleanne, Any plans to pick this one up again? |
…batch-operator-for-multinode-jobs
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. |
@vandonr-amz is taking on this work on in #29522 Closing this PR with that context. |
picking up #28321 after it's been somewhat abandoned by the original author. Addressed my own comment about empty array, and it should be good to go I think. Initial description from @camilleanne: Adds support for AWS Batch multinode jobs by allowing a node_overrides json object to be passed through to the boto3 submit_job method. Adds support for multinode jobs by properly parsing the output of describe_jobs (which is different for container vs multinode) to extract the log stream name. closes: #25522
node_overrides
json object to be passed through to the boto3 submit_job method.describe_jobs
(which is different for container vs multinode) to extract the log stream name.closes: #25522
I had a hard time running tests locally, so I'm opening as a draft PR initially although I don't anticipate any changes beyond syncing with main, but I'd like to confirm test success before making available for review.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.