Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kubernetes): Create keyword and network policy edge builders #3763

Merged
merged 17 commits into from
Nov 6, 2022
Original file line number Diff line number Diff line change
@@ -1,9 +1,31 @@
class ResourceKeywordIdentifier:
"""
this class maps connections between resources by their unique keyword identifier.
each resource in this class has a list of objects, and each object defines a potential connection to a different
resource only if all attributes in the object are matched. each object in the list is independent regardless of
the other objects in the list.

for example:
A ServiceAccount resource A with the property 'metadata.name' with value 'service-123' will match a resource B
of type 'ClusterRoleBinding' with a 'subjects.name' property equals to 'service-123'
and
the property 'kind' for resource A matches resource B's 'subjects.kind' property
"""

KINDS_KEYWORDS_MAP = {
"PersistentVolumeClaim": ["claimName"],
"ServiceAccount": ["serviceAccountName"],
"ClusterRole": ["rules[].resources", "rules[].resourceNames"],
"ClusterRoleBinding": [["roleRef.name", "kind"], ["subjects[].name", "kind"], ["aggregationRule.clusterRoleSelectors.matchLabels"]],
"Role": ["rules[].resources", "rules[].resourceNames"],
"RoleBinding": [["roleRef.name", "kind"], ["subjects[].name", "kind"]]
# TODO: "PersistentVolumeClaim": ["claimName"],
# TODO: "ClusterRole": ["rules[].resources", "rules[].resourceNames"],
# TODO: "Role": ["rules[].resources", "rules[].resourceNames"],

"ServiceAccount": [
{"spec.serviceAccountName": "metadata.name"}
],
"ClusterRoleBinding": [
{"metadata.name": "roleRef.name", "kind": "roleRef.kind"},
[{"subjects": {"metadata.name": "name", "kind": "kind"}}]
],
"RoleBinding": [
{"metadata.name": "roleRef.name", "kind": "roleRef.kind"},
[{"subjects": {"metadata.name": "name", "kind": "kind", "metadata.namespace": "namespace"}}]
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

@dataclass
class KubernetesSelector:
match_labels: Dict[str, Any]
match_labels: Dict[str, Any] | None


@dataclass
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from __future__ import annotations

from checkov.kubernetes.graph_builder.graph_components.edge_builders.K8SEdgeBuilder import K8SEdgeBuilder
from checkov.kubernetes.graph_builder.graph_components.blocks import KubernetesBlock
from checkov.kubernetes.graph_builder.graph_components.ResourceKeywordIdentifier import ResourceKeywordIdentifier
from checkov.kubernetes.kubernetes_utils import FILTERED_RESOURCES_FOR_EDGE_BUILDERS


class KeywordEdgeBuilder(K8SEdgeBuilder):

@staticmethod
def should_search_for_edges(vertex: KubernetesBlock) -> bool:
return vertex.attributes.get("kind") in ResourceKeywordIdentifier.KINDS_KEYWORDS_MAP.keys() \
and vertex.attributes.get("kind") not in FILTERED_RESOURCES_FOR_EDGE_BUILDERS

@staticmethod
def find_connections(vertex: KubernetesBlock, vertices: list[KubernetesBlock]) -> list[int]:
"""
connection is defined by a match between a vertex's (of a certain type) references definitions to a potential
vertex (of any type).

example:
A Pod with the property 'spec.serviceAccountName' with value 'service-123' will match a resource of type
'ServiceAccount' with a 'metadata.name' property equals to 'service-123'
"""

connections: list[int] = []
for potential_vertex_index, potential_vertex in enumerate(vertices):
if potential_vertex.id == vertex.id:
continue
resource_references_definitions: list[dict[str, str] | list[dict[str, dict[str, str]]]] = ResourceKeywordIdentifier.KINDS_KEYWORDS_MAP[vertex.attributes["kind"]] # type: ignore[assignment]
# check that resource items comply to all references definitions defined in ResourceKeywordIdentifier
for references_definition in resource_references_definitions:
match = True

if isinstance(references_definition, dict):
for potential_vertex_key, vertex_key in references_definition.items():
match = KeywordEdgeBuilder._find_match_in_attributes(vertex, potential_vertex, potential_vertex_key, vertex_key, match)
if match:
connections.append(potential_vertex_index)

# some items are nested in lists and their value in the vertex is concatenated with their index,
# like so: subjects.0.name
elif isinstance(references_definition, list):
# not really a loop, just extracting the dict's key
for base_key_attribute, reference_definitions_items in references_definition[0].items():
vertex_attribute_references_list: list[dict[str, str]] = vertex.attributes.get(base_key_attribute) # type: ignore[assignment]
# iterate every item on the list as a separate resource
for i in range(len(vertex_attribute_references_list)):
match = True
for potential_vertex_key, vertex_key in reference_definitions_items.items():
vertex_key = f"{base_key_attribute}.{i}.{vertex_key}"
match = KeywordEdgeBuilder._find_match_in_attributes(vertex, potential_vertex, potential_vertex_key, vertex_key, match)
if match:
connections.append(potential_vertex_index)

return connections

@staticmethod
def _find_match_in_attributes(vertex: KubernetesBlock,
potential_vertex: KubernetesBlock,
potential_vertex_key: str,
vertex_key: str,
match: bool) -> bool:

vertex_ref = vertex.attributes.get(vertex_key)
potential_vertex_ref = potential_vertex.attributes.get(potential_vertex_key)
if vertex_ref is None or potential_vertex_ref is None or vertex_ref != potential_vertex_ref:
# if not all attributes match then it's not qualified as an edge
match = False

return match
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
from __future__ import annotations

from checkov.kubernetes.graph_builder.graph_components.K8SEdgeBuilder import K8SEdgeBuilder
from checkov.kubernetes.graph_builder.graph_components.edge_builders.K8SEdgeBuilder import K8SEdgeBuilder
from checkov.kubernetes.graph_builder.graph_components.blocks import KubernetesBlock
from checkov.kubernetes.kubernetes_utils import FILTERED_RESOURCES_FOR_EDGE_BUILDERS


class LabelSelectorEdgeBuilder(K8SEdgeBuilder):

@staticmethod
def should_search_for_edges(vertex: KubernetesBlock) -> bool:
if vertex.metadata is not None and vertex.metadata.labels is not None:
return True
return False
return vertex.metadata is not None \
and vertex.metadata.labels is not None \
YaaraVerner marked this conversation as resolved.
Show resolved Hide resolved
and "kind" in vertex.attributes \
and vertex.attributes["kind"] not in FILTERED_RESOURCES_FOR_EDGE_BUILDERS

@staticmethod
def find_connections(vertex: KubernetesBlock, vertices: list[KubernetesBlock]) -> list[int]:
Expand All @@ -37,6 +39,7 @@ def find_connections(vertex: KubernetesBlock, vertices: list[KubernetesBlock]) -
for potential_vertex_index, potential_vertex in enumerate(vertices):
if potential_vertex.id == vertex.id or not potential_vertex.metadata:
continue

match_labels = potential_vertex.metadata.selector.match_labels
if match_labels:
if len(match_labels) > len(labels):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from __future__ import annotations

from checkov.kubernetes.graph_builder.graph_components.edge_builders.K8SEdgeBuilder import K8SEdgeBuilder
from checkov.kubernetes.graph_builder.graph_components.blocks import KubernetesBlock
from checkov.kubernetes.kubernetes_utils import remove_metadata_from_attribute


class NetworkPolicyEdgeBuilder(K8SEdgeBuilder):

@staticmethod
def should_search_for_edges(vertex: KubernetesBlock) -> bool:
return bool(vertex.attributes.get("kind") == "NetworkPolicy")

@staticmethod
def find_connections(vertex: KubernetesBlock, vertices: list[KubernetesBlock]) -> list[int]:
"""
this edge builder is a specific case of LabelSelectorEdgeBuilder with 2 differences:
1. it applies only to NetworkPolicy resources that connect to Pod resources
2. it handles a wildcard that attaches a NetworkPolicy resource to all pods. for example:

apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: default-deny-ingress
spec:
podSelector: {}
policyTypes:
- Ingress
"""

connections: list[int] = []
for potential_pod_index, potential_vertex in enumerate(vertices):
if potential_vertex.id == vertex.id or potential_vertex.attributes.get("kind") != "Pod":
continue

network_policy = vertex
pod = potential_vertex

pod_selector = network_policy.attributes.get("spec", {}).get("podSelector")
if not pod_selector:
continue
match_labels = pod_selector.get("matchLabels")
remove_metadata_from_attribute(match_labels)

# the network policy has specific pod labels
if match_labels and pod.metadata is not None and pod.metadata.labels is not None:
pod_labels = pod.metadata.labels
if len(match_labels) > len(pod_labels):
continue
# find shared label between the inspected vertex and the iterated potential vertex
shared_labels = [k for k in match_labels if k in pod_labels and match_labels[k] == pod_labels[k]]
if len(shared_labels) == len(match_labels):
connections.append(potential_pod_index)
# the network policy has a podSelector property with no labels and should apply for all pods
else:
connections.append(potential_pod_index)

return connections
37 changes: 19 additions & 18 deletions checkov/kubernetes/graph_builder/local_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
from checkov.common.graph.graph_builder.local_graph import LocalGraph
from checkov.common.util.consts import START_LINE, END_LINE
from checkov.kubernetes.graph_builder.graph_components.blocks import KubernetesBlock, KubernetesBlockMetadata, KubernetesSelector
from checkov.kubernetes.kubernetes_utils import DEFAULT_NESTED_RESOURCE_TYPE, is_invalid_k8_definition, get_resource_id, is_invalid_k8_pod_definition, K8sGraphFlags
from checkov.kubernetes.graph_builder.graph_components.LabelSelectorEdgeBuilder import LabelSelectorEdgeBuilder


EDGE_BUILDERS = (LabelSelectorEdgeBuilder,)
from checkov.kubernetes.kubernetes_utils import DEFAULT_NESTED_RESOURCE_TYPE, is_invalid_k8_definition, get_resource_id, is_invalid_k8_pod_definition, K8sGraphFlags, remove_metadata_from_attribute
from checkov.kubernetes.graph_builder.graph_components.edge_builders.LabelSelectorEdgeBuilder import LabelSelectorEdgeBuilder
from checkov.kubernetes.graph_builder.graph_components.edge_builders.KeywordEdgeBuilder import KeywordEdgeBuilder
from checkov.kubernetes.graph_builder.graph_components.edge_builders.NetworkPolicyEdgeBuilder import NetworkPolicyEdgeBuilder


class KubernetesLocalGraph(LocalGraph[KubernetesBlock]):
def __init__(self, definitions: dict[str, list[dict[str, Any]]]) -> None:
self.definitions = definitions
self.edge_builders = (LabelSelectorEdgeBuilder, KeywordEdgeBuilder, NetworkPolicyEdgeBuilder)
super().__init__()

def build_graph(self, render_variables: bool, graph_flags: K8sGraphFlags | None = None) -> None:
Expand Down Expand Up @@ -59,6 +59,7 @@ def _create_vertices(self, create_complex_vertices: bool) -> None:
config = deepcopy(resource)
attributes = deepcopy(config)
attributes["resource_type"] = resource_type
attributes["kind"] = resource_type
attributes[START_LINE] = resource[START_LINE]
attributes[END_LINE] = resource[END_LINE]

Expand Down Expand Up @@ -86,7 +87,7 @@ def _create_vertices(self, create_complex_vertices: bool) -> None:
def _create_edges(self) -> None:
edges_to_create = defaultdict(list)
for vertex_index, vertex in enumerate(self.vertices):
for edge_builder in EDGE_BUILDERS:
for edge_builder in self.edge_builders:
if edge_builder.should_search_for_edges(vertex):
current_vertex_connections = edge_builder.find_connections(vertex, self.vertices)
if current_vertex_connections:
Expand All @@ -104,29 +105,29 @@ def _create_edge(self, origin_vertex_index: int, dest_vertex_index: int, label:
def _get_k8s_block_metadata(resource: Dict[str, Any]) -> KubernetesBlockMetadata:
name = resource.get('metadata', {}).get('name')
spec = resource.get('spec')
match_labels: Dict[str, Any] | None = None
if isinstance(spec, list):
for spec_item in spec:
if spec_item.get('selector'):
match_labels = spec_item.get('selector').get('matchLabels')
if resource.get('kind') == "Service":
match_labels = spec_item.get('selector')
else:
match_labels = spec_item.get('selector').get('matchLabels')
PelegLi marked this conversation as resolved.
Show resolved Hide resolved
break
else:
match_labels = None
elif isinstance(spec, dict):
match_labels = spec.get('selector', {}).get('matchLabels')
else:
match_labels = None
KubernetesLocalGraph.remove_metadata_from_attribute(match_labels)
if spec.get('selector'):
if resource.get('kind') == "Service":
match_labels = spec.get('selector')
else:
match_labels = spec.get('selector', {}).get('matchLabels')
remove_metadata_from_attribute(match_labels)
selector = KubernetesSelector(match_labels)
labels = resource.get('metadata', {}).get('labels')
KubernetesLocalGraph.remove_metadata_from_attribute(labels)
remove_metadata_from_attribute(labels)
return KubernetesBlockMetadata(selector, labels, name)

@staticmethod
def remove_metadata_from_attribute(attribute: dict[str, Any] | None) -> None:
if isinstance(attribute, dict):
attribute.pop("__startline__", None)
attribute.pop("__endline__", None)

@staticmethod
def _extract_nested_resources(file_conf: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
all_resources: "list[dict[str, Any]]" = []
Expand Down
9 changes: 8 additions & 1 deletion checkov/kubernetes/kubernetes_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
from checkov.kubernetes.parser.parser import parse

K8_POSSIBLE_ENDINGS = {".yaml", ".yml", ".json"}
DEFAULT_NESTED_RESOURCE_TYPE = 'Pod'
DEFAULT_NESTED_RESOURCE_TYPE = "Pod"
FILTERED_RESOURCES_FOR_EDGE_BUILDERS = ["NetworkPolicy"]


def get_folder_definitions(
Expand Down Expand Up @@ -218,6 +219,12 @@ def get_resource_id(resource: dict[str, Any] | None) -> str | None:
return None


def remove_metadata_from_attribute(attribute: dict[str, Any] | None) -> None:
if isinstance(attribute, dict):
attribute.pop("__startline__", None)
attribute.pop("__endline__", None)


@dataclass()
class K8sGraphFlags:
create_complex_vertices: bool
Expand Down
58 changes: 58 additions & 0 deletions tests/kubernetes/graph/resources/Keyword/clusterrolebinding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
apiVersion: rbac.authorization.k8s.io/v1
# This cluster role binding allows anyone in the "manager" group to read secrets in any namespace.
kind: ClusterRoleBinding
metadata:
name: failing-crb1
subjects:
- kind: ServiceAccount
name: service-account-name
namespace: default
- kind: ServiceAccount
name: service-account-name2
namespace: default
roleRef:
kind: ClusterRole
name: failing-cr1
apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-deployment
labels:
app: nginx
spec:
replicas: 3
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
serviceAccount: service-account-name
containers:
- name: nginx
image: nginx:1.14.2
ports:
- containerPort: 80
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: failing-cr1
rules:
- apiGroups: ["rbac.authorization.k8s.io"]
resources: ["clusterrolebindings"]
verbs: ["*"]
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: service-account-name
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: service-account-name2
Loading