Skip to content

Commit

Permalink
feat(terraform): ensure snapshots use encryption (#3899)
Browse files Browse the repository at this point in the history
* snapshot encryption

* add type hints

Co-authored-by: gruebel <[email protected]>
  • Loading branch information
JamesWoolfenden and gruebel authored Nov 22, 2022
1 parent 31170f6 commit 5e426da
Show file tree
Hide file tree
Showing 13 changed files with 295 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import Any

from checkov.common.models.enums import CheckCategories
from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceValueCheck
from checkov.common.models.consts import ANY_VALUE


class MemoryDBSnapshotEncryptionWithCMK(BaseResourceValueCheck):
def __init__(self) -> None:
name = "Ensure MemoryDB snapshot is encrypted by KMS using a customer managed Key (CMK)"
id = "CKV_AWS_278"
supported_resources = ("aws_memorydb_snapshot",)
categories = (CheckCategories.ENCRYPTION,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def get_inspected_key(self) -> str:
return "kms_key_arn"

def get_expected_value(self) -> Any:
return ANY_VALUE


check = MemoryDBSnapshotEncryptionWithCMK()
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceValueCheck
from checkov.common.models.enums import CheckCategories


class NeptuneClusterSnapshotEncrypted(BaseResourceValueCheck):
def __init__(self) -> None:
name = "Ensure Neptune snapshot is securely encrypted"
id = "CKV_AWS_279"
supported_resources = ("aws_neptune_cluster_snapshot",)
categories = (CheckCategories.ENCRYPTION,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def get_inspected_key(self) -> str:
return "storage_encrypted"


check = NeptuneClusterSnapshotEncrypted()
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import Any

from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceValueCheck
from checkov.common.models.enums import CheckCategories
from checkov.common.models.consts import ANY_VALUE


class NeptuneClusterSnapshotEncrypted(BaseResourceValueCheck):
def __init__(self) -> None:
name = "Ensure Neptune snapshot is encrypted by KMS using a customer managed Key (CMK)"
id = "CKV_AWS_280"
supported_resources = ("aws_neptune_cluster_snapshot",)
categories = (CheckCategories.ENCRYPTION,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def get_inspected_key(self) -> str:
return "kms_key_id"

def get_expected_value(self) -> Any:
return ANY_VALUE


check = NeptuneClusterSnapshotEncrypted()
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import Any

from checkov.common.models.consts import ANY_VALUE
from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceValueCheck
from checkov.common.models.enums import CheckCategories


class RedshiftSnapshotCopyGrantEncryptedWithCMK(BaseResourceValueCheck):
def __init__(self) -> None:
name = "Ensure RedShift snapshot copy is encrypted by KMS using a customer managed Key (CMK)"
id = "CKV_AWS_281"
supported_resources = ("aws_redshift_snapshot_copy_grant",)
categories = (CheckCategories.ENCRYPTION,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def get_inspected_key(self) -> str:
return "kms_key_id"

def get_expected_value(self) -> Any:
return ANY_VALUE


check = RedshiftSnapshotCopyGrantEncryptedWithCMK()
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
resource "aws_memorydb_snapshot" "fail" {
name = "pike"
cluster_name = "sato"
}

resource "aws_memorydb_snapshot" "pass" {
cluster_name = "sato"
name = "pike"
kms_key_arn = aws_kms_key.example.arn
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
resource "aws_neptune_cluster_snapshot" "fail" {
db_cluster_identifier = aws_neptune_cluster.example.id
db_cluster_snapshot_identifier = "resourcetestsnapshot1234"
}

resource "aws_neptune_cluster_snapshot" "fail2" {
db_cluster_identifier = aws_neptune_cluster.example.id
db_cluster_snapshot_identifier = "resourcetestsnapshot1234"
storage_encrypted = false
}

resource "aws_neptune_cluster_snapshot" "pass" {
db_cluster_identifier = aws_neptune_cluster.example.id
db_cluster_snapshot_identifier = "resourcetestsnapshot1234"
storage_encrypted =true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
resource "aws_neptune_cluster_snapshot" "fail" {
db_cluster_identifier = aws_neptune_cluster.example.id
db_cluster_snapshot_identifier = "resourcetestsnapshot1234"
storage_encrypted=true
}


resource "aws_neptune_cluster_snapshot" "pass" {
db_cluster_identifier = aws_neptune_cluster.example.id
db_cluster_snapshot_identifier = "resourcetestsnapshot1234"
storage_encrypted = true
kms_key_id = aws_kms_key.pike.id
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
resource "aws_redshift_snapshot_copy_grant" "pass" {
snapshot_copy_grant_name = "my-grant"
kms_key_id = aws_kms_key.test.arn
}

resource "aws_redshift_snapshot_copy_grant" "fail" {
snapshot_copy_grant_name = "my-grant"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import unittest
from pathlib import Path

from checkov.runner_filter import RunnerFilter
from checkov.terraform.checks.resource.aws.MemoryDBSnapshotEncryptionWithCMK import check
from checkov.terraform.runner import Runner


class TestMemoryDBSnapshotEncryptionWithCMK(unittest.TestCase):
def test(self):
# given
test_files_dir = Path(__file__).parent / "example_MemoryDBSnapshotEncryptionWithCMK"

# when
report = Runner().run(root_folder=str(test_files_dir), runner_filter=RunnerFilter(checks=[check.id]))

# then
summary = report.get_summary()

passing_resources = {
"aws_memorydb_snapshot.pass",
}
failing_resources = {
"aws_memorydb_snapshot.fail",
}

passed_check_resources = {c.resource for c in report.passed_checks}
failed_check_resources = {c.resource for c in report.failed_checks}

self.assertEqual(summary["passed"], 1)
self.assertEqual(summary["failed"], 1)
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import unittest
from pathlib import Path

from checkov.runner_filter import RunnerFilter
from checkov.terraform.checks.resource.aws.NeptuneClusterSnapshotEncrypted import check
from checkov.terraform.runner import Runner


class TestNeptuneClusterSnapshotEncrypted(unittest.TestCase):
def test(self):
# given
test_files_dir = Path(__file__).parent / "example_NeptuneClusterSnapshotEncrypted"

# when
report = Runner().run(root_folder=str(test_files_dir), runner_filter=RunnerFilter(checks=[check.id]))

# then
summary = report.get_summary()

passing_resources = {
"aws_neptune_cluster_snapshot.pass",
}
failing_resources = {
"aws_neptune_cluster_snapshot.fail",
"aws_neptune_cluster_snapshot.fail2",
}

passed_check_resources = {c.resource for c in report.passed_checks}
failed_check_resources = {c.resource for c in report.failed_checks}

self.assertEqual(summary["passed"], 1)
self.assertEqual(summary["failed"], 2)
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import unittest
from pathlib import Path

from checkov.runner_filter import RunnerFilter
from checkov.terraform.checks.resource.aws.NeptuneClusterSnapshotEncryptedWithCMK import check
from checkov.terraform.runner import Runner


class TestNeptuneClusterSnapshotEncryptedWithCMK(unittest.TestCase):
def test(self):
# given
test_files_dir = Path(__file__).parent / "example_NeptuneClusterSnapshotEncryptedWithCMK"

# when
report = Runner().run(root_folder=str(test_files_dir), runner_filter=RunnerFilter(checks=[check.id]))

# then
summary = report.get_summary()

passing_resources = {
"aws_neptune_cluster_snapshot.pass",
}
failing_resources = {
"aws_neptune_cluster_snapshot.fail",
}

passed_check_resources = {c.resource for c in report.passed_checks}
failed_check_resources = {c.resource for c in report.failed_checks}

self.assertEqual(summary["passed"], 1)
self.assertEqual(summary["failed"], 1)
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import os
import unittest

from checkov.runner_filter import RunnerFilter
from checkov.terraform.checks.resource.aws.RedshiftClusterSnapshotCopyGrantEncryptedWithCMK import check
from checkov.terraform.runner import Runner


class TestRedshiftClusterSnapshotCopyGrantEncryptedWithCMK(unittest.TestCase):
def test(self):
runner = Runner()
current_dir = os.path.dirname(os.path.realpath(__file__))

test_files_dir = current_dir + "/example_RedshiftSnapshotCopyGrantEncryptedWithCMK"
report = runner.run(
root_folder=test_files_dir, runner_filter=RunnerFilter(checks=[check.id])
)
summary = report.get_summary()

passing_resources = {
"aws_redshift_snapshot_copy_grant.pass",
}
failing_resources = {
"aws_redshift_snapshot_copy_grant.fail",
}

passed_check_resources = set([c.resource for c in report.passed_checks])
failed_check_resources = set([c.resource for c in report.failed_checks])

self.assertEqual(summary["passed"], 1)
self.assertEqual(summary["failed"], 1)
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == "__main__":
unittest.main()

0 comments on commit 5e426da

Please sign in to comment.