Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for deferrable operators in AMPP #30032

Merged

Conversation

syedahsn
Copy link
Contributor

@syedahsn syedahsn commented Mar 10, 2023

The purpose of this PR is to add the foundations required to convert AMPP operators to deferrable operators.
The aiobotocore library is being used to make all async calls to the boto3 API.

Rather than create separate Async hooks for every service, we create an async_conn property in base_aws.py which behaves similar to the existing conn property. This property allows us to get access to a client that can make async boto3 API calls. This also allows us to make use of the supporting code that exists for the conn property. The get_client_type function is extended to handle returning a ClientCreatorContext object which can be used to get a client that supports asynchronous calls.

Another addition this PR makes is extending get_waiter in base_aws.py to work with custom async waiters. Currently, the get_waiter function allows us to create a custom waiter using a JSON config file. This PR allows passing a client parameter which can be used to return an async waiter generated by the aiobotocore library. This feature will save a lot of code duplication by standardizing the polling portion of Triggers. A README.md file is included that describes how Triggers can be written which make use of these features. Although not all operators and sensor will be able to make use of these features, there are many in the AMPP that can be written in a standardized format following the methods described.

The RedshiftCreateClusterOperator is chosen to demonstrate how an operator would be modified to become a deferrable operator. We use the built-in cluster_available waiter in the Trigger to asynchronously poll the boto3 API to wait for the cluster to become available.
@Taragolis, @uranusjr , @potiuk , @pankajastro I'd love to hear your thoughts on this approach, and see what you think.


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@syedahsn syedahsn force-pushed the syedahsn/deferrable-redshift-create-cluster branch 5 times, most recently from 7717291 to 8c93e1a Compare March 25, 2023 00:02
@potiuk
Copy link
Member

potiuk commented Mar 27, 2023

You need to skip your aiobotocore tests conditionally - see other deferrable tests in aws

@syedahsn syedahsn force-pushed the syedahsn/deferrable-redshift-create-cluster branch 7 times, most recently from b6b680d to 075f201 Compare March 29, 2023 18:29
@syedahsn syedahsn force-pushed the syedahsn/deferrable-redshift-create-cluster branch 4 times, most recently from 9633b59 to f023f32 Compare April 6, 2023 17:07
@syedahsn syedahsn force-pushed the syedahsn/deferrable-redshift-create-cluster branch 2 times, most recently from 571789d to b164050 Compare April 11, 2023 16:52
airflow/providers/amazon/aws/triggers/README.md Outdated Show resolved Hide resolved
airflow/providers/amazon/aws/triggers/README.md Outdated Show resolved Hide resolved
airflow/providers/amazon/aws/triggers/README.md Outdated Show resolved Hide resolved
airflow/providers/amazon/aws/triggers/README.md Outdated Show resolved Hide resolved
airflow/providers/amazon/aws/triggers/README.md Outdated Show resolved Hide resolved
airflow/providers/amazon/aws/triggers/README.md Outdated Show resolved Hide resolved
Add unit test for deferrable in test_redshift_cluster.py operator
Add docstring for deferrable param
Comment on lines +132 to +133
with pytest.raises(TaskDeferred):
redshift_operator.execute(None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I was expecting to check that self.defer have been called, but this one works as well :)

@vincbeck
Copy link
Contributor

There are some static check failures

@potiuk potiuk merged commit 62ea0ff into apache:main Apr 24, 2023
potiuk added a commit to potiuk/airflow that referenced this pull request Apr 26, 2023
The apache#30032 has been merged with aiobotore as required dependency.
The aiobotocore package adds specific requirements for botocore
and boto and it conflicts with older versions of Airflow, so
we have to bring it back as optional dependency of the
amazon provider.
potiuk added a commit that referenced this pull request Apr 26, 2023
The #30032 has been merged with aiobotore as required dependency.
The aiobotocore package adds specific requirements for botocore
and boto and it conflicts with older versions of Airflow, so
we have to bring it back as optional dependency of the
amazon provider.
@vandonr-amz vandonr-amz deleted the syedahsn/deferrable-redshift-create-cluster branch May 24, 2023 20:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:amazon-aws AWS/Amazon - related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants