Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(terraform): TF Dynamic Blocks support - for_each lists type #3737

Merged
merged 4 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions checkov/terraform/graph_builder/graph_components/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@


class TerraformBlock(Block):
__slots__ = ("module_connections", "module_dependency", "module_dependency_num", "source_module")
__slots__ = ("module_connections", "module_dependency", "module_dependency_num", "source_module", "has_dynamic_block")

def __init__(self, name: str, config: Dict[str, Any], path: str, block_type: BlockType, attributes: Dict[str, Any],
id: str = "", source: str = "") -> None:
id: str = "", source: str = "", has_dynamic_block: bool = False) -> None:
"""
:param name: unique name given to the terraform block, for example: 'aws_vpc.example_name'
:param config: the section in tf_definitions that belong to this block
Expand All @@ -33,6 +33,7 @@ def __init__(self, name: str, config: Dict[str, Any], path: str, block_type: Blo
self.attributes = attributes
self.module_connections: Dict[str, List[int]] = {}
self.source_module: Set[int] = set()
self.has_dynamic_block = has_dynamic_block

def add_module_connection(self, attribute_key: str, vertex_id: int) -> None:
self.module_connections.setdefault(attribute_key, []).append(vertex_id)
Expand Down
3 changes: 2 additions & 1 deletion checkov/terraform/graph_builder/graph_components/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def _add_resource(self, blocks: List[Dict[str, Dict[str, Any]]], path: str) -> N
attributes = self.clean_bad_characters(resource_conf)
if not isinstance(attributes, dict):
continue
handle_dynamic_values(attributes)
has_dynamic_block = handle_dynamic_values(attributes)
provisioner = attributes.get("provisioner")
if provisioner:
self._handle_provisioner(provisioner, attributes)
Expand All @@ -164,6 +164,7 @@ def _add_resource(self, blocks: List[Dict[str, Dict[str, Any]]], path: str) -> N
attributes=attributes,
id=f"{resource_type}.{name}",
source=self.source,
has_dynamic_block=has_dynamic_block
)
self._add_to_blocks(resource_block)

Expand Down
5 changes: 4 additions & 1 deletion checkov/terraform/graph_builder/local_graph.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import logging
import os
from collections import defaultdict
Expand Down Expand Up @@ -33,9 +35,10 @@ class Undetermined(TypedDict):
variable_vertex_id: int


class TerraformLocalGraph(LocalGraph):
class TerraformLocalGraph(LocalGraph[TerraformBlock]):
def __init__(self, module: Module) -> None:
super().__init__()
self.vertices: list[TerraformBlock] = []
self.module = module
self.map_path_to_module: Dict[str, List[int]] = {}
self.relative_paths_cache = {}
Expand Down
54 changes: 53 additions & 1 deletion checkov/terraform/graph_builder/variable_rendering/renderer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import annotations

from ast import literal_eval
import logging
import os
Expand Down Expand Up @@ -276,7 +277,58 @@ def replace_value(
return new_val

def _render_variables_from_vertices(self) -> None:
pass
self._render_dynamic_blocks()

def _render_dynamic_blocks(self) -> None:
vertex_indices = self.local_graph.vertices_by_block_type[BlockType.RESOURCE]

for idx in vertex_indices:
vertex = self.local_graph.vertices[idx]
if vertex.has_dynamic_block:
# only check dynamic blocks on the root level for now
dynamic_blocks = vertex.attributes.get("dynamic")
if dynamic_blocks:
rendered_blocks = self._process_dynamic_blocks(dynamic_blocks)
changed_attributes = []

for block_name, block_confs in rendered_blocks.items():
vertex.update_inner_attribute(block_name, vertex.attributes, block_confs)
changed_attributes.append(block_name)

self.local_graph.update_vertex_config(vertex, changed_attributes)

@staticmethod
def _process_dynamic_blocks(dynamic_blocks: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
rendered_blocks: dict[str, list[dict[str, Any]]] = {}

if not isinstance(dynamic_blocks, list):
logging.info(f"Dynamic blocks found, but of type {type(dynamic_blocks)}")
return rendered_blocks

for block in dynamic_blocks:
block_name, block_values = next(iter(block.items())) # only one block per dynamic_block
block_content = block_values.get("content")
dynamic_values = block_values.get("for_each")
if not block_content or not dynamic_values:
return rendered_blocks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not continue the loop in this case? isn't it possible to have other valid dynamic blocks?


dynamic_value_ref = f"{block_name}.value"
dynamic_arguments = [
argument
for argument, value in block_content.items()
if value == dynamic_value_ref
]
if dynamic_arguments:
block_confs = []
for dynamic_value in dynamic_values:
block_conf = deepcopy(block_content)
for dynamic_argument in dynamic_arguments:
block_conf[dynamic_argument] = dynamic_value

block_confs.append(block_conf)
rendered_blocks[block_name] = block_confs

return rendered_blocks

def evaluate_non_rendered_values(self) -> None:
for index, vertex in enumerate(self.local_graph.vertices):
Expand Down
29 changes: 20 additions & 9 deletions checkov/terraform/parser_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,26 +170,37 @@ def _check_map_type_consistency(value: Dict) -> Dict:
return value


def handle_dynamic_values(conf: Dict[str, List[Any]]) -> None:
def handle_dynamic_values(conf: Dict[str, List[Any]], has_dynamic_block: bool = False) -> bool:
# recursively search for blocks that are dynamic
for block_name in conf.keys():
if isinstance(conf[block_name], dict):
handle_dynamic_values(conf[block_name])
conf_block = conf[block_name]
if isinstance(conf_block, dict):
has_dynamic_block = handle_dynamic_values(conf_block, has_dynamic_block)

# if the configuration is a block element, search down again.
if isinstance(conf[block_name], list) and conf[block_name] and isinstance(conf[block_name][0], dict):
handle_dynamic_values(conf[block_name][0])
if conf_block and isinstance(conf_block, list) and isinstance(conf_block[0], dict):
has_dynamic_block = handle_dynamic_values(conf_block[0], has_dynamic_block)

process_dynamic_values(conf)
# if a dynamic block exists somewhere in the resource it will return True
return process_dynamic_values(conf) or has_dynamic_block


def process_dynamic_values(conf: Dict[str, List[Any]]) -> None:
def process_dynamic_values(conf: Dict[str, List[Any]]) -> bool:
has_dynamic_block = False
for dynamic_element in conf.get("dynamic", {}):
if isinstance(dynamic_element, str):
try:
dynamic_element = json.loads(dynamic_element)
except Exception:
dynamic_element = {}

for element_name in dynamic_element.keys():
conf[element_name] = dynamic_element[element_name].get("content", [])
for element_name, element_value in dynamic_element.items():
if "content" in element_value:
conf[element_name] = element_value["content"]
else:
# this should be the result of a successful dynamic block rendering
conf[element_name] = element_value

has_dynamic_block = True

return has_dynamic_block
20 changes: 20 additions & 0 deletions tests/terraform/graph/variable_rendering/test_renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,23 @@ def test_tfvar_rendering_module_vars(self):
local_graph, _ = graph_manager.build_graph_from_source_directory(resource_path, render_variables=True)
resources_vertex = list(filter(lambda v: v.block_type == BlockType.RESOURCE, local_graph.vertices))
assert resources_vertex[0].attributes.get('name') == ['airpods']

def test_dynamic_blocks_with_list(self):
resource_paths = [
os.path.join(TEST_DIRNAME, "test_resources", "dynamic_blocks_resource"),
os.path.join(TEST_DIRNAME, "test_resources", "dynamic_blocks_variable_rendering"),
os.path.join(TEST_DIRNAME, "test_resources", "dynamic_blocks_tfvars"),

]
for path in resource_paths:
graph_manager = TerraformGraphManager('m', ['m'])
local_graph, _ = graph_manager.build_graph_from_source_directory(path, render_variables=True)
resources_vertex = list(filter(lambda v: v.block_type == BlockType.RESOURCE, local_graph.vertices))
assert len(resources_vertex[0].attributes.get('ingress')) == 2
assert len(resources_vertex[0].attributes.get('egress')) == 2
assert resources_vertex[0].attributes.get('ingress') == \
[{'cidr_blocks': ['0.0.0.0/0'], 'from_port': 80, 'protocol': 'tcp', 'to_port': 80},
{'cidr_blocks': ['0.0.0.0/0'], 'from_port': 443, 'protocol': 'tcp', 'to_port': 443}]
assert resources_vertex[0].attributes.get('egress') == \
[{'cidr_blocks': ['0.0.0.0/0'], 'from_port': 443, 'protocol': 'tcp', 'to_port': 443},
{'cidr_blocks': ['0.0.0.0/0'], 'from_port': 1433, 'protocol': 'tcp', 'to_port': 1433}]
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
locals {
inbound_ports = [80, 443]
outbound_ports = [443, 1433]
}

resource "aws_security_group" "list_example" {
name = "list-example"

dynamic "ingress" {
for_each = local.inbound_ports
content {
from_port = ingress.value
to_port = ingress.value
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
}

dynamic "egress" {
for_each = local.outbound_ports
content {
from_port = egress.value
to_port = egress.value
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
resource "aws_security_group" "list_example" {
name = "list-example"

dynamic "ingress" {
for_each = var.dynamic.inbound_ports
content {
from_port = ingress.value
to_port = ingress.value
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
}

dynamic "egress" {
for_each = var.dynamic.outbound_ports
content {
from_port = egress.value
to_port = egress.value
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
dynamic = {
inbound_ports = [80, 443]
outbound_ports = [443, 1433]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
variable "dynamic" {
description = "TODO"
type = object({
outbound_ports = list(string)
inbound_ports = list(string)
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
resource "aws_security_group" "list_example" {
name = "list-example"

dynamic "ingress" {
for_each = var.dynamic.inbound_ports
content {
from_port = ingress.value
to_port = ingress.value
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
}

dynamic "egress" {
for_each = var.dynamic.outbound_ports
content {
from_port = egress.value
to_port = egress.value
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
variable "dynamic" {
inbound_ports = [80, 443]
outbound_ports = [443, 1433]
}