Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(terraform): Support for nested dynamic modules #3813

Merged
merged 10 commits into from
Nov 9, 2022
6 changes: 5 additions & 1 deletion checkov/common/graph/graph_builder/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ def run_function_multithreaded(
def filter_sub_keys(key_list: list[str]) -> list[str]:
filtered_key_list = []
for key in key_list:
if not any(other_key != key and other_key.startswith(key) for other_key in key_list):
if not any(other_key != key and other_key.startswith(key) for other_key in key_list) and is_include_dup_dynamic(key, key_list):
filtered_key_list.append(key)
return filtered_key_list


def is_include_dup_dynamic(key: str, list_keys: list[str]) -> bool:
return f"dynamic.{key.split('.')[0]}" not in list_keys
15 changes: 15 additions & 0 deletions checkov/terraform/graph_builder/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,21 @@ def attribute_has_nested_attributes(attribute_key: str, attributes: Dict[str, An
return isinstance(attributes[attribute_key], list) or isinstance(attributes[attribute_key], dict)


def attribute_has_dup_with_dynamic_attributes(attribute_key: str, attributes: dict[str, Any] | list[str]) -> bool:
ChanochShayner marked this conversation as resolved.
Show resolved Hide resolved
"""
:param attribute_key: key inside the `attributes` dictionary
:param attributes: `attributes` dictionary
:return: True if attribute_key has duplicate attribute with dynamic reference.
:example: if attributes.keys == [name.rule, dynamic.name.content.rule] -> will return True.
"""
attribute_key_paths = attribute_key.split('.')
if len(attribute_key_paths) > 1:
attar_key_dynamic_ref = f"dynamic.{attribute_key_paths[0]}.content.{attribute_key_paths[1]}"
return attar_key_dynamic_ref in attributes
else:
return False


def get_related_resource_id(resource: dict[str, Any], file_path_to_referred_id: dict[str, str]) -> str:
resource_id = resource.get(CustomAttributes.ID)
# for external modules resources the id should start with the prefix module.[module_name]
Expand Down
27 changes: 21 additions & 6 deletions checkov/terraform/graph_builder/variable_rendering/renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from checkov.terraform.graph_builder.utils import (
get_referenced_vertices_in_value,
remove_index_pattern_from_str,
attribute_has_nested_attributes,
attribute_has_nested_attributes, attribute_has_dup_with_dynamic_attributes,
)
from checkov.terraform.graph_builder.variable_rendering.vertex_reference import VertexReference
from checkov.terraform.graph_builder.variable_rendering.evaluate_terraform import replace_string_value, \
Expand All @@ -33,6 +33,7 @@
'map': {}
}

DYNAMIC_STRING = 'dynamic'
DYNAMIC_BLOCKS_LISTS = 'list'
DYNAMIC_BLOCKS_MAPS = 'map'
LEFT_BRACKET_WITH_QUOTATION = '["'
Expand Down Expand Up @@ -323,28 +324,41 @@ def _process_dynamic_blocks(dynamic_blocks: list[dict[str, Any]] | dict[str, Any
if not block_content or not dynamic_values:
continue

dynamic_value_ref = f"{block_name}.value"
dynamic_value_dot_ref = f"{block_name}.value"
dynamic_value_bracket_ref = f'{block_name}["value"]'
dynamic_value_refs = (dynamic_value_dot_ref, dynamic_value_bracket_ref)
dynamic_arguments = [
argument
for argument, value in block_content.items()
if value == dynamic_value_ref or isinstance(value, str) and dynamic_value_ref in value
if value in dynamic_value_refs or isinstance(value, str) and dynamic_value_dot_ref in value
]

if dynamic_arguments:
block_confs = []
for dynamic_value in dynamic_values:
block_conf = deepcopy(block_content)
block_conf.pop(DYNAMIC_STRING, None)
for dynamic_argument in dynamic_arguments:
if dynamic_type == DYNAMIC_BLOCKS_MAPS:
if not isinstance(dynamic_value, dict):
continue
dynamic_value_in_map = TerraformVariableRenderer.extract_dynamic_value_in_map(block_content[dynamic_argument])
block_conf[dynamic_argument] = dynamic_value[dynamic_value_in_map]
dynamic_value_in_map = TerraformVariableRenderer.extract_dynamic_value_in_map(
block_content[dynamic_argument]
)
if block_name not in dynamic_value:
block_conf[dynamic_argument] = dynamic_value[dynamic_value_in_map]
else:
block_conf[dynamic_argument] = dynamic_value[block_name][0][dynamic_value_in_map]
else:
block_conf[dynamic_argument] = dynamic_value

block_confs.append(block_conf)
rendered_blocks[block_name] = block_confs
rendered_blocks[block_name] = block_confs if len(block_confs) > 1 else block_confs[0]

if DYNAMIC_STRING in block_content:
next_key = next(iter(block_content[DYNAMIC_STRING].keys()))
block_content[DYNAMIC_STRING][next_key]['for_each'] = dynamic_values
rendered_blocks.update(TerraformVariableRenderer._process_dynamic_blocks(block_content[DYNAMIC_STRING]))

return rendered_blocks

Expand All @@ -357,6 +371,7 @@ def evaluate_non_rendered_values(self) -> None:
attr
for attr in vertex.attributes
if attr not in reserved_attribute_names and not attribute_has_nested_attributes(attr, vertex.attributes)
and not attribute_has_dup_with_dynamic_attributes(attr, vertex.attributes)
]
for attribute in filtered_attributes:
curr_val = vertex.attributes.get(attribute)
Expand Down
41 changes: 41 additions & 0 deletions tests/terraform/graph/variable_rendering/test_renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,19 @@ def test_dynamic_blocks_breadcrumbs(self):
assert 'ingress.from_port' in self.breadcrumbs['/main.tf']['aws_security_group.single_dynamic_example']
assert 'ingress.to_port' in self.breadcrumbs['/main.tf']['aws_security_group.single_dynamic_example']

def test_nested_dynamic_blocks_breadcrumbs(self):
root_folder = os.path.join(TEST_DIRNAME, "test_resources", "dynamic_blocks_with_nested")
graph_manager = TerraformGraphManager('m', ['m'])
local_graph, _ = graph_manager.build_graph_from_source_directory(root_folder, render_variables=True)
self.definitions, self.breadcrumbs = convert_graph_vertices_to_tf_definitions(
local_graph.vertices,
root_folder,
)
# Test multiple dynamic blocks
assert 'required_resource_access.resource_app_id' in self.breadcrumbs['/main.tf']['azuread_application.bootstrap']
assert 'required_resource_access.resource_access.id' in self.breadcrumbs['/main.tf']['azuread_application.bootstrap']
assert 'required_resource_access.resource_access.type' in self.breadcrumbs['/main.tf']['azuread_application.bootstrap']

def test_list_entry_rendering_module_vars(self):
# given
resource_path = Path(TEST_DIRNAME) / "test_resources/list_entry_module_var"
Expand Down Expand Up @@ -307,3 +320,31 @@ def test_dynamic_with_env_var_false(self):
resources_vertex = list(filter(lambda v: v.block_type == BlockType.RESOURCE, local_graph.vertices))
assert not resources_vertex[0].attributes.get('ingress')
assert not resources_vertex[0].attributes.get('egress')

def test_dynamic_blocks_with_nested_map(self):
resource_paths = [
os.path.join(TEST_DIRNAME, 'test_resources', 'dynamic_blocks_with_nested'),
]
for path in resource_paths:
graph_manager = TerraformGraphManager('m', ['m'])
local_graph, _ = graph_manager.build_graph_from_source_directory(path, render_variables=True)
resources_vertex = list(filter(lambda v: v.block_type == BlockType.RESOURCE, local_graph.vertices))
assert len(resources_vertex[0].attributes.get('required_resource_access')) == 2
assert resources_vertex[0].attributes.get('required_resource_access') == \
{'resource_app_id': '00000003-0000-0000-c000-000000000000',
'resource_access': {'id': '7ab1d382-f21e-4acd-a863-ba3e13f7da61', 'type': 'Role'}}

def test_dynamic_example_for_security_rule(self):
graph_manager = TerraformGraphManager('m', ['m'])
local_graph, _ = graph_manager.build_graph_from_source_directory(os.path.join(TEST_DIRNAME, "test_resources", "dynamic_block_map_example"), render_variables=True)
resources_vertex = list(filter(lambda v: v.block_type == BlockType.RESOURCE, local_graph.vertices))
assert resources_vertex[0].attributes.get('security_rule') == [
{'access': 'Allow', 'destination_address_prefix': '*', 'destination_port_range': 80, 'direction': 'Inbound', 'name': 'AllowHttpIn', 'priority': 100, 'protocol': 'Tcp', 'source_address_prefix': '*', 'source_port_range': '*'},
{'access': 'Allow', 'destination_address_prefix': '*', 'destination_port_range': 443, 'direction': 'Inbound', 'name': 'AllowHttpsIn', 'priority': 110, 'protocol': 'Tcp', 'source_address_prefix': '*', 'source_port_range': '*'},
{'access': 'Allow', 'destination_address_prefix': '*', 'destination_port_range': 3389, 'direction': 'Inbound', 'name': 'AllowRdpIn', 'priority': 120, 'protocol': 'Tcp', 'source_address_prefix': '*', 'source_port_range': '*'},
{'access': 'Allow', 'destination_address_prefix': '*', 'destination_port_range': '*', 'direction': 'Inbound', 'name': 'AllowIcmpIn', 'priority': 130, 'protocol': 'Icmp', 'source_address_prefix': '*', 'source_port_range': '*'}]
assert resources_vertex[1].attributes.get('security_rule') == [
{'access': 'Deny', 'destination_address_prefix': '*', 'destination_port_range': 80, 'direction': 'Inbound', 'name': 'DenyHttpIn', 'priority': 100, 'protocol': 'Tcp', 'source_address_prefix': '*', 'source_port_range': '*'},
{'access': 'Allow', 'destination_address_prefix': '*', 'destination_port_range': 443, 'direction': 'Inbound', 'name': 'AllowHttpsIn', 'priority': 110, 'protocol': 'Tcp', 'source_address_prefix': '35.181.123.80/32', 'source_port_range': '*'},
{'access': 'Deny', 'destination_address_prefix': '*', 'destination_port_range': 3389, 'direction': 'Inbound', 'name': 'DenyRdpIn', 'priority': 120, 'protocol': 'Tcp', 'source_address_prefix': '*', 'source_port_range': '*'},
{'access': 'Deny', 'destination_address_prefix': '*', 'destination_port_range': '*', 'direction': 'Inbound', 'name': 'DenyIcmpIn', 'priority': 130, 'protocol': 'Icmp', 'source_address_prefix': '*', 'source_port_range': '*'}]
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
data "azurerm_resource_group" "abc-azr-lab" {
name = "abc-azr-lab"
}

resource "azurerm_network_security_group" "dynamic_nsg_fail" {
name = var.nsg_name_fail
location = data.azurerm_resource_group.abc-azr-lab.location
resource_group_name = data.azurerm_resource_group.abc-azr-lab.name

dynamic "security_rule" {
for_each = var.fail_nsg_rules
content {
name = security_rule.value["name"]
priority = security_rule.value["priority"]
direction = security_rule.value["direction"]
access = security_rule.value["access"]
protocol = security_rule.value["protocol"]
source_port_range = security_rule.value["source_port_range"]
destination_port_range = security_rule.value["destination_port_range"]
source_address_prefix = security_rule.value["source_address_prefix"]
destination_address_prefix = security_rule.value["destination_address_prefix"]
}
}
}

resource "azurerm_network_security_group" "dynamic_nsg_pass" {
name = var.nsg_name_pass
location = data.azurerm_resource_group.abc-azr-lab.location
resource_group_name = data.azurerm_resource_group.abc-azr-lab.name

dynamic "security_rule" {
for_each = var.pass_nsg_rules
content {
name = security_rule.value["name"]
priority = security_rule.value["priority"]
direction = security_rule.value["direction"]
access = security_rule.value["access"]
protocol = security_rule.value["protocol"]
source_port_range = security_rule.value["source_port_range"]
destination_port_range = security_rule.value["destination_port_range"]
source_address_prefix = security_rule.value["source_address_prefix"]
destination_address_prefix = security_rule.value["destination_address_prefix"]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
subnet_list = [
{
name = "dynamic_subnet1"
address_prefix = "10.100.1.0/24"
security_group = "azurerm_network_security_group.dynamic_nsg_pass.id"
},
{
name = "dynamic_subnet2"
address_prefix = "10.100.2.0/24"
security_group = "azurerm_network_security_group.dynamic_nsg_pass.id"
},
{
name = "dynamic_subnet3"
address_prefix = "10.100.3.0/24"
security_group = "azurerm_network_security_group.dynamic_nsg_pass.id"
}
]

fail_nsg_rules = [
{
name = "AllowHttpIn"
priority = 100
direction = "Inbound"
access = "Allow"
protocol = "Tcp"
source_port_range = "*"
destination_port_range = "80"
source_address_prefix = "*"
destination_address_prefix = "*"
},
{
name = "AllowHttpsIn"
priority = 110
direction = "Inbound"
access = "Allow"
protocol = "Tcp"
source_port_range = "*"
destination_port_range = "443"
source_address_prefix = "*"
destination_address_prefix = "*"
},
{
name = "AllowRdpIn"
priority = 120
direction = "Inbound"
access = "Allow"
protocol = "Tcp"
source_port_range = "*"
destination_port_range = "3389"
source_address_prefix = "*"
destination_address_prefix = "*"
},
{
name = "AllowIcmpIn"
priority = 130
direction = "Inbound"
access = "Allow"
protocol = "Icmp"
source_port_range = "*"
destination_port_range = "*"
source_address_prefix = "*"
destination_address_prefix = "*"
}
]


pass_nsg_rules = [
{
name = "DenyHttpIn"
priority = 100
direction = "Inbound"
access = "Deny"
protocol = "Tcp"
source_port_range = "*"
destination_port_range = "80"
source_address_prefix = "*"
destination_address_prefix = "*"
},
{
name = "AllowHttpsIn"
priority = 110
direction = "Inbound"
access = "Allow"
protocol = "Tcp"
source_port_range = "*"
destination_port_range = "443"
source_address_prefix = "35.181.123.80/32"
destination_address_prefix = "*"
},
{
name = "DenyRdpIn"
priority = 120
direction = "Inbound"
access = "Deny"
protocol = "Tcp"
source_port_range = "*"
destination_port_range = "3389"
source_address_prefix = "*"
destination_address_prefix = "*"
},
{
name = "DenyIcmpIn"
priority = 130
direction = "Inbound"
access = "Deny"
protocol = "Icmp"
source_port_range = "*"
destination_port_range = "*"
source_address_prefix = "*"
destination_address_prefix = "*"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
variable "rg_name" {
type = string
default = "abc-azr-lab"
}

variable "rg_location" {
type = string
default = "East US"
}

variable "vnet_name" {
type = string
default = "dynamic_vnet"
}

variable "nsg_name_fail" {
type = string
default = "dynamic_nsg_fail"
}

variable "nsg_name_pass" {
type = string
default = "dynamic_nsg_pass"
}

variable "tags" {
type = list(string)
default = ["testing", "dynamic_block"]
}

variable "address_space" {
type = list(string)
default = ["10.100.0.0/16"]
}

variable "subnet_list" {
type = list(object({
name = string
address_prefix = string
security_group = string
}))
description = "Values for each subnet"
}

variable "fail_nsg_rules" {
type = list(object({
name = string
priority = number
direction = string
access = string
protocol = string
source_port_range = string
destination_port_range = string
source_address_prefix = string
destination_address_prefix = string
}))
description = "Values for each NSG rule"
}

variable "pass_nsg_rules" {
type = list(object({
name = string
priority = number
direction = string
access = string
protocol = string
source_port_range = string
destination_port_range = string
source_address_prefix = string
destination_address_prefix = string
}))
description = "Values for each NSG rule"
}
Loading