diff --git a/samples/sample-expunge-vpc/build-lambda.sh b/samples/sample-expunge-vpc/build-lambda.sh new file mode 100755 index 000000000..88363074f --- /dev/null +++ b/samples/sample-expunge-vpc/build-lambda.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +set -e + +cd src/lambda_vpc + +pip install crhelper -t . + +cd - diff --git a/samples/sample-expunge-vpc/buildspec.yml b/samples/sample-expunge-vpc/buildspec.yml index df8e74fb6..e9bc25423 100644 --- a/samples/sample-expunge-vpc/buildspec.yml +++ b/samples/sample-expunge-vpc/buildspec.yml @@ -7,6 +7,7 @@ phases: - python adf-build/generate_params.py build: commands: + - ./build-lambda.sh - bash adf-build/helpers/package_transform.sh artifacts: - files: "**/*" \ No newline at end of file + files: "**/*" diff --git a/samples/sample-expunge-vpc/src/lambda_vpc/crhelper/__init__.py b/samples/sample-expunge-vpc/src/lambda_vpc/crhelper/__init__.py deleted file mode 100755 index 7ac4e3f41..000000000 --- a/samples/sample-expunge-vpc/src/lambda_vpc/crhelper/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from crhelper.resource_helper import CfnResource, SUCCESS, FAILED diff --git a/samples/sample-expunge-vpc/src/lambda_vpc/crhelper/log_helper.py b/samples/sample-expunge-vpc/src/lambda_vpc/crhelper/log_helper.py deleted file mode 100755 index 9339771cf..000000000 --- a/samples/sample-expunge-vpc/src/lambda_vpc/crhelper/log_helper.py +++ /dev/null @@ -1,83 +0,0 @@ -from __future__ import print_function -import json -import logging - - -def _json_formatter(obj): - """Formatter for unserialisable values.""" - return str(obj) - - -class JsonFormatter(logging.Formatter): - """AWS Lambda Logging formatter. - - Formats the log message as a JSON encoded string. If the message is a - dict it will be used directly. If the message can be parsed as JSON, then - the parse d value is used in the output record. - """ - - def __init__(self, **kwargs): - super(JsonFormatter, self).__init__() - self.format_dict = { - 'timestamp': '%(asctime)s', - 'level': '%(levelname)s', - 'location': '%(name)s.%(funcName)s:%(lineno)d', - } - self.format_dict.update(kwargs) - self.default_json_formatter = kwargs.pop( - 'json_default', _json_formatter) - - def format(self, record): - record_dict = record.__dict__.copy() - record_dict['asctime'] = self.formatTime(record) - - log_dict = { - k: v % record_dict - for k, v in self.format_dict.items() - if v - } - - if isinstance(record_dict['msg'], dict): - log_dict['message'] = record_dict['msg'] - else: - log_dict['message'] = record.getMessage() - - # Attempt to decode the message as JSON, if so, merge it with the - # overall message for clarity. - try: - log_dict['message'] = json.loads(log_dict['message']) - except (TypeError, ValueError): - pass - - if record.exc_info: - # Cache the traceback text to avoid converting it multiple times - # (it's constant anyway) - # from logging.Formatter:format - if not record.exc_text: - record.exc_text = self.formatException(record.exc_info) - - if record.exc_text: - log_dict['exception'] = record.exc_text - - json_record = json.dumps(log_dict, default=self.default_json_formatter) - - if hasattr(json_record, 'decode'): # pragma: no cover - json_record = json_record.decode('utf-8') - - return json_record - - -def setup(level='DEBUG', formatter_cls=JsonFormatter, boto_level=None, **kwargs): - if formatter_cls: - for handler in logging.root.handlers: - handler.setFormatter(formatter_cls(**kwargs)) - - logging.root.setLevel(level) - - if not boto_level: - boto_level = level - - logging.getLogger('boto').setLevel(boto_level) - logging.getLogger('boto3').setLevel(boto_level) - logging.getLogger('botocore').setLevel(boto_level) - logging.getLogger('urllib3').setLevel(boto_level) diff --git a/samples/sample-expunge-vpc/src/lambda_vpc/crhelper/resource_helper.py b/samples/sample-expunge-vpc/src/lambda_vpc/crhelper/resource_helper.py deleted file mode 100755 index 6d484797d..000000000 --- a/samples/sample-expunge-vpc/src/lambda_vpc/crhelper/resource_helper.py +++ /dev/null @@ -1,319 +0,0 @@ -# -*- coding: utf-8 -*- -""" -TODO: -* Async mode – take a wait condition handle as an input, increases max timeout to 12 hours -* Idempotency – If a duplicate request comes in (say there was a network error in signaling back to cfn) the subsequent - request should return the already created response, will need a persistent store of some kind... -* Functional tests -""" - -from __future__ import print_function -import threading -from crhelper.utils import _send_response -from crhelper import log_helper -import logging -import random -import boto3 -import string -import json -import os -from time import sleep - -logger = logging.getLogger(__name__) - -SUCCESS = 'SUCCESS' -FAILED = 'FAILED' - - -class CfnResource(object): - - def __init__(self, json_logging=False, log_level='DEBUG', boto_level='ERROR', polling_interval=2): - self._create_func = None - self._update_func = None - self._delete_func = None - self._poll_create_func = None - self._poll_update_func = None - self._poll_delete_func = None - self._timer = None - self._init_failed = None - self._json_logging = json_logging - self._log_level = log_level - self._boto_level = boto_level - self._send_response = False - self._polling_interval = polling_interval - self.Status = "" - self.Reason = "" - self.PhysicalResourceId = "" - self.StackId = "" - self.RequestId = "" - self.LogicalResourceId = "" - self.Data = {} - self._event = {} - self._context = None - self._response_url = "" - self._sam_local = os.getenv('AWS_SAM_LOCAL') - self._region = os.getenv('AWS_REGION') - try: - if not self._sam_local: - self._lambda_client = boto3.client('lambda', region_name=self._region) - self._events_client = boto3.client('events', region_name=self._region) - self._logs_client = boto3.client('logs', region_name=self._region) - if json_logging: - log_helper.setup(log_level, boto_level=boto_level, RequestType='ContainerInit') - else: - log_helper.setup(log_level, formatter_cls=None, boto_level=boto_level) - except Exception as e: - logger.error(e, exc_info=True) - self.init_failure(e) - - def __call__(self, event, context): - try: - self._log_setup(event, context) - logger.debug(event) - self._crhelper_init(event, context) - # Check for polling functions - if self._poll_enabled() and self._sam_local: - logger.info("Skipping poller functionality, as this is a local invocation") - elif self._poll_enabled(): - self._polling_init(event) - # If polling is not enabled, then we should respond - else: - logger.debug("enabling send_response") - self._send_response = True - logger.debug("_send_response: %s" % self._send_response) - if self._send_response: - if self.RequestType == 'Delete': - self._wait_for_cwlogs() - self._cfn_response(event) - except Exception as e: - logger.error(e, exc_info=True) - self._send(FAILED, str(e)) - finally: - if self._timer: - self._timer.cancel() - - def _wait_for_cwlogs(self, sleep=sleep): - sleep_time = int(self._context.get_remaining_time_in_millis() / 1000) - 15 - if sleep_time > 120: - sleep_time = 120 - if sleep_time > 1: - sleep(sleep_time) - - def _log_setup(self, event, context): - if self._json_logging: - log_helper.setup(self._log_level, boto_level=self._boto_level, RequestType=event['RequestType'], - StackId=event['StackId'], RequestId=event['RequestId'], - LogicalResourceId=event['LogicalResourceId'], aws_request_id=context.aws_request_id) - else: - log_helper.setup(self._log_level, boto_level=self._boto_level, formatter_cls=None) - - def _crhelper_init(self, event, context): - self._send_response = False - self.Status = SUCCESS - self.Reason = "" - self.PhysicalResourceId = "" - self.StackId = event["StackId"] - self.RequestId = event["RequestId"] - self.LogicalResourceId = event["LogicalResourceId"] - self.Data = {} - if "CrHelperData" in event.keys(): - self.Data = event["CrHelperData"] - self.RequestType = event["RequestType"] - self._event = event - self._context = context - self._response_url = event['ResponseURL'] - if self._timer: - self._timer.cancel() - if self._init_failed: - return self._send(FAILED, str(self._init_failed)) - self._set_timeout() - self._wrap_function(self._get_func()) - - def _polling_init(self, event): - # Setup polling on initial request - logger.debug("pid1: %s" % self.PhysicalResourceId) - if 'CrHelperPoll' not in event.keys() and self.Status != FAILED: - logger.info("Setting up polling") - self.Data["PhysicalResourceId"] = self.PhysicalResourceId - self._setup_polling() - self.PhysicalResourceId = None - logger.debug("pid2: %s" % self.PhysicalResourceId) - # if physical id is set, or there was a failure then we're done - logger.debug("pid3: %s" % self.PhysicalResourceId) - if self.PhysicalResourceId or self.Status == FAILED: - logger.info("Polling complete, removing cwe schedule") - self._remove_polling() - self._send_response = True - - def _cfn_response(self, event): - # Use existing PhysicalResourceId if it's in the event and no ID was set - if not self.PhysicalResourceId and "PhysicalResourceId" in event.keys(): - logger.info("PhysicalResourceId present in event, Using that for response") - self.PhysicalResourceId = event['PhysicalResourceId'] - # Generate a physical id if none is provided - elif not self.PhysicalResourceId or self.PhysicalResourceId is True: - if "PhysicalResourceId" in event.keys(): - logger.info("PhysicalResourceId present in event, Using that for response") - logger.info("No physical resource id returned, generating one...") - self.PhysicalResourceId = event['StackId'].split('/')[1] + '_' + event[ - 'LogicalResourceId'] + '_' + self._rand_string(8) - self._send() - - def _poll_enabled(self): - return getattr(self, "_poll_{}_func".format(self._event['RequestType'].lower())) - - def create(self, func): - self._create_func = func - return func - - def update(self, func): - self._update_func = func - return func - - def delete(self, func): - self._delete_func = func - return func - - def poll_create(self, func): - self._poll_create_func = func - return func - - def poll_update(self, func): - self._poll_update_func = func - return func - - def poll_delete(self, func): - self._poll_delete_func = func - return func - - def _wrap_function(self, func): - try: - self.PhysicalResourceId = func(self._event, self._context) if func else '' - except Exception as e: - logger.error(str(e), exc_info=True) - self.Reason = str(e) - self.Status = FAILED - - def _timeout(self): - logger.error("Execution is about to time out, sending failure message") - self._send(FAILED, "Execution timed out") - - def _set_timeout(self): - self._timer = threading.Timer((self._context.get_remaining_time_in_millis() / 1000.00) - 0.5, - self._timeout) - self._timer.start() - - def _get_func(self): - request_type = "_{}_func" - if "CrHelperPoll" in self._event.keys(): - request_type = "_poll" + request_type - return getattr(self, request_type.format(self._event['RequestType'].lower())) - - def _send(self, status=None, reason="", send_response=_send_response): - if len(str(str(self.Reason))) > 256: - self.Reason = "ERROR: (truncated) " + str(self.Reason)[len(str(self.Reason)) - 240:] - if len(str(reason)) > 256: - reason = "ERROR: (truncated) " + str(reason)[len(str(reason)) - 240:] - response_body = { - 'Status': self.Status, - 'PhysicalResourceId': str(self.PhysicalResourceId), - 'StackId': self.StackId, - 'RequestId': self.RequestId, - 'LogicalResourceId': self.LogicalResourceId, - 'Reason': str(self.Reason), - 'Data': self.Data, - } - if status: - response_body.update({'Status': status, 'Reason': reason}) - send_response(self._response_url, response_body) - - def init_failure(self, error): - self._init_failed = error - logger.error(str(error), exc_info=True) - - def _cleanup_response(self): - for k in ["CrHelperPoll", "CrHelperPermission", "CrHelperRule"]: - if k in self.Data.keys(): - del self.Data[k] - - @staticmethod - def _rand_string(l): - return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(l)) - - def _add_permission(self, rule_arn): - sid = self._event['LogicalResourceId'] + self._rand_string(8) - self._lambda_client.add_permission( - FunctionName=self._context.function_name, - StatementId=sid, - Action='lambda:InvokeFunction', - Principal='events.amazonaws.com', - SourceArn=rule_arn - ) - return sid - - def _put_rule(self): - response = self._events_client.put_rule( - Name=self._event['LogicalResourceId'] + self._rand_string(8), - ScheduleExpression='rate({} minutes)'.format(self._polling_interval), - State='ENABLED', - ) - return response["RuleArn"] - - def _put_targets(self, func_name): - region = self._event['CrHelperRule'].split(":")[3] - account_id = self._event['CrHelperRule'].split(":")[4] - partition = self._event['CrHelperRule'].split(":")[1] - rule_name = self._event['CrHelperRule'].split("/")[1] - logger.debug(self._event) - self._events_client.put_targets( - Rule=rule_name, - Targets=[ - { - 'Id': '1', - 'Arn': 'arn:%s:lambda:%s:%s:function:%s' % (partition, region, account_id, func_name), - 'Input': json.dumps(self._event) - } - ] - ) - - def _remove_targets(self, rule_arn): - self._events_client.remove_targets( - Rule=rule_arn.split("/")[1], - Ids=['1'] - ) - - def _remove_permission(self, sid): - self._lambda_client.remove_permission( - FunctionName=self._context.function_name, - StatementId=sid - ) - - def _delete_rule(self, rule_arn): - self._events_client.delete_rule( - Name=rule_arn.split("/")[1] - ) - - def _setup_polling(self): - self._event['CrHelperData'] = self.Data - self._event['CrHelperPoll'] = True - self._event['CrHelperRule'] = self._put_rule() - self._event['CrHelperPermission'] = self._add_permission(self._event['CrHelperRule']) - self._put_targets(self._context.function_name) - - def _remove_polling(self): - if 'CrHelperData' in self._event.keys(): - self._event.pop('CrHelperData') - if "PhysicalResourceId" in self.Data.keys(): - self.Data.pop("PhysicalResourceId") - if 'CrHelperRule' in self._event.keys(): - self._remove_targets(self._event['CrHelperRule']) - else: - logger.error("Cannot remove CloudWatch events rule, Rule arn not available in event") - if 'CrHelperPermission' in self._event.keys(): - self._remove_permission(self._event['CrHelperPermission']) - else: - logger.error("Cannot remove lambda events permission, permission id not available in event") - if 'CrHelperRule' in self._event.keys(): - self._delete_rule(self._event['CrHelperRule']) - else: - logger.error("Cannot remove CloudWatch events target, Rule arn not available in event") diff --git a/samples/sample-expunge-vpc/src/lambda_vpc/crhelper/utils.py b/samples/sample-expunge-vpc/src/lambda_vpc/crhelper/utils.py deleted file mode 100755 index 486c2f0f3..000000000 --- a/samples/sample-expunge-vpc/src/lambda_vpc/crhelper/utils.py +++ /dev/null @@ -1,28 +0,0 @@ -from __future__ import print_function -from botocore.vendored import requests -import json -import logging as logging -import time - -logger = logging.getLogger(__name__) - - -def _send_response(response_url, response_body, put=requests.put): - try: - json_response_body = json.dumps(response_body) - except Exception as e: - msg = "Failed to convert response to json: {}".format(str(e)) - logger.error(msg, exc_info=True) - response_body = {'Status': 'FAILED', 'Data': {}, 'Reason': msg} - json_response_body = json.dumps(response_body) - logger.debug("CFN response URL: {}".format(response_url)) - logger.debug(json_response_body) - headers = {'content-type': '', 'content-length': str(len(json_response_body))} - while True: - try: - response = put(response_url, data=json_response_body, headers=headers) - logger.info("CloudFormation returned status code: {}".format(response.reason)) - break - except Exception as e: - logger.error("Unexpected failure sending response to CloudFormation {}".format(e), exc_info=True) - time.sleep(5)