Skip to content

Commit

Permalink
Events: Support CrossAccount events (#5866)
Browse files Browse the repository at this point in the history
  • Loading branch information
bblommers authored Jan 23, 2023
1 parent 931bb6d commit b0ee64f
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 12 deletions.
9 changes: 9 additions & 0 deletions docs/docs/services/events.rst
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,15 @@ events
- [X] list_tags_for_resource
- [X] list_targets_by_rule
- [X] put_events

The following targets are supported at the moment:

- CloudWatch Log Group
- EventBridge Archive
- SQS Queue + FIFO Queue
- Cross-region/account EventBus


- [ ] put_partner_events
- [X] put_permission
- [X] put_rule
Expand Down
52 changes: 41 additions & 11 deletions moto/events/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@


class Rule(CloudFormationModel):
Arn = namedtuple("Arn", ["service", "resource_type", "resource_id"])
Arn = namedtuple(
"Arn", ["account", "region", "service", "resource_type", "resource_id"]
)

def __init__(
self,
Expand Down Expand Up @@ -122,6 +124,7 @@ def send_to_targets(self, event_bus_name, event):
# - CloudWatch Log Group
# - EventBridge Archive
# - SQS Queue + FIFO Queue
# - Cross-region/account EventBus
for target in self.targets:
arn = self._parse_arn(target["Arn"])

Expand All @@ -135,17 +138,25 @@ def send_to_targets(self, event_bus_name, event):
elif arn.service == "sqs":
group_id = target.get("SqsParameters", {}).get("MessageGroupId")
self._send_to_sqs_queue(arn.resource_id, event, group_id)
elif arn.service == "events" and arn.resource_type == "event-bus":
cross_account_backend: EventsBackend = events_backends[arn.account][
arn.region
]
new_event = {
"Source": event["source"],
"DetailType": event["detail-type"],
"Detail": json.dumps(event["detail"]),
"EventBusName": arn.resource_id,
}
cross_account_backend.put_events([new_event])
else:
raise NotImplementedError(f"Expr not defined for {type(self)}")

def _parse_arn(self, arn):
def _parse_arn(self, arn: str) -> Arn:
# http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html
# this method needs probably some more fine tuning,
# when also other targets are supported
elements = arn.split(":", 5)

service = elements[2]
resource = elements[5]
_, _, service, region, account, resource = arn.split(":", 5)

if ":" in resource and "/" in resource:
if resource.index(":") < resource.index("/"):
Expand All @@ -161,7 +172,11 @@ def _parse_arn(self, arn):
resource_id = resource

return self.Arn(
service=service, resource_type=resource_type, resource_id=resource_id
account=account,
region=region,
service=service,
resource_type=resource_type,
resource_id=resource_id,
)

def _send_to_cw_log_group(self, name, event):
Expand Down Expand Up @@ -925,11 +940,18 @@ def parse(self):

class EventsBackend(BaseBackend):
"""
When a event occurs, the appropriate targets are triggered for a subset of usecases.
Some Moto services are configured to generate events and send them to EventBridge. See the AWS documentation here:
https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-service-event.html
Events that currently supported
- S3:CreateBucket
Supported events: S3:CreateBucket
Targets that are currently supported
Supported targets: AWSLambda functions
- AWSLambda functions
Please let us know if you want support for an event/target that is not yet listed here.
"""

ACCOUNT_ID = re.compile(r"^(\d{1,12}|\*)$")
Expand Down Expand Up @@ -1069,7 +1091,7 @@ def delete_rule(self, name: str) -> None:
self.tagger.delete_all_tags_for_resource(arn)
self.rules.pop(name)

def describe_rule(self, name):
def describe_rule(self, name: str) -> Rule:
rule = self.rules.get(name)
if not rule:
raise ResourceNotFoundException(f"Rule {name} does not exist.")
Expand Down Expand Up @@ -1174,6 +1196,14 @@ def put_targets(self, name, event_bus_name, targets):
rule.put_targets(targets)

def put_events(self, events):
"""
The following targets are supported at the moment:
- CloudWatch Log Group
- EventBridge Archive
- SQS Queue + FIFO Queue
- Cross-region/account EventBus
"""
num_events = len(events)

if num_events > 10:
Expand Down
98 changes: 97 additions & 1 deletion tests/test_events/test_events_integration.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import json
from datetime import datetime
from unittest import SkipTest, mock

import boto3
import os

import sure # noqa # pylint: disable=unused-import

from moto import mock_events, mock_sqs, mock_logs
from moto import mock_events, mock_sqs, mock_logs, settings
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from moto.core.utils import iso_8601_datetime_without_milliseconds

Expand Down Expand Up @@ -316,3 +319,96 @@ def test_moto_matches_none_value_with_exists_filter():
{"foo": None, "bar": "123"},
],
)


@mock_events
@mock_sqs
def test_put_events_event_bus_forwarding_rules():
if settings.TEST_SERVER_MODE:
raise SkipTest("Cross-account test - easiest to just test in DecoratorMode")

# EventBus1 --> EventBus2 --> SQS
account1 = ACCOUNT_ID
account2 = "222222222222"
event_bus_name1 = "asdf"
event_bus_name2 = "erty"
events_client = boto3.client("events", "eu-central-1")
sqs_client = boto3.client("sqs", region_name="eu-central-1")

pattern = {
"source": ["source1"],
"detail-type": ["test-detail-type"],
"detail": {
"test": [{"exists": True}],
},
}

with mock.patch.dict(os.environ, {"MOTO_ACCOUNT_ID": account2}):
# Setup SQS rule in account 2
queue_url = sqs_client.create_queue(QueueName="test-queue")["QueueUrl"]
queue_arn = sqs_client.get_queue_attributes(
QueueUrl=queue_url, AttributeNames=["QueueArn"]
)["Attributes"]["QueueArn"]

event_bus_arn2 = events_client.create_event_bus(Name=event_bus_name2)[
"EventBusArn"
]

events_client.put_rule(
Name="event_bus_2_rule",
EventPattern=json.dumps(pattern),
State="ENABLED",
EventBusName=event_bus_name2,
)

events_client.put_targets(
Rule="event_bus_2_rule",
EventBusName=event_bus_name2,
Targets=[{"Id": "sqs-dedup-fifo", "Arn": queue_arn}],
)

# Setup EventBus1
events_client.create_event_bus(Name=event_bus_name1)["EventBusArn"]

events_client.put_rule(
Name="event_bus_1_rule",
RoleArn=f"arn:aws:iam::{account1}:role/Administrator",
EventPattern=json.dumps(pattern),
State="ENABLED",
EventBusName=event_bus_name1,
)

events_client.put_targets(
Rule="event_bus_1_rule",
EventBusName=event_bus_name1,
Targets=[
{
"Id": "event_bus_2",
"Arn": event_bus_arn2,
"RoleArn": "arn:aws:iam::123456789012:role/Administrator",
},
],
)

test_events = [
{
"Source": "source1",
"DetailType": "test-detail-type",
"Detail": json.dumps({"test": "true"}),
"EventBusName": event_bus_name1,
}
]

events_client.put_events(Entries=test_events)

with mock.patch.dict(os.environ, {"MOTO_ACCOUNT_ID": account2}):
# Verify SQS messages were received in account 2

response = sqs_client.receive_message(QueueUrl=queue_url)

response["Messages"].should.have.length_of(1)

message = json.loads(response["Messages"][0]["Body"])
message["source"].should.equal("source1")
message["detail-type"].should.equal("test-detail-type")
message["detail"].should.equal({"test": "true"})

0 comments on commit b0ee64f

Please sign in to comment.