Skip to content

Commit

Permalink
fix(terraform): render list entries via modules correctly (#3781)
Browse files Browse the repository at this point in the history
* render list entries via modules correctly

* extract the update list logic to only override for TF

* ensure type of key_parts
  • Loading branch information
gruebel authored Nov 3, 2022
1 parent 8668d71 commit fcaf014
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 10 deletions.
11 changes: 8 additions & 3 deletions checkov/common/graph/graph_builder/graph_components/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,8 @@ def update_attribute(
previous_breadcrumbs.append(BreadcrumbMetadata(change_origin_id, attribute_at_dest))

# update the numbered attributes, if the new value is a list
if isinstance(attribute_value, list):
for idx, value in enumerate(attribute_value):
self.attributes[f"{attribute_key}.{idx}"] = value
if attribute_value and isinstance(attribute_value, list):
self.update_list_attribute(attribute_key=attribute_key, attribute_value=attribute_value)

attribute_key_parts = attribute_key.split(".")
if len(attribute_key_parts) == 1:
Expand Down Expand Up @@ -197,6 +196,12 @@ def update_inner_attribute(
elif curr_key in nested_attributes.keys():
self.update_inner_attribute(".".join(split_key[i:]), nested_attributes[curr_key], value_to_update)

def update_list_attribute(self, attribute_key: str, attribute_value: Any) -> None:
"""Updates list attributes with their index"""

for idx, value in enumerate(attribute_value):
self.attributes[f"{attribute_key}.{idx}"] = value

@staticmethod
def _should_add_previous_breadcrumbs(
change_origin_id: int | None, previous_breadcrumbs: list[BreadcrumbMetadata], attribute_at_dest: str | None
Expand Down
12 changes: 12 additions & 0 deletions checkov/terraform/graph_builder/graph_components/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ def find_attribute(self, attribute: Optional[Union[str, List[str]]]) -> Optional

return None

def update_list_attribute(self, attribute_key: str, attribute_value: Any) -> None:
"""Updates list attributes with their index
This needs to be overridden, because of our hcl parser adding a list around any value
"""

if attribute_key not in self.attributes or isinstance(self.attributes[attribute_key][0], list):
# sometimes the attribute_value is a list and replaces the whole value of the key, which makes it a normal value
# ex. attribute_value = ["xyz"] and self.attributes[attribute_key][0] = "xyz"
for idx, value in enumerate(attribute_value):
self.attributes[f"{attribute_key}.{idx}"] = value

@classmethod
def get_inner_attributes(
cls,
Expand Down
48 changes: 41 additions & 7 deletions checkov/terraform/graph_builder/local_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from checkov.common.graph.graph_builder.graph_components.attribute_names import CustomAttributes
from checkov.common.graph.graph_builder.local_graph import LocalGraph
from checkov.common.graph.graph_builder.utils import calculate_hash, join_trimmed_strings, filter_sub_keys
from checkov.common.util.type_forcers import force_int
from checkov.terraform.checks.utils.dependency_path_handler import unify_dependency_path
from checkov.terraform.graph_builder.graph_components.block_types import BlockType
from checkov.terraform.graph_builder.graph_components.blocks import TerraformBlock
Expand Down Expand Up @@ -474,21 +475,54 @@ def update_dictionary_attribute(
config: Union[List[Any], Dict[str, Any]], key_to_update: str, new_value: Any
) -> Union[List[Any], Dict[str, Any]]:
key_parts = key_to_update.split(".")
if isinstance(config, dict):
if config.get(key_parts[0]) is not None:
key = key_parts[0]

if isinstance(config, dict) and isinstance(key_parts, list):
key = key_parts[0]
inner_config = config.get(key)

if inner_config is not None:
if len(key_parts) == 1:
if isinstance(config[key], list) and not isinstance(new_value, list):
if isinstance(inner_config, list) and not isinstance(new_value, list):
new_value = [new_value]
config[key] = new_value
return config
else:
config[key] = update_dictionary_attribute(config[key], ".".join(key_parts[1:]), new_value)
config[key] = update_dictionary_attribute(inner_config, ".".join(key_parts[1:]), new_value)
else:
for key in config:
config[key] = update_dictionary_attribute(config[key], key_to_update, new_value)
if isinstance(config, list):
for i, config_value in enumerate(config):
config[i] = update_dictionary_attribute(config_value, key_to_update, new_value)
return update_list_attribute(
config=config,
key_parts=key_parts,
key_to_update=key_to_update,
new_value=new_value,
)
return config


def update_list_attribute(
config: list[Any], key_parts: list[str], key_to_update: str, new_value: Any
) -> list[Any] | dict[str, Any]:
"""Updates a list attribute in the given config"""

if not config:
# happens when we can't correctly evaluate something, because of strange defaults or 'for_each' blocks
return config

if len(key_parts) == 1:
idx = force_int(key_parts[0])
inner_config = config[0]

if idx is not None and isinstance(inner_config, list):
if not inner_config:
# happens when config = [[]]
return config

inner_config[idx] = new_value
return config

for i, config_value in enumerate(config):
config[i] = update_dictionary_attribute(config=config_value, key_to_update=key_to_update, new_value=new_value)

return config
22 changes: 22 additions & 0 deletions tests/terraform/graph/variable_rendering/test_renderer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
from pathlib import Path
from unittest.case import TestCase

from checkov.common.graph.db_connectors.networkx.networkx_db_connector import NetworkxConnector
from checkov.terraform.graph_builder.graph_components.block_types import BlockType
from checkov.terraform.graph_manager import TerraformGraphManager
from checkov.terraform.graph_builder.variable_rendering.renderer import TerraformVariableRenderer
Expand Down Expand Up @@ -237,3 +239,23 @@ def test_dynamic_blocks_with_list(self):
assert resources_vertex[0].attributes.get('egress') == \
[{'cidr_blocks': ['0.0.0.0/0'], 'from_port': 443, 'protocol': 'tcp', 'to_port': 443},
{'cidr_blocks': ['0.0.0.0/0'], 'from_port': 1433, 'protocol': 'tcp', 'to_port': 1433}]

def test_list_entry_rendering_module_vars(self):
# given
resource_path = Path(TEST_DIRNAME) / "test_resources/list_entry_module_var"
graph_manager = TerraformGraphManager(NetworkxConnector())

# when
local_graph, _ = graph_manager.build_graph_from_source_directory(str(resource_path), render_variables=True)

# then
resource_vertex = next(v for v in local_graph.vertices if v.id == 'aws_security_group.sg')

self.assertEqual(
resource_vertex.config["aws_security_group"]["sg"]["ingress"][0]["cidr_blocks"][0],
["0.0.0.0/0"],
)
self.assertCountEqual(
resource_vertex.config["aws_security_group"]["sg"]["egress"][0]["cidr_blocks"][0],
["10.0.0.0/16", "0.0.0.0/0"],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
variable "vpc_id" {
type = string
}

variable "cidr_sg" {
type = string
default = "0.0.0.0/0"
}

resource "aws_security_group" "sg" {
name = "example"
vpc_id = var.vpc_id

ingress {
from_port = 22
to_port = 22
protocol = "TCP"
cidr_blocks = [var.cidr_sg]
}
egress {
from_port = 22
to_port = 22
protocol = "TCP"
cidr_blocks = ["10.0.0.0/16", var.cidr_sg]
}
}

0 comments on commit fcaf014

Please sign in to comment.