Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(terraform): Dynamic Modules Support map type #3800

Merged
merged 5 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def scan_resource_conf(self, conf):
return CheckResult.UNKNOWN
if not instance_count:
return CheckResult.UNKNOWN
if instance_count <= 1:
if isinstance(instance_count, int) and instance_count <= 1:
return CheckResult.PASSED

self.evaluated_keys.append('node_to_node_encryption/[0]/enabled')
Expand Down
43 changes: 33 additions & 10 deletions checkov/terraform/graph_builder/variable_rendering/renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
'list': [],
'map': {}
}

DYNAMIC_BLOCKS_LISTS = 'list'
DYNAMIC_BLOCKS_MAPS = 'map'

# matches the internal value of the 'type' attribute: usually like '${map}' or '${map(string)}', but could possibly just
# be like 'map' or 'map(string)' (but once we hit a ( or } we can stop)
TYPE_REGEX = re.compile(r'^(\${)?([a-z]+)')
Expand Down Expand Up @@ -298,32 +302,41 @@ def _render_dynamic_blocks(self) -> None:
self.local_graph.update_vertex_config(vertex, changed_attributes)

@staticmethod
def _process_dynamic_blocks(dynamic_blocks: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
def _process_dynamic_blocks(dynamic_blocks: list[dict[str, Any]] | dict[str, Any])\
-> dict[str, list[dict[str, Any]]]:
rendered_blocks: dict[str, list[dict[str, Any]]] = {}
dynamic_type = DYNAMIC_BLOCKS_LISTS

if not isinstance(dynamic_blocks, list):
ChanochShayner marked this conversation as resolved.
Show resolved Hide resolved
logging.info(f"Dynamic blocks found, but of type {type(dynamic_blocks)}")
return rendered_blocks
dynamic_blocks = [dynamic_blocks]
ChanochShayner marked this conversation as resolved.
Show resolved Hide resolved
dynamic_type = DYNAMIC_BLOCKS_MAPS
logging.debug(f"Dynamic blocks found, with type {type(dynamic_blocks)}")

for block in dynamic_blocks:
block_name, block_values = next(iter(block.items())) # only one block per dynamic_block
block_content = block_values.get("content")
dynamic_values = block_values.get("for_each")
if not block_content or not dynamic_values:
return rendered_blocks
continue

dynamic_value_ref = f"{block_name}.value"
dynamic_arguments = [
argument
for argument, value in block_content.items()
if value == dynamic_value_ref
]
dynamic_arguments = []
for argument, value in block_content.items():
if value == dynamic_value_ref or isinstance(value, str) and dynamic_value_ref in value:
dynamic_arguments.append(argument)
ChanochShayner marked this conversation as resolved.
Show resolved Hide resolved

if dynamic_arguments:
block_confs = []
for dynamic_value in dynamic_values:
block_conf = deepcopy(block_content)
for dynamic_argument in dynamic_arguments:
block_conf[dynamic_argument] = dynamic_value
if dynamic_type == DYNAMIC_BLOCKS_MAPS:
if not isinstance(dynamic_value, dict):
continue
ChanochShayner marked this conversation as resolved.
Show resolved Hide resolved
dynamic_value_in_map = TerraformVariableRenderer.extract_dynamic_value_in_map(block_content[dynamic_argument])
block_conf[dynamic_argument] = dynamic_value[dynamic_value_in_map]
else:
block_conf[dynamic_argument] = dynamic_value

block_confs.append(block_conf)
rendered_blocks[block_name] = block_confs
Expand Down Expand Up @@ -366,6 +379,16 @@ def evaluate_non_rendered_values(self) -> None:
changed_attributes[attribute] = evaluated
self.local_graph.update_vertex_config(vertex, changed_attributes)

@staticmethod
def extract_dynamic_value_in_map(dynamic_value: str) -> str:
left_bracket_with_quotation = '["'
right_bracket_with_quotation = '"]'
left_bracket = '['
ChanochShayner marked this conversation as resolved.
Show resolved Hide resolved
dynamic_value_in_map = dynamic_value.split('.')[-1]
if left_bracket not in dynamic_value_in_map:
return dynamic_value_in_map
return dynamic_value_in_map.split(left_bracket_with_quotation)[-1].replace(right_bracket_with_quotation, '')

def evaluate_value(self, val: Any) -> Any:
val_length: int = len(str(val))
if CHECKOV_RENDER_MAX_LEN and 0 < CHECKOV_RENDER_MAX_LEN < val_length:
Expand Down
20 changes: 20 additions & 0 deletions tests/terraform/graph/variable_rendering/test_renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,26 @@ def test_dynamic_blocks_with_list(self):
[{'cidr_blocks': ['0.0.0.0/0'], 'from_port': 443, 'protocol': 'tcp', 'to_port': 443},
{'cidr_blocks': ['0.0.0.0/0'], 'from_port': 1433, 'protocol': 'tcp', 'to_port': 1433}]

def test_dynamic_blocks_with_map(self):
resource_paths = [
os.path.join(TEST_DIRNAME, "test_resources", "dynamic_blocks_map"),
os.path.join(TEST_DIRNAME, "test_resources", "dynamic_blocks_map_brackets"),
]
for path in resource_paths:
graph_manager = TerraformGraphManager('m', ['m'])
local_graph, _ = graph_manager.build_graph_from_source_directory(path, render_variables=True)
resources_vertex = list(filter(lambda v: v.block_type == BlockType.RESOURCE, local_graph.vertices))
assert len(resources_vertex[0].attributes.get('ingress')) == 2
assert resources_vertex[0].attributes.get('ingress') == \
[{'action': 'allow', 'cidr_block': '10.0.0.1/32', 'from_port': 22, 'protocol': 'tcp', 'rule_no': 1,
'to_port': 22},
{'action': 'allow', 'cidr_block': '10.0.0.2/32', 'from_port': 22, 'protocol': 'tcp', 'rule_no': 2,
'to_port': 22}]

def test_extract_dynamic_value_in_map(self):
self.assertEqual(TerraformVariableRenderer.extract_dynamic_value_in_map('value.value1.value2'), 'value2')
self.assertEqual(TerraformVariableRenderer.extract_dynamic_value_in_map('value.value1["value2"]'), 'value2')

def test_list_entry_rendering_module_vars(self):
# given
resource_path = Path(TEST_DIRNAME) / "test_resources/list_entry_module_var"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
resource "aws_network_acl" "network_acl" {
vpc_id = data.aws_vpc

dynamic "ingress" {
for_each = var.http_headers
content {
rule_no = ingress.value.num
protocol = ingress.value.protoc
action = "allow"
cidr_block = ingress.value.values
from_port = 22
to_port = 22
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
variable "http_headers" {
type = list(object({
num = number
values = string
}))
default = [{
"num": 1,
"protoc": "tcp",
"values": "10.0.0.1/32"
},
{
"num": 2,
"protoc": "tcp",
"values": "10.0.0.2/32"
}]
}

variable "aws_vpc" {
default = true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
resource "aws_network_acl" "network_acl" {
vpc_id = data.aws_vpc

dynamic "ingress" {
for_each = var.http_headers
content {
rule_no = ingress.value["num"]
protocol = ingress.value["protoc"]
action = "allow"
cidr_block = ingress.value["values"]
from_port = 22
to_port = 22
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
variable "http_headers" {
type = list(object({
num = number
values = string
}))
default = [{
"num": 1,
"protoc": "tcp",
"values": "10.0.0.1/32"
},
{
"num": 2,
"protoc": "tcp",
"values": "10.0.0.2/32"
}]
}

variable "aws_vpc" {
default = true
}