From 5e426da481a44e5e4cd111f03a4f651fed3127f2 Mon Sep 17 00:00:00 2001 From: James Woolfenden Date: Tue, 22 Nov 2022 14:15:42 +0000 Subject: [PATCH] feat(terraform): ensure snapshots use encryption (#3899) * snapshot encryption * add type hints Co-authored-by: gruebel --- .../aws/MemoryDBSnapshotEncryptionWithCMK.py | 23 +++++++++++ .../aws/NeptuneClusterSnapshotEncrypted.py | 17 ++++++++ .../NeptuneClusterSnapshotEncryptedWithCMK.py | 23 +++++++++++ ...lusterSnapshotCopyGrantEncryptedWithCMK.py | 23 +++++++++++ .../main.tf | 11 +++++ .../main.tf | 16 ++++++++ .../main.tf | 13 ++++++ .../{aws_redshift_cluster.tf => main.tf} | 0 .../main.tf | 8 ++++ .../test_MemoryDBSnapshotEncryptionWithCMK.py | 40 ++++++++++++++++++ .../aws/test_NeptuneSnapshotEncrypted.py | 41 +++++++++++++++++++ .../test_NeptuneSnapshotEncryptedWithCMK.py | 40 ++++++++++++++++++ ...dshiftSnapshotCopyGrantEncryptedWithCMK.py | 40 ++++++++++++++++++ 13 files changed, 295 insertions(+) create mode 100644 checkov/terraform/checks/resource/aws/MemoryDBSnapshotEncryptionWithCMK.py create mode 100644 checkov/terraform/checks/resource/aws/NeptuneClusterSnapshotEncrypted.py create mode 100644 checkov/terraform/checks/resource/aws/NeptuneClusterSnapshotEncryptedWithCMK.py create mode 100644 checkov/terraform/checks/resource/aws/RedshiftClusterSnapshotCopyGrantEncryptedWithCMK.py create mode 100644 tests/terraform/checks/resource/aws/example_MemoryDBSnapshotEncryptionWithCMK/main.tf create mode 100644 tests/terraform/checks/resource/aws/example_NeptuneClusterSnapshotEncrypted/main.tf create mode 100644 tests/terraform/checks/resource/aws/example_NeptuneClusterSnapshotEncryptedWithCMK/main.tf rename tests/terraform/checks/resource/aws/example_RedshiftClusterEncryptedWithCMK/{aws_redshift_cluster.tf => main.tf} (100%) create mode 100644 tests/terraform/checks/resource/aws/example_RedshiftSnapshotCopyGrantEncryptedWithCMK/main.tf create mode 100644 tests/terraform/checks/resource/aws/test_MemoryDBSnapshotEncryptionWithCMK.py create mode 100644 tests/terraform/checks/resource/aws/test_NeptuneSnapshotEncrypted.py create mode 100644 tests/terraform/checks/resource/aws/test_NeptuneSnapshotEncryptedWithCMK.py create mode 100644 tests/terraform/checks/resource/aws/test_RedshiftSnapshotCopyGrantEncryptedWithCMK.py diff --git a/checkov/terraform/checks/resource/aws/MemoryDBSnapshotEncryptionWithCMK.py b/checkov/terraform/checks/resource/aws/MemoryDBSnapshotEncryptionWithCMK.py new file mode 100644 index 00000000000..5d51447308c --- /dev/null +++ b/checkov/terraform/checks/resource/aws/MemoryDBSnapshotEncryptionWithCMK.py @@ -0,0 +1,23 @@ +from typing import Any + +from checkov.common.models.enums import CheckCategories +from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceValueCheck +from checkov.common.models.consts import ANY_VALUE + + +class MemoryDBSnapshotEncryptionWithCMK(BaseResourceValueCheck): + def __init__(self) -> None: + name = "Ensure MemoryDB snapshot is encrypted by KMS using a customer managed Key (CMK)" + id = "CKV_AWS_278" + supported_resources = ("aws_memorydb_snapshot",) + categories = (CheckCategories.ENCRYPTION,) + super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) + + def get_inspected_key(self) -> str: + return "kms_key_arn" + + def get_expected_value(self) -> Any: + return ANY_VALUE + + +check = MemoryDBSnapshotEncryptionWithCMK() diff --git a/checkov/terraform/checks/resource/aws/NeptuneClusterSnapshotEncrypted.py b/checkov/terraform/checks/resource/aws/NeptuneClusterSnapshotEncrypted.py new file mode 100644 index 00000000000..3822d43be60 --- /dev/null +++ b/checkov/terraform/checks/resource/aws/NeptuneClusterSnapshotEncrypted.py @@ -0,0 +1,17 @@ +from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceValueCheck +from checkov.common.models.enums import CheckCategories + + +class NeptuneClusterSnapshotEncrypted(BaseResourceValueCheck): + def __init__(self) -> None: + name = "Ensure Neptune snapshot is securely encrypted" + id = "CKV_AWS_279" + supported_resources = ("aws_neptune_cluster_snapshot",) + categories = (CheckCategories.ENCRYPTION,) + super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) + + def get_inspected_key(self) -> str: + return "storage_encrypted" + + +check = NeptuneClusterSnapshotEncrypted() diff --git a/checkov/terraform/checks/resource/aws/NeptuneClusterSnapshotEncryptedWithCMK.py b/checkov/terraform/checks/resource/aws/NeptuneClusterSnapshotEncryptedWithCMK.py new file mode 100644 index 00000000000..fb2c3647071 --- /dev/null +++ b/checkov/terraform/checks/resource/aws/NeptuneClusterSnapshotEncryptedWithCMK.py @@ -0,0 +1,23 @@ +from typing import Any + +from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceValueCheck +from checkov.common.models.enums import CheckCategories +from checkov.common.models.consts import ANY_VALUE + + +class NeptuneClusterSnapshotEncrypted(BaseResourceValueCheck): + def __init__(self) -> None: + name = "Ensure Neptune snapshot is encrypted by KMS using a customer managed Key (CMK)" + id = "CKV_AWS_280" + supported_resources = ("aws_neptune_cluster_snapshot",) + categories = (CheckCategories.ENCRYPTION,) + super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) + + def get_inspected_key(self) -> str: + return "kms_key_id" + + def get_expected_value(self) -> Any: + return ANY_VALUE + + +check = NeptuneClusterSnapshotEncrypted() diff --git a/checkov/terraform/checks/resource/aws/RedshiftClusterSnapshotCopyGrantEncryptedWithCMK.py b/checkov/terraform/checks/resource/aws/RedshiftClusterSnapshotCopyGrantEncryptedWithCMK.py new file mode 100644 index 00000000000..65906d42332 --- /dev/null +++ b/checkov/terraform/checks/resource/aws/RedshiftClusterSnapshotCopyGrantEncryptedWithCMK.py @@ -0,0 +1,23 @@ +from typing import Any + +from checkov.common.models.consts import ANY_VALUE +from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceValueCheck +from checkov.common.models.enums import CheckCategories + + +class RedshiftSnapshotCopyGrantEncryptedWithCMK(BaseResourceValueCheck): + def __init__(self) -> None: + name = "Ensure RedShift snapshot copy is encrypted by KMS using a customer managed Key (CMK)" + id = "CKV_AWS_281" + supported_resources = ("aws_redshift_snapshot_copy_grant",) + categories = (CheckCategories.ENCRYPTION,) + super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) + + def get_inspected_key(self) -> str: + return "kms_key_id" + + def get_expected_value(self) -> Any: + return ANY_VALUE + + +check = RedshiftSnapshotCopyGrantEncryptedWithCMK() diff --git a/tests/terraform/checks/resource/aws/example_MemoryDBSnapshotEncryptionWithCMK/main.tf b/tests/terraform/checks/resource/aws/example_MemoryDBSnapshotEncryptionWithCMK/main.tf new file mode 100644 index 00000000000..8bc094eb666 --- /dev/null +++ b/tests/terraform/checks/resource/aws/example_MemoryDBSnapshotEncryptionWithCMK/main.tf @@ -0,0 +1,11 @@ +resource "aws_memorydb_snapshot" "fail" { + name = "pike" + cluster_name = "sato" +} + +resource "aws_memorydb_snapshot" "pass" { + cluster_name = "sato" + name = "pike" + kms_key_arn = aws_kms_key.example.arn +} + diff --git a/tests/terraform/checks/resource/aws/example_NeptuneClusterSnapshotEncrypted/main.tf b/tests/terraform/checks/resource/aws/example_NeptuneClusterSnapshotEncrypted/main.tf new file mode 100644 index 00000000000..55ff0f7d5c9 --- /dev/null +++ b/tests/terraform/checks/resource/aws/example_NeptuneClusterSnapshotEncrypted/main.tf @@ -0,0 +1,16 @@ +resource "aws_neptune_cluster_snapshot" "fail" { + db_cluster_identifier = aws_neptune_cluster.example.id + db_cluster_snapshot_identifier = "resourcetestsnapshot1234" +} + +resource "aws_neptune_cluster_snapshot" "fail2" { + db_cluster_identifier = aws_neptune_cluster.example.id + db_cluster_snapshot_identifier = "resourcetestsnapshot1234" + storage_encrypted = false +} + +resource "aws_neptune_cluster_snapshot" "pass" { + db_cluster_identifier = aws_neptune_cluster.example.id + db_cluster_snapshot_identifier = "resourcetestsnapshot1234" + storage_encrypted =true +} \ No newline at end of file diff --git a/tests/terraform/checks/resource/aws/example_NeptuneClusterSnapshotEncryptedWithCMK/main.tf b/tests/terraform/checks/resource/aws/example_NeptuneClusterSnapshotEncryptedWithCMK/main.tf new file mode 100644 index 00000000000..69775f26ced --- /dev/null +++ b/tests/terraform/checks/resource/aws/example_NeptuneClusterSnapshotEncryptedWithCMK/main.tf @@ -0,0 +1,13 @@ +resource "aws_neptune_cluster_snapshot" "fail" { + db_cluster_identifier = aws_neptune_cluster.example.id + db_cluster_snapshot_identifier = "resourcetestsnapshot1234" + storage_encrypted=true +} + + +resource "aws_neptune_cluster_snapshot" "pass" { + db_cluster_identifier = aws_neptune_cluster.example.id + db_cluster_snapshot_identifier = "resourcetestsnapshot1234" + storage_encrypted = true + kms_key_id = aws_kms_key.pike.id +} \ No newline at end of file diff --git a/tests/terraform/checks/resource/aws/example_RedshiftClusterEncryptedWithCMK/aws_redshift_cluster.tf b/tests/terraform/checks/resource/aws/example_RedshiftClusterEncryptedWithCMK/main.tf similarity index 100% rename from tests/terraform/checks/resource/aws/example_RedshiftClusterEncryptedWithCMK/aws_redshift_cluster.tf rename to tests/terraform/checks/resource/aws/example_RedshiftClusterEncryptedWithCMK/main.tf diff --git a/tests/terraform/checks/resource/aws/example_RedshiftSnapshotCopyGrantEncryptedWithCMK/main.tf b/tests/terraform/checks/resource/aws/example_RedshiftSnapshotCopyGrantEncryptedWithCMK/main.tf new file mode 100644 index 00000000000..1034013fb11 --- /dev/null +++ b/tests/terraform/checks/resource/aws/example_RedshiftSnapshotCopyGrantEncryptedWithCMK/main.tf @@ -0,0 +1,8 @@ +resource "aws_redshift_snapshot_copy_grant" "pass" { + snapshot_copy_grant_name = "my-grant" + kms_key_id = aws_kms_key.test.arn +} + +resource "aws_redshift_snapshot_copy_grant" "fail" { + snapshot_copy_grant_name = "my-grant" +} \ No newline at end of file diff --git a/tests/terraform/checks/resource/aws/test_MemoryDBSnapshotEncryptionWithCMK.py b/tests/terraform/checks/resource/aws/test_MemoryDBSnapshotEncryptionWithCMK.py new file mode 100644 index 00000000000..726486a9c60 --- /dev/null +++ b/tests/terraform/checks/resource/aws/test_MemoryDBSnapshotEncryptionWithCMK.py @@ -0,0 +1,40 @@ +import unittest +from pathlib import Path + +from checkov.runner_filter import RunnerFilter +from checkov.terraform.checks.resource.aws.MemoryDBSnapshotEncryptionWithCMK import check +from checkov.terraform.runner import Runner + + +class TestMemoryDBSnapshotEncryptionWithCMK(unittest.TestCase): + def test(self): + # given + test_files_dir = Path(__file__).parent / "example_MemoryDBSnapshotEncryptionWithCMK" + + # when + report = Runner().run(root_folder=str(test_files_dir), runner_filter=RunnerFilter(checks=[check.id])) + + # then + summary = report.get_summary() + + passing_resources = { + "aws_memorydb_snapshot.pass", + } + failing_resources = { + "aws_memorydb_snapshot.fail", + } + + passed_check_resources = {c.resource for c in report.passed_checks} + failed_check_resources = {c.resource for c in report.failed_checks} + + self.assertEqual(summary["passed"], 1) + self.assertEqual(summary["failed"], 1) + self.assertEqual(summary["skipped"], 0) + self.assertEqual(summary["parsing_errors"], 0) + + self.assertEqual(passing_resources, passed_check_resources) + self.assertEqual(failing_resources, failed_check_resources) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/terraform/checks/resource/aws/test_NeptuneSnapshotEncrypted.py b/tests/terraform/checks/resource/aws/test_NeptuneSnapshotEncrypted.py new file mode 100644 index 00000000000..5079e5f655a --- /dev/null +++ b/tests/terraform/checks/resource/aws/test_NeptuneSnapshotEncrypted.py @@ -0,0 +1,41 @@ +import unittest +from pathlib import Path + +from checkov.runner_filter import RunnerFilter +from checkov.terraform.checks.resource.aws.NeptuneClusterSnapshotEncrypted import check +from checkov.terraform.runner import Runner + + +class TestNeptuneClusterSnapshotEncrypted(unittest.TestCase): + def test(self): + # given + test_files_dir = Path(__file__).parent / "example_NeptuneClusterSnapshotEncrypted" + + # when + report = Runner().run(root_folder=str(test_files_dir), runner_filter=RunnerFilter(checks=[check.id])) + + # then + summary = report.get_summary() + + passing_resources = { + "aws_neptune_cluster_snapshot.pass", + } + failing_resources = { + "aws_neptune_cluster_snapshot.fail", + "aws_neptune_cluster_snapshot.fail2", + } + + passed_check_resources = {c.resource for c in report.passed_checks} + failed_check_resources = {c.resource for c in report.failed_checks} + + self.assertEqual(summary["passed"], 1) + self.assertEqual(summary["failed"], 2) + self.assertEqual(summary["skipped"], 0) + self.assertEqual(summary["parsing_errors"], 0) + + self.assertEqual(passing_resources, passed_check_resources) + self.assertEqual(failing_resources, failed_check_resources) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/terraform/checks/resource/aws/test_NeptuneSnapshotEncryptedWithCMK.py b/tests/terraform/checks/resource/aws/test_NeptuneSnapshotEncryptedWithCMK.py new file mode 100644 index 00000000000..1fc90b890da --- /dev/null +++ b/tests/terraform/checks/resource/aws/test_NeptuneSnapshotEncryptedWithCMK.py @@ -0,0 +1,40 @@ +import unittest +from pathlib import Path + +from checkov.runner_filter import RunnerFilter +from checkov.terraform.checks.resource.aws.NeptuneClusterSnapshotEncryptedWithCMK import check +from checkov.terraform.runner import Runner + + +class TestNeptuneClusterSnapshotEncryptedWithCMK(unittest.TestCase): + def test(self): + # given + test_files_dir = Path(__file__).parent / "example_NeptuneClusterSnapshotEncryptedWithCMK" + + # when + report = Runner().run(root_folder=str(test_files_dir), runner_filter=RunnerFilter(checks=[check.id])) + + # then + summary = report.get_summary() + + passing_resources = { + "aws_neptune_cluster_snapshot.pass", + } + failing_resources = { + "aws_neptune_cluster_snapshot.fail", + } + + passed_check_resources = {c.resource for c in report.passed_checks} + failed_check_resources = {c.resource for c in report.failed_checks} + + self.assertEqual(summary["passed"], 1) + self.assertEqual(summary["failed"], 1) + self.assertEqual(summary["skipped"], 0) + self.assertEqual(summary["parsing_errors"], 0) + + self.assertEqual(passing_resources, passed_check_resources) + self.assertEqual(failing_resources, failed_check_resources) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/terraform/checks/resource/aws/test_RedshiftSnapshotCopyGrantEncryptedWithCMK.py b/tests/terraform/checks/resource/aws/test_RedshiftSnapshotCopyGrantEncryptedWithCMK.py new file mode 100644 index 00000000000..21e5c899a66 --- /dev/null +++ b/tests/terraform/checks/resource/aws/test_RedshiftSnapshotCopyGrantEncryptedWithCMK.py @@ -0,0 +1,40 @@ +import os +import unittest + +from checkov.runner_filter import RunnerFilter +from checkov.terraform.checks.resource.aws.RedshiftClusterSnapshotCopyGrantEncryptedWithCMK import check +from checkov.terraform.runner import Runner + + +class TestRedshiftClusterSnapshotCopyGrantEncryptedWithCMK(unittest.TestCase): + def test(self): + runner = Runner() + current_dir = os.path.dirname(os.path.realpath(__file__)) + + test_files_dir = current_dir + "/example_RedshiftSnapshotCopyGrantEncryptedWithCMK" + report = runner.run( + root_folder=test_files_dir, runner_filter=RunnerFilter(checks=[check.id]) + ) + summary = report.get_summary() + + passing_resources = { + "aws_redshift_snapshot_copy_grant.pass", + } + failing_resources = { + "aws_redshift_snapshot_copy_grant.fail", + } + + passed_check_resources = set([c.resource for c in report.passed_checks]) + failed_check_resources = set([c.resource for c in report.failed_checks]) + + self.assertEqual(summary["passed"], 1) + self.assertEqual(summary["failed"], 1) + self.assertEqual(summary["skipped"], 0) + self.assertEqual(summary["parsing_errors"], 0) + + self.assertEqual(passing_resources, passed_check_resources) + self.assertEqual(failing_resources, failed_check_resources) + + +if __name__ == "__main__": + unittest.main()