Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Amazon EventBridge PutRule hook and operator #32869

Merged
merged 7 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion airflow/providers/amazon/aws/hooks/eventbridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,75 @@
# under the License.
from __future__ import annotations

import json

from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.utils import trim_none_values


def validate_json(pattern: str) -> None:
mahammi marked this conversation as resolved.
Show resolved Hide resolved
try:
json.loads(pattern)
except ValueError:
raise ValueError("`event_pattern` must be a valid JSON string.")


class EventBridgeHook(AwsBaseHook):
"""Amazon EventBridge Hook."""

def __init__(self, *args, **kwargs):
"""Creating object."""
super().__init__(client_type="events", *args, **kwargs)

def put_rule(
self,
name: str,
description: str | None = None,
event_bus_name: str | None = None,
event_pattern: str | None = None,
role_arn: str | None = None,
schedule_expression: str | None = None,
state: str | None = None,
tags: list[dict] | None = None,
**kwargs,
):
"""
Create or update an EventBridge rule.

:param name: name of the rule to create or update (required)
:param description: description of the rule
:param event_bus_name: name or ARN of the event bus to associate with this rule
:param event_pattern: pattern of events to be matched to this rule
:param role_arn: the Amazon Resource Name of the IAM role associated with the rule
:param schedule_expression: the scheduling expression (for example, a cron or rate expression)
:param state: indicates whether rule is set to be "ENABLED" or "DISABLED"
:param tags: list of key-value pairs to associate with the rule

"""
if not (event_pattern or schedule_expression):
raise ValueError(
"One of `event_pattern` or `schedule_expression` are required in order to "
"put or update your rule."
)

if state and state not in ["ENABLED", "DISABLED"]:
raise ValueError("`state` must be specified as ENABLED or DISABLED.")

if event_pattern:
validate_json(event_pattern)

put_rule_kwargs: dict[str, str | list] = {
**trim_none_values(
{
"Name": name,
"Description": description,
"EventBusName": event_bus_name,
"EventPattern": event_pattern,
"RoleArn": role_arn,
"ScheduleExpression": schedule_expression,
"State": state,
"Tags": tags,
}
)
}

return self.conn.put_rule(**put_rule_kwargs)
85 changes: 85 additions & 0 deletions airflow/providers/amazon/aws/operators/eventbridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ class EventBridgePutEventsOperator(BaseOperator):
"""
Put Events onto Amazon EventBridge.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:EventBridgePutEventsOperator`

:param entries: the list of events to be put onto EventBridge, each event is a dict (required)
:param endpoint_id: the URL subdomain of the endpoint
:param aws_conn_id: the AWS connection to use
Expand Down Expand Up @@ -85,3 +89,84 @@ def execute(self, context: Context):

if self.do_xcom_push:
return [e["EventId"] for e in response["Entries"]]


class EventBridgePutRuleOperator(BaseOperator):
"""
Create or update a specified EventBridge rule.

mahammi marked this conversation as resolved.
Show resolved Hide resolved
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:EventBridgePutRuleOperator`

:param name: name of the rule to create or update (required)
:param description: description of the rule
:param event_bus_name: name or ARN of the event bus to associate with this rule
:param event_pattern: pattern of events to be matched to this rule
:param role_arn: the Amazon Resource Name of the IAM role associated with the rule
:param schedule_expression: the scheduling expression (for example, a cron or rate expression)
:param state: indicates whether rule is set to be "ENABLED" or "DISABLED"
:param tags: list of key-value pairs to associate with the rule
:param region: the region where rule is to be created or updated

"""

template_fields: Sequence[str] = (
"aws_conn_id",
"name",
"description",
"event_bus_name",
"event_pattern",
"role_arn",
"schedule_expression",
"state",
"tags",
"region_name",
)

def __init__(
self,
*,
name: str,
description: str | None = None,
event_bus_name: str | None = None,
event_pattern: str | None = None,
role_arn: str | None = None,
schedule_expression: str | None = None,
state: str | None = None,
tags: list | None = None,
region_name: str | None = None,
aws_conn_id: str = "aws_default",
**kwargs,
):
super().__init__(**kwargs)
self.name = name
self.description = description
self.event_bus_name = event_bus_name
self.event_pattern = event_pattern
self.role_arn = role_arn
self.region_name = region_name
self.schedule_expression = schedule_expression
self.state = state
self.tags = tags
self.aws_conn_id = aws_conn_id

@cached_property
def hook(self) -> EventBridgeHook:
"""Create and return an EventBridgeHook."""
return EventBridgeHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)

def execute(self, context: Context):

self.log.info('Sending rule "%s" to EventBridge.', self.name)

return self.hook.put_rule(
name=self.name,
description=self.description,
event_bus_name=self.event_bus_name,
event_pattern=self.event_pattern,
role_arn=self.role_arn,
schedule_expression=self.schedule_expression,
state=self.state,
tags=self.tags,
)
24 changes: 21 additions & 3 deletions docs/apache-airflow-providers-amazon/operators/eventbridge.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
specific language governing permissions and limitations
under the License.

========================================
==================
Amazon EventBridge
========================================
==================

`Amazon Eventbridge <https://docs.aws.amazon.com/eventbridge/>`__ is a serverless event bus service that makes it easy
to connect your applications with data from a variety of sources. EventBridge delivers a stream of real-time data from
Expand All @@ -34,8 +34,11 @@ Prerequisite Tasks
Operators
---------

.. _howto/operator:EventBridgePutEventsOperator:


Send events to Amazon EventBridge
==========================================
=================================

To send custom events to Amazon EventBridge, use
:class:`~airflow.providers.amazon.aws.operators.eventbridge.EventBridgePutEventsOperator`.
Expand All @@ -46,6 +49,21 @@ To send custom events to Amazon EventBridge, use
:start-after: [START howto_operator_eventbridge_put_events]
:end-before: [END howto_operator_eventbridge_put_events]

.. _howto/operator:EventBridgePutRuleOperator:


Create or update a rule on Amazon EventBridge
mahammi marked this conversation as resolved.
Show resolved Hide resolved
======================================================

To create or update a rule on EventBridge, use
:class:`~airflow.providers.amazon.aws.operators.eventbridge.EventBridgePutRuleOperator`.

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eventbridge.py
:language: python
:dedent: 4
:start-after: [START howto_operator_eventbridge_put_rule]
:end-before: [END howto_operator_eventbridge_put_rule]


Reference
---------
Expand Down
11 changes: 11 additions & 0 deletions tests/providers/amazon/aws/hooks/test_eventbridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import pytest
from moto import mock_events

from airflow.providers.amazon.aws.hooks.eventbridge import EventBridgeHook
Expand All @@ -26,3 +27,13 @@ class TestEventBridgeHook:
def test_conn_returns_a_boto3_connection(self):
hook = EventBridgeHook(aws_conn_id="aws_default")
assert hook.conn is not None

def test_put_rule(self):
hook = EventBridgeHook(aws_conn_id="aws_default")
response = hook.put_rule(name="test", event_pattern='{"source": ["aws.s3"]}', state="ENABLED")
assert "RuleArn" in response

def test_put_rule_with_bad_json_fails(self):
hook = EventBridgeHook(aws_conn_id="aws_default")
with pytest.raises(ValueError):
hook.put_rule(name="test", event_pattern="invalid json", state="ENABLED")
44 changes: 38 additions & 6 deletions tests/providers/amazon/aws/operators/test_eventbridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,20 @@

from airflow import AirflowException
from airflow.providers.amazon.aws.hooks.eventbridge import EventBridgeHook
from airflow.providers.amazon.aws.operators.eventbridge import EventBridgePutEventsOperator
from airflow.providers.amazon.aws.operators.eventbridge import (
EventBridgePutEventsOperator,
EventBridgePutRuleOperator,
)

TASK_ID = "events_putevents_job"
ENTRIES = [{"Detail": "test-detail", "Source": "test-source", "DetailType": "test-detail-type"}]
FAILED_ENTRIES_RESPONSE = [{"ErrorCode": "test_code"}, {"ErrorCode": "test_code"}]
EVENT_PATTERN = '{"source": ["aws.s3"]}'


class TestEventBridgePutEventsOperator:
def test_init(self):
operator = EventBridgePutEventsOperator(
task_id=TASK_ID,
task_id="put_events_job",
entries=ENTRIES,
)

Expand All @@ -46,7 +49,7 @@ def test_execute(self, mock_conn: MagicMock):
mock_conn.put_events.return_value = hook_response

operator = EventBridgePutEventsOperator(
task_id=TASK_ID,
task_id="put_events_job",
entries=ENTRIES,
)

Expand All @@ -56,7 +59,6 @@ def test_execute(self, mock_conn: MagicMock):

@mock.patch.object(EventBridgeHook, "conn")
def test_failed_to_send(self, mock_conn: MagicMock):

hook_response = {
"FailedEntryCount": 1,
"Entries": FAILED_ENTRIES_RESPONSE,
Expand All @@ -65,9 +67,39 @@ def test_failed_to_send(self, mock_conn: MagicMock):
mock_conn.put_events.return_value = hook_response

operator = EventBridgePutEventsOperator(
task_id=TASK_ID,
task_id="failed_put_events_job",
entries=ENTRIES,
)

with pytest.raises(AirflowException):
operator.execute(None)


class TestEventBridgePutRuleOperator:
def test_init(self):
operator = EventBridgePutRuleOperator(
task_id="events_put_rule_job", name="match_s3_events", event_pattern=EVENT_PATTERN
)

assert operator.event_pattern == EVENT_PATTERN

@mock.patch.object(EventBridgeHook, "conn")
def test_execute(self, mock_conn: MagicMock):
hook_response = {"RuleArn": "arn:aws:events:us-east-1:123456789012:rule/test"}
mock_conn.put_rule.return_value = hook_response

operator = EventBridgePutRuleOperator(
task_id="events_put_rule_job", name="match_s3_events", event_pattern=EVENT_PATTERN
)

result = operator.execute(None)

assert result == hook_response

def test_put_rule_with_bad_json_fails(self):
operator = EventBridgePutRuleOperator(
task_id="failed_put_rule_job", name="match_s3_events", event_pattern="invalid json"
)

with pytest.raises(ValueError):
operator.execute(None)
25 changes: 22 additions & 3 deletions tests/system/providers/amazon/aws/example_eventbridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
from datetime import datetime

from airflow import DAG
from airflow.providers.amazon.aws.operators.eventbridge import EventBridgePutEventsOperator
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.operators.eventbridge import (
EventBridgePutEventsOperator,
EventBridgePutRuleOperator,
)
from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder

DAG_ID = "example_eventbridge"
ENTRIES = [
Expand All @@ -31,20 +36,34 @@
}
]

sys_test_context_task = SystemTestContextBuilder().build()

with DAG(
dag_id=DAG_ID,
schedule="@once",
start_date=datetime(2021, 1, 1),
tags=["example"],
catchup=False,
) as dag:
test_context = sys_test_context_task()

# [START howto_operator_eventbridge_put_events]
env_id = test_context[ENV_ID_KEY]

# [START howto_operator_eventbridge_put_events]
put_events = EventBridgePutEventsOperator(task_id="put_events_task", entries=ENTRIES)

# [END howto_operator_eventbridge_put_events]

# [START howto_operator_eventbridge_put_rule]
put_rule = EventBridgePutRuleOperator(
task_id="put_rule_task",
name="Example Rule",
event_pattern='{"source": ["example.myapp"]}',
description="This rule matches events from example.myapp.",
)
# [END howto_operator_eventbridge_put_rule]

chain(test_context, put_events, put_rule)


mahammi marked this conversation as resolved.
Show resolved Hide resolved
from tests.system.utils import get_test_run # noqa: E402

Expand Down