Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

break(terraform): return UNKNOWN for unrendered values in graph checks #3689

Merged
merged 21 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

class AnyResourceSolver(BaseAttributeSolver):
operator = Operators.ANY # noqa: CCE003 # a static attribute
is_value_attribute_check = False # noqa: CCE003 # a static attribute

def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bool:
return vertex is not None
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,10 @@
SUPPORTED_BLOCK_TYPES = {BlockType.RESOURCE, TerraformBlockType.DATA}
WILDCARD_PATTERN = re.compile(r"(\S+[.][*][.]*)+")

OPERATION_TO_FUNC = {
'all': all,
'any': any
}


class BaseAttributeSolver(BaseSolver):
operator = "" # noqa: CCE003 # a static attribute
is_value_attribute_check = True # noqa: CCE003 # a static attribute

def __init__(self, resource_types: List[str], attribute: Optional[str], value: Any, is_jsonpath_check: bool = False
) -> None:
Expand All @@ -40,28 +36,50 @@ def __init__(self, resource_types: List[str], attribute: Optional[str], value: A
self.is_jsonpath_check = is_jsonpath_check
self.parsed_attributes: Dict[Optional[str], Any] = {}

def run(self, graph_connector: DiGraph) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
def run(self, graph_connector: DiGraph) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]:
executer = ThreadPoolExecutor()
jobs = []
passed_vertices: List[Dict[str, Any]] = []
failed_vertices: List[Dict[str, Any]] = []
unknown_vertices: List[Dict[str, Any]] = []
for _, data in graph_connector.nodes(data=True):
if (not self.resource_types or data.get(CustomAttributes.RESOURCE_TYPE) in self.resource_types) \
and data.get(CustomAttributes.BLOCK_TYPE) in SUPPORTED_BLOCK_TYPES:
jobs.append(executer.submit(self._process_node, data, passed_vertices, failed_vertices))
jobs.append(executer.submit(
self._process_node, data, passed_vertices, failed_vertices, unknown_vertices))

concurrent.futures.wait(jobs)
return passed_vertices, failed_vertices
return passed_vertices, failed_vertices, unknown_vertices

def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]:
attr_val = vertex.get(self.attribute) # type:ignore[arg-type] # due to attribute can be None
# if this value contains an underendered variable, then we cannot evaluate value checks,
# and will return None (for UNKNOWN)
# handle edge cases in some policies that explicitly look for blank values
if self.is_value_attribute_check and self._is_variable_dependant(attr_val, vertex['source_']) \
and self.value != '':
return None

def get_operation(self, vertex: Dict[str, Any]) -> bool:
if self.attribute and (self.is_jsonpath_check or re.match(WILDCARD_PATTERN, self.attribute)):
attribute_matches = self.get_attribute_matches(vertex)
filtered_attribute_matches = attribute_matches
if self.is_value_attribute_check and self.value != '':
filtered_attribute_matches = [
a for a in attribute_matches
if not self._is_variable_dependant(vertex.get(a), vertex['source_'])
]

operator = OPERATION_TO_FUNC['all'] if self.is_jsonpath_check else OPERATION_TO_FUNC['any']
if attribute_matches:
return self.resource_type_pred(vertex, self.resource_types) and operator(
self._get_operation(vertex=vertex, attribute=attr) for attr in attribute_matches
)
if self.is_jsonpath_check:
if self.resource_type_pred(vertex, self.resource_types) and all(
self._get_operation(vertex=vertex, attribute=attr) for attr in filtered_attribute_matches):
return True if len(attribute_matches) == len(filtered_attribute_matches) else None
return False

if self.resource_type_pred(vertex, self.resource_types) and any(
self._get_operation(vertex=vertex, attribute=attr) for attr in filtered_attribute_matches):
return True
return False if len(attribute_matches) == len(filtered_attribute_matches) else None

return self.resource_type_pred(vertex, self.resource_types) and self._get_operation(
vertex=vertex, attribute=self.attribute
Expand All @@ -71,11 +89,16 @@ def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bo
raise NotImplementedError

def _process_node(
self, data: Dict[str, Any], passed_vartices: List[Dict[str, Any]], failed_vertices: List[Dict[str, Any]]
self, data: Dict[str, Any], passed_vartices: List[Dict[str, Any]], failed_vertices: List[Dict[str, Any]],
unknown_vertices: List[Dict[str, Any]]
) -> None:
if not self.resource_type_pred(data, self.resource_types):
return
if self.get_operation(vertex=data):
result = self.get_operation(vertex=data)
# A None indicate for UNKNOWN result - the vertex shouldn't be added to the passed or the failed vertices
if result is None:
unknown_vertices.append(data)
elif result:
passed_vartices.append(data)
else:
failed_vertices.append(data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,4 @@ class EndingWithAttributeSolver(BaseAttributeSolver):

def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bool:
attr = vertex.get(attribute) # type:ignore[arg-type] # due to attribute can be None

# if this value contains an underendered variable, then we cannot evaluate the check,
# so return True (since we cannot return UNKNOWN)
if self._is_variable_dependant(attr, vertex["source_"]):
return True

return isinstance(attr, str) and attr.endswith(self.value)
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ class EqualsAttributeSolver(BaseAttributeSolver):

def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bool:
attr_val = vertex.get(attribute) # type:ignore[arg-type] # due to attribute can be None
# if this value contains an underendered variable, then we cannot evaluate the check,
# so return True (since we cannot return UNKNOWN)
# handle edge cases in some policies that explicitly look for blank values
if self.value != '' and self._is_variable_dependant(attr_val, vertex['source_']):
return True
if type(attr_val) == bool or type(self.value) == bool:
# handle cases like str(False) == "false"
# generally self.value will be a string, but could be a bool if the policy was created straight from json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

class ExistsAttributeSolver(BaseAttributeSolver):
operator = Operators.EXISTS # noqa: CCE003 # a static attribute
is_value_attribute_check = False # noqa: CCE003 # a static attribute

def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bool:
return vertex.get(attribute) is not None # type:ignore[arg-type] # due to attribute can be None
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ class GreaterThanAttributeSolver(BaseAttributeSolver):

def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bool:
vertex_attr = vertex.get(attribute) # type:ignore[arg-type] # due to attribute can be None
# if this value contains an underendered variable, then we cannot evaluate the check,
# so return True (since we cannot return UNKNOWN)
if self._is_variable_dependant(vertex_attr, vertex['source_']):
return True
attr_float = force_float(vertex_attr)
value_float = force_float(self.value)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ class GreaterThanOrEqualAttributeSolver(BaseAttributeSolver):
def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bool:

vertex_attr = vertex.get(attribute) # type:ignore[arg-type] # due to attribute can be None
# if this value contains an underendered variable, then we cannot evaluate the check,
# so return True (since we cannot return UNKNOWN)
if self._is_variable_dependant(vertex_attr, vertex['source_']):
return True
attr_float = force_float(vertex_attr)
value_float = force_float(self.value)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ class IntersectsAttributeSolver(BaseAttributeSolver):
def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bool:
attr = vertex.get(attribute) # type:ignore[arg-type] # due to attribute can be None

# if this value contains an underendered variable, then we cannot evaluate the check,
# so return True (since we cannot return UNKNOWN)
if self._is_variable_dependant(attr, vertex['source_']):
return True

if isinstance(self.value, str) and isinstance(attr, Iterable):
return self.value in attr

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ class IsEmptyAttributeSolver(BaseAttributeSolver):
def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bool:
attr = vertex.get(attribute) # type:ignore[arg-type] # due to attribute can be None

# if this value contains an underendered variable, then we cannot evaluate the check,
# so return True (since we cannot return UNKNOWN)
if self._is_variable_dependant(attr, vertex["source_"]):
return True

if isinstance(attr, (list, Sized)):
return len(attr) == 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,4 @@ def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bo
if attr is None:
return False

# if this value contains an underendered variable, then we cannot evaluate the check,
# so return True (since we cannot return UNKNOWN)
if self._is_variable_dependant(attr, vertex['source_']):
return True

return attr is True
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bo
return False

attr = vertex.get(attribute) # type:ignore[arg-type] # due to attribute can be None
# if this value contains an underendered variable, then we cannot evaluate the check,
# so return True (since we cannot return UNKNOWN)
if self._is_variable_dependant(attr, vertex['source_']):
return True

if isinstance(attr, Sized):
return len(attr) == force_int(self.value)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bo
if attr is None:
return False

# if this value contains an underendered variable, then we cannot evaluate the check,
# so return True (since we cannot return UNKNOWN)
if self._is_variable_dependant(attr, vertex['source_']):
return True

value_int = force_int(self.value)

if value_int is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bo
if attr is None:
return False

# if this value contains an underendered variable, then we cannot evaluate the check,
# so return True (since we cannot return UNKNOWN)
if self._is_variable_dependant(attr, vertex['source_']):
return True

value_int = force_int(self.value)

if value_int is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bo
if attr is None:
return False

# if this value contains an underendered variable, then we cannot evaluate the check,
# so return True (since we cannot return UNKNOWN)
if self._is_variable_dependant(attr, vertex['source_']):
return True

value_int = force_int(self.value)

if value_int is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bo
if attr is None:
return False

# if this value contains an underendered variable, then we cannot evaluate the check,
# so return True (since we cannot return UNKNOWN)
if self._is_variable_dependant(attr, vertex['source_']):
return True

value_int = force_int(self.value)

if value_int is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,4 @@ def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bo
if vertex.get(attribute) is None: # type:ignore[arg-type] # due to attribute can be None
return False

attr_val = vertex.get(attribute) # type:ignore[arg-type] # due to attribute can be None
# if this value contains an underendered variable, then we cannot evaluate the check,
# so return True (since we cannot return UNKNOWN)
if self._is_variable_dependant(attr_val, vertex['source_']):
return True

return not super()._get_operation(vertex, attribute)
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,4 @@ def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bo
if vertex.get(attribute) is None: # type:ignore[arg-type] # due to attribute can be None
return False

attr_val = vertex.get(attribute) # type:ignore[arg-type] # due to attribute can be None
# if this value contains an underendered variable, then we cannot evaluate the check,
# so return True (since we cannot return UNKNOWN)
if self._is_variable_dependant(attr_val, vertex['source_']):
return True

return not super()._get_operation(vertex, attribute)
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,4 @@ class NotEqualsAttributeSolver(EqualsAttributeSolver):
operator = Operators.NOT_EQUALS # noqa: CCE003 # a static attribute

def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bool:
attr_val = vertex.get(attribute) # type:ignore[arg-type] # due to attribute can be None
# if this value contains an underendered variable, then we cannot evaluate the check,
# so return True (since we cannot return UNKNOWN)
if self._is_variable_dependant(attr_val, vertex['source_']):
return True
return not super()._get_operation(vertex, attribute)
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,4 @@ class StartingWithAttributeSolver(BaseAttributeSolver):

def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bool:
attr = vertex.get(attribute) # type:ignore[arg-type] # due to attribute can be None

# if this value contains an underendered variable, then we cannot evaluate the check,
# so return True (since we cannot return UNKNOWN)
if self._is_variable_dependant(attr, vertex["source_"]):
return True

return isinstance(attr, str) and attr.startswith(self.value)
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,4 @@ class WithinAttributeSolver(BaseAttributeSolver):

def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bool:
attr = vertex.get(attribute) # type:ignore[arg-type] # due to attribute can be None
# if this value contains an underendered variable, then we cannot evaluate the check,
# so return True (since we cannot return UNKNOWN)
if self._is_variable_dependant(attr, vertex['source_']):
return True
return attr in self.value
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Any, Dict
from typing import List, Any, Dict, Optional

from checkov.common.graph.checks_infra.enums import Operators
from checkov.common.graph.checks_infra.solvers.base_solver import BaseSolver
Expand All @@ -16,8 +16,12 @@ def __init__(self, solvers: List[BaseSolver], resource_types: List[str]) -> None
def _get_operation(self, *args: Any, **kwargs: Any) -> Any:
return reduce(and_, args)

def get_operation(self, vertex: Dict[str, Any]) -> bool:
def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]:
has_unrendered_attribute = False
for solver in self.solvers:
if not solver.get_operation(vertex):
operation = solver.get_operation(vertex)
if operation is None:
has_unrendered_attribute = True
nimrodkor marked this conversation as resolved.
Show resolved Hide resolved
elif not operation:
return False
return True
return None if has_unrendered_attribute else True
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from typing import List, Any, Tuple, Dict, TYPE_CHECKING
from abc import abstractmethod
from typing import List, Any, Tuple, Dict, TYPE_CHECKING, Optional

from checkov.common.graph.checks_infra.enums import SolverType
from checkov.common.graph.checks_infra.solvers.base_solver import BaseSolver
Expand All @@ -25,13 +26,21 @@ def _get_operation(self, *args: Any, **kwargs: Any) -> Any:
def _get_negative_op(self, *args: Any) -> Any:
return not self._get_operation(args)

def run(self, graph_connector: DiGraph) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
@abstractmethod
def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]:
YaaraVerner marked this conversation as resolved.
Show resolved Hide resolved
raise NotImplementedError()

def run(self, graph_connector: DiGraph) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]:
passed_vertices = []
failed_vertices = []
unknown_vertices = []
for _, data in graph_connector.nodes(data=True):
if self.resource_type_pred(data, self.resource_types):
if self.get_operation(data):
result = self.get_operation(data)
if result is None:
unknown_vertices.append(data)
elif result:
passed_vertices.append(data)
else:
failed_vertices.append(data)
return passed_vertices, failed_vertices
return passed_vertices, failed_vertices, unknown_vertices
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Any, Dict
from typing import List, Any, Dict, Optional

from checkov.common.graph.checks_infra.enums import Operators
from checkov.common.graph.checks_infra.solvers.base_solver import BaseSolver
Expand All @@ -18,5 +18,6 @@ def _get_operation(self, *args: Any, **kwargs: Any) -> Any:
raise Exception('The "not" operator must have exactly one child')
return not args[0]

def get_operation(self, vertex: Dict[str, Any]) -> bool:
return not self.solvers[0].get_operation(vertex)
def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]:
result = self.solvers[0].get_operation(vertex)
return None if result is None else not result
12 changes: 8 additions & 4 deletions checkov/common/checks_infra/solvers/complex_solvers/or_solver.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Any, Dict
from typing import List, Any, Dict, Optional

from checkov.common.graph.checks_infra.enums import Operators
from checkov.common.graph.checks_infra.solvers.base_solver import BaseSolver
Expand All @@ -16,8 +16,12 @@ def __init__(self, solvers: List[BaseSolver], resource_types: List[str]) -> None
def _get_operation(self, *args: Any, **kwargs: Any) -> Any:
return reduce(or_, args)

def get_operation(self, vertex: Dict[str, Any]) -> bool:
def get_operation(self, vertex: Dict[str, Any]) -> Optional[bool]:
has_unrendered_attribute = False
for solver in self.solvers:
if solver.get_operation(vertex):
operation = solver.get_operation(vertex)
if operation:
return True
return False
if operation is None:
has_unrendered_attribute = True
return None if has_unrendered_attribute else False
Loading