Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(general): allow file destination mapping via output-file-path flag #3593

Merged
merged 2 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion checkov/common/output/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,6 @@ def print_failed_github_md(self, use_bc_ids: bool = False) -> str:
showindex=True,
)
output_data = f"### {self.check_type} scan results:\n\n{table}\n\n---\n"
print(output_data)
return output_data
else:
return "\n\n---\n\n"
Expand Down
172 changes: 119 additions & 53 deletions checkov/common/runners/runner_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

from collections import defaultdict
from collections.abc import Iterable
from json import dumps
from pathlib import Path
from typing import List, Dict, Any, Optional, cast, TYPE_CHECKING, TypeVar

Expand Down Expand Up @@ -44,6 +43,7 @@

_BaseRunner = TypeVar("_BaseRunner", bound="BaseRunner[Any]")

CONSOLE_OUTPUT = "console"
CHECK_BLOCK_TYPES = frozenset(["resource", "data", "provider", "module"])
OUTPUT_CHOICES = ["cli", "cyclonedx", "json", "junitxml", "github_failed_only", "sarif", "csv"]
SUMMARY_POSITIONS = frozenset(['top', 'bottom'])
Expand Down Expand Up @@ -125,8 +125,9 @@ def _handle_report(self, scan_report: Report, repo_root_for_plan_enrichment: lis

def save_output_to_file(self, file_name: str, data: str, data_format: str) -> None:
try:
with open(file_name, 'w') as f:
f.write(data)
file_path = Path(file_name)
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(data)
logging.info(f"\nWrote output in {data_format} format to the file '{file_name}')")
except EnvironmentError:
logging.error(f"\nAn error occurred while writing {data_format} results to file: {file_name}",
Expand Down Expand Up @@ -203,15 +204,21 @@ def print_reports(
created_baseline_path: Optional[str] = None,
baseline: Optional[Baseline] = None,
) -> Literal[0, 1]:
output_formats = set(config.output)
output_formats: "dict[str, str]" = {}

if config.output_file_path and "," in config.output_file_path:
output_paths = config.output_file_path.split(",")
for idx, output_format in enumerate(config.output):
output_formats[output_format] = output_paths[idx]
else:
output_formats = {output_format: CONSOLE_OUTPUT for output_format in config.output}

if "cli" in config.output and not config.quiet:
print(f"{self.banner}\n")
exit_codes = []
cli_reports = []
report_jsons = []
sarif_reports = []
junit_reports = []
github_reports = []
cyclonedx_reports = []
csv_sbom_report = CSVSBOM()

Expand All @@ -223,7 +230,7 @@ def print_reports(
if "junitxml" in config.output:
junit_reports.append(report)
if "github_failed_only" in config.output:
data_outputs["github_failed_only"] += report.print_failed_github_md(use_bc_ids=config.output_bc_ids)
github_reports.append(report.print_failed_github_md(use_bc_ids=config.output_bc_ids))
if "sarif" in config.output:
sarif_reports.append(report)
if "cli" in config.output:
Expand All @@ -240,7 +247,20 @@ def print_reports(
exit_code_thresholds = self.get_fail_thresholds(config, report.check_type)
exit_codes.append(report.get_exit_code(exit_code_thresholds))

if "github_failed_only" in config.output:
github_output = "".join(github_reports)

self._print_to_console(
output_formats=output_formats,
output_format="github_failed_only",
output=github_output,
)

data_outputs["github_failed_only"] = github_output
if "cli" in config.output:
if not config.quiet:
print(f"{self.banner}\n")

cli_output = ''
for report in cli_reports:
cli_output += report.print_console(
Expand All @@ -251,51 +271,69 @@ def print_reports(
use_bc_ids=config.output_bc_ids,
summary_position=config.summary_position
)
print(cli_output)

self._print_to_console(
output_formats=output_formats,
output_format="cli",
output=cli_output,
url=url,
)

# Remove colors from the cli output
ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0–9:;<=>?]*[ -/]*[@-~]')
data_outputs['cli'] = ansi_escape.sub('', cli_output)
if url:
print("More details: {}".format(url))
output_formats.remove("cli")
if output_formats:
print(OUTPUT_DELIMITER)
if "sarif" in config.output:
master_report = Report("merged")
print(self.banner)

output_format = output_formats["sarif"]
if "cli" not in config.output and output_format == CONSOLE_OUTPUT:
print(self.banner)

for report in sarif_reports:
print(report.print_console(
is_quiet=config.quiet,
is_compact=config.compact,
created_baseline_path=created_baseline_path,
baseline=baseline,
use_bc_ids=config.output_bc_ids,
summary_position=config.summary_position
))
if "cli" not in config.output and output_format == CONSOLE_OUTPUT:
print(report.print_console(
is_quiet=config.quiet,
is_compact=config.compact,
created_baseline_path=created_baseline_path,
baseline=baseline,
use_bc_ids=config.output_bc_ids,
summary_position=config.summary_position
))
master_report.failed_checks += report.failed_checks
master_report.skipped_checks += report.skipped_checks
if url:
print("More details: {}".format(url))
master_report.write_sarif_output(self.tool)
data_outputs['sarif'] = json.dumps(master_report.get_sarif_json(self.tool), cls=CustomJSONEncoder)
output_formats.remove("sarif")
if output_formats:
print(OUTPUT_DELIMITER)

if output_format == CONSOLE_OUTPUT:
# don't write to file, if an explicit file path was set
master_report.write_sarif_output(self.tool)

if output_format == CONSOLE_OUTPUT:
del output_formats["sarif"]

if "cli" not in config.output and url:
print("More details: {}".format(url))
if CONSOLE_OUTPUT in output_formats.values():
print(OUTPUT_DELIMITER)

data_outputs["sarif"] = json.dumps(master_report.get_sarif_json(self.tool), cls=CustomJSONEncoder)
if "json" in config.output:
if config.compact and report_jsons:
self.strip_code_blocks_from_json(report_jsons)

report_json_output: "list[dict[str, Any]] | dict[str, Any]" = report_jsons
if not report_jsons:
print(dumps(Report("").get_summary(), indent=4, cls=CustomJSONEncoder))
data_outputs['json'] = json.dumps(Report("").get_summary(), cls=CustomJSONEncoder)
report_json_output = Report("").get_summary()
elif len(report_jsons) == 1:
print(dumps(report_jsons[0], indent=4, cls=CustomJSONEncoder))
data_outputs['json'] = json.dumps(report_jsons[0], cls=CustomJSONEncoder)
else:
print(dumps(report_jsons, indent=4, cls=CustomJSONEncoder))
data_outputs['json'] = json.dumps(report_jsons, cls=CustomJSONEncoder)
output_formats.remove("json")
if output_formats:
print(OUTPUT_DELIMITER)
report_json_output = report_jsons[0]

json_output = json.dumps(report_json_output, indent=4, cls=CustomJSONEncoder)

self._print_to_console(
output_formats=output_formats,
output_format="json",
output=json_output,
)

data_outputs["json"] = json.dumps(report_json_output, cls=CustomJSONEncoder)
if "junitxml" in config.output:
properties = Report.create_test_suite_properties_block(config)

Expand All @@ -307,21 +345,26 @@ def print_reports(
else:
test_suites = [Report("").get_test_suite(properties=properties)]

data_outputs['junitxml'] = Report.get_junit_xml_string(test_suites)
print(data_outputs['junitxml'])
junit_output = Report.get_junit_xml_string(test_suites)

output_formats.remove("junitxml")
if output_formats:
print(OUTPUT_DELIMITER)
self._print_to_console(
output_formats=output_formats,
output_format="junitxml",
output=junit_output,
)

data_outputs['junitxml'] = junit_output
if "cyclonedx" in config.output:
cyclonedx = CycloneDX(repo_id=metadata_integration.bc_integration.repo_id, reports=cyclonedx_reports)
cyclonedx_output = cyclonedx.get_xml_output()

print(cyclonedx_output)
self._print_to_console(
output_formats=output_formats,
output_format="cyclonedx",
output=cyclonedx_output,
)

data_outputs["cyclonedx"] = cyclonedx_output
output_formats.remove("cyclonedx")
if output_formats:
print(OUTPUT_DELIMITER)
if "csv" in config.output:
is_api_key = False
if 'bc_api_key' in config and config.bc_api_key is not None:
Expand All @@ -336,14 +379,37 @@ def print_reports(
'junitxml': 'results_junitxml.xml',
'cyclonedx': 'results_cyclonedx.xml'}
if config.output_file_path:
for output in config.output:
if output in file_names:
self.save_output_to_file(file_name=f'{config.output_file_path}/{file_names[output]}',
data=data_outputs[output],
data_format=output)
if output_formats:
for output_format, output_path in output_formats.items():
self.save_output_to_file(
file_name=output_path,
data=data_outputs[output_format],
data_format=output_format,
)
else:
for output in config.output:
if output in file_names:
self.save_output_to_file(
file_name=f'{config.output_file_path}/{file_names[output]}',
data=data_outputs[output],
data_format=output,
)
exit_code = 1 if 1 in exit_codes else 0
return cast(Literal[0, 1], exit_code)

def _print_to_console(self, output_formats: dict[str, str], output_format: str, output: str, url: str | None = None) -> None:
"""Prints the output to console, if needed"""

output_dest = output_formats[output_format]
if output_dest == CONSOLE_OUTPUT:
del output_formats[output_format]

print(output)
if url:
print(f"More details: {url}")
if CONSOLE_OUTPUT in output_formats.values():
print(OUTPUT_DELIMITER)

def print_iac_bom_reports(self, output_path: str,
scan_reports: list[Report],
output_types: list[str],
Expand Down
6 changes: 5 additions & 1 deletion checkov/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,11 @@ def add_parser_args(parser: ArgumentParser) -> None:
default=None,
help='Report output format. Add multiple outputs by using the flag multiple times (-o sarif -o cli)')
parser.add('--output-file-path', default=None,
help='Name for output file. The first selected output via output flag will be saved to the file (default output is cli)')
help='Name of the output folder to save the chosen output formats. '
'Advanced usage: '
'By using -o cli -o junitxml --output-file-path console,results.xml the CLI output will be printed '
'to the console and the JunitXML output to the file results.xml.'
)
parser.add('--output-bc-ids', action='store_true',
help='Print Bridgecrew platform IDs (BC...) instead of Checkov IDs (CKV...), if the check exists in the platform')
parser.add('--include-all-checkov-policies', action='store_true',
Expand Down
Loading