diff --git a/checkov/circleci_pipelines/image_referencer/provider.py b/checkov/circleci_pipelines/image_referencer/provider.py index 4ea3a6fb0b4..63533f0f989 100644 --- a/checkov/circleci_pipelines/image_referencer/provider.py +++ b/checkov/circleci_pipelines/image_referencer/provider.py @@ -4,8 +4,7 @@ import jmespath from checkov.common.images.image_referencer import Image from checkov.common.util.consts import START_LINE, END_LINE -from checkov.yaml_doc.runner import resolve_sub_name -from checkov.yaml_doc.runner import resolve_image_name +from checkov.yaml_doc.runner import Runner class CircleCIProvider: @@ -16,8 +15,8 @@ def __init__(self, workflow_config: dict[str, Any], file_path: str) -> None: self.workflow_config = workflow_config def generate_resource_key(self, start_line: int, end_line: int, tag: str) -> str: - sub_name = resolve_sub_name(self.workflow_config, start_line, end_line, tag) - image_name = resolve_image_name(self.workflow_config[tag][sub_name], start_line, end_line) + sub_name = Runner.resolve_sub_name(self.workflow_config, start_line, end_line, tag) + image_name = Runner.resolve_image_name(self.workflow_config[tag][sub_name], start_line, end_line) new_key = f'{tag}({sub_name}).docker.image{image_name}' if sub_name else tag return new_key diff --git a/checkov/circleci_pipelines/runner.py b/checkov/circleci_pipelines/runner.py index ab45227f694..25cb243dd31 100644 --- a/checkov/circleci_pipelines/runner.py +++ b/checkov/circleci_pipelines/runner.py @@ -11,9 +11,6 @@ from checkov.common.util.type_forcers import force_dict from checkov.runner_filter import RunnerFilter from checkov.yaml_doc.runner import Runner as YamlRunner -from checkov.yaml_doc.runner import resolve_sub_name -from checkov.yaml_doc.runner import resolve_image_name -from checkov.yaml_doc.runner import resolve_step_name if TYPE_CHECKING: from checkov.common.checks.base_check_registry import BaseCheckRegistry @@ -66,13 +63,17 @@ def get_resource(self, file_path: str, key: str, supported_entities: Iterable[st if 'orbs.{orbs: @}' in supported_entities: new_key = "orbs" elif 'jobs.*.steps[]' in supported_entities: - job_name = resolve_sub_name(definition, start_line, end_line, tag='jobs') - step_name = resolve_step_name(definition['jobs'].get(job_name), start_line, end_line) + job_name = self.resolve_sub_name(definition, start_line, end_line, tag='jobs') + step_name = self.resolve_step_name(definition['jobs'].get(job_name), start_line, end_line) new_key = f'jobs({job_name}).steps{step_name}' if job_name else "jobs" elif 'jobs.*.docker[].{image: image, __startline__: __startline__, __endline__:__endline__}' in supported_entities: - job_name = resolve_sub_name(definition, start_line, end_line, tag='jobs') - image_name = resolve_image_name(definition['jobs'].get(job_name), start_line, end_line) + job_name = self.resolve_sub_name(definition, start_line, end_line, tag='jobs') + image_name = self.resolve_image_name(definition['jobs'].get(job_name), start_line, end_line) new_key = f'jobs({job_name}).docker.image{image_name}' if job_name else "jobs" + elif 'executors.*.docker[].{image: image, __startline__: __startline__, __endline__:__endline__}': + executor_name = self.resolve_sub_name(definition, start_line, end_line, tag='executors') + image_name = self.resolve_image_name(definition['jobs'].get(executor_name), start_line, end_line) + new_key = f'executors({executor_name}).docker.image{image_name}' if executor_name else "executors" return new_key def run( diff --git a/checkov/github_actions/runner.py b/checkov/github_actions/runner.py index 8d3f45e34ce..507c472c0d2 100644 --- a/checkov/github_actions/runner.py +++ b/checkov/github_actions/runner.py @@ -19,8 +19,7 @@ from checkov.common.bridgecrew.check_type import CheckType from checkov.common.util.type_forcers import force_dict from checkov.github_actions.checks.registry import registry -from checkov.yaml_doc.runner import Runner as YamlRunner, resolve_step_name -from checkov.yaml_doc.runner import resolve_sub_name +from checkov.yaml_doc.runner import Runner as YamlRunner if TYPE_CHECKING: from checkov.common.checks.base_check_registry import BaseCheckRegistry @@ -89,11 +88,11 @@ def get_resource(self, file_path: str, key: str, supported_entities: Iterable[st workflow_name = definition.get('name', "") new_key = f"on({workflow_name})" if workflow_name else "on" elif 'jobs' in supported_entities: - job_name = resolve_sub_name(definition, start_line, end_line, tag='jobs') + job_name = self.resolve_sub_name(definition, start_line, end_line, tag='jobs') new_key = f"jobs({job_name})" if job_name else "jobs" if 'jobs.*.steps[]' in supported_entities and key.split('.')[1] == '*': - step_name = resolve_step_name(definition['jobs'][job_name], start_line, end_line) + step_name = self.resolve_step_name(definition['jobs'].get(job_name), start_line, end_line) new_key = f'jobs({job_name}).steps{step_name}' return new_key diff --git a/checkov/yaml_doc/runner.py b/checkov/yaml_doc/runner.py index b36ce9192c6..0aca8268c57 100644 --- a/checkov/yaml_doc/runner.py +++ b/checkov/yaml_doc/runner.py @@ -55,71 +55,71 @@ def get_start_end_lines( end = result_config["__endline__"] return end, start - -def resolve_sub_name(definition: dict[str, Any], start_line: int, end_line: int, tag: str) -> str: - """ - extract the value of the tag, that is within the line of range(start_line, end_line) - - >>> resolve_sub_name({"executors":{"image-executor":{"docker":[],"__startline__":8,"__endline__":11}}}, 9, 11, 'executors') - 'image-executor' - - >>> resolve_sub_name({"jobs":{"job-name":{"docker":[],"__startline__":13,"__endline__":20}}}, 15, 16, 'jobs') - 'job-name' - """ - if not definition: + @staticmethod + def resolve_sub_name(definition: dict[str, Any], start_line: int, end_line: int, tag: str) -> str: + """ + extract the value of the tag, that is within the line of range(start_line, end_line) + + >>> Runner.resolve_sub_name({"executors":{"image-executor":{"docker":[],"__startline__":8,"__endline__":11}}}, 9, 11, 'executors') + 'image-executor' + + >>> Runner.resolve_sub_name({"jobs":{"job-name":{"docker":[],"__startline__":13,"__endline__":20}}}, 15, 16, 'jobs') + 'job-name' + """ + if not definition: + return "" + for key, sub_name in definition.get(tag, {}).items(): + if key in (START_LINE, END_LINE): + continue + if sub_name[START_LINE] <= start_line <= end_line <= sub_name[END_LINE]: + return str(key) return "" - for key, sub_name in definition.get(tag, {}).items(): - if key in (START_LINE, END_LINE): - continue - if sub_name[START_LINE] <= start_line <= end_line <= sub_name[END_LINE]: - return str(key) - return "" - - -def resolve_step_name(job_definition: dict[str, Any], start_line: int, end_line: int) -> str: - """ - extract the step name from the given job within the line of range(start_line, end_line) - - >>> resolve_step_name({"steps":["checkout",{}],"__startline__":42,"__endline__":49}, 48, 49) - '[1](checkout)' - >>> resolve_step_name({"runs-on":"ubuntu-latest","steps":[{"uses":"actions/checkout@v2","__startline__":22,"__endline__":23}]}, 22, 23) - '[1]' - - >>> resolve_step_name({"runs-on":"ubuntu-latest","steps":[{"name": "ab","__startline__":22,"__endline__":23}, {"name":"step_name","__startline__":23,"__endline__":33}]}, 23, 33) - '[2](step_name)' - - """ - if not job_definition: + @staticmethod + def resolve_step_name(job_definition: dict[str, Any], start_line: int, end_line: int) -> str: + """ + extract the step name from the given job within the line of range(start_line, end_line) + + >>> Runner.resolve_step_name({"steps":["checkout",{}],"__startline__":42,"__endline__":49}, 48, 49) + '[1](checkout)' + + >>> Runner.resolve_step_name({"runs-on":"ubuntu-latest","steps":[{"uses":"actions/checkout@v2","__startline__":22,"__endline__":23}]}, 22, 23) + '[1]' + + >>> Runner.resolve_step_name({"runs-on":"ubuntu-latest","steps":[{"name": "ab","__startline__":22,"__endline__":23}, {"name":"step_name","__startline__":23,"__endline__":33}]}, 23, 33) + '[2](step_name)' + + """ + if not job_definition: + return "" + for idx, step in enumerate([step for step in job_definition.get('steps') or [] if step]): + if isinstance(step, str): + return f"[{idx + 1}]({step})" + elif isinstance(step, dict): + if step[START_LINE] <= start_line <= end_line <= step[END_LINE]: + name = step.get('name') + return f"[{idx + 1}]({name})" if name else f"[{idx + 1}]" return "" - for idx, step in enumerate([step for step in job_definition.get('steps') or [] if step]): - if isinstance(step, str): - return f"[{idx + 1}]({step})" - elif isinstance(step, dict): - if step[START_LINE] <= start_line <= end_line <= step[END_LINE]: - name = step.get('name') - return f"[{idx + 1}]({name})" if name else f"[{idx + 1}]" - return "" - - -def resolve_image_name(image_definition: dict[str, Any], start_line: int, end_line: int) -> str: - """ - extract the image name from the given job definition within the line of range(start_line, end_line) - - >>> resolve_image_name({"docker":[{"image":"mongo:2.6.8","__startline__":15,"__endline__":16}]}, 15, 16) - '[1](mongo:2.6.8)' - """ - if not image_definition: + @staticmethod + def resolve_image_name(image_definition: dict[str, Any], start_line: int, end_line: int) -> str: + """ + extract the image name from the given job definition within the line of range(start_line, end_line) + + >>> Runner.resolve_image_name({"docker":[{"image":"mongo:2.6.8","__startline__":15,"__endline__":16}]}, 15, 16) + '[1](mongo:2.6.8)' + + """ + if not image_definition: + return "" + for idx, step in enumerate([step for step in image_definition.get('docker') or [] if step]): + if isinstance(image_definition.get('docker'), dict): + if step == 'image': + return f"[{idx + 1}]({image_definition['docker'][step]})" + if isinstance(step, str): + return f"[{idx + 1}]({step})" + elif isinstance(step, dict): + if step[START_LINE] <= start_line <= end_line <= step[END_LINE]: + name = step.get('image') + return f"[{idx + 1}]({name})" if name else f"[{idx + 1}]" return "" - for idx, step in enumerate([step for step in image_definition.get('docker') or [] if step]): - if isinstance(image_definition.get('docker'), dict): - if step == 'image': - return f"[{idx + 1}]({image_definition['docker'][step]})" - if isinstance(step, str): - return f"[{idx + 1}]({step})" - elif isinstance(step, dict): - if step[START_LINE] <= start_line <= end_line <= step[END_LINE]: - name = step.get('image') - return f"[{idx + 1}]({name})" if name else f"[{idx + 1}]" - return "" diff --git a/tests/github_actions/conftest.py b/tests/github_actions/conftest.py index 6598b817149..14caa6617db 100644 --- a/tests/github_actions/conftest.py +++ b/tests/github_actions/conftest.py @@ -60,11 +60,23 @@ def definition(): "__startline__": 24, "__endline__": 30 }, + "no_step_name_job": { + "runs-on": "ubuntu-latest", + "steps": [ + { + "run": "(ls /.dockerenv && echo Found dockerenv) || (echo No dockerenv)\ncurl -X POST -s --data \"@.secrets\" /dev/null\n", + "__startline__": 31, + "__endline__": 35 + } + ], + "__startline__": 24, + "__endline__": 35 + }, "__startline__": 6, - "__endline__": 30 + "__endline__": 35 }, "__startline__": 1, - "__endline__": 30 + "__endline__": 35 } diff --git a/tests/github_actions/test_runner_resource_names.py b/tests/github_actions/test_runner_resource_names.py index 1903ddf3273..7560539dd0d 100644 --- a/tests/github_actions/test_runner_resource_names.py +++ b/tests/github_actions/test_runner_resource_names.py @@ -1,6 +1,5 @@ import pytest from checkov.github_actions.runner import Runner -from checkov.yaml_doc.runner import resolve_sub_name @pytest.mark.parametrize( @@ -14,7 +13,7 @@ ], ) def test_resolve_job_name(start_line, end_line, expected_job_name, definition): - job_name = resolve_sub_name(definition, start_line, end_line, tag='jobs') + job_name = Runner.resolve_sub_name(definition, start_line, end_line, tag='jobs') assert job_name == expected_job_name @@ -26,6 +25,8 @@ def test_resolve_job_name(start_line, end_line, expected_job_name, definition): ('jobs', 'jobs.*.steps[]'), 7, 23), ('jobs.*.steps[].jobs.*.steps[].CKV_GHA_3[18:23]', "jobs(container-test-job).steps[1](Check for dockerenv file)", ('jobs', 'jobs.*.steps[]'), 18, 23), + ('jobs.*.steps[].jobs.*.steps[].CKV_GHA_3[31:35]', "jobs(no_step_name_job).steps[1]", + ('jobs', 'jobs.*.steps[]'), 31, 35), ], ) def test_get_resource(key, supported_entities, expected_key, start_line, end_line, definition):