Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(github): add graph to GitHub Actions #3672

Merged
merged 17 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
51 changes: 51 additions & 0 deletions checkov/common/runners/graph_builder/local_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from __future__ import annotations

import logging
from abc import abstractmethod
from pathlib import Path
from typing import Any

from checkov.common.graph.graph_builder import Edge
from checkov.common.graph.graph_builder.graph_components.blocks import Block
from checkov.common.graph.graph_builder.local_graph import LocalGraph


class ObjectLocalGraph(LocalGraph[Block]):
def __init__(self, definitions: dict[str | Path, dict[str, Any] | list[dict[str, Any]]]) -> None:
super().__init__()
self.vertices: list[Block] = []
self.definitions = definitions
self.vertices_by_path_and_name: dict[tuple[str, str], int] = {}

def build_graph(self, render_variables: bool = False) -> None:
self._create_vertices()
logging.info(f"[{self.__class__.__name__}] created {len(self.vertices)} vertices")
gruebel marked this conversation as resolved.
Show resolved Hide resolved

for i, vertex in enumerate(self.vertices):
self.vertices_by_block_type[vertex.block_type].append(i)
self.vertices_block_name_map[vertex.block_type][vertex.name].append(i)
self.vertices_by_path_and_name[(vertex.path, vertex.name)] = i

self.in_edges[i] = []
self.out_edges[i] = []

self._create_edges()
logging.info(f"[{self.__class__.__name__}] created {len(self.edges)} edges")
gruebel marked this conversation as resolved.
Show resolved Hide resolved

@abstractmethod
def _create_vertices(self) -> None:
pass

@abstractmethod
def _create_edges(self) -> None:
pass

def _create_edge(self, origin_vertex_index: int, dest_vertex_index: int, label: str = "default") -> None:
if origin_vertex_index == dest_vertex_index:
# this should not happen
return

edge = Edge(origin_vertex_index, dest_vertex_index, label)
self.edges.append(edge)
self.out_edges[origin_vertex_index].append(edge)
self.in_edges[dest_vertex_index].append(edge)
39 changes: 39 additions & 0 deletions checkov/common/runners/graph_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING, Any

from checkov.common.graph.db_connectors.db_connector import DBConnector
from checkov.common.graph.graph_manager import GraphManager
from checkov.common.runners.graph_builder.local_graph import ObjectLocalGraph

if TYPE_CHECKING:
import networkx as nx
from checkov.common.graph.graph_builder.graph_components.blocks import Block # noqa


class ObjectGraphManager(GraphManager[ObjectLocalGraph]):
def __init__(self, db_connector: DBConnector[nx.DiGraph], source: str) -> None:
super().__init__(db_connector=db_connector, parser=None, source=source)

def build_graph_from_source_directory(
self,
source_dir: str,
local_graph_class: type[ObjectLocalGraph] = ObjectLocalGraph,
render_variables: bool = True,
parsing_errors: dict[str, Exception] | None = None,
download_external_modules: bool = False,
excluded_paths: list[str] | None = None,
) -> tuple[ObjectLocalGraph, dict[str, dict[str, Any]]]:
# needs some refactor of thr ObjectRunner to implement this
pass

def build_graph_from_definitions( # type:ignore[override] # need to revisit it after adding `Generic` to `definitions`
self,
definitions: dict[str | Path, dict[str, Any] | list[dict[str, Any]]],
render_variables: bool = False,
graph_class: type[ObjectLocalGraph] = ObjectLocalGraph,
) -> ObjectLocalGraph:
local_graph = graph_class(definitions)
local_graph.build_graph(render_variables)
return local_graph
135 changes: 128 additions & 7 deletions checkov/common/runners/object_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,27 @@

from abc import abstractmethod
from collections.abc import Iterable
from pathlib import Path
from typing import Any, TYPE_CHECKING, Callable
from typing_extensions import TypedDict

from checkov.common.checks_infra.registry import get_graph_checks_registry
from checkov.common.graph.db_connectors.networkx.networkx_db_connector import NetworkxConnector
from checkov.common.graph.graph_builder import CustomAttributes
from checkov.common.output.github_actions_record import GithubActionsRecord
from checkov.common.output.record import Record
from checkov.common.output.report import Report, CheckType
from checkov.common.parallelizer.parallel_runner import parallel_runner
from checkov.common.runners.base_runner import BaseRunner, filter_ignored_paths
from checkov.common.runners.base_runner import BaseRunner, filter_ignored_paths, CHECKOV_CREATE_GRAPH
from checkov.common.runners.graph_manager import ObjectGraphManager
from checkov.common.typing import _CheckResult
from checkov.common.util.consts import START_LINE, END_LINE
from checkov.runner_filter import RunnerFilter
from checkov.common.util.suppression import collect_suppressions_for_context

if TYPE_CHECKING:
from checkov.common.checks.base_check_registry import BaseCheckRegistry
from checkov.common.runners.graph_builder.local_graph import ObjectLocalGraph


class GhaMetadata(TypedDict):
Expand All @@ -28,12 +35,29 @@ class GhaMetadata(TypedDict):
jobs: dict[int, str]


class Runner(BaseRunner[None]): # if a graph is added, Any needs to replaced
def __init__(self) -> None:
class Runner(BaseRunner[ObjectGraphManager]): # if a graph is added, Any needs to replaced
def __init__(
self,
db_connector: NetworkxConnector | None = None,
source: str | None = None,
graph_class: type[ObjectLocalGraph] | None = None,
graph_manager: ObjectGraphManager | None = None,
) -> None:
super().__init__()
self.definitions: dict[str, dict[str, Any] | list[dict[str, Any]]] = {}
self.definitions_raw: dict[str, list[tuple[int, str]]] = {}
self.map_file_path_to_gha_metadata_dict: dict[str, GhaMetadata] = {}
self.root_folder: str | None = None

if source and graph_class:
# if they are not all set, then ignore it
db_connector = db_connector or NetworkxConnector()
self.source = source
self.graph_class = graph_class
self.graph_manager = (
graph_manager if graph_manager else ObjectGraphManager(source=self.source, db_connector=db_connector)
)
self.graph_registry = get_graph_checks_registry(self.check_type)

def _load_files(
self,
Expand Down Expand Up @@ -86,21 +110,54 @@ def run(
for directory in external_checks_dir:
registry.load_external_checks(directory)

if CHECKOV_CREATE_GRAPH and self.graph_registry:
self.graph_registry.load_external_checks(directory)

if files:
self._load_files(files)

if root_folder:
self.root_folder = root_folder

for root, d_names, f_names in os.walk(root_folder):
filter_ignored_paths(root, d_names, runner_filter.excluded_paths, self.included_paths())
filter_ignored_paths(root, f_names, runner_filter.excluded_paths, self.included_paths())
files_to_load = [os.path.join(root, f_name) for f_name in f_names]
self._load_files(files_to_load=files_to_load)

if CHECKOV_CREATE_GRAPH and self.graph_registry and self.graph_manager:
logging.info(f"Creating {self.source} graph")
local_graph = self.graph_manager.build_graph_from_definitions(
definitions=self.definitions, graph_class=self.graph_class # type:ignore[arg-type] # the paths are just `str`
)
logging.info(f"Successfully created {self.source} graph")

self.graph_manager.save_graph(local_graph)

self.pbar.initiate(len(self.definitions))

# run Python checks
self.add_python_check_results(
report=report, registry=registry, runner_filter=runner_filter, root_folder=root_folder
)

# run graph checks
if CHECKOV_CREATE_GRAPH and self.graph_registry:
self.add_graph_check_results(report=report, runner_filter=runner_filter)

return report

def add_python_check_results(
self, report: Report, registry: BaseCheckRegistry, runner_filter: RunnerFilter, root_folder: str | Path | None
) -> None:
"""Adds Python check results to given report"""

for file_path in self.definitions.keys():
self.pbar.set_additional_data({'Current File Scanned': os.path.relpath(file_path, root_folder)})
skipped_checks = collect_suppressions_for_context(self.definitions_raw[file_path])
results = registry.scan(file_path, self.definitions[file_path], skipped_checks, runner_filter) # type:ignore[arg-type] # this is overridden in the subclass
results = registry.scan(
file_path, self.definitions[file_path], skipped_checks, runner_filter # type:ignore[arg-type] # this is overridden in the subclass
)
for key, result in results.items():
result_config = result["results_configuration"]
start = 0
Expand All @@ -124,7 +181,9 @@ def run(
code_block=self.definitions_raw[file_path][start - 1:end + 1],
file_path=f"/{os.path.relpath(file_path, root_folder)}",
file_line_range=[start, end + 1],
resource=self.get_resource(file_path, key, check.supported_entities, self.definitions[file_path]), # type:ignore[arg-type] # key is str not BaseCheck
resource=self.get_resource(
file_path, key, check.supported_entities, self.definitions[file_path] # type:ignore[arg-type] # key is str not BaseCheck
),
evaluations=None,
check_class=check.__class__.__module__,
file_abs_path=os.path.abspath(file_path),
Expand All @@ -143,7 +202,9 @@ def run(
code_block=self.definitions_raw[file_path][start - 1:end + 1],
file_path=f"/{os.path.relpath(file_path, root_folder)}",
file_line_range=[start, end + 1],
resource=self.get_resource(file_path, key, check.supported_entities), # type:ignore[arg-type] # key is str not BaseCheck
resource=self.get_resource(
file_path, key, check.supported_entities # type:ignore[arg-type] # key is str not BaseCheck
),
evaluations=None,
check_class=check.__class__.__module__,
file_abs_path=os.path.abspath(file_path),
Expand All @@ -153,7 +214,67 @@ def run(
report.add_record(record)
self.pbar.update()
self.pbar.close()
return report

def add_graph_check_results(self, report: Report, runner_filter: RunnerFilter) -> None:
"""Adds graph check results to given report"""

root_folder = self.root_folder
graph_checks_results = self.run_graph_checks_results(runner_filter, self.check_type)

for check, check_results in graph_checks_results.items():
for check_result in check_results:
entity = check_result["entity"]
entity_file_path = entity.get(CustomAttributes.FILE_PATH)

if platform.system() == "Windows":
root_folder = os.path.split(entity_file_path)[0]

clean_check_result: _CheckResult = {
"result": check_result["result"],
"evaluated_keys": check_result["evaluated_keys"],
}

start_line = entity[START_LINE]
end_line = entity[END_LINE]

if self.check_type == CheckType.GITHUB_ACTIONS:
record: "Record" = GithubActionsRecord(
check_id=check.id,
bc_check_id=check.bc_id,
check_name=check.name,
check_result=clean_check_result,
code_block=self.definitions_raw[entity_file_path][start_line - 1:end_line + 1],
file_path=f"/{os.path.relpath(entity_file_path, root_folder)}",
file_line_range=[start_line, end_line + 1],
resource=entity.get(CustomAttributes.ID),
evaluations=None,
check_class=check.__class__.__module__,
file_abs_path=os.path.abspath(entity_file_path),
entity_tags=None,
severity=check.severity,
job=self.map_file_path_to_gha_metadata_dict[entity_file_path]["jobs"].get(end_line, ''),
triggers=self.map_file_path_to_gha_metadata_dict[entity_file_path]["triggers"],
workflow_name=self.map_file_path_to_gha_metadata_dict[entity_file_path]["workflow_name"]
)
else:
record = Record(
check_id=check.id,
bc_check_id=check.bc_id,
check_name=check.name,
check_result=clean_check_result,
code_block=self.definitions_raw[entity_file_path][start_line - 1:end_line + 1],
file_path=f"/{os.path.relpath(entity_file_path, root_folder)}",
file_line_range=[start_line, end_line + 1],
resource=entity.get(CustomAttributes.ID),
evaluations=None,
check_class=check.__class__.__module__,
file_abs_path=os.path.abspath(entity_file_path),
entity_tags=None,
severity=check.severity,
)

record.set_guideline(check.guideline)
report.add_record(record=record)

def included_paths(self) -> Iterable[str]:
return []
Expand Down
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from enum import Enum


class ResourceType(str, Enum):
JOBS = "jobs"
PERMISSIONS = "permissions"
STEPS = "steps"

def __str__(self) -> str:
# needed, because of a Python 3.11 change
return self.value
Loading