Skip to content

Commit

Permalink
feat(s3): add EventBridge bucket notifications (#18150)
Browse files Browse the repository at this point in the history
### **Description**

Adds EventBridge bucket notification configuration. 

See https://aws.amazon.com/blogs/aws/new-use-amazon-s3-event-notifications-with-amazon-eventbridge/


### **Implementation**

- Added new Bucket property to enable this feature (`eventBridgeEnabled: true`)
- Added EventBridge config to `S3BucketNotifications` Custom Resource
- Added unit tests
- Added integration test (currently fails, see below for more info) 
- Fixed dependent integration tests

Closes #18076

### **FAQ**

1. **Why not simply expose EventBridge Cfn property via S3 BucketProps?**

 Currently CDK manages `NotificationConfigurations `via CustomResource. If we were to expose that way, then e.g. SNS configuration would override EventBridge config.

2. **Why not create new `IBucketNotificationDestination` class for EventBridge?**

 We can, but there is no need. Usually we create a subclass to `IBucketNotificationDestination` in order to adjust resource permissions, however in this case there is no need to adjust permissions: [default EventBridge does not require any additional permissions](https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-permissions.html) unlike SQS/SNS/Lambda destinations. Additionally, enabling this feature via bucket props is much cleaner/simpler API than creating new dummy object of type `IBucketNotificationDestination` for customers.
 
 However, if you still think that we need to create new `IBucketNotificationDestination` subclass for EventBridge for consistency, let me know and I will refactor.

----

**BLOCKED ON LAMBDA RUNTIME SDK UPDATE TO BOTOCORE >= v1.23.16 (Integration test currently fails as current version (v1.21.55) does not contain EventBridge configuration)**

Check latest version here: https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
yerzhan7 authored Jan 10, 2022
1 parent 883c1a3 commit 912aeda
Show file tree
Hide file tree
Showing 14 changed files with 400 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@
"Properties": {
"Description": "AWS CloudFormation handler for \"Custom::S3BucketNotifications\" resources (@aws-cdk/aws-s3)",
"Code": {
"ZipFile": "import boto3 # type: ignore\nimport json\nimport logging\nimport urllib.request\n\ns3 = boto3.client(\"s3\")\n\nCONFIGURATION_TYPES = [\"TopicConfigurations\", \"QueueConfigurations\", \"LambdaFunctionConfigurations\"]\n\ndef handler(event: dict, context):\n response_status = \"SUCCESS\"\n error_message = \"\"\n try:\n props = event[\"ResourceProperties\"]\n bucket = props[\"BucketName\"]\n notification_configuration = props[\"NotificationConfiguration\"]\n request_type = event[\"RequestType\"]\n managed = props.get('Managed', 'true').lower() == 'true'\n stack_id = event['StackId']\n\n if managed:\n config = handle_managed(request_type, notification_configuration)\n else:\n config = handle_unmanaged(bucket, stack_id, request_type, notification_configuration)\n\n put_bucket_notification_configuration(bucket, config)\n except Exception as e:\n logging.exception(\"Failed to put bucket notification configuration\")\n response_status = \"FAILED\"\n error_message = f\"Error: {str(e)}. \"\n finally:\n submit_response(event, context, response_status, error_message)\n\n\ndef handle_managed(request_type, notification_configuration):\n if request_type == 'Delete':\n return {}\n return notification_configuration\n\n\ndef handle_unmanaged(bucket, stack_id, request_type, notification_configuration):\n\n # find external notifications\n external_notifications = find_external_notifications(bucket, stack_id)\n\n # if delete, that's all we need\n if request_type == 'Delete':\n return external_notifications\n\n def with_id(notification):\n notification['Id'] = f\"{stack_id}-{hash(json.dumps(notification, sort_keys=True))}\"\n return notification\n\n # otherwise, merge external with incoming config and augment with id\n notifications = {}\n for t in CONFIGURATION_TYPES:\n external = external_notifications.get(t, [])\n incoming = [with_id(n) for n in notification_configuration.get(t, [])]\n notifications[t] = external + incoming\n return notifications\n\n\ndef find_external_notifications(bucket, stack_id):\n existing_notifications = get_bucket_notification_configuration(bucket)\n external_notifications = {}\n for t in CONFIGURATION_TYPES:\n # if the notification was created by us, we know what id to expect\n # so we can filter by it.\n external_notifications[t] = [n for n in existing_notifications.get(t, []) if not n['Id'].startswith(f\"{stack_id}-\")]\n\n return external_notifications\n\n\ndef get_bucket_notification_configuration(bucket):\n return s3.get_bucket_notification_configuration(Bucket=bucket)\n\n\ndef put_bucket_notification_configuration(bucket, notification_configuration):\n s3.put_bucket_notification_configuration(Bucket=bucket, NotificationConfiguration=notification_configuration)\n\n\ndef submit_response(event: dict, context, response_status: str, error_message: str):\n response_body = json.dumps(\n {\n \"Status\": response_status,\n \"Reason\": f\"{error_message}See the details in CloudWatch Log Stream: {context.log_stream_name}\",\n \"PhysicalResourceId\": event.get(\"PhysicalResourceId\") or event[\"LogicalResourceId\"],\n \"StackId\": event[\"StackId\"],\n \"RequestId\": event[\"RequestId\"],\n \"LogicalResourceId\": event[\"LogicalResourceId\"],\n \"NoEcho\": False,\n }\n ).encode(\"utf-8\")\n headers = {\"content-type\": \"\", \"content-length\": str(len(response_body))}\n try:\n req = urllib.request.Request(url=event[\"ResponseURL\"], headers=headers, data=response_body, method=\"PUT\")\n with urllib.request.urlopen(req) as response:\n print(response.read().decode(\"utf-8\"))\n print(\"Status code: \" + response.reason)\n except Exception as e:\n print(\"send(..) failed executing request.urlopen(..): \" + str(e))\n"
"ZipFile": "import boto3 # type: ignore\nimport json\nimport logging\nimport urllib.request\n\ns3 = boto3.client(\"s3\")\n\nEVENTBRIDGE_CONFIGURATION = 'EventBridgeConfiguration'\n\nCONFIGURATION_TYPES = [\"TopicConfigurations\", \"QueueConfigurations\", \"LambdaFunctionConfigurations\"]\n\ndef handler(event: dict, context):\n response_status = \"SUCCESS\"\n error_message = \"\"\n try:\n props = event[\"ResourceProperties\"]\n bucket = props[\"BucketName\"]\n notification_configuration = props[\"NotificationConfiguration\"]\n request_type = event[\"RequestType\"]\n managed = props.get('Managed', 'true').lower() == 'true'\n stack_id = event['StackId']\n\n if managed:\n config = handle_managed(request_type, notification_configuration)\n else:\n config = handle_unmanaged(bucket, stack_id, request_type, notification_configuration)\n\n put_bucket_notification_configuration(bucket, config)\n except Exception as e:\n logging.exception(\"Failed to put bucket notification configuration\")\n response_status = \"FAILED\"\n error_message = f\"Error: {str(e)}. \"\n finally:\n submit_response(event, context, response_status, error_message)\n\n\ndef handle_managed(request_type, notification_configuration):\n if request_type == 'Delete':\n return {}\n return notification_configuration\n\n\ndef handle_unmanaged(bucket, stack_id, request_type, notification_configuration):\n\n # find external notifications\n external_notifications = find_external_notifications(bucket, stack_id)\n\n # if delete, that's all we need\n if request_type == 'Delete':\n return external_notifications\n\n def with_id(notification):\n notification['Id'] = f\"{stack_id}-{hash(json.dumps(notification, sort_keys=True))}\"\n return notification\n\n # otherwise, merge external with incoming config and augment with id\n notifications = {}\n for t in CONFIGURATION_TYPES:\n external = external_notifications.get(t, [])\n incoming = [with_id(n) for n in notification_configuration.get(t, [])]\n notifications[t] = external + incoming\n\n # EventBridge configuration is a special case because it's just an empty object if it exists\n if EVENTBRIDGE_CONFIGURATION in notification_configuration:\n notifications[EVENTBRIDGE_CONFIGURATION] = notification_configuration[EVENTBRIDGE_CONFIGURATION]\n elif EVENTBRIDGE_CONFIGURATION in external_notifications:\n notifications[EVENTBRIDGE_CONFIGURATION] = external_notifications[EVENTBRIDGE_CONFIGURATION]\n\n return notifications\n\n\ndef find_external_notifications(bucket, stack_id):\n existing_notifications = get_bucket_notification_configuration(bucket)\n external_notifications = {}\n for t in CONFIGURATION_TYPES:\n # if the notification was created by us, we know what id to expect\n # so we can filter by it.\n external_notifications[t] = [n for n in existing_notifications.get(t, []) if not n['Id'].startswith(f\"{stack_id}-\")]\n\n # always treat EventBridge configuration as an external config if it already exists\n # as there is no way to determine whether it's managed by us or not\n if EVENTBRIDGE_CONFIGURATION in existing_notifications:\n external_notifications[EVENTBRIDGE_CONFIGURATION] = existing_notifications[EVENTBRIDGE_CONFIGURATION]\n\n return external_notifications\n\n\ndef get_bucket_notification_configuration(bucket):\n return s3.get_bucket_notification_configuration(Bucket=bucket)\n\n\ndef put_bucket_notification_configuration(bucket, notification_configuration):\n s3.put_bucket_notification_configuration(Bucket=bucket, NotificationConfiguration=notification_configuration)\n\n\ndef submit_response(event: dict, context, response_status: str, error_message: str):\n response_body = json.dumps(\n {\n \"Status\": response_status,\n \"Reason\": f\"{error_message}See the details in CloudWatch Log Stream: {context.log_stream_name}\",\n \"PhysicalResourceId\": event.get(\"PhysicalResourceId\") or event[\"LogicalResourceId\"],\n \"StackId\": event[\"StackId\"],\n \"RequestId\": event[\"RequestId\"],\n \"LogicalResourceId\": event[\"LogicalResourceId\"],\n \"NoEcho\": False,\n }\n ).encode(\"utf-8\")\n headers = {\"content-type\": \"\", \"content-length\": str(len(response_body))}\n try:\n req = urllib.request.Request(url=event[\"ResponseURL\"], headers=headers, data=response_body, method=\"PUT\")\n with urllib.request.urlopen(req) as response:\n print(response.read().decode(\"utf-8\"))\n print(\"Status code: \" + response.reason)\n except Exception as e:\n print(\"send(..) failed executing request.urlopen(..): \" + str(e))\n"
},
"Handler": "index.handler",
"Role": {
Expand All @@ -195,4 +195,4 @@
]
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@
"Properties": {
"Description": "AWS CloudFormation handler for \"Custom::S3BucketNotifications\" resources (@aws-cdk/aws-s3)",
"Code": {
"ZipFile": "import boto3 # type: ignore\nimport json\nimport logging\nimport urllib.request\n\ns3 = boto3.client(\"s3\")\n\nCONFIGURATION_TYPES = [\"TopicConfigurations\", \"QueueConfigurations\", \"LambdaFunctionConfigurations\"]\n\ndef handler(event: dict, context):\n response_status = \"SUCCESS\"\n error_message = \"\"\n try:\n props = event[\"ResourceProperties\"]\n bucket = props[\"BucketName\"]\n notification_configuration = props[\"NotificationConfiguration\"]\n request_type = event[\"RequestType\"]\n managed = props.get('Managed', 'true').lower() == 'true'\n stack_id = event['StackId']\n\n if managed:\n config = handle_managed(request_type, notification_configuration)\n else:\n config = handle_unmanaged(bucket, stack_id, request_type, notification_configuration)\n\n put_bucket_notification_configuration(bucket, config)\n except Exception as e:\n logging.exception(\"Failed to put bucket notification configuration\")\n response_status = \"FAILED\"\n error_message = f\"Error: {str(e)}. \"\n finally:\n submit_response(event, context, response_status, error_message)\n\n\ndef handle_managed(request_type, notification_configuration):\n if request_type == 'Delete':\n return {}\n return notification_configuration\n\n\ndef handle_unmanaged(bucket, stack_id, request_type, notification_configuration):\n\n # find external notifications\n external_notifications = find_external_notifications(bucket, stack_id)\n\n # if delete, that's all we need\n if request_type == 'Delete':\n return external_notifications\n\n def with_id(notification):\n notification['Id'] = f\"{stack_id}-{hash(json.dumps(notification, sort_keys=True))}\"\n return notification\n\n # otherwise, merge external with incoming config and augment with id\n notifications = {}\n for t in CONFIGURATION_TYPES:\n external = external_notifications.get(t, [])\n incoming = [with_id(n) for n in notification_configuration.get(t, [])]\n notifications[t] = external + incoming\n return notifications\n\n\ndef find_external_notifications(bucket, stack_id):\n existing_notifications = get_bucket_notification_configuration(bucket)\n external_notifications = {}\n for t in CONFIGURATION_TYPES:\n # if the notification was created by us, we know what id to expect\n # so we can filter by it.\n external_notifications[t] = [n for n in existing_notifications.get(t, []) if not n['Id'].startswith(f\"{stack_id}-\")]\n\n return external_notifications\n\n\ndef get_bucket_notification_configuration(bucket):\n return s3.get_bucket_notification_configuration(Bucket=bucket)\n\n\ndef put_bucket_notification_configuration(bucket, notification_configuration):\n s3.put_bucket_notification_configuration(Bucket=bucket, NotificationConfiguration=notification_configuration)\n\n\ndef submit_response(event: dict, context, response_status: str, error_message: str):\n response_body = json.dumps(\n {\n \"Status\": response_status,\n \"Reason\": f\"{error_message}See the details in CloudWatch Log Stream: {context.log_stream_name}\",\n \"PhysicalResourceId\": event.get(\"PhysicalResourceId\") or event[\"LogicalResourceId\"],\n \"StackId\": event[\"StackId\"],\n \"RequestId\": event[\"RequestId\"],\n \"LogicalResourceId\": event[\"LogicalResourceId\"],\n \"NoEcho\": False,\n }\n ).encode(\"utf-8\")\n headers = {\"content-type\": \"\", \"content-length\": str(len(response_body))}\n try:\n req = urllib.request.Request(url=event[\"ResponseURL\"], headers=headers, data=response_body, method=\"PUT\")\n with urllib.request.urlopen(req) as response:\n print(response.read().decode(\"utf-8\"))\n print(\"Status code: \" + response.reason)\n except Exception as e:\n print(\"send(..) failed executing request.urlopen(..): \" + str(e))\n"
"ZipFile": "import boto3 # type: ignore\nimport json\nimport logging\nimport urllib.request\n\ns3 = boto3.client(\"s3\")\n\nEVENTBRIDGE_CONFIGURATION = 'EventBridgeConfiguration'\n\nCONFIGURATION_TYPES = [\"TopicConfigurations\", \"QueueConfigurations\", \"LambdaFunctionConfigurations\"]\n\ndef handler(event: dict, context):\n response_status = \"SUCCESS\"\n error_message = \"\"\n try:\n props = event[\"ResourceProperties\"]\n bucket = props[\"BucketName\"]\n notification_configuration = props[\"NotificationConfiguration\"]\n request_type = event[\"RequestType\"]\n managed = props.get('Managed', 'true').lower() == 'true'\n stack_id = event['StackId']\n\n if managed:\n config = handle_managed(request_type, notification_configuration)\n else:\n config = handle_unmanaged(bucket, stack_id, request_type, notification_configuration)\n\n put_bucket_notification_configuration(bucket, config)\n except Exception as e:\n logging.exception(\"Failed to put bucket notification configuration\")\n response_status = \"FAILED\"\n error_message = f\"Error: {str(e)}. \"\n finally:\n submit_response(event, context, response_status, error_message)\n\n\ndef handle_managed(request_type, notification_configuration):\n if request_type == 'Delete':\n return {}\n return notification_configuration\n\n\ndef handle_unmanaged(bucket, stack_id, request_type, notification_configuration):\n\n # find external notifications\n external_notifications = find_external_notifications(bucket, stack_id)\n\n # if delete, that's all we need\n if request_type == 'Delete':\n return external_notifications\n\n def with_id(notification):\n notification['Id'] = f\"{stack_id}-{hash(json.dumps(notification, sort_keys=True))}\"\n return notification\n\n # otherwise, merge external with incoming config and augment with id\n notifications = {}\n for t in CONFIGURATION_TYPES:\n external = external_notifications.get(t, [])\n incoming = [with_id(n) for n in notification_configuration.get(t, [])]\n notifications[t] = external + incoming\n\n # EventBridge configuration is a special case because it's just an empty object if it exists\n if EVENTBRIDGE_CONFIGURATION in notification_configuration:\n notifications[EVENTBRIDGE_CONFIGURATION] = notification_configuration[EVENTBRIDGE_CONFIGURATION]\n elif EVENTBRIDGE_CONFIGURATION in external_notifications:\n notifications[EVENTBRIDGE_CONFIGURATION] = external_notifications[EVENTBRIDGE_CONFIGURATION]\n\n return notifications\n\n\ndef find_external_notifications(bucket, stack_id):\n existing_notifications = get_bucket_notification_configuration(bucket)\n external_notifications = {}\n for t in CONFIGURATION_TYPES:\n # if the notification was created by us, we know what id to expect\n # so we can filter by it.\n external_notifications[t] = [n for n in existing_notifications.get(t, []) if not n['Id'].startswith(f\"{stack_id}-\")]\n\n # always treat EventBridge configuration as an external config if it already exists\n # as there is no way to determine whether it's managed by us or not\n if EVENTBRIDGE_CONFIGURATION in existing_notifications:\n external_notifications[EVENTBRIDGE_CONFIGURATION] = existing_notifications[EVENTBRIDGE_CONFIGURATION]\n\n return external_notifications\n\n\ndef get_bucket_notification_configuration(bucket):\n return s3.get_bucket_notification_configuration(Bucket=bucket)\n\n\ndef put_bucket_notification_configuration(bucket, notification_configuration):\n s3.put_bucket_notification_configuration(Bucket=bucket, NotificationConfiguration=notification_configuration)\n\n\ndef submit_response(event: dict, context, response_status: str, error_message: str):\n response_body = json.dumps(\n {\n \"Status\": response_status,\n \"Reason\": f\"{error_message}See the details in CloudWatch Log Stream: {context.log_stream_name}\",\n \"PhysicalResourceId\": event.get(\"PhysicalResourceId\") or event[\"LogicalResourceId\"],\n \"StackId\": event[\"StackId\"],\n \"RequestId\": event[\"RequestId\"],\n \"LogicalResourceId\": event[\"LogicalResourceId\"],\n \"NoEcho\": False,\n }\n ).encode(\"utf-8\")\n headers = {\"content-type\": \"\", \"content-length\": str(len(response_body))}\n try:\n req = urllib.request.Request(url=event[\"ResponseURL\"], headers=headers, data=response_body, method=\"PUT\")\n with urllib.request.urlopen(req) as response:\n print(response.read().decode(\"utf-8\"))\n print(\"Status code: \" + response.reason)\n except Exception as e:\n print(\"send(..) failed executing request.urlopen(..): \" + str(e))\n"
},
"Handler": "index.handler",
"Role": {
Expand Down
Loading

0 comments on commit 912aeda

Please sign in to comment.