Skip to content

Commit

Permalink
feat(terraform): Create checks for aws managed admin policy (#3741)
Browse files Browse the repository at this point in the history
Create checks for aws managed admin policy
  • Loading branch information
bkblanton authored Oct 26, 2022
1 parent 718b544 commit bedb854
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 0 deletions.
36 changes: 36 additions & 0 deletions checkov/terraform/checks/data/aws/IAMManagedAdminPolicy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.terraform.checks.data.base_check import BaseDataCheck


ADMIN_POLICY_NAME = "AdministratorAccess"
ADMIN_POLICY_ARN = f"arn:aws:iam::aws:policy/{ADMIN_POLICY_NAME}"


class IAMManagedAdminPolicy(BaseDataCheck):
def __init__(self):
# This is the full description of your check
description = "Disallow policies from using the AWS AdministratorAccess policy"

# This is the Unique ID for your check
id = "CKV_AWS_275"

# These are the terraform objects supported by this check (ex: aws_iam_policy_document)
supported_data = ('aws_iam_policy',)

# Valid CheckCategories checkov/common/models/enums.py
categories = [CheckCategories.IAM]
super().__init__(name=description, id=id, categories=categories, supported_data=supported_data)

def scan_data_conf(self, conf):
if "name" in conf.keys():
if conf.get("name")[0] == ADMIN_POLICY_NAME:
return CheckResult.FAILED

if "arn" in conf.keys():
if conf.get("arn")[0] == ADMIN_POLICY_ARN:
return CheckResult.FAILED

return CheckResult.PASSED


check = IAMManagedAdminPolicy()
48 changes: 48 additions & 0 deletions checkov/terraform/checks/resource/aws/IAMManagedAdminPolicy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck


ADMIN_POLICY_NAME = "AdministratorAccess"
ADMIN_POLICY_ARN = f"arn:aws:iam::aws:policy/{ADMIN_POLICY_NAME}"


class IAMManagedAdminPolicy(BaseResourceCheck):
def __init__(self):
# This is the full description of your check
description = "Disallow IAM roles, users, and groups from using the AWS AdministratorAccess policy"

# This is the Unique ID for your check
id = "CKV_AWS_274"

# These are the terraform objects supported by this check (ex: aws_iam_policy_document)
supported_resources = (
"aws_iam_role",
"aws_iam_policy_attachment",
"aws_iam_role_policy_attachment",
"aws_iam_user_policy_attachment",
"aws_iam_group_policy_attachment",
)

# Valid CheckCategories are defined in checkov/common/models/enums.py
categories = (CheckCategories.IAM,)
super().__init__(name=description, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf):
if self.entity_type == "aws_iam_role":
if "managed_policy_arns" in conf.keys():
if ADMIN_POLICY_ARN in conf.get("managed_policy_arns")[0]:
return CheckResult.FAILED

elif self.entity_type in (
"aws_iam_policy_attachment",
"aws_iam_role_policy_attachment",
"aws_iam_user_policy_attachment",
"aws_iam_group_policy_attachment",
):
if conf.get("policy_arn")[0] == ADMIN_POLICY_ARN:
return CheckResult.FAILED

return CheckResult.PASSED


check = IAMManagedAdminPolicy()
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Fail

data "aws_iam_policy" "fail1" {
name = "AdministratorAccess"
}

data "aws_iam_policy" "fail2" {
arn = "arn:aws:iam::aws:policy/AdministratorAccess"
}

# Pass

data "aws_iam_policy" "pass1" {
name = "AmazonS3ReadOnlyAccess"
}

data "aws_iam_policy" "pass2" {
arn = "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
}
43 changes: 43 additions & 0 deletions tests/terraform/checks/data/aws/test_IAMManagedAdminPolicy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import os
import unittest

from checkov.runner_filter import RunnerFilter
from checkov.terraform.runner import Runner
from checkov.terraform.checks.data.aws.IAMManagedAdminPolicy import check


class TestIAMManagedAdminPolicy(unittest.TestCase):

def test(self):
runner = Runner()
current_dir = os.path.dirname(os.path.realpath(__file__))

test_files_dir = os.path.join(current_dir, "example_IAMManagedAdminPolicy")
report = runner.run(root_folder=test_files_dir,
runner_filter=RunnerFilter(checks=[check.id]))
summary = report.get_summary()

passing_resources = {
"aws_iam_policy.pass1",
"aws_iam_policy.pass2",
}

failing_resources = {
"aws_iam_policy.fail1",
"aws_iam_policy.fail2",
}

passed_check_resources = {c.resource for c in report.passed_checks}
failed_check_resources = {c.resource for c in report.failed_checks}

self.assertEqual(summary["passed"], 2)
self.assertEqual(summary["failed"], 2)
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Fail

resource "aws_iam_role" "fail1" {
name = "fail1"
assume_role_policy = data.aws_iam_policy_document.instance_assume_role_policy.json
managed_policy_arns = ["arn:aws:iam::aws:policy/AdministratorAccess"]
}

resource "aws_iam_policy_attachment" "fail2" {
name = "fail2"
roles = [aws_iam_role.fail1.name]
policy_arn = "arn:aws:iam::aws:policy/AdministratorAccess"
}

resource "aws_iam_role_policy_attachment" "fail3" {
role = aws_iam_role.fail1.name
policy_arn = "arn:aws:iam::aws:policy/AdministratorAccess"
}

resource "aws_iam_user_policy_attachment" "fail4" {
user = "user"
policy_arn = "arn:aws:iam::aws:policy/AdministratorAccess"
}

resource "aws_iam_group_policy_attachment" "fail5" {
group = "group"
policy_arn = "arn:aws:iam::aws:policy/AdministratorAccess"
}

# Pass

resource "aws_iam_role" "pass1" {
name = "pass1"
assume_role_policy = data.aws_iam_policy_document.instance_assume_role_policy.json
managed_policy_arns = ["arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"]
}

resource "aws_iam_policy_attachment" "pass2" {
name = "pass2"
role = aws_iam_role.pass1.name
policy_arn = "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
}

resource "aws_iam_role_policy_attachment" "pass3" {
role = aws_iam_role.pass1.name
policy_arn = "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
}

resource "aws_iam_user_policy_attachment" "pass4" {
user = "user"
policy_arn = "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
}

resource "aws_iam_group_policy_attachment" "pass5" {
group = "group"
policy_arn = "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
}


# Data

data "aws_iam_policy_document" "instance_assume_role_policy" {
statement {
actions = ["sts:AssumeRole"]

principals {
type = "Service"
identifiers = ["ec2.amazonaws.com"]
}
}
}
49 changes: 49 additions & 0 deletions tests/terraform/checks/resource/aws/test_IAMManagedAdminPolicy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import os
import unittest

from checkov.runner_filter import RunnerFilter
from checkov.terraform.runner import Runner
from checkov.terraform.checks.resource.aws.IAMManagedAdminPolicy import check


class TestIAMManagedAdminPolicy(unittest.TestCase):

def test(self):
runner = Runner()
current_dir = os.path.dirname(os.path.realpath(__file__))

test_files_dir = os.path.join(current_dir, "example_IAMManagedAdminPolicy")
report = runner.run(root_folder=test_files_dir,
runner_filter=RunnerFilter(checks=[check.id]))
summary = report.get_summary()

passing_resources = {
"aws_iam_role.pass1",
"aws_iam_policy_attachment.pass2",
"aws_iam_role_policy_attachment.pass3",
"aws_iam_user_policy_attachment.pass4",
"aws_iam_group_policy_attachment.pass5",
}

failing_resources = {
"aws_iam_role.fail1",
"aws_iam_policy_attachment.fail2",
"aws_iam_role_policy_attachment.fail3",
"aws_iam_user_policy_attachment.fail4",
"aws_iam_group_policy_attachment.fail5",
}

passed_check_resources = {c.resource for c in report.passed_checks}
failed_check_resources = {c.resource for c in report.failed_checks}

self.assertEqual(summary["passed"], 5)
self.assertEqual(summary["failed"], 5)
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == '__main__':
unittest.main()

0 comments on commit bedb854

Please sign in to comment.