diff --git a/samcli/commands/local/lib/sam_base_provider.py b/samcli/commands/local/lib/sam_base_provider.py index 861e1fd47a..6d03e5e6f9 100644 --- a/samcli/commands/local/lib/sam_base_provider.py +++ b/samcli/commands/local/lib/sam_base_provider.py @@ -21,7 +21,7 @@ class SamBaseProvider(object): # There is not much benefit in infering real values for these parameters in local development context. These values # are usually representative of an AWS environment and stack, but in local development scenario they don't make # sense. If customers choose to, they can always override this value through the CLI interface. - _DEFAULT_PSEUDO_PARAM_VALUES = { + DEFAULT_PSEUDO_PARAM_VALUES = { "AWS::AccountId": "123456789012", "AWS::Partition": "aws", @@ -116,7 +116,7 @@ def _get_parameter_values(template_dict, parameter_overrides): # NOTE: Ordering of following statements is important. It makes sure that any user-supplied values # override the defaults parameter_values = {} - parameter_values.update(SamBaseProvider._DEFAULT_PSEUDO_PARAM_VALUES) + parameter_values.update(SamBaseProvider.DEFAULT_PSEUDO_PARAM_VALUES) parameter_values.update(default_values) parameter_values.update(parameter_overrides or {}) diff --git a/samcli/lib/intrinsic_resolver/__init__.py b/samcli/lib/intrinsic_resolver/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/samcli/lib/intrinsic_resolver/intrinsic_property_resolver.py b/samcli/lib/intrinsic_resolver/intrinsic_property_resolver.py new file mode 100644 index 0000000000..fea3985db5 --- /dev/null +++ b/samcli/lib/intrinsic_resolver/intrinsic_property_resolver.py @@ -0,0 +1,993 @@ +""" +Process and simplifies CloudFormation intrinsic properties such as FN::* and Ref +""" +import copy +import logging + +import base64 +import re + +from six import string_types + +from samcli.lib.intrinsic_resolver.invalid_intrinsic_validation import ( + verify_intrinsic_type_list, + verify_non_null, + verify_intrinsic_type_int, + verify_in_bounds, + verify_number_arguments, + verify_intrinsic_type_str, + verify_intrinsic_type_dict, + verify_intrinsic_type_bool, + verify_all_list_intrinsic_type, +) +from samcli.lib.intrinsic_resolver.invalid_intrinsic_exception import ( + InvalidIntrinsicException, + InvalidSymbolException +) + +LOG = logging.getLogger(__name__) + + +class IntrinsicResolver(object): + AWS_INCLUDE = "AWS::Include" + SUPPORTED_MACRO_TRANSFORMATIONS = [AWS_INCLUDE] + _PSEUDO_REGEX = r"AWS::.*?" + _ATTRIBUTE_REGEX = r"[a-zA-Z0-9]*?\.?[a-zA-Z0-9]*?" + _REGEX_SUB_FUNCTION = r"\$\{(" + _PSEUDO_REGEX + "||" + _ATTRIBUTE_REGEX + r")\}" + + FN_JOIN = "Fn::Join" + FN_SPLIT = "Fn::Split" + FN_SUB = "Fn::Sub" + FN_SELECT = "Fn::Select" + FN_BASE64 = "Fn::Base64" + FN_FIND_IN_MAP = "Fn::FindInMap" + FN_TRANSFORM = "Fn::Transform" + FN_GET_AZS = "Fn::GetAZs" + REF = "Ref" + FN_GET_ATT = "Fn::GetAtt" + FN_IMPORT_VALUE = "Fn::ImportValue" + + SUPPORTED_INTRINSIC_FUNCTIONS = [ + FN_JOIN, + FN_SPLIT, + FN_SUB, + FN_SELECT, + FN_BASE64, + FN_FIND_IN_MAP, + FN_TRANSFORM, + FN_GET_AZS, + REF, + FN_GET_ATT, + FN_IMPORT_VALUE, + ] + + FN_AND = "Fn::And" + FN_OR = "Fn::Or" + FN_IF = "Fn::If" + FN_EQUALS = "Fn::Equals" + FN_NOT = "Fn::Not" + + CONDITIONAL_FUNCTIONS = [FN_AND, FN_OR, FN_IF, FN_EQUALS, FN_NOT] + + def __init__(self, template, symbol_resolver): + """ + Initializes the Intrinsic Property class with the default intrinsic_key_function_map and + conditional_key_function_map. + + In the future, for items like Fn::ImportValue multiple templates can be provided + into the function. + """ + self._template = None + self._resources = None + self._mapping = None + self._parameters = None + self._conditions = None + self.init_template(template) + + self._symbol_resolver = symbol_resolver + + self.intrinsic_key_function_map = self.default_intrinsic_function_map() + self.conditional_key_function_map = self.default_conditional_key_map() + + def init_template(self, template): + self._template = copy.deepcopy(template or {}) + self._resources = self._template.get("Resources", {}) + self._mapping = self._template.get("Mappings", {}) + self._parameters = self._template.get("Parameters", {}) + self._conditions = self._template.get("Conditions", {}) + + def default_intrinsic_function_map(self): + """ + Returns a dictionary containing the mapping from + Intrinsic Function Key -> Intrinsic Resolver. + The intrinsic_resolver function has the format lambda intrinsic: some_retun_value + + Return + ------- + A dictionary containing the mapping from Intrinsic Function Key -> Intrinsic Resolver + """ + return { + IntrinsicResolver.FN_JOIN: self.handle_fn_join, + IntrinsicResolver.FN_SPLIT: self.handle_fn_split, + IntrinsicResolver.FN_SUB: self.handle_fn_sub, + IntrinsicResolver.FN_SELECT: self.handle_fn_select, + IntrinsicResolver.FN_BASE64: self.handle_fn_base64, + IntrinsicResolver.FN_FIND_IN_MAP: self.handle_find_in_map, + IntrinsicResolver.FN_TRANSFORM: self.handle_fn_transform, + IntrinsicResolver.FN_GET_AZS: self.handle_fn_get_azs, + IntrinsicResolver.REF: self.handle_fn_ref, + IntrinsicResolver.FN_GET_ATT: self.handle_fn_getatt, + IntrinsicResolver.FN_IMPORT_VALUE: self.handle_fn_import_value, + } + + def default_conditional_key_map(self): + """ + Returns a dictionary containing the mapping from Conditional + Conditional Intrinsic Function Key -> Conditional Intrinsic Resolver. + The intrinsic_resolver function has the format lambda intrinsic: some_retun_value + + The code was split between conditionals and other intrinsic keys for readability purposes. + Return + ------- + A dictionary containing the mapping from Intrinsic Function Key -> Intrinsic Resolver + """ + return { + IntrinsicResolver.FN_AND: self.handle_fn_and, + IntrinsicResolver.FN_OR: self.handle_fn_or, + IntrinsicResolver.FN_IF: self.handle_fn_if, + IntrinsicResolver.FN_EQUALS: self.handle_fn_equals, + IntrinsicResolver.FN_NOT: self.handle_fn_not, + } + + def set_intrinsic_key_function_map(self, function_map): + """ + Sets the mapping from + Conditional Intrinsic Function Key -> Conditional Intrinsic Resolver. + The intrinsic_resolver function has the format lambda intrinsic: some_retun_value + + A user of this function can set the function map directly or can get the default_conditional_key_map directly. + + + """ + self.intrinsic_key_function_map = function_map + + def set_conditional_function_map(self, function_map): + """ + Sets the mapping from + Conditional Intrinsic Function Key -> Conditional Intrinsic Resolver. + The intrinsic_resolver function has the format lambda intrinsic: some_retun_value + + A user of this function can set the function map directly or can get the default_intrinsic_function_map directly + + The code was split between conditionals and other intrinsic keys for readability purposes. + + """ + self.conditional_key_function_map = function_map + + def intrinsic_property_resolver(self, intrinsic, parent_function="template"): + """ + This resolves the intrinsic of the format + { + intrinsic: dict + } by calling the function with the relevant intrinsic function resolver. + + This also supports returning a string, list, boolean, int since they may be intermediate steps in the recursion + process. No transformations are done on these. + + By default this will just return the item if non of the types match. This is because of the function + resolve_all_attributes which will recreate the resources by processing every aspect of resource. + + This code resolves in a top down depth first fashion in order to create a functional style recursion that + doesn't mutate any of the properties. + + Parameters + ---------- + intrinsic: dict, str, list, bool, int + This is an intrinsic property or an intermediate step + parent_function: str + In case there is a missing property, this is used to figure out where the property resolved is missing. + Return + --------- + The simplified version of the intrinsic function. This could be a list,str,dict depending on the format required + """ + if intrinsic is None: + raise InvalidIntrinsicException( + "Missing Intrinsic property in {}".format(parent_function) + ) + + if ( + any( + isinstance(intrinsic, object_type) + for object_type in [string_types, list, bool, int] + ) or intrinsic == {} + ): + return intrinsic + + keys = list(intrinsic.keys()) + key = keys[0] + + if key in self.intrinsic_key_function_map: + intrinsic_value = intrinsic.get(key) + return self.intrinsic_key_function_map.get(key)(intrinsic_value) + elif key in self.conditional_key_function_map: + intrinsic_value = intrinsic.get(key) + return self.conditional_key_function_map.get(key)(intrinsic_value) + + # In this case, it is a dictionary that doesn't directly contain an intrinsic resolver, we must recursively + # resolve each of it's sub properties. + sanitized_dict = {} + for key, val in intrinsic.items(): + sanitized_key = self.intrinsic_property_resolver( + key, parent_function=parent_function + ) + sanitized_val = self.intrinsic_property_resolver( + val, parent_function=parent_function + ) + verify_intrinsic_type_str( + sanitized_key, + message="The keys of the dictionary {} in {} must all resolve to a string".format( + sanitized_key, parent_function + ), + ) + sanitized_dict[sanitized_key] = sanitized_val + return sanitized_dict + + def resolve_template(self, ignore_errors=False): + """ + This will parse through every entry in a CloudFormation template and resolve them based on the symbol_resolver. + Customers can optionally ignore resource errors and default to whatever the resource provides. + + Parameters + ----------- + ignore_errors: bool + An option to ignore errors that are InvalidIntrinsicException and InvalidSymbolException + Return + ------- + A resolved template with all references possible simplified + """ + processed_template = {} + for key, val in self._resources.items(): + processed_key = self._symbol_resolver.get_translation(key) or key + try: + processed_resource = self.intrinsic_property_resolver(val) + processed_template[processed_key] = processed_resource + except (InvalidIntrinsicException, InvalidSymbolException) as e: + resource_type = val.get("Type", "") + if ignore_errors: + LOG.error( + "Unable to process properties of %s.%s", key, resource_type + ) + processed_template[key] = val + else: + raise InvalidIntrinsicException( + "Exception with property of {}.{}".format(key, resource_type) + ": " + str(e.args) + ) + return processed_template + + def handle_fn_join(self, intrinsic_value): + """ + { "Fn::Join" : [ "delimiter", [ comma-delimited list of values ] ] } + This function will join the items in the list together based on the string using the python join. + + This intrinsic function will resolve all the objects within the function's value and check their type. + + Parameter + ---------- + intrinsic_value: list, dict + This is the value of the object inside the Fn::Join intrinsic function property + + Return + ------- + A string with the resolved attributes + """ + arguments = self.intrinsic_property_resolver( + intrinsic_value, parent_function=IntrinsicResolver.FN_JOIN + ) + + verify_intrinsic_type_list(arguments, IntrinsicResolver.FN_JOIN) + + delimiter = arguments[0] + + verify_intrinsic_type_str( + delimiter, IntrinsicResolver.FN_JOIN, position_in_list="first" + ) + + value_list = self.intrinsic_property_resolver( + arguments[1], parent_function=IntrinsicResolver.FN_JOIN + ) + + verify_intrinsic_type_list( + value_list, + IntrinsicResolver.FN_JOIN, + message="The list of values in {} after the " + "delimiter must be a list".format(IntrinsicResolver.FN_JOIN), + ) + + sanitized_value_list = [ + self.intrinsic_property_resolver( + item, parent_function=IntrinsicResolver.FN_JOIN + ) + for item in value_list + ] + verify_all_list_intrinsic_type( + sanitized_value_list, + verification_func=verify_intrinsic_type_str, + property_type=IntrinsicResolver.FN_JOIN, + ) + + return delimiter.join(sanitized_value_list) + + def handle_fn_split(self, intrinsic_value): + """ + { "Fn::Split" : [ "delimiter", "source string" ] } + This function will then split the source_string based on the delimiter + + This intrinsic function will resolve all the objects within the function's value and check their type. + Parameter + ---------- + intrinsic_value: list, dict + This is the value of the object inside the Fn::Split intrinsic function property + + Return + ------- + A string with the resolved attributes + """ + arguments = self.intrinsic_property_resolver( + intrinsic_value, parent_function=IntrinsicResolver.FN_SPLIT + ) + + verify_intrinsic_type_list(arguments, IntrinsicResolver.FN_SPLIT) + + delimiter = arguments[0] + + verify_intrinsic_type_str( + delimiter, IntrinsicResolver.FN_SPLIT, position_in_list="first" + ) + + source_string = self.intrinsic_property_resolver( + arguments[1], parent_function=IntrinsicResolver.FN_SPLIT + ) + + verify_intrinsic_type_str( + source_string, IntrinsicResolver.FN_SPLIT, position_in_list="second" + ) + + return source_string.split(delimiter) + + def handle_fn_base64(self, intrinsic_value): + """ + { "Fn::Base64" : valueToEncode } + This intrinsic function will then base64 encode the string using python's base64. + + This function will resolve all the intrinsic properties in valueToEncode + Parameter + ---------- + intrinsic_value: list, dict + This is the value of the object inside the Fn::Base64 intrinsic function property + + Return + ------- + A string with the resolved attributes + """ + data = self.intrinsic_property_resolver( + intrinsic_value, parent_function=IntrinsicResolver.FN_BASE64 + ) + + verify_intrinsic_type_str(data, IntrinsicResolver.FN_BASE64) + # Encoding then decoding is required to return a string of the data + return base64.b64encode(data.encode()).decode() + + def handle_fn_select(self, intrinsic_value): + """ + { "Fn::Select" : [ index, listOfObjects ] } + It will select the item in the listOfObjects using python's base64. + This intrinsic function will resolve all the objects within the function's value and check their type. + Parameter + ---------- + intrinsic_value: list, dict + This is the value of the object inside the Fn::Select intrinsic function property + + Return + ------- + A string with the resolved attributes + """ + arguments = self.intrinsic_property_resolver( + intrinsic_value, parent_function=IntrinsicResolver.FN_SELECT + ) + + verify_intrinsic_type_list(arguments, IntrinsicResolver.FN_SELECT) + + index = self.intrinsic_property_resolver( + arguments[0], parent_function=IntrinsicResolver.FN_SELECT + ) + + verify_intrinsic_type_int(index, IntrinsicResolver.FN_SELECT) + + list_of_objects = self.intrinsic_property_resolver( + arguments[1], parent_function=IntrinsicResolver.FN_SELECT + ) + verify_intrinsic_type_list(list_of_objects, IntrinsicResolver.FN_SELECT) + + sanitized_objects = [ + self.intrinsic_property_resolver( + item, parent_function=IntrinsicResolver.FN_SELECT + ) + for item in list_of_objects + ] + + verify_in_bounds( + index=index, + objects=sanitized_objects, + property_type=IntrinsicResolver.FN_SELECT, + ) + + return sanitized_objects[index] + + def handle_find_in_map(self, intrinsic_value): + """ + { "Fn::FindInMap" : [ "MapName", "TopLevelKey", "SecondLevelKey"] } This function will then lookup the + specified dictionary in the Mappings dictionary as mappings[map_name][top_level_key][second_level_key]. + + This intrinsic function will resolve all the objects within the function's value and check their type. + + The format of the Mappings dictionary is: + "Mappings": { + "map_name": { + "top_level_key": { + "second_level_key": "value" + } + } + } + } + Parameter + ---------- + intrinsic_value: list, dict + This is the value of the object inside the Fn::FindInMap intrinsic function property + + Return + ------- + A string with the resolved attributes + """ + arguments = self.intrinsic_property_resolver( + intrinsic_value, parent_function=IntrinsicResolver.FN_FIND_IN_MAP + ) + + verify_intrinsic_type_list(arguments, IntrinsicResolver.FN_FIND_IN_MAP) + + verify_number_arguments( + arguments, num=3, property_type=IntrinsicResolver.FN_FIND_IN_MAP + ) + + map_name = self.intrinsic_property_resolver( + arguments[0], parent_function=IntrinsicResolver.FN_FIND_IN_MAP + ) + top_level_key = self.intrinsic_property_resolver( + arguments[1], parent_function=IntrinsicResolver.FN_FIND_IN_MAP + ) + second_level_key = self.intrinsic_property_resolver( + arguments[2], parent_function=IntrinsicResolver.FN_FIND_IN_MAP + ) + + verify_intrinsic_type_str( + map_name, IntrinsicResolver.FN_FIND_IN_MAP, position_in_list="first" + ) + verify_intrinsic_type_str( + top_level_key, IntrinsicResolver.FN_FIND_IN_MAP, position_in_list="second" + ) + verify_intrinsic_type_str( + second_level_key, IntrinsicResolver.FN_FIND_IN_MAP, position_in_list="third" + ) + + map_value = self._mapping.get(map_name) + verify_intrinsic_type_dict( + map_value, + IntrinsicResolver.FN_FIND_IN_MAP, + position_in_list="first", + message="The MapName is missing in the Mappings dictionary in Fn::FindInMap for {}".format( + map_name + ), + ) + + top_level_value = map_value.get(top_level_key) + verify_intrinsic_type_dict( + top_level_value, + IntrinsicResolver.FN_FIND_IN_MAP, + message="The TopLevelKey is missing in the Mappings dictionary in Fn::FindInMap " + "for {}".format(top_level_key), + ) + + second_level_value = top_level_value.get(second_level_key) + verify_intrinsic_type_str( + second_level_value, + IntrinsicResolver.FN_FIND_IN_MAP, + message="The SecondLevelKey is missing in the Mappings dictionary in Fn::FindInMap " + "for {}".format(second_level_key), + ) + + return second_level_value + + def handle_fn_get_azs(self, intrinsic_value): + """ + { "Fn::GetAZs" : "" } + { "Fn::GetAZs" : { "Ref" : "AWS::Region" } } + { "Fn::GetAZs" : "us-east-1" } + This intrinsic function will get the availability zones specified for the specified region. This is usually used + with {"Ref": "AWS::Region"}. If it is an empty string, it will get the default region. + + This intrinsic function will resolve all the objects within the function's value and check their type. + Parameter + ---------- + intrinsic_value: list, dict + This is the value of the object inside the Fn::GetAZs intrinsic function property + + Return + ------- + A string with the resolved attributes + """ + intrinsic_value = self.intrinsic_property_resolver( + intrinsic_value, parent_function=IntrinsicResolver.FN_GET_AZS + ) + verify_intrinsic_type_str(intrinsic_value, IntrinsicResolver.FN_GET_AZS) + + if intrinsic_value == "": + intrinsic_value = self._symbol_resolver.DEFAULT_REGION + + if intrinsic_value not in self._symbol_resolver.REGIONS: + raise InvalidIntrinsicException( + "Invalid region string passed in to {}".format( + IntrinsicResolver.FN_GET_AZS + ) + ) + + return self._symbol_resolver.REGIONS.get(intrinsic_value) + + def handle_fn_transform(self, intrinsic_value): + """ + { "Fn::Transform" : { "Name" : macro name, "Parameters" : {key : value, ... } } } + This intrinsic function will transform the data with the body provided + + This intrinsic function will resolve all the objects within the function's value and check their type. + Parameter + ---------- + intrinsic_value: list, dict + This is the value of the object inside the Fn::Transform intrinsic function property + + Return + ------- + A string with the resolved attributes + """ + macro_name = intrinsic_value.get("Name") + name = self.intrinsic_property_resolver( + macro_name, parent_function=IntrinsicResolver.FN_TRANSFORM + ) + + if name not in IntrinsicResolver.SUPPORTED_MACRO_TRANSFORMATIONS: + raise InvalidIntrinsicException( + "The type {} is not currently supported in {}".format( + name, IntrinsicResolver.FN_TRANSFORM + ) + ) + + parameters = intrinsic_value.get("Parameters") + verify_intrinsic_type_dict( + parameters, + IntrinsicResolver.FN_TRANSFORM, + message=" Fn::Transform requires parameters section", + ) + + location = self.intrinsic_property_resolver(parameters.get("Location")) + return location + + def handle_fn_import_value(self, intrinsic_value): + """ + { "Fn::ImportValue" : sharedValueToImport } + This intrinsic function requires handling multiple stacks, which is not currently supported by SAM-CLI. + Thus, it will thrown an exception. + + Return + ------- + An InvalidIntrinsicException + """ + raise InvalidIntrinsicException( + "Fn::ImportValue is currently not supported by IntrinsicResolver" + ) + + def handle_fn_getatt(self, intrinsic_value): + """ + { "Fn::GetAtt" : [ "logicalNameOfResource", "attributeName" ] } + This intrinsic function gets the attribute for logical_resource specified. Each attribute might have a different + functionality depending on the type. + + This intrinsic function will resolve all the objects within the function's value and check their type. + This calls the symbol resolver in order to resolve the relevant attribute. + Parameter + ---------- + intrinsic_value: list, dict + This is the value of the object inside the Fn::GetAtt intrinsic function property + + Return + ------- + A string with the resolved attributes + """ + arguments = self.intrinsic_property_resolver( + intrinsic_value, parent_function=IntrinsicResolver.FN_GET_ATT + ) + verify_intrinsic_type_list(arguments, IntrinsicResolver.FN_GET_ATT) + verify_number_arguments(arguments, IntrinsicResolver.FN_GET_ATT, num=2) + + logical_id = self.intrinsic_property_resolver( + arguments[0], parent_function=IntrinsicResolver.FN_GET_ATT + ) + resource_type = self.intrinsic_property_resolver( + arguments[1], parent_function=IntrinsicResolver.FN_GET_ATT + ) + + verify_intrinsic_type_str(logical_id, IntrinsicResolver.FN_GET_ATT) + verify_intrinsic_type_str(resource_type, IntrinsicResolver.FN_GET_ATT) + + return self._symbol_resolver.resolve_symbols(logical_id, resource_type) + + def handle_fn_ref(self, intrinsic_value): + """ + {"Ref": "Logical ID"} + This intrinsic function gets the reference to a certain attribute. Some Ref's have different functionality with + different resource types. + + This intrinsic function will resolve all the objects within the function's value and check their type. + This calls the symbol resolver in order to resolve the relevant attribute. + Parameter + ---------- + intrinsic_value: str + This is the value of the object inside the Ref intrinsic function property + + Return + ------- + A string with the resolved attributes + """ + arguments = self.intrinsic_property_resolver( + intrinsic_value, parent_function=IntrinsicResolver.REF + ) + verify_intrinsic_type_str(arguments, IntrinsicResolver.REF) + + return self._symbol_resolver.resolve_symbols(arguments, IntrinsicResolver.REF) + + def handle_fn_sub(self, intrinsic_value): + """ + { "Fn::Sub" : [ String, { Var1Name: Var1Value, Var2Name: Var2Value } ] } or { "Fn::Sub" : String } + This intrinsic function will substitute the variables specified in the list into the string provided. The string + will also parse out pseudo properties and anything of the form ${}. + + This intrinsic function will resolve all the objects within the function's value and check their type. + Parameter + ---------- + intrinsic_value: list, dict + This is the value of the object inside the Fn::Join intrinsic function property + + Return + ------- + A string with the resolved attributes + """ + + def resolve_sub_attribute(intrinsic_item, symbol_resolver): + if "." in intrinsic_item: + (logical_id, attribute_type) = intrinsic_item.rsplit(".", 1) + else: + (logical_id, attribute_type) = intrinsic_item, IntrinsicResolver.REF + return symbol_resolver.resolve_symbols( + logical_id, attribute_type, ignore_errors=True + ) + + if isinstance(intrinsic_value, string_types): + intrinsic_value = [intrinsic_value, {}] + + verify_intrinsic_type_list( + intrinsic_value, + IntrinsicResolver.FN_SUB, + message="The arguments to a Fn::Sub must be a list or a string", + ) + + verify_number_arguments(intrinsic_value, IntrinsicResolver.FN_SUB, num=2) + + sub_str = self.intrinsic_property_resolver( + intrinsic_value[0], parent_function=IntrinsicResolver.FN_SUB + ) + verify_intrinsic_type_str( + sub_str, IntrinsicResolver.FN_SUB, position_in_list="first" + ) + + variables = intrinsic_value[1] + verify_intrinsic_type_dict( + variables, IntrinsicResolver.FN_SUB, position_in_list="second" + ) + + sanitized_variables = self.intrinsic_property_resolver( + variables, parent_function=IntrinsicResolver.FN_SUB + ) + + subable_props = re.findall( + string=sub_str, pattern=IntrinsicResolver._REGEX_SUB_FUNCTION + ) + for sub_item in subable_props: + sanitized_item = ( + sanitized_variables[sub_item] + if sub_item in sanitized_variables + else sub_item + ) + result = resolve_sub_attribute(sanitized_item, self._symbol_resolver) + sub_str = re.sub( + pattern=r"\$\{" + sub_item + r"\}", string=sub_str, repl=result + ) + return sub_str + + def handle_fn_if(self, intrinsic_value): + """ + {"Fn::If": [condition_name, value_if_true, value_if_false]} + This intrinsic function will evaluate the condition from the Conditions dictionary and then return value_if_true + or value_if_false depending on the value. + + The Conditions dictionary will have the following format: + { + "Conditions": { + "condition_name": True/False or "{Intrinsic Function}" + } + } + + This intrinsic function will resolve all the objects within the function's value and check their type. + Parameter + ---------- + intrinsic_value: list, dict + This is the value of the object inside the Fn::Join intrinsic function property + + Return + ------- + This will return value_if_true and value_if_false depending on how the condition is evaluated + """ + arguments = self.intrinsic_property_resolver( + intrinsic_value, parent_function=IntrinsicResolver.FN_IF + ) + verify_intrinsic_type_list(arguments, IntrinsicResolver.FN_IF) + verify_number_arguments(arguments, IntrinsicResolver.FN_IF, num=3) + + condition_name = self.intrinsic_property_resolver( + arguments[0], parent_function=IntrinsicResolver.FN_IF + ) + verify_intrinsic_type_str(condition_name, IntrinsicResolver.FN_IF) + + value_if_true = self.intrinsic_property_resolver( + arguments[1], parent_function=IntrinsicResolver.FN_IF + ) + value_if_false = self.intrinsic_property_resolver( + arguments[2], parent_function=IntrinsicResolver.FN_IF + ) + + condition = self._conditions.get(condition_name) + verify_intrinsic_type_dict( + condition, + IntrinsicResolver.FN_IF, + message="The condition is missing in the Conditions dictionary for {}".format( + IntrinsicResolver.FN_IF + ), + ) + + condition_evaluated = self.intrinsic_property_resolver( + condition, parent_function=IntrinsicResolver.FN_IF + ) + verify_intrinsic_type_bool( + condition_evaluated, + IntrinsicResolver.FN_IF, + message="The result of {} must evaluate to bool".format( + IntrinsicResolver.FN_IF + ), + ) + + return value_if_true if condition_evaluated else value_if_false + + def handle_fn_equals(self, intrinsic_value): + """ + {"Fn::Equals" : ["value_1", "value_2"]} + This intrinsic function will verify that both items in the intrinsic function are equal after resolving them. + + This intrinsic function will resolve all the objects within the function's value and check their type. + Parameter + ---------- + intrinsic_value: list, dict + This is the value of the object inside the Fn::Join intrinsic function property + + Return + ------- + A boolean depending on if both arguments is equal + """ + arguments = self.intrinsic_property_resolver( + intrinsic_value, parent_function=IntrinsicResolver.FN_EQUALS + ) + verify_intrinsic_type_list(arguments, IntrinsicResolver.FN_EQUALS) + verify_number_arguments(arguments, IntrinsicResolver.FN_EQUALS, num=2) + + value_1 = self.intrinsic_property_resolver( + arguments[0], parent_function=IntrinsicResolver.FN_EQUALS + ) + value_2 = self.intrinsic_property_resolver( + arguments[1], parent_function=IntrinsicResolver.FN_EQUALS + ) + return value_1 == value_2 + + def handle_fn_not(self, intrinsic_value): + """ + {"Fn::Not": [{condition}]} + This intrinsic function will negate the evaluation of the condition specified. + + This intrinsic function will resolve all the objects within the function's value and check their type. + Parameter + ---------- + intrinsic_value: list, dict + This is the value of the object inside the Fn::Join intrinsic function property + + Return + ------- + A boolean that is the opposite of the condition evaluated + """ + arguments = self.intrinsic_property_resolver( + intrinsic_value, parent_function=IntrinsicResolver.FN_NOT + ) + verify_intrinsic_type_list(arguments, IntrinsicResolver.FN_NOT) + verify_number_arguments(arguments, IntrinsicResolver.FN_NOT, num=1) + argument_sanitised = self.intrinsic_property_resolver( + arguments[0], parent_function=IntrinsicResolver.FN_NOT + ) + if isinstance(argument_sanitised, dict) and "Condition" in arguments[0]: + condition_name = argument_sanitised.get("Condition") + verify_intrinsic_type_str(condition_name, IntrinsicResolver.FN_NOT) + + condition = self._conditions.get(condition_name) + verify_non_null( + condition, IntrinsicResolver.FN_NOT, position_in_list="first" + ) + + argument_sanitised = self.intrinsic_property_resolver( + condition, parent_function=IntrinsicResolver.FN_NOT + ) + + verify_intrinsic_type_bool( + argument_sanitised, + IntrinsicResolver.FN_NOT, + message="The result of {} must evaluate to bool".format( + IntrinsicResolver.FN_NOT + ), + ) + return not argument_sanitised + + @staticmethod + def get_prefix_position_in_list(i): + """ + Gets the prefix for the string "ith element of the list", handling first, second, and third. + :param i: + :return: + """ + prefix = "{} th ".format(str(i)) + if i == 1: + prefix = "first " + elif i == 2: + prefix = "second " + elif i == 3: + prefix = "third " + return prefix + + def handle_fn_and(self, intrinsic_value): + """ + {"Fn::And": [{condition}, {...}]} + This intrinsic checks that every item in the list evaluates to a boolean. The items in the list can either + be of the format {Condition: condition_name} which finds and evaluates the Conditions dictionary of another + intrinsic function. + + The Conditions dictionary will have the following format: + { + "Conditions": { + "condition_name": True/False or "{Intrinsic Function}" + } + } + + This intrinsic function will resolve all the objects within the function's value and check their type. + Parameter + ---------- + intrinsic_value: list, dict + This is the value of the object inside the Fn::Join intrinsic function property + + Return + ------- + A boolean depending on if all of the properties in Fn::And evaluate to True + """ + arguments = self.intrinsic_property_resolver( + intrinsic_value, parent_function=IntrinsicResolver.FN_AND + ) + verify_intrinsic_type_list(arguments, IntrinsicResolver.FN_AND) + + for i, argument in enumerate(arguments): + if isinstance(argument, dict) and "Condition" in argument: + condition_name = argument.get("Condition") + verify_intrinsic_type_str(condition_name, IntrinsicResolver.FN_AND) + + condition = self._conditions.get(condition_name) + verify_non_null( + condition, + IntrinsicResolver.FN_AND, + position_in_list=self.get_prefix_position_in_list(i), + ) + + condition_evaluated = self.intrinsic_property_resolver( + condition, parent_function=IntrinsicResolver.FN_AND + ) + verify_intrinsic_type_bool( + condition_evaluated, IntrinsicResolver.FN_AND + ) + + if not condition_evaluated: + return False + else: + condition = self.intrinsic_property_resolver( + argument, parent_function=IntrinsicResolver.FN_AND + ) + verify_intrinsic_type_bool(condition, IntrinsicResolver.FN_AND) + + if not condition: + return False + + return True + + def handle_fn_or(self, intrinsic_value): + """ + {"Fn::Or": [{condition}, {...}]} + This intrinsic checks that a single item in the list evaluates to a boolean. The items in the list can either + be of the format {Condition: condition_name} which finds and evaluates the Conditions dictionary of another + intrinsic function. + + The Conditions dictionary will have the following format: + { + "Conditions": { + "condition_name": True/False or "{Intrinsic Function}" + } + } + + This intrinsic function will resolve all the objects within the function's value and check their type. + Parameter + ---------- + intrinsic_value: list, dict + This is the value of the object inside the Fn::Join intrinsic function property + + Return + ------- + A boolean depending on if any of the properties in Fn::And evaluate to True + """ + arguments = self.intrinsic_property_resolver( + intrinsic_value, parent_function=IntrinsicResolver.FN_OR + ) + verify_intrinsic_type_list(arguments, IntrinsicResolver.FN_OR) + + for i, argument in enumerate(arguments): + if isinstance(argument, dict) and "Condition" in argument: + condition_name = argument.get("Condition") + verify_intrinsic_type_str(condition_name, IntrinsicResolver.FN_OR) + + condition = self._conditions.get(condition_name) + verify_non_null( + condition, + IntrinsicResolver.FN_OR, + position_in_list=self.get_prefix_position_in_list(i), + ) + + condition_evaluated = self.intrinsic_property_resolver( + condition, parent_function=IntrinsicResolver.FN_OR + ) + verify_intrinsic_type_bool(condition_evaluated, IntrinsicResolver.FN_OR) + + if condition_evaluated: + return True + else: + condition = self.intrinsic_property_resolver( + argument, parent_function=IntrinsicResolver.FN_OR + ) + verify_intrinsic_type_bool(condition, IntrinsicResolver.FN_OR) + + if condition: + return True + + return False diff --git a/samcli/lib/intrinsic_resolver/intrinsics_symbol_table.py b/samcli/lib/intrinsic_resolver/intrinsics_symbol_table.py new file mode 100644 index 0000000000..155955964b --- /dev/null +++ b/samcli/lib/intrinsic_resolver/intrinsics_symbol_table.py @@ -0,0 +1,417 @@ +""" +The symbol table that is used in IntrinsicResolver in order to resolve runtime attributes +""" +import os + +from six import string_types + +from samcli.commands.local.lib.sam_base_provider import SamBaseProvider +from samcli.lib.intrinsic_resolver.intrinsic_property_resolver import IntrinsicResolver +from samcli.lib.intrinsic_resolver.invalid_intrinsic_exception import ( + InvalidSymbolException, +) + + +class IntrinsicsSymbolTable(object): + AWS_ACCOUNT_ID = "AWS::AccountId" + AWS_NOTIFICATION_ARN = "AWS::NotificationArn" + AWS_PARTITION = "AWS::Partition" + AWS_REGION = "AWS::Region" + AWS_STACK_ID = "AWS::StackId" + AWS_STACK_NAME = "AWS::StackName" + AWS_URL_PREFIX = "AWS::URLSuffix" + AWS_NOVALUE = "AWS::NoValue" + SUPPORTED_PSEUDO_TYPES = [ + AWS_ACCOUNT_ID, + AWS_NOTIFICATION_ARN, + AWS_PARTITION, + AWS_REGION, + AWS_STACK_ID, + AWS_STACK_NAME, + AWS_URL_PREFIX, + AWS_NOVALUE, + ] + + DEFAULT_REGION = "us-east-1" + REGIONS = { + "us-east-1": [ + "us-east-1a", + "us-east-1b", + "us-east-1c", + "us-east-1d", + "us-east-1e", + "us-east-1f", + ], + "us-west-1": ["us-west-1b", "us-west-1c"], + "eu-north-1": ["eu-north-1a", "eu-north-1b", "eu-north-1c"], + "ap-northeast-3": ["ap-northeast-3a"], + "ap-northeast-2": ["ap-northeast-2a", "ap-northeast-2b", "ap-northeast-2c"], + "ap-northeast-1": ["ap-northeast-1a", "ap-northeast-1c", "ap-northeast-1d"], + "sa-east-1": ["sa-east-1a", "sa-east-1c"], + "ap-southeast-1": ["ap-southeast-1a", "ap-southeast-1b", "ap-southeast-1c"], + "ca-central-1": ["ca-central-1a", "ca-central-1b"], + "ap-southeast-2": ["ap-southeast-2a", "ap-southeast-2b", "ap-southeast-2c"], + "us-west-2": ["us-west-2a", "us-west-2b", "us-west-2c", "us-west-2d"], + "us-east-2": ["us-east-2a", "us-east-2b", "us-east-2c"], + "ap-south-1": ["ap-south-1a", "ap-south-1b", "ap-south-1c"], + "eu-central-1": ["eu-central-1a", "eu-central-1b", "eu-central-1c"], + "eu-west-1": ["eu-west-1a", "eu-west-1b", "eu-west-1c"], + "eu-west-2": ["eu-west-2a", "eu-west-2b", "eu-west-2c"], + "eu-west-3": ["eu-west-3a", "eu-west-3b", "eu-west-3c"], + "cn-north-1": [], + "us-gov-west-1": [], + } + + DEFAULT_PARTITION = "aws" + GOV_PARTITION = "aws-us-gov" + CHINA_PARTITION = "aws-cn" + CHINA_PREFIX = "cn" + GOV_PREFIX = "gov" + + CHINA_URL_PREFIX = "amazonaws.com.cn" + DEFAULT_URL_PREFIX = "amazonaws.com" + + AWS_NOTIFICATION_SERVICE_NAME = "sns" + ARN_SUFFIX = ".Arn" + + CFN_RESOURCE_TYPE = "Type" + + def __init__( + self, + template=None, + logical_id_translator=None, + default_type_resolver=None, + common_attribute_resolver=None, + ): + """ + Initializes the Intrinsic Symbol Table so that runtime attributes can be resolved. + + The code is defaulted in the following order logical_id_translator => parameters => default_type_resolver => + common_attribute_resolver + + If the item is a pseudo type, it will run through the logical_id_translator and if it doesn't exist there + it will generate a default one and save it in the logical_id_translator as a cache for future computation. + Parameters + ------------ + logical_id_translator: dict + This will act as the default symbol table resolver. The resolver will first check if the attribute is + explicitly defined in this dictionary and do the relevant translation. + + All Logical Ids and Pseudo types can be included here. + { + "RestApi.Test": { # this could be used with RestApi.Deployment => NewRestApi + "Ref": "NewRestApi" + }, + "LambdaFunction": { + "Ref": "LambdaFunction", + "Arn": "MyArn" + } + "AWS::Region": "us-east-1" + } + default_type_resolver: dict + This can be used provide common attributes that are true across all objects of a certain type. + This can be in the format of + { + "AWS::ApiGateway::RestApi": { + "RootResourceId": "/" + } + } + or can also be a function that takes in (logical_id, attribute_type) => string + { + "AWS::ApiGateway::RestApi": { + "RootResourceId": (lambda l, a, p, r: p.get("ResourceId")) + } + } + common_attribute_resolver: dict + This is a clean way of specifying common attributes across all types. + The value can either be a function of the form string or (logical_id) => string + { + "Ref": lambda p,r: "", + "Arn:": arn_resolver + } + """ + self.logical_id_translator = logical_id_translator or {} + + self._template = template or {} + self._parameters = self._template.get("Parameters", {}) + self._resources = self._template.get("Resources", {}) + + self.default_type_resolver = ( + default_type_resolver or self.get_default_type_resolver() + ) + self.common_attribute_resolver = ( + common_attribute_resolver or self.get_default_attribute_resolver() + ) + self.default_pseudo_resolver = self.get_default_pseudo_resolver() + + def get_default_pseudo_resolver(self): + return { + IntrinsicsSymbolTable.AWS_ACCOUNT_ID: self.handle_pseudo_account_id, + IntrinsicsSymbolTable.AWS_PARTITION: self.handle_pseudo_partition, + IntrinsicsSymbolTable.AWS_REGION: self.handle_pseudo_region, + IntrinsicsSymbolTable.AWS_STACK_ID: self.handle_pseudo_stack_id, + IntrinsicsSymbolTable.AWS_STACK_NAME: self.handle_pseudo_stack_name, + IntrinsicsSymbolTable.AWS_NOVALUE: self.handle_pseudo_no_value, + IntrinsicsSymbolTable.AWS_URL_PREFIX: self.handle_pseudo_url_prefix, + } + + def get_default_attribute_resolver(self): + return {"Ref": lambda logical_id: logical_id, "Arn": self.arn_resolver} + + @staticmethod + def get_default_type_resolver(): + return { + "AWS::ApiGateway::RestApi": { + "RootResourceId": "/" # It usually used as a reference to the parent id of the RestApi, + } + } + + def resolve_symbols(self, logical_id, resource_attribute, ignore_errors=False): + """ + This function resolves all the symbols given a logical id and a resource_attribute for Fn::GetAtt and Ref. + This boils Ref into a type of Fn:GetAtt to simplify the implementation. + For example: + {"Ref": "AWS::REGION"} => resolve_symbols("AWS::REGION", "REF") + {"Fn::GetAtt": ["logical_id", "attribute_type"] => resolve_symbols(logical_id, attribute_type) + + + First pseudo types are checked. If item is present in the logical_id_translator it is returned. + Otherwise, it falls back to the default_pseudo_resolver + + Then the default_type_resolver is checked, which has common attributes and functions for each types. + Then the common_attribute_resolver is run, which has functions that are common for each attribute. + Parameters + ----------- + logical_id: str + The logical id of the resource in question or a pseudo type. + resource_attribute: str + The resource attribute of the resource in question or Ref for psuedo types. + ignore_errors: bool + An optional flags to not return errors. This used in sub + + Return + ------- + This resolves the attribute + """ + # pylint: disable-msg=too-many-return-statements + translated = self.get_translation(logical_id, resource_attribute) + if translated: + return translated + + if logical_id in self.SUPPORTED_PSEUDO_TYPES: + translated = self.default_pseudo_resolver.get(logical_id)() + self.logical_id_translator[logical_id] = translated + return translated + + # Handle Default Parameters + translated = self._parameters.get(logical_id, {}).get("Default") + if translated: + return translated + + # Handle Default Property Type Resolution + resource_type = self._resources.get(logical_id, {}).get( + IntrinsicsSymbolTable.CFN_RESOURCE_TYPE + ) + resolver = ( + self.default_type_resolver.get(resource_type, {}).get(resource_attribute) + if resource_type + else {} + ) + if resolver: + if callable(resolver): + return resolver(logical_id, resource_attribute) + return resolver + + # Handle Attribute Type Resolution + attribute_resolver = self.common_attribute_resolver.get(resource_attribute, {}) + if attribute_resolver: + if callable(attribute_resolver): + return attribute_resolver(logical_id) + return attribute_resolver + + if ignore_errors: + return "${}".format(logical_id + "." + resource_attribute) + raise InvalidSymbolException( + "The {} is not supported in the logical_id_translator, default_type_resolver, or the attribute_resolver." + " It is also not a supported pseudo function".format( + logical_id + "." + resource_attribute + ) + ) + + def arn_resolver(self, logical_id, service_name="lambda"): + """ + This function resolves Arn in the format + arn:{partition_name}:{service_name}:{aws_region}:{account_id}:{function_name} + + Parameters + ----------- + logical_id: str + This the reference to the function name used + service_name: str + This is the service name used such as lambda or sns + + Return + ------- + The resolved Arn + """ + aws_region = self.handle_pseudo_region() + account_id = ( + self.logical_id_translator.get(IntrinsicsSymbolTable.AWS_ACCOUNT_ID) or self.handle_pseudo_account_id() + ) + partition_name = self.handle_pseudo_partition() + resource_name = logical_id + resource_name = self.logical_id_translator.get(resource_name) or resource_name + str_format = "arn:{partition_name}:{service_name}:{aws_region}:{account_id}:{resource_name}" + if service_name == "lambda": + str_format = "arn:{partition_name}:{service_name}:{aws_region}:{account_id}:function:{resource_name}" + + return str_format.format( + partition_name=partition_name, + service_name=service_name, + aws_region=aws_region, + account_id=account_id, + resource_name=resource_name, + ) + + def get_translation(self, logical_id, resource_attributes=IntrinsicResolver.REF): + """ + This gets the logical_id_translation of the logical id and resource_attributes. + + Parameters + ---------- + logical_id: str + This is the logical id of the resource in question + resource_attributes: str + This is the attribute required. By default, it is a REF type + + Returns + -------- + This returns the translated item if it already exists + + """ + logical_id_item = self.logical_id_translator.get(logical_id, {}) + if isinstance(logical_id_item, string_types): + if ( + resource_attributes != IntrinsicResolver.REF and resource_attributes != "" + ): + return None + return logical_id_item + return logical_id_item.get(resource_attributes) + + @staticmethod + def get_availability_zone(region): + """ + This gets the availability zone from the the specified region + + Parameters + ----------- + region: str + The specified region from the SymbolTable region + + Return + ------- + The list of availability zones for the specified region + """ + return IntrinsicsSymbolTable.REGIONS.get(region) + + @staticmethod + def handle_pseudo_account_id(): + """ + This gets a default account id from SamBaseProvider. + Return + ------- + A pseudo account id + """ + return SamBaseProvider.DEFAULT_PSEUDO_PARAM_VALUES.get( + IntrinsicsSymbolTable.AWS_ACCOUNT_ID + ) + + def handle_pseudo_region(self): + """ + Gets the region from the environment and defaults to a the default region from the global variables. + + This is only run if it is not specified by the logical_id_translator as a default. + + Return + ------- + The region from the environment or a default one + """ + return ( + self.logical_id_translator.get(IntrinsicsSymbolTable.AWS_REGION) or os.getenv("AWS_REGION") or + SamBaseProvider.DEFAULT_PSEUDO_PARAM_VALUES.get( + IntrinsicsSymbolTable.AWS_REGION + ) + ) + + def handle_pseudo_url_prefix(self): + """ + This gets the AWS::UrlSuffix for the intrinsic with the china and regular prefix. + + This is only run if it is not specified by the logical_id_translator as a default. + Return + ------- + The url prefix of amazonaws.com or amazonaws.com.cn + """ + aws_region = self.handle_pseudo_region() + if self.CHINA_PREFIX in aws_region: + return self.CHINA_URL_PREFIX + return self.DEFAULT_URL_PREFIX + + def handle_pseudo_partition(self): + """ + This resolves AWS::Partition so that the correct partition is returned depending on the region. + + This is only run if it is not specified by the logical_id_translator as a default. + + Return + ------- + A pseudo partition like aws-cn or aws or aws-gov + """ + aws_region = self.handle_pseudo_region() + if self.CHINA_PREFIX in aws_region: + return self.CHINA_PARTITION + if self.GOV_PREFIX in aws_region: + return self.GOV_PARTITION + return self.DEFAULT_PARTITION + + @staticmethod + def handle_pseudo_stack_id(): + """ + This resolves AWS::StackId by using the SamBaseProvider as the default value. + + This is only run if it is not specified by the logical_id_translator as a default. + + Return + ------- + A randomized string + """ + return SamBaseProvider.DEFAULT_PSEUDO_PARAM_VALUES.get( + IntrinsicsSymbolTable.AWS_STACK_ID + ) + + @staticmethod + def handle_pseudo_stack_name(): + """ + This resolves AWS::StackName by using the SamBaseProvider as the default value. + + This is only run if it is not specified by the logical_id_translator as a default. + + Return + ------- + A randomized string + """ + return SamBaseProvider.DEFAULT_PSEUDO_PARAM_VALUES.get( + IntrinsicsSymbolTable.AWS_STACK_NAME + ) + + @staticmethod + def handle_pseudo_no_value(): + """ + This resolves AWS::NoValue so that it returns the python None + + Returns + -------- + None + :return: + """ + return None diff --git a/samcli/lib/intrinsic_resolver/invalid_intrinsic_exception.py b/samcli/lib/intrinsic_resolver/invalid_intrinsic_exception.py new file mode 100644 index 0000000000..23bad5899d --- /dev/null +++ b/samcli/lib/intrinsic_resolver/invalid_intrinsic_exception.py @@ -0,0 +1,11 @@ +""" +A custom Exception to display Invalid Intrinsics and Symbol Table format. +""" + + +class InvalidIntrinsicException(Exception): + pass + + +class InvalidSymbolException(Exception): + pass diff --git a/samcli/lib/intrinsic_resolver/invalid_intrinsic_validation.py b/samcli/lib/intrinsic_resolver/invalid_intrinsic_validation.py new file mode 100644 index 0000000000..9c82ced267 --- /dev/null +++ b/samcli/lib/intrinsic_resolver/invalid_intrinsic_validation.py @@ -0,0 +1,103 @@ +""" +A list of helper functions that cleanup the processing in IntrinsicResolver and IntrinsicSymbolTable +""" +from six import string_types + +from samcli.lib.intrinsic_resolver.invalid_intrinsic_exception import InvalidIntrinsicException + + +def verify_intrinsic_type_bool( + argument, property_type="", message="", position_in_list="" +): + verify_intrinsic_type( + argument, property_type, message, position_in_list, primitive_type=bool + ) + + +def verify_intrinsic_type_list( + argument, property_type="", message="", position_in_list="" +): + verify_intrinsic_type( + argument, property_type, message, position_in_list, primitive_type=list + ) + + +def verify_intrinsic_type_dict( + argument, property_type="", message="", position_in_list="" +): + verify_intrinsic_type( + argument, property_type, message, position_in_list, primitive_type=dict + ) + + +def verify_intrinsic_type_int( + argument, property_type="", message="", position_in_list="" +): + # Special case since bool is a subclass of int in python + if isinstance(argument, bool): + raise InvalidIntrinsicException( + message or "The {} argument to {} must resolve to a {} type".format( + position_in_list, property_type, int + ) + ) + verify_intrinsic_type( + argument, property_type, message, position_in_list, primitive_type=int + ) + + +def verify_intrinsic_type_str( + argument, property_type="", message="", position_in_list="" +): + verify_intrinsic_type( + argument, property_type, message, position_in_list, primitive_type=string_types + ) + + +def verify_non_null(argument, property_type="", message="", position_in_list=""): + if argument is None: + raise InvalidIntrinsicException( + message or "The {} argument to {} is missing from the intrinsic function".format( + position_in_list, property_type + ) + ) + + +def verify_intrinsic_type( + argument, + property_type="", + message="", + position_in_list="", + primitive_type=string_types, +): + verify_non_null(argument, property_type, message, position_in_list) + if not isinstance(argument, primitive_type): + raise InvalidIntrinsicException( + message or "The {} argument to {} must resolve to a {} type".format( + position_in_list, property_type, primitive_type + ) + ) + + +def verify_in_bounds(objects, index, property_type=""): + if index < 0 or index >= len(objects): + raise InvalidIntrinsicException( + "The index of {} resolved properties must be within the range".format( + property_type + ) + ) + + +def verify_number_arguments(arguments, property_type="", num=0): + if not len(arguments) == num: + raise InvalidIntrinsicException( + "The arguments to {} must have {} arguments instead of {} arguments".format( + property_type, num, len(arguments) + ) + ) + + +def verify_all_list_intrinsic_type( + arguments, verification_func, property_type="", message="", position_in_list="" +): + for argument in arguments: + verification_func(argument, property_type, message, position_in_list) diff --git a/tests/unit/lib/intrinsic_resolver/__init__.py b/tests/unit/lib/intrinsic_resolver/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/unit/lib/intrinsic_resolver/test_data/test_intrinsic_template_resolution.json b/tests/unit/lib/intrinsic_resolver/test_data/test_intrinsic_template_resolution.json new file mode 100644 index 0000000000..654484874b --- /dev/null +++ b/tests/unit/lib/intrinsic_resolver/test_data/test_intrinsic_template_resolution.json @@ -0,0 +1,148 @@ +{ + "Mappings": { + "TopLevel": { + "SecondLevelKey": { + "key": "https://s3location/" + } + } + }, + "Conditions": { + "ComplexCondition": { + "Fn::And": [ + { + "Fn::Equals": [ + { + "Fn::Or": [ + { + "Condition": "NotTestCondition" + }, + { + "Condition": "TestCondition" + } + ] + }, + false + ] + }, + true, + { + "Fn::If": [ + "TestCondition", + true, + false + ] + } + ] + }, + "TestCondition": { + "Fn::Equals": [ + { + "Ref": "EnvironmentType" + }, + "prod" + ] + }, + "NotTestCondition": { + "Fn::Not": [ + { + "Condition": "TestCondition" + } + ] + }, + "InvalidCondition": [ + "random items" + ] + }, + "Resources": { + "RestApi.Deployment": { + "Type": "AWS::ApiGateway::RestApi", + "Properties": { + "Body": { + "Fn::Base64": { + "Fn::Join": [ + ";", + { + "Fn::Split": [ + ",", + { + "Fn::Join": [ + ",", + [ + "a", + "e", + "f", + "d" + ] + ] + } + ] + } + ] + } + }, + "BodyS3Location": { + "Fn::FindInMap": [ + "TopLevel", + "SecondLevelKey", + "key" + ] + } + } + }, + "RestApiResource": { + "Properties": { + "parentId": { + "Fn::GetAtt": [ + "RestApi.Deployment", + "RootResourceId" + ] + }, + "PathPart": "{proxy+}", + "RestApiId": { + "Ref": "RestApi.Deployment" + } + } + }, + "HelloHandler2E4FBA4D": { + "Type": "AWS::Lambda::Function", + "Properties": { + "handler": "main.handle" + } + }, + "LambdaFunction": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Uri": { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":apigateway:", + { + "Fn::Select": [ + 0, + { + "Fn::GetAZs": { + "Ref": "AWS::Region" + } + } + ] + }, + ":lambda:path/2015-03-31/functions/", + { + "Fn::GetAtt": [ + "HelloHandler2E4FBA4D", + "Arn" + ] + }, + "/invocations" + ] + ] + } + } + } + } +} \ No newline at end of file diff --git a/tests/unit/lib/intrinsic_resolver/test_intrinsic_resolver.py b/tests/unit/lib/intrinsic_resolver/test_intrinsic_resolver.py new file mode 100644 index 0000000000..7f280b31e3 --- /dev/null +++ b/tests/unit/lib/intrinsic_resolver/test_intrinsic_resolver.py @@ -0,0 +1,1350 @@ +import json +from copy import deepcopy +try: + from pathlib import Path +except ImportError: + from pathlib2 import Path +from unittest import TestCase + +from parameterized import parameterized + +from samcli.lib.intrinsic_resolver.intrinsic_property_resolver import IntrinsicResolver +from samcli.lib.intrinsic_resolver.intrinsics_symbol_table import IntrinsicsSymbolTable +from samcli.lib.intrinsic_resolver.invalid_intrinsic_exception import ( + InvalidIntrinsicException, +) + + +class TestIntrinsicFnJoinResolver(TestCase): + def setUp(self): + self.resolver = IntrinsicResolver(template={}, symbol_resolver=IntrinsicsSymbolTable()) + + def test_basic_fn_join(self): + intrinsic = {"Fn::Join": [",", ["a", "b", "c", "d"]]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertEqual(result, "a,b,c,d") + + def test_nested_fn_join(self): + intrinsic_base_1 = {"Fn::Join": [",", ["a", "b", "c", "d"]]} + intrinsic_base_2 = {"Fn::Join": [";", ["g", "h", "i", intrinsic_base_1]]} + intrinsic = {"Fn::Join": [":", [intrinsic_base_1, "e", "f", intrinsic_base_2]]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertEqual(result, "a,b,c,d:e:f:g;h;i;a,b,c,d") + + @parameterized.expand( + [ + ( + "Fn::Join should fail for values that are not lists: {}".format(item), + item, + ) + for item in [True, False, "Test", {}, 42, None] + ] + ) + def test_fn_join_arguments_invalid_formats(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::Join": intrinsic}) + + @parameterized.expand( + [ + ( + "Fn::Join should fail if the first argument does not resolve to a string: {}".format( + item + ), + item, + ) + for item in [True, False, {}, 42, None] + ] + ) + def test_fn_join_delimiter_invalid_type(self, name, delimiter): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::Join": [delimiter, []]}) + + @parameterized.expand( + [ + ( + "Fn::Join should fail if the list_of_objects is not a valid list: {}".format( + item + ), + item, + ) + for item in [True, False, {}, 42, "t", None] + ] + ) + def test_fn_list_of_objects_invalid_type(self, name, list_of_objects): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver( + {"Fn::Join": ["", list_of_objects]} + ) + + @parameterized.expand( + [ + ( + "Fn::Join should require that all items in the list_of_objects resolve to string: {}".format( + item + ), + item, + ) + for item in [True, False, {}, 42, None] + ] + ) + def test_fn_join_items_all_str(self, name, single_obj): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver( + {"Fn::Join": ["", ["test", single_obj, "abcd"]]} + ) + + +class TestIntrinsicFnSplitResolver(TestCase): + def setUp(self): + self.resolver = IntrinsicResolver(template={}, symbol_resolver=IntrinsicsSymbolTable()) + + def test_basic_fn_split(self): + intrinsic = {"Fn::Split": ["|", "a|b|c"]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertEqual(result, ["a", "b", "c"]) + + def test_nested_fn_split(self): + intrinsic_base_1 = {"Fn::Split": [";", {"Fn::Join": [";", ["a", "b", "c"]]}]} + + intrinsic_base_2 = {"Fn::Join": [",", intrinsic_base_1]} + intrinsic = { + "Fn::Split": [ + ",", + {"Fn::Join": [",", [intrinsic_base_2, ",e", ",f,", intrinsic_base_2]]}, + ] + } + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertEqual(result, ["a", "b", "c", "", "e", "", "f", "", "a", "b", "c"]) + + @parameterized.expand( + [ + ( + "Fn::Split should fail for values that are not lists: {}".format(item), + item, + ) + for item in [True, False, "Test", {}, 42] + ] + ) + def test_fn_split_arguments_invalid_formats(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::Split": intrinsic}) + + @parameterized.expand( + [ + ( + "Fn::Split should fail if the first argument does not resolve to a string: {}".format( + item + ), + item, + ) + for item in [True, False, {}, 42] + ] + ) + def test_fn_split_delimiter_invalid_type(self, name, delimiter): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::Split": [delimiter, []]}) + + @parameterized.expand( + [ + ( + "Fn::Split should fail if the second argument does not resolve to a string: {}".format( + item + ), + item, + ) + for item in [True, False, {}, 42] + ] + ) + def test_fn_split_source_string_invalid_type(self, name, source_string): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver( + {"Fn::Split": ["", source_string]} + ) + + +class TestIntrinsicFnBase64Resolver(TestCase): + def setUp(self): + self.resolver = IntrinsicResolver(template={}, symbol_resolver=IntrinsicsSymbolTable()) + + def test_basic_fn_split(self): + intrinsic = {"Fn::Base64": "AWS CloudFormation"} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertEqual(result, "QVdTIENsb3VkRm9ybWF0aW9u") + + def test_nested_fn_base64(self): + intrinsic_base_1 = {"Fn::Base64": "AWS CloudFormation"} + + intrinsic_base_2 = {"Fn::Base64": intrinsic_base_1} + intrinsic = { + "Fn::Base64": { + "Fn::Join": [",", [intrinsic_base_2, ",e", ",f,", intrinsic_base_2]] + } + } + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertEqual( + result, + "VVZaa1ZFbEZUbk5pTTFaclVtMDVlV0pYUmpCaFZ6bDEsLGUsLGYsLFVWWmtWRWxGVG5OaU0xWnJ" + "VbTA1ZVdKWFJqQmhWemwx", + ) + + @parameterized.expand( + [ + ( + "Fn::Base64 must have a value that resolves to a string: {}".format( + item + ), + item, + ) + for item in [True, False, {}, 42, None] + ] + ) + def test_fn_base64_arguments_invalid_formats(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::Base64": intrinsic}) + + +class TestIntrinsicFnSelectResolver(TestCase): + def setUp(self): + self.resolver = IntrinsicResolver(template={}, symbol_resolver=IntrinsicsSymbolTable()) + + def test_basic_fn_select(self): + intrinsic = {"Fn::Select": [2, ["a", "b", "c", "d"]]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertEqual(result, "c") + + def test_nested_fn_select(self): + intrinsic_base_1 = {"Fn::Select": [0, ["a", "b", "c", "d"]]} + intrinsic_base_2 = {"Fn::Join": [";", ["g", "h", "i", intrinsic_base_1]]} + intrinsic = {"Fn::Select": [3, [intrinsic_base_2, "e", "f", intrinsic_base_2]]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertEqual(result, "g;h;i;a") + + @parameterized.expand( + [ + ( + "Fn::Select should fail for values that are not lists: {}".format(item), + item, + ) + for item in [True, False, "Test", {}, 42, None] + ] + ) + def test_fn_select_arguments_invalid_formats(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::Select": intrinsic}) + + @parameterized.expand( + [ + ( + "Fn::Select should fail if the first argument does not resolve to a int: {}".format( + item + ), + item, + ) + for item in [True, False, {}, "3", None] + ] + ) + def test_fn_select_index_invalid_index_type(self, name, index): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::Select": [index, [0]]}) + + @parameterized.expand( + [ + ( + "Fn::Select should fail if the index is out of bounds: {}".format( + number + ), + number, + ) + for number in [-2, 7] + ] + ) + def test_fn_select_out_of_bounds(self, name, index): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::Select": [index, []]}) + + @parameterized.expand( + [ + ( + "Fn::Select should fail if the second argument does not resolve to a list: {}".format( + item + ), + item, + ) + for item in [True, False, {}, "3", 33, None] + ] + ) + def test_fn_select_second_argument_invalid_type(self, name, argument): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::Select": [0, argument]}) + + +class TestIntrinsicFnFindInMapResolver(TestCase): + def setUp(self): + template = { + "Mappings": { + "Basic": {"Test": {"key": "value"}}, + "value": {"anotherkey": {"key": "result"}}, + "result": {"value": {"key": "final"}}, + } + } + self.resolver = IntrinsicResolver( + symbol_resolver=IntrinsicsSymbolTable(), template=template + ) + + def test_basic_find_in_map(self): + intrinsic = {"Fn::FindInMap": ["Basic", "Test", "key"]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertEqual(result, "value") + + def test_nested_find_in_map(self): + intrinsic_base_1 = {"Fn::FindInMap": ["Basic", "Test", "key"]} + intrinsic_base_2 = {"Fn::FindInMap": [intrinsic_base_1, "anotherkey", "key"]} + intrinsic = {"Fn::FindInMap": [intrinsic_base_2, intrinsic_base_1, "key"]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertEqual(result, "final") + + @parameterized.expand( + [ + ( + "Fn::FindInMap should fail if the list does not resolve to a string: {}".format( + item + ), + item, + ) + for item in [True, False, "Test", {}, 42, None] + ] + ) + def test_fn_find_in_map_arguments_invalid_formats(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::FindInMap": intrinsic}) + + @parameterized.expand( + [ + ( + "Fn::FindInMap should fail if there isn't 3 arguments in the list: {}".format( + item + ), + item, + ) + for item in [[""] * i for i in [0, 1, 2, 4, 5, 6, 7, 8, 9, 10]] + ] + ) + def test_fn_find_in_map_invalid_number_arguments(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::FindInMap": intrinsic}) + + @parameterized.expand( + [ + ( + "The arguments in Fn::FindInMap must fail if the arguments are not in the mappings".format( + item + ), + item, + ) + for item in [ + ["", "Test", "key"], + ["Basic", "", "key"], + ["Basic", "Test", ""], + ] + ] + ) + def test_fn_find_in_map_invalid_key_entries(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::FindInMap": intrinsic}) + + +class TestIntrinsicFnAzsResolver(TestCase): + def setUp(self): + logical_id_translator = {"AWS::Region": "us-east-1"} + self.resolver = IntrinsicResolver( + template={}, + symbol_resolver=IntrinsicsSymbolTable( + logical_id_translator=logical_id_translator + ) + ) + + def test_basic_azs(self): + intrinsic = {"Ref": "AWS::Region"} + result = self.resolver.intrinsic_property_resolver({"Fn::GetAZs": intrinsic}) + self.assertEqual( + result, + [ + "us-east-1a", + "us-east-1b", + "us-east-1c", + "us-east-1d", + "us-east-1e", + "us-east-1f", + ], + ) + + def test_default_get_azs(self): + result = self.resolver.intrinsic_property_resolver({"Fn::GetAZs": ""}) + self.assertEqual( + result, + [ + "us-east-1a", + "us-east-1b", + "us-east-1c", + "us-east-1d", + "us-east-1e", + "us-east-1f", + ], + ) + + @parameterized.expand( + [ + ("Fn::GetAZs should fail if it not given a string type".format(item), item) + for item in [True, False, {}, 42, None] + ] + ) + def test_fn_azs_arguments_invalid_formats(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::GetAZs": intrinsic}) + + def test_fn_azs_invalid_region(self): + intrinsic = "UNKOWN REGION" + with self.assertRaises(InvalidIntrinsicException, msg="FN::GetAzs should fail for unknown region"): + self.resolver.intrinsic_property_resolver({"Fn::GetAZs": intrinsic}) + + +class TestFnTransform(TestCase): + def setUp(self): + logical_id_translator = {"AWS::Region": "us-east-1"} + self.resolver = IntrinsicResolver( + template={}, + symbol_resolver=IntrinsicsSymbolTable( + logical_id_translator=logical_id_translator + ) + ) + + def test_basic_fn_transform(self): + intrinsic = {"Fn::Transform": {"Name": "AWS::Include", "Parameters": {"Location": "test"}}} + self.resolver.intrinsic_property_resolver(intrinsic) + + def test_fn_transform_unsupported_macro(self): + intrinsic = {"Fn::Transform": {"Name": "UNKNOWN", "Parameters": {"Location": "test"}}} + with self.assertRaises(InvalidIntrinsicException, msg="FN::Transform should fail for unknown region"): + self.resolver.intrinsic_property_resolver(intrinsic) + + +class TestIntrinsicFnRefResolver(TestCase): + def setUp(self): + logical_id_translator = { + "RestApi": {"Ref": "NewRestApi"}, + "AWS::StackId": "12301230123", + } + resources = {"RestApi": {"Type": "AWS::ApiGateway::RestApi", "Properties": {}}} + template = {"Resources": resources} + self.resolver = IntrinsicResolver( + symbol_resolver=IntrinsicsSymbolTable( + logical_id_translator=logical_id_translator, template=template + ), template=template + ) + + def test_basic_ref_translation(self): + intrinsic = {"Ref": "RestApi"} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertEqual(result, "NewRestApi") + + def test_default_ref_translation(self): + intrinsic = {"Ref": "UnknownApi"} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertEqual(result, "UnknownApi") + + @parameterized.expand( + [ + ("Ref must have arguments that resolve to a string: {}".format(item), item) + for item in [True, False, {}, 42, None, []] + ] + ) + def test_ref_arguments_invalid_formats(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Ref": intrinsic}) + + +class TestIntrinsicFnGetAttResolver(TestCase): + def setUp(self): + logical_id_translator = { + "RestApi": {"Ref": "NewRestApi"}, + "LambdaFunction": { + "Arn": "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:us-east" + "-1:123456789012:LambdaFunction/invocations" + }, + "AWS::StackId": "12301230123", + "AWS::Region": "us-east-1", + "AWS::AccountId": "406033500479", + } + resources = { + "RestApi": {"Type": "AWS::ApiGateway::RestApi", "Properties": {}}, + "HelloHandler2E4FBA4D": { + "Type": "AWS::Lambda::Function", + "Properties": {"handler": "main.handle"}, + }, + "LambdaFunction": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Uri": { + "Fn::Join": [ + "", + [ + "arn:", + {"Ref": "AWS::Partition"}, + ":apigateway:", + {"Ref": "AWS::Region"}, + ":lambda:path/2015-03-31/functions/", + {"Fn::GetAtt": ["HelloHandler2E4FBA4D", "Arn"]}, + "/invocations", + ], + ] + } + }, + }, + } + template = {"Resources": resources} + symbol_resolver = IntrinsicsSymbolTable( + template=template, logical_id_translator=logical_id_translator + ) + self.resources = resources + self.resolver = IntrinsicResolver( + template=template, symbol_resolver=symbol_resolver + ) + + def test_fn_getatt_basic_translation(self): + intrinsic = {"Fn::GetAtt": ["RestApi", "RootResourceId"]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertEqual(result, "/") + + def test_fn_getatt_logical_id_translated(self): + intrinsic = {"Fn::GetAtt": ["LambdaFunction", "Arn"]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertEqual( + result, + "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:us-east" + "-1:123456789012:LambdaFunction/invocations", + ) + + def test_fn_getatt_with_fn_join(self): + intrinsic = self.resources.get("LambdaFunction").get("Properties").get("Uri") + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertEqual( + result, + "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:us" + "-east-1:406033500479:function:HelloHandler2E4FBA4D/invocations", + ) + + @parameterized.expand( + [ + ( + "Fn::GetAtt must fail if the argument does not resolve to a list: {}".format( + item + ), + item, + ) + for item in [True, False, {}, "test", 42, None] + ] + ) + def test_fn_getatt_arguments_invalid_formats(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::GetAtt": intrinsic}) + + @parameterized.expand( + [ + ( + "Fn::GetAtt should fail if it doesn't have exactly 2 arguments: {}".format( + item + ), + item, + ) + for item in [[""] * i for i in [0, 1, 3, 4, 5, 6, 7, 8, 9, 10]] + ] + ) + def test_fn_getatt_invalid_number_arguments(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::GetAtt": intrinsic}) + + @parameterized.expand( + [ + ( + "Fn::GetAtt first argument must resolve to a valid string: {}".format( + item + ), + item, + ) + for item in [True, False, {}, [], 42, None] + ] + ) + def test_fn_getatt_first_arguments_invalid(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver( + {"Fn::GetAtt": [intrinsic, IntrinsicResolver.REF]} + ) + + @parameterized.expand( + [ + ( + "Fn::GetAtt second argument must resolve to a string:{}".format(item), + item, + ) + for item in [True, False, {}, [], 42, None] + ] + ) + def test_fn_getatt_second_arguments_invalid(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver( + {"Fn::GetAtt": ["some logical Id", intrinsic]} + ) + + +class TestIntrinsicFnSubResolver(TestCase): + def setUp(self): + logical_id_translator = { + "AWS::Region": "us-east-1", + "AWS::AccountId": "123456789012", + } + resources = { + "LambdaFunction": { + "Type": "AWS::ApiGateway::RestApi", + "Properties": {"Uri": "test"}, + } + } + template = {"Resources": resources} + self.resolver = IntrinsicResolver( + template=template, + symbol_resolver=IntrinsicsSymbolTable( + logical_id_translator=logical_id_translator, template=template + ) + ) + + def test_fn_sub_basic_uri(self): + intrinsic = { + "Fn::Sub": + "arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${LambdaFunction.Arn}/invocations" + } + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertEqual( + result, + "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:us-east-1" + ":123456789012:function:LambdaFunction/invocations", + ) + + def test_fn_sub_uri_arguments(self): + intrinsic = { + "Fn::Sub": [ + "arn:aws:apigateway:${MyItem}:lambda:path/2015-03-31/functions/${MyOtherItem}/invocations", + {"MyItem": {"Ref": "AWS::Region"}, "MyOtherItem": "LambdaFunction.Arn"}, + ] + } + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertEqual( + result, + "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:us-east" + "-1:123456789012:function:LambdaFunction/invocations", + ) + + @parameterized.expand( + [ + ( + "Fn::Sub arguments must either resolve to a string or a list".format( + item + ), + item, + ) + for item in [True, False, {}, 42, None] + ] + ) + def test_fn_sub_arguments_invalid_formats(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::Sub": intrinsic}) + + @parameterized.expand( + [ + ( + "If Fn::Sub is a list, first argument must resolve to a string: {}".format( + item + ), + item, + ) + for item in [True, False, {}, 42, None] + ] + ) + def test_fn_sub_first_argument_invalid_formats(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::Sub": [intrinsic, {}]}) + + @parameterized.expand( + [ + ( + "If Fn::Sub is a list, second argument must resolve to a dictionary".format( + item + ), + item, + ) + for item in [True, False, "Another str", [], 42, None] + ] + ) + def test_fn_sub_second_argument_invalid_formats(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver( + {"Fn::Sub": ["some str", intrinsic]} + ) + + @parameterized.expand( + [ + ("If Fn::Sub is a list, it should only have 2 arguments".format(item), item) + for item in [[""] * i for i in [0, 2, 3, 4, 5, 6, 7, 8, 9, 10]] + ] + ) + def test_fn_sub_invalid_number_arguments(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::Sub": ["test"] + intrinsic}) + + +class TestIntrinsicFnImportValueResolver(TestCase): + def setUp(self): + self.resolver = IntrinsicResolver(template={}, symbol_resolver=IntrinsicsSymbolTable()) + + def test_fn_import_value_unsupported(self): + with self.assertRaises( + InvalidIntrinsicException, msg="Fn::ImportValue should be unsupported" + ): + self.resolver.intrinsic_property_resolver({"Fn::ImportValue": ""}) + + +class TestIntrinsicFnEqualsResolver(TestCase): + def setUp(self): + logical_id_translator = { + "EnvironmentType": "prod", + "AWS::AccountId": "123456789012", + } + self.resolver = IntrinsicResolver( + template={}, + symbol_resolver=IntrinsicsSymbolTable( + logical_id_translator=logical_id_translator + ) + ) + + def test_fn_equals_basic_true(self): + intrinsic = {"Fn::Equals": [{"Ref": "EnvironmentType"}, "prod"]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertTrue(result) + + def test_fn_equals_basic_false(self): + intrinsic = {"Fn::Equals": [{"Ref": "EnvironmentType"}, "NotProd"]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertFalse(result) + + def test_fn_equals_nested_true(self): + intrinsic_base_1 = {"Fn::Equals": [{"Ref": "EnvironmentType"}, "prod"]} + intrinsic_base_2 = {"Fn::Equals": [{"Ref": "AWS::AccountId"}, "123456789012"]} + + intrinsic = {"Fn::Equals": [intrinsic_base_1, intrinsic_base_2]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertTrue(result) + + def test_fn_equals_nested_false(self): + intrinsic_base_1 = {"Fn::Equals": [{"Ref": "EnvironmentType"}, "prod"]} + intrinsic_base_2 = { + "Fn::Equals": [{"Ref": "AWS::AccountId"}, "NOT_A_VALID_ACCOUNT_ID"] + } + + intrinsic = {"Fn::Equals": [intrinsic_base_1, intrinsic_base_2]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertFalse(result) + + @parameterized.expand( + [ + ( + "Fn::Equals must have arguments that resolve to a string: {}".format( + item + ), + item, + ) + for item in [True, False, {}, 42, None, "test"] + ] + ) + def test_fn_equals_arguments_invalid_formats(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::Equals": intrinsic}) + + @parameterized.expand( + [ + ("Fn::Equals must have exactly two arguments: {}".format(item), item) + for item in [["t"] * i for i in [0, 1, 3, 4, 5, 6, 7, 8, 9, 10]] + ] + ) + def test_fn_equals_invalid_number_arguments(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::Equals": intrinsic}) + + +class TestIntrinsicFnNotResolver(TestCase): + def setUp(self): + logical_id_translator = { + "EnvironmentType": "prod", + "AWS::AccountId": "123456789012", + } + conditions = { + "TestCondition": {"Fn::Equals": [{"Ref": "EnvironmentType"}, "prod"]}, + "NotTestCondition": {"Fn::Not": [{"Condition": "TestCondition"}]}, + } + template = {"Conditions": conditions} + symbol_resolver = IntrinsicsSymbolTable( + template=template, logical_id_translator=logical_id_translator + ) + self.resolver = IntrinsicResolver( + template=template, symbol_resolver=symbol_resolver + ) + + def test_fn_not_basic_false(self): + intrinsic = {"Fn::Not": [{"Fn::Equals": [{"Ref": "EnvironmentType"}, "prod"]}]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertFalse(result) + + def test_fn_not_basic_true(self): + intrinsic = { + "Fn::Not": [{"Fn::Equals": [{"Ref": "EnvironmentType"}, "NotProd"]}] + } + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertTrue(result) + + def test_fn_not_nested_true(self): + intrinsic_base_1 = { + "Fn::Not": [{"Fn::Equals": [{"Ref": "EnvironmentType"}, "prod"]}] + } + intrinsic_base_2 = {"Fn::Equals": [{"Ref": "AWS::AccountId"}, "123456789012"]} + # !(True && True) + intrinsic = {"Fn::Not": [{"Fn::Equals": [intrinsic_base_1, intrinsic_base_2]}]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertTrue(result) + + def test_fn_not_nested_false(self): + intrinsic_base_1 = { + "Fn::Not": [{"Fn::Equals": [{"Ref": "EnvironmentType"}, "prod"]}] + } + intrinsic_base_2 = { + "Fn::Not": [{"Fn::Equals": [{"Ref": "AWS::AccountId"}, "123456789012"]}] + } + + intrinsic = {"Fn::Not": [{"Fn::Equals": [intrinsic_base_1, intrinsic_base_2]}]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertFalse(result) + + def test_fn_not_condition_false(self): + intrinsic = {"Fn::Not": [{"Condition": "TestCondition"}]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertFalse(result) + + def test_fn_not_condition_true(self): + intrinsic = {"Fn::Not": [{"Condition": "NotTestCondition"}]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertTrue(result) + + @parameterized.expand( + [ + ( + "Fn::Not must have an argument that resolves to a list: {}".format( + item + ), + item, + ) + for item in [True, False, {}, 42, None, "test"] + ] + ) + def test_fn_not_arguments_invalid_formats(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::Not": intrinsic}) + + @parameterized.expand( + [ + ( + "Fn::Not items in the list must resolve to booleans: {}".format(item), + item, + ) + for item in [{}, 42, None, "test"] + ] + ) + def test_fn_not_first_arguments_invalid_formats(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::Not": [intrinsic]}) + + @parameterized.expand( + [ + ("Fn::Not must have exactly 1 argument: {}".format(item), item) + for item in [[True] * i for i in [0, 2, 3, 4, 5, 6, 7, 8, 9, 10]] + ] + ) + def test_fn_not_invalid_number_arguments(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::Not": intrinsic}) + + def test_fn_not_invalid_condition(self): + with self.assertRaises(InvalidIntrinsicException, msg="Invalid Condition"): + self.resolver.intrinsic_property_resolver( + {"Fn::Not": [{"Condition": "NOT_VALID_CONDITION"}]} + ) + + +class TestIntrinsicFnAndResolver(TestCase): + def setUp(self): + logical_id_translator = { + "EnvironmentType": "prod", + "AWS::AccountId": "123456789012", + } + conditions = { + "TestCondition": {"Fn::Equals": [{"Ref": "EnvironmentType"}, "prod"]}, + "NotTestCondition": {"Fn::Not": [{"Condition": "TestCondition"}]}, + } + template = {"Conditions": conditions} + symbol_resolver = IntrinsicsSymbolTable( + template=template, logical_id_translator=logical_id_translator + ) + self.resolver = IntrinsicResolver( + template=template, symbol_resolver=symbol_resolver + ) + + def test_fn_and_basic_true(self): + prod_fn_equals = {"Fn::Equals": [{"Ref": "EnvironmentType"}, "prod"]} + intrinsic = { + "Fn::And": [prod_fn_equals, {"Condition": "TestCondition"}, prod_fn_equals] + } + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertTrue(result) + + def test_fn_and_basic_false(self): + prod_fn_equals = {"Fn::Equals": [{"Ref": "EnvironmentType"}, "prod"]} + intrinsic = { + "Fn::And": [ + prod_fn_equals, + {"Condition": "NotTestCondition"}, + prod_fn_equals, + ] + } + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertFalse(result) + + def test_fn_and_nested_true(self): + prod_fn_equals = {"Fn::Equals": [{"Ref": "EnvironmentType"}, "prod"]} + intrinsic_base = { + "Fn::And": [prod_fn_equals, {"Condition": "TestCondition"}, prod_fn_equals] + } + fn_not_intrinsic = {"Fn::Not": [{"Condition": "NotTestCondition"}]} + intrinsic = {"Fn::And": [intrinsic_base, fn_not_intrinsic, prod_fn_equals]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertTrue(result) + + def test_fn_and_nested_false(self): + prod_fn_equals = {"Fn::Equals": [{"Ref": "EnvironmentType"}, "prod"]} + prod_fn_not_equals = {"Fn::Equals": [{"Ref": "EnvironmentType"}, "NOT_EQUAL"]} + intrinsic_base = { + "Fn::And": [ + prod_fn_equals, + {"Condition": "NotTestCondition"}, + prod_fn_equals, + ] + } + intrinsic = {"Fn::And": [{"Fn::Not": [intrinsic_base]}, prod_fn_not_equals]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertFalse(result) + + @parameterized.expand( + [ + ("Fn::And must have value that resolves to a list: {}".format(item), item) + for item in [True, False, {}, 42, None, "test"] + ] + ) + def test_fn_and_arguments_invalid_formats(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::And": intrinsic}) + + @parameterized.expand( + [ + ( + "Fn:And must have all arguments that resolves to booleans".format(item), + item, + ) + for item in [{}, 42, None, "test"] + ] + ) + def test_fn_and_all_arguments_bool(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver( + {"Fn::And": [intrinsic, intrinsic, intrinsic]} + ) + + def test_fn_and_invalid_condition(self): + with self.assertRaises(InvalidIntrinsicException, msg="Invalid Condition"): + self.resolver.intrinsic_property_resolver( + {"Fn::And": [{"Condition": "NOT_VALID_CONDITION"}]} + ) + + +class TestIntrinsicFnOrResolver(TestCase): + def setUp(self): + logical_id_translator = { + "EnvironmentType": "prod", + "AWS::AccountId": "123456789012", + } + conditions = { + "TestCondition": {"Fn::Equals": [{"Ref": "EnvironmentType"}, "prod"]}, + "NotTestCondition": {"Fn::Not": [{"Condition": "TestCondition"}]}, + } + + template = {"Conditions": conditions} + symbol_resolver = IntrinsicsSymbolTable( + template=template, logical_id_translator=logical_id_translator + ) + self.resolver = IntrinsicResolver( + template=template, symbol_resolver=symbol_resolver + ) + + def test_fn_or_basic_true(self): + prod_fn_equals = {"Fn::Equals": [{"Ref": "EnvironmentType"}, "prod"]} + intrinsic = { + "Fn::Or": [prod_fn_equals, {"Condition": "TestCondition"}, prod_fn_equals] + } + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertTrue(result) + + def test_fn_or_basic_single_true(self): + intrinsic = {"Fn::Or": [False, False, {"Condition": "TestCondition"}, False]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertTrue(result) + + def test_fn_or_basic_false(self): + prod_fn_equals = {"Fn::Equals": [{"Ref": "EnvironmentType"}, "prod"]} + intrinsic = { + "Fn::Or": [ + {"Fn::Not": [prod_fn_equals]}, + {"Condition": "NotTestCondition"}, + {"Fn::Not": [prod_fn_equals]}, + ] + } + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertFalse(result) + + def test_fn_or_nested_true(self): + prod_fn_equals = {"Fn::Equals": [{"Ref": "EnvironmentType"}, "prod"]} + failed_intrinsic_or = { + "Fn::Or": [ + {"Fn::Not": [prod_fn_equals]}, + {"Condition": "NotTestCondition"}, + {"Fn::Not": [prod_fn_equals]}, + ] + } + intrinsic_base = { + "Fn::Or": [prod_fn_equals, {"Condition": "TestCondition"}, prod_fn_equals] + } + fn_not_intrinsic = {"Fn::Not": [{"Condition": "NotTestCondition"}]} + intrinsic = { + "Fn::Or": [ + failed_intrinsic_or, + intrinsic_base, + fn_not_intrinsic, + fn_not_intrinsic, + ] + } + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertTrue(result) + + def test_fn_or_nested_false(self): + prod_fn_equals = {"Fn::Equals": [{"Ref": "EnvironmentType"}, "prod"]} + failed_intrinsic_or = { + "Fn::Or": [ + {"Fn::Not": [prod_fn_equals]}, + {"Condition": "NotTestCondition"}, + {"Fn::Not": [prod_fn_equals]}, + ] + } + intrinsic_base = { + "Fn::Or": [prod_fn_equals, {"Condition": "TestCondition"}, prod_fn_equals] + } + intrinsic = {"Fn::Or": [failed_intrinsic_or, {"Fn::Not": [intrinsic_base]}]} + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertFalse(result) + + @parameterized.expand( + [ + ( + "Fn::Or must have an argument that resolves to a list: {}".format(item), + item, + ) + for item in [True, False, {}, 42, None, "test"] + ] + ) + def test_fn_or_arguments_invalid_formats(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::Or": intrinsic}) + + @parameterized.expand( + [ + ( + "Fn::Or must have all arguments resolve to booleans: {}".format(item), + item, + ) + for item in [{}, 42, None, "test"] + ] + ) + def test_fn_or_all_arguments_bool(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver( + {"Fn::Or": [intrinsic, intrinsic, intrinsic]} + ) + + def test_fn_or_invalid_condition(self): + with self.assertRaises(InvalidIntrinsicException, msg="Invalid Condition"): + self.resolver.intrinsic_property_resolver( + {"Fn::Or": [{"Condition": "NOT_VALID_CONDITION"}]} + ) + + +class TestIntrinsicFnIfResolver(TestCase): + def setUp(self): + logical_id_translator = { + "EnvironmentType": "prod", + "AWS::AccountId": "123456789012", + } + conditions = { + "TestCondition": {"Fn::Equals": [{"Ref": "EnvironmentType"}, "prod"]}, + "NotTestCondition": {"Fn::Not": [{"Condition": "TestCondition"}]}, + "InvalidCondition": ["random items"], + } + template = {"Conditions": conditions} + symbol_resolver = IntrinsicsSymbolTable( + template=template, logical_id_translator=logical_id_translator + ) + self.resolver = IntrinsicResolver( + template=template, symbol_resolver=symbol_resolver + ) + + def test_fn_if_basic_true(self): + intrinsic = {"Fn::If": ["TestCondition", True, False]} + + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertTrue(result) + + def test_fn_if_basic_false(self): + intrinsic = {"Fn::If": ["NotTestCondition", True, False]} + + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertFalse(result) + + def test_nested_fn_if_true(self): + intrinsic_base_1 = {"Fn::If": ["NotTestCondition", True, False]} + intrinsic_base_2 = {"Fn::If": ["TestCondition", True, False]} + intrinsic = {"Fn::If": ["TestCondition", intrinsic_base_2, intrinsic_base_1]} + + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertTrue(result) + + def test_nested_fn_if_false(self): + intrinsic_base_1 = {"Fn::If": ["NotTestCondition", True, False]} + intrinsic_base_2 = {"Fn::If": ["TestCondition", True, False]} + intrinsic = {"Fn::If": ["TestCondition", intrinsic_base_1, intrinsic_base_2]} + + result = self.resolver.intrinsic_property_resolver(intrinsic) + self.assertFalse(result) + + @parameterized.expand( + [ + ("Fn::If must an argument that resolves to a list: {}".format(item), item) + for item in [True, False, {}, 42, None, "test"] + ] + ) + def test_fn_if_arguments_invalid_formats(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver({"Fn::If": intrinsic}) + + @parameterized.expand( + [ + ("Fn::If must have the argument resolve to a string: {}".format(item), item) + for item in [True, False, {}, 42, None, "test", []] + ] + ) + def test_fn_if_condition_arguments_invalid_type(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver( + {"Fn::If": [intrinsic, True, False]} + ) + + def test_fn_if_invalid_condition(self): + with self.assertRaises(InvalidIntrinsicException, msg="Invalid Condition"): + self.resolver.intrinsic_property_resolver( + {"Fn::If": ["NOT_VALID_CONDITION", "test", "test"]} + ) + + @parameterized.expand( + [ + ("Fn::If must have exactly 3 arguments: {}".format(item), item) + for item in [[True] * i for i in [0, 1, 3, 4, 5, 6, 7, 8, 9, 10]] + ] + ) + def test_fn_if_invalid_number_arguments(self, name, intrinsic): + with self.assertRaises(InvalidIntrinsicException, msg=name): + self.resolver.intrinsic_property_resolver( + {"Fn::Not": ["TestCondition"] + intrinsic} + ) + + def test_fn_if_condition_not_bool_fail(self): + with self.assertRaises(InvalidIntrinsicException, msg="Invalid Condition"): + self.resolver.intrinsic_property_resolver( + {"Fn::If": ["InvalidCondition", "test", "test"]} + ) + + +class TestIntrinsicTemplateResolution(TestCase): + def setUp(self): + logical_id_translator = { + "RestApi": "NewRestApi", + "LambdaFunction": { + "Arn": "arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/arn:aws:lambda:us-east" + "-1:123456789012:LambdaFunction/invocations" + }, + "AWS::StackId": "12301230123", + "AWS::Region": "us-east-1", + "AWS::AccountId": "406033500479", + "RestApi.Deployment": {"Ref": "RestApi"}, + } + self.logical_id_translator = logical_id_translator + + integration_path = str( + Path(__file__).resolve().parents[0].joinpath('test_data', 'test_intrinsic_template_resolution.json')) + with open(integration_path) as f: + template = json.load(f) + + self.template = template + self.resources = template.get("Resources") + self.conditions = template.get("Conditions") + self.mappings = template.get("Mappings") + + symbol_resolver = IntrinsicsSymbolTable( + template=self.template, logical_id_translator=self.logical_id_translator + ) + self.resolver = IntrinsicResolver( + template=self.template, symbol_resolver=symbol_resolver + ) + + def test_basic_template_resolution(self): + resolved_template = self.resolver.resolve_template(ignore_errors=False) + expected_resources = { + "HelloHandler2E4FBA4D": { + "Properties": {"handler": "main.handle"}, + "Type": "AWS::Lambda::Function", + }, + "LambdaFunction": { + "Properties": { + "Uri": "arn:aws:apigateway:us-east-1a:lambda:path/2015-03-31/functions/arn:aws" + ":lambda:us-east-1:406033500479:function:HelloHandler2E4FBA4D/invocations" + }, + "Type": "AWS::Lambda::Function", + }, + "RestApi": { + "Properties": { + "Body": "YTtlO2Y7ZA==", + "BodyS3Location": "https://s3location/", + }, + "Type": "AWS::ApiGateway::RestApi", + }, + "RestApiResource": { + "Properties": { + "PathPart": "{proxy+}", + "RestApiId": "RestApi", + "parentId": "/", + } + }, + } + self.assertEqual(resolved_template, expected_resources) + + def test_template_fail_errors(self): + resources = deepcopy(self.resources) + resources["RestApi.Deployment"]["Properties"]["BodyS3Location"] = { + "Fn::FindInMap": [] + } + template = { + "Mappings": self.mappings, + "Conditions": self.conditions, + "Resources": resources, + } + symbol_resolver = IntrinsicsSymbolTable( + template=template, logical_id_translator=self.logical_id_translator + ) + resolver = IntrinsicResolver(template=template, symbol_resolver=symbol_resolver) + with self.assertRaises(InvalidIntrinsicException, msg="Invalid Find In Map"): + resolver.resolve_template(ignore_errors=False) + + def test_template_ignore_errors(self): + resources = deepcopy(self.resources) + resources["RestApi.Deployment"]["Properties"]["BodyS3Location"] = { + "Fn::FindInMap": [] + } + template = { + "Mappings": self.mappings, + "Conditions": self.conditions, + "Resources": resources, + } + symbol_resolver = IntrinsicsSymbolTable( + template=template, logical_id_translator=self.logical_id_translator + ) + resolver = IntrinsicResolver(template=template, symbol_resolver=symbol_resolver) + result = resolver.resolve_template(ignore_errors=True) + expected_template = { + "HelloHandler2E4FBA4D": { + "Properties": {"handler": "main.handle"}, + "Type": "AWS::Lambda::Function", + }, + "LambdaFunction": { + "Properties": { + "Uri": "arn:aws:apigateway:us-east-1a:lambda:path/2015-03-31" + "/functions/arn:aws:lambda:us-east-1:406033500479" + ":function:HelloHandler2E4FBA4D/invocations" + }, + "Type": "AWS::Lambda::Function", + }, + "RestApi.Deployment": { + "Properties": { + "Body": { + "Fn::Base64": { + "Fn::Join": [ + ";", # NOQA + { + "Fn::Split": [ + ",", + {"Fn::Join": [",", ["a", "e", "f", "d"]]}, + ] + }, + ] + } + }, + "BodyS3Location": {"Fn::FindInMap": []}, + }, + "Type": "AWS::ApiGateway::RestApi", + }, + "RestApiResource": { + "Properties": { + "PathPart": "{proxy+}", + "RestApiId": "RestApi", + "parentId": "/", + } + }, + } + self.assertEqual(expected_template, result) + + +class TestIntrinsicResolverInitialization(TestCase): + def test_conditional_key_function_map(self): + resolver = IntrinsicResolver(None, None) + + def lambda_func(x): + return True + + resolver.set_conditional_function_map({"key": lambda_func}) + self.assertTrue(resolver.conditional_key_function_map.get("key") == lambda_func) + + def test_set_intrinsic_key_function_map(self): + resolver = IntrinsicResolver(None, None) + + def lambda_func(x): + return True + + resolver.set_intrinsic_key_function_map({"key": lambda_func}) + self.assertTrue(resolver.intrinsic_key_function_map.get("key") == lambda_func) diff --git a/tests/unit/lib/intrinsic_resolver/test_intrinsics_symbol_table.py b/tests/unit/lib/intrinsic_resolver/test_intrinsics_symbol_table.py new file mode 100644 index 0000000000..5254b8dd9a --- /dev/null +++ b/tests/unit/lib/intrinsic_resolver/test_intrinsics_symbol_table.py @@ -0,0 +1,152 @@ +from unittest import TestCase + +from mock import patch + +from samcli.lib.intrinsic_resolver.invalid_intrinsic_exception import InvalidSymbolException +from samcli.lib.intrinsic_resolver.intrinsic_property_resolver import IntrinsicResolver +from samcli.lib.intrinsic_resolver.intrinsics_symbol_table import IntrinsicsSymbolTable + + +class TestIntrinsicsSymbolTablePseudoProperties(TestCase): + def setUp(self): + self.symbol_table = IntrinsicsSymbolTable(template={}) + + def test_handle_account_id_default(self): + self.assertEquals(self.symbol_table.handle_pseudo_account_id(), "123456789012") + + def test_pseudo_partition(self): + self.assertEquals(self.symbol_table.handle_pseudo_partition(), "aws") + + @patch("samcli.lib.intrinsic_resolver.intrinsics_symbol_table.os") + def test_pseudo_partition_gov(self, mock_os): + mock_os.getenv.return_value = "us-west-gov-1" + self.assertEquals(self.symbol_table.handle_pseudo_partition(), "aws-us-gov") + + @patch("samcli.lib.intrinsic_resolver.intrinsics_symbol_table.os") + def test_pseudo_partition_china(self, mock_os): + mock_os.getenv.return_value = "cn-west-1" + self.assertEquals(self.symbol_table.handle_pseudo_partition(), "aws-cn") + + @patch("samcli.lib.intrinsic_resolver.intrinsics_symbol_table.os") + def test_pseudo_region_environ(self, mock_os): + mock_os.getenv.return_value = "mytemp" + self.assertEquals(self.symbol_table.handle_pseudo_region(), "mytemp") + + @patch("samcli.lib.intrinsic_resolver.intrinsics_symbol_table.os") + def test_pseudo_default_region(self, mock_os): + mock_os.getenv.return_value = None + self.assertEquals(self.symbol_table.handle_pseudo_region(), "us-east-1") + + def test_pseudo_no_value(self): + self.assertIsNone(self.symbol_table.handle_pseudo_no_value()) + + def test_pseudo_url_prefix_default(self): + self.assertEquals(self.symbol_table.handle_pseudo_url_prefix(), "amazonaws.com") + + @patch("samcli.lib.intrinsic_resolver.intrinsics_symbol_table.os") + def test_pseudo_url_prefix_china(self, mock_os): + mock_os.getenv.return_value = "cn-west-1" + self.assertEquals( + self.symbol_table.handle_pseudo_url_prefix(), "amazonaws.com.cn" + ) + + def test_get_availability_zone(self): + res = IntrinsicsSymbolTable.get_availability_zone("us-east-1") + self.assertIn("us-east-1a", res) + + def test_handle_pseudo_account_id(self): + res = IntrinsicsSymbolTable.handle_pseudo_account_id() + self.assertEqual(res, "123456789012") + + def test_handle_pseudo_stack_name(self): + res = IntrinsicsSymbolTable.handle_pseudo_stack_name() + self.assertEqual(res, "local") + + def test_handle_pseudo_stack_id(self): + res = IntrinsicsSymbolTable.handle_pseudo_stack_id() + self.assertEqual(res, "arn:aws:cloudformation:us-east-1:123456789012:stack/" + "local/51af3dc0-da77-11e4-872e-1234567db123") + + +class TestSymbolResolution(TestCase): + def test_parameter_symbols(self): + template = { + "Resources": {}, + "Parameters": { + "Test": { + "Default": "data" + } + } + } + symbol_resolver = IntrinsicsSymbolTable(template=template) + result = symbol_resolver.resolve_symbols("Test", IntrinsicResolver.REF) + self.assertEquals(result, "data") + + def test_default_type_resolver_function(self): + template = { + "Resources": { + "MyApi": { + "Type": "AWS::ApiGateway::RestApi" + } + }, + } + default_type_resolver = { + "AWS::ApiGateway::RestApi": { + "RootResourceId": lambda logical_id, resource_attribute: logical_id + } + } + + symbol_resolver = IntrinsicsSymbolTable(template=template, default_type_resolver=default_type_resolver) + result = symbol_resolver.resolve_symbols("MyApi", "RootResourceId") + + self.assertEquals(result, "MyApi") + + def test_custom_attribute_resolver(self): + template = { + "Resources": { + "MyApi": { + "Type": "AWS::ApiGateway::RestApi" + } + }, + } + common_attribute_resolver = { + "Arn": "test" + } + + symbol_resolver = IntrinsicsSymbolTable(template=template, common_attribute_resolver=common_attribute_resolver) + result = symbol_resolver.resolve_symbols("MyApi", "Arn") + + self.assertEquals(result, "test") + + def test_unknown_symbol_translation(self): + symbol_resolver = IntrinsicsSymbolTable(template={}) + res = symbol_resolver.get_translation("UNKNOWN MAP") + self.assertEqual(res, None) + + def test_basic_symbol_translation(self): + symbol_resolver = IntrinsicsSymbolTable(template={}, logical_id_translator={"item": "test"}) + res = symbol_resolver.get_translation("item") + self.assertEqual(res, "test") + + def test_basic_unknown_translated_string_translation(self): + symbol_resolver = IntrinsicsSymbolTable(template={}, logical_id_translator={"item": "test"}) + res = symbol_resolver.get_translation("item", "RootResourceId") + self.assertEqual(res, None) + + def test_arn_resolver_lambda(self): + res = IntrinsicsSymbolTable().arn_resolver('test') + self.assertEquals(res, "arn:aws:lambda:us-east-1:123456789012:function:test") + + def test_arn_resolver(self): + res = IntrinsicsSymbolTable().arn_resolver('test', service_name="sns") + self.assertEquals(res, "arn:aws:sns:us-east-1:123456789012:test") + + def test_resolver_ignore_errors(self): + resolver = IntrinsicsSymbolTable() + res = resolver.resolve_symbols('UNKNOWN', "SOME UNKNOWN RESOURCE PROPERTY", ignore_errors=True) + self.assertEqual(res, "$UNKNOWN.SOME UNKNOWN RESOURCE PROPERTY") + + def test_symbol_resolver_unknown_fail(self): + resolver = IntrinsicsSymbolTable() + with self.assertRaises(InvalidSymbolException): + resolver.resolve_symbols('UNKNOWN', "SOME UNKNOWN RESOURCE PROPERTY")