Skip to content

Commit

Permalink
Include pods of kubernetes_deployment in kubernetes_pod checks (2/4)
Browse files Browse the repository at this point in the history
  • Loading branch information
ugrave committed Oct 20, 2022
1 parent fcbc7eb commit e601cb9
Show file tree
Hide file tree
Showing 19 changed files with 4,355 additions and 46 deletions.
17 changes: 13 additions & 4 deletions checkov/terraform/checks/resource/kubernetes/DropCapabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ def __init__(self):
name = "Minimize the admission of containers with the NET_RAW capability"
id = "CKV_K8S_28"

supported_resources = ('kubernetes_pod', 'kubernetes_pod_v1')
supported_resources = ('kubernetes_pod', 'kubernetes_pod_v1',
'kubernetes_deployment', 'kubernetes_deployment_v1', )
categories = (CheckCategories.GENERAL_SECURITY,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

Expand All @@ -20,6 +21,14 @@ def scan_resource_conf(self, conf) -> CheckResult:
self.evaluated_keys = [""]
return CheckResult.FAILED
spec = conf['spec'][0]
evaluated_keys_path = "spec"

if spec.get("template") and isinstance(spec.get("template"), list):
template = spec.get("template")[0]
if template.get("spec") and isinstance(template.get("spec"), list):
spec = template.get("spec")[0]
evaluated_keys_path = f'{evaluated_keys_path}/[0]/template/[0]/spec'

if spec.get("container"):
containers = spec.get("container")

Expand All @@ -39,13 +48,13 @@ def scan_resource_conf(self, conf) -> CheckResult:
if not dropped:
return CheckResult.FAILED
else:
self.evaluated_keys = [f"spec/[0]/container/{idx}/security_context/[0]/capabilities"]
self.evaluated_keys = [f"{evaluated_keys_path}/[0]/container/{idx}/security_context/[0]/capabilities"]
return CheckResult.FAILED
else:
self.evaluated_keys = [f"spec/[0]/container/{idx}/security_context"]
self.evaluated_keys = [f"{evaluated_keys_path}/[0]/container/{idx}/security_context"]
return CheckResult.FAILED
else:
self.evaluated_keys = [f"spec/[0]/container/{idx}"]
self.evaluated_keys = [f"{evaluated_keys_path}/[0]/container/{idx}"]
return CheckResult.FAILED
return CheckResult.PASSED
return CheckResult.FAILED
Expand Down
14 changes: 12 additions & 2 deletions checkov/terraform/checks/resource/kubernetes/HostPort.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ def __init__(self):
"""
name = "Do not specify hostPort unless absolutely necessary"
id = "CKV_K8S_26"
supported_resources = ["kubernetes_pod", "kubernetes_pod_v1"]
supported_resources = ["kubernetes_pod", "kubernetes_pod_v1",
"kubernetes_deployment", "kubernetes_deployment_v1"]
categories = [CheckCategories.GENERAL_SECURITY]
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

Expand All @@ -26,7 +27,16 @@ def scan_resource_conf(self, conf) -> CheckResult:
return CheckResult.FAILED

spec = conf.get('spec')[0]
evaluated_keys_path = "spec"

if spec:

if spec.get("template") and isinstance(spec.get("template"), list):
template = spec.get("template")[0]
if template.get("spec") and isinstance(template.get("spec"), list):
spec = template.get("spec")[0]
evaluated_keys_path = f'{evaluated_keys_path}/[0]/template/[0]/spec'

containers = spec.get("container")
if containers is None:
return CheckResult.UNKNOWN
Expand All @@ -36,7 +46,7 @@ def scan_resource_conf(self, conf) -> CheckResult:
if container.get("port"):
for idy, port in enumerate(container["port"]):
if "host_port" in port:
self.evaluated_keys = [f"spec/[0]/container/[{idx}]/port/[{idy}]/host_port"]
self.evaluated_keys = [f"{evaluated_keys_path}/[0]/container/[{idx}]/port/[{idy}]/host_port"]
return CheckResult.FAILED
return CheckResult.PASSED

Expand Down
13 changes: 11 additions & 2 deletions checkov/terraform/checks/resource/kubernetes/ImageDigest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,22 @@ def __init__(self):
"""
name = "Image should use digest"
id = "CKV_K8S_43"
supported_resources = ["kubernetes_pod", "kubernetes_pod_v1"]
supported_resources = ["kubernetes_pod", "kubernetes_pod_v1",
"kubernetes_deployment", "kubernetes_deployment_v1"]
categories = [CheckCategories.GENERAL_SECURITY]
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf) -> CheckResult:
spec = conf.get('spec')[0]
if spec:
evaluated_keys_path = "spec"

if spec.get("template") and isinstance(spec.get("template"), list):
template = spec.get("template")[0]
if template.get("spec") and isinstance(template.get("spec"), list):
spec = template.get("spec")[0]
evaluated_keys_path = f'{evaluated_keys_path}/[0]/template/[0]/spec'

containers = spec.get("container")
if containers is None:
return CheckResult.UNKNOWN
Expand All @@ -31,7 +40,7 @@ def scan_resource_conf(self, conf) -> CheckResult:
if container.get("image") and isinstance(container.get("image"), list):
name = container.get("image")[0]
if "@" not in name:
self.evaluated_keys = [f'spec/[0]/container/[{idx}]/image']
self.evaluated_keys = [f'{evaluated_keys_path}/[0]/container/[{idx}]/image']
return CheckResult.FAILED
return CheckResult.PASSED
return CheckResult.FAILED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,22 @@ def __init__(self):
"""
name = "Image Pull Policy should be Always"
id = "CKV_K8S_15"
supported_resources = ["kubernetes_pod", "kubernetes_pod_v1"]
supported_resources = ["kubernetes_pod", "kubernetes_pod_v1",
"kubernetes_deployment", "kubernetes_deployment_v1"]
categories = [CheckCategories.GENERAL_SECURITY]
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf) -> CheckResult:
spec = conf.get('spec', [None])[0]
if isinstance(spec, dict) and spec:
evaluated_keys_path = "spec"

if spec.get("template") and isinstance(spec.get("template"), list):
template = spec.get("template")[0]
if template.get("spec") and isinstance(template.get("spec"), list):
spec = template.get("spec")[0]
evaluated_keys_path = f'{evaluated_keys_path}/[0]/template/[0]/spec'

containers = spec.get("container")
if containers is None:
return CheckResult.UNKNOWN
Expand All @@ -36,7 +45,7 @@ def scan_resource_conf(self, conf) -> CheckResult:
name = container.get("image")[0]
if "latest" in name:
break
self.evaluated_keys = [f'spec/[0]/container/[{idx}]']
self.evaluated_keys = [f'{evaluated_keys_path}/[0]/container/[{idx}]']
return CheckResult.FAILED
return CheckResult.PASSED
return CheckResult.FAILED
Expand Down
50 changes: 30 additions & 20 deletions checkov/terraform/checks/resource/kubernetes/ImageTagFixed.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,42 @@ def __init__(self):
"""
name = "Image Tag should be fixed - not latest or blank"
id = "CKV_K8S_14"
supported_resources = ["kubernetes_pod", "kubernetes_pod_v1"]
supported_resources = ["kubernetes_pod", "kubernetes_pod_v1",
"kubernetes_deployment", "kubernetes_deployment_v1"]
categories = [CheckCategories.GENERAL_SECURITY]
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf) -> CheckResult:
spec = conf.get('spec', [None])[0]
if isinstance(spec, dict) and spec.get("container"):
containers = spec.get("container")
for idx, container in enumerate(containers):
if not isinstance(container, dict):
return CheckResult.UNKNOWN
if container.get("image"):
name = container.get("image")[0]
if ":" in name:
if name.split(":")[1] in ("latest", ""):
self.evaluated_keys = [f'spec/[0]/container/[{idx}]/image']
return CheckResult.FAILED
continue
if "@" in name:
continue
self.evaluated_keys = [f'spec/[0]/container/[{idx}]/image']
if isinstance(spec, dict) and spec:
evaluated_keys_path = "spec"

if spec.get("template") and isinstance(spec.get("template"), list):
template = spec.get("template")[0]
if template.get("spec") and isinstance(template.get("spec"), list):
spec = template.get("spec")[0]
evaluated_keys_path = f'{evaluated_keys_path}/[0]/template/[0]/spec'

if spec.get("container"):
containers = spec.get("container")
for idx, container in enumerate(containers):
if not isinstance(container, dict):
return CheckResult.UNKNOWN
if container.get("image"):
name = container.get("image")[0]
if ":" in name:
if name.split(":")[1] in ("latest", ""):
self.evaluated_keys = [f'{evaluated_keys_path}/[0]/container/[{idx}]/image']
return CheckResult.FAILED
continue
if "@" in name:
continue
self.evaluated_keys = [f'{evaluated_keys_path}/[0]/container/[{idx}]/image']
return CheckResult.FAILED
self.evaluated_keys = [f'{evaluated_keys_path}/[0]/container/[{idx}]']
return CheckResult.FAILED
self.evaluated_keys = [f'spec/[0]/container/[{idx}]']
return CheckResult.FAILED
return CheckResult.PASSED
return CheckResult.FAILED
return CheckResult.PASSED
return CheckResult.FAILED


check = ImageTagFixed()
16 changes: 13 additions & 3 deletions checkov/terraform/checks/resource/kubernetes/LivenessProbe.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ class LivenessProbe(BaseResourceValueCheck):
def __init__(self):
name = "Liveness Probe Should be Configured"
id = "CKV_K8S_8"
supported_resources = ["kubernetes_pod", "kubernetes_pod_v1"]
supported_resources = ["kubernetes_pod", "kubernetes_pod_v1",
"kubernetes_deployment", "kubernetes_deployment_v1"]
categories = [CheckCategories.GENERAL_SECURITY]
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources,
missing_block_result=CheckResult.FAILED)
Expand All @@ -19,7 +20,16 @@ def get_inspected_key(self) -> str:

def scan_resource_conf(self, conf) -> CheckResult:
spec = conf.get('spec', [None])[0]
if spec and isinstance(spec, dict):

if isinstance(spec, dict) and spec:
evaluated_keys_path = "spec"

if spec.get("template") and isinstance(spec.get("template"), list):
template = spec.get("template")[0]
if template.get("spec") and isinstance(template.get("spec"), list):
spec = template.get("spec")[0]
evaluated_keys_path = f'{evaluated_keys_path}/[0]/template/[0]/spec'

containers = spec.get("container")
if containers is None:
return CheckResult.UNKNOWN
Expand All @@ -28,7 +38,7 @@ def scan_resource_conf(self, conf) -> CheckResult:
return CheckResult.UNKNOWN
if container.get("liveness_probe"):
return CheckResult.PASSED
self.evaluated_keys = [f'spec/[0]/container/[{idx}]']
self.evaluated_keys = [f'{evaluated_keys_path}/[0]/container/[{idx}]']
return CheckResult.FAILED

return CheckResult.FAILED
Expand Down
Loading

0 comments on commit e601cb9

Please sign in to comment.