Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend hooks arguments into AwsBaseWaiterTrigger #34884

Merged
merged 4 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions airflow/providers/amazon/aws/triggers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from airflow.providers.amazon.aws.utils.waiter_with_logging import async_wait
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.utils.helpers import prune_dict

if TYPE_CHECKING:
from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook
Expand Down Expand Up @@ -55,6 +56,9 @@ class AwsBaseWaiterTrigger(BaseTrigger):
:param waiter_max_attempts: The maximum number of attempts to be made.
:param aws_conn_id: The Airflow connection used for AWS credentials. To be used to build the hook.
:param region_name: The AWS region where the resources to watch are. To be used to build the hook.
:param verify: Whether or not to verify SSL certificates. To be used to build the hook.
:param botocore_config: Configuration dictionary (key-values) for botocore client.
To be used to build the hook.
"""

def __init__(
Expand All @@ -72,6 +76,8 @@ def __init__(
waiter_max_attempts: int,
aws_conn_id: str | None,
region_name: str | None = None,
verify: bool | str | None = None,
Taragolis marked this conversation as resolved.
Show resolved Hide resolved
botocore_config: dict | None = None,
):
# parameters that should be hardcoded in the child's implem
self.serialized_fields = serialized_fields
Expand All @@ -90,6 +96,8 @@ def __init__(
self.attempts = waiter_max_attempts
self.aws_conn_id = aws_conn_id
self.region_name = region_name
self.verify = verify
self.botocore_config = botocore_config

def serialize(self) -> tuple[str, dict[str, Any]]:
# here we put together the "common" params,
Expand All @@ -102,9 +110,19 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
},
**self.serialized_fields,
)
if self.region_name:
# if we serialize the None value from this, it breaks subclasses that don't have it in their ctor.
params["region_name"] = self.region_name

# if we serialize the None value from this, it breaks subclasses that don't have it in their ctor.
params.update(
prune_dict(
{
# Keep previous behaviour when empty string in region_name evaluated as `None`
"region_name": self.region_name or None,
"verify": self.verify,
"botocore_config": self.botocore_config,
}
)
)

return (
# remember that self is an instance of the subclass here, not of this class.
self.__class__.__module__ + "." + self.__class__.__qualname__,
Expand Down
36 changes: 35 additions & 1 deletion tests/providers/amazon/aws/triggers/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,41 @@ def test_region_serialized(self):
assert "region_name" in args
assert args["region_name"] == "my_region"

def test_region_not_serialized_if_omitted(self):
@pytest.mark.parametrize("verify", [True, False, pytest.param("/foo/bar.pem", id="path")])
def test_verify_serialized(self, verify):
self.trigger.verify = verify
_, args = self.trigger.serialize()

assert "verify" in args
assert args["verify"] == verify

@pytest.mark.parametrize(
"botocore_config",
[
pytest.param({"read_timeout": 10, "connect_timeout": 42, "keepalive": True}, id="non-empty-dict"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have such example exposed in the docs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet. Let's add it with first migrated Operators.
Lambda it is a good candidate for first migration:

  • Just couple Operators/Sensors
  • Have deferrable mode

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea create similar RST file as https://github.com/apache/airflow/blob/main/docs/apache-airflow-providers-amazon/_partials/prerequisite_tasks.rst
Describe generic hook parameters + links to boto3 docs (if required) and include into migrated operators

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's best to create doc for the base class rather than add the example to all/selected operators

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we could create page for base boto3 based operators.
However I think that we also need to have this kind of documentation into the specific operators pages. Because from auto-API pages users would go to the operator specific page.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a plan create some draft PR during the day, so we could discuss documentation there

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool so lets merge?

pytest.param({}, id="empty-dict"),
],
)
def test_botocore_config_serialized(self, botocore_config):
self.trigger.botocore_config = botocore_config
_, args = self.trigger.serialize()

assert "botocore_config" in args
assert args["botocore_config"] == botocore_config

@pytest.mark.parametrize("param_name", ["region_name", "verify", "botocore_config"])
def test_hooks_args_not_serialized_if_omitted(self, param_name):
_, args = self.trigger.serialize()

assert param_name not in args

def test_region_name_not_serialized_if_empty_string(self):
"""
Compatibility with previous behaviour when empty string region name not serialised.

It would evaluate as None, however empty string it is not valid region name in boto3.
"""
self.trigger.region_name = ""
_, args = self.trigger.serialize()

assert "region_name" not in args
Expand Down