From 624f2b576ea4fb889cf72cbdb89bc600228bd8e9 Mon Sep 17 00:00:00 2001 From: Varun Sharma Date: Thu, 6 Apr 2023 12:55:05 +0530 Subject: [PATCH] download-api: Adding script to start scan and download scan results --- .../start_scan_and_download_results.py | 272 ++++++++++++++++++ 1 file changed, 272 insertions(+) create mode 100644 scripts/vulnerability_scan/start_scan_and_download_results.py diff --git a/scripts/vulnerability_scan/start_scan_and_download_results.py b/scripts/vulnerability_scan/start_scan_and_download_results.py new file mode 100644 index 0000000..b6d1adc --- /dev/null +++ b/scripts/vulnerability_scan/start_scan_and_download_results.py @@ -0,0 +1,272 @@ +import requests +import time +import csv +import io +import xlsxwriter +import json +from sortedcontainers import SortedDict +from requests.packages.urllib3.exceptions import InsecureRequestWarning + +requests.packages.urllib3.disable_warnings(InsecureRequestWarning) + + +''' +Below are the pre-requisite for running this script: +* sortedcontainers +* xlsxwriter + +These can be installed using pip as: +* pip3 install sortedcontainers +* pip3 install xlsxwriter +''' + +CVE_FILE_NAME_CSV = "vulnerability_scan_results.csv" +CVE_FILE_NAME_XLSX = "vulnerability_scan_results.xlsx" +CVE_FILE_NAME_JSON = "vulnerability_scan_results.json" + +SEV_CRITICAL = "critical" +SEV_HIGH = "high" +SEV_MEDIUM = "medium" +SEV_LOW = "low" + +TYPE_HOST="host" +TYPE_CONTAINER_IMAGE="container_image" + +EXCEL = "EXCEL" +CSV = "CSV" +JSON = "JSON" + +def run_scan_and_download_results(api_url, api_key, report_type): + # Auth + default_headers = {"Content-Type": "application/json"} + auth_response = requests.post("{0}/users/auth".format(api_url), json={"api_key": api_key}, headers=default_headers, + verify=False).json() + if auth_response["success"]: + print("Authentication successful") + else: + print("Authentication failed") + return + default_headers["Authorization"] = "Bearer " + auth_response["data"]["access_token"] + + # Enumerate nodes + enumerate_response = requests.post( + "{0}/enumerate".format(api_url), + json={"filters": {"type": [TYPE_HOST, TYPE_CONTAINER_IMAGE], "pseudo": False}, "size": 5000}, + headers=default_headers, verify=False).json() + + counter = 1 + enumerate_response_nodes = enumerate_response.get("data", {}).get("data", []) + if not enumerate_response_nodes: + print("No nodes found") + return + + container_images = {} + hosts = {} + + host_scope_ids = [] + container_image_scope_ids = [] + for node in enumerate_response_nodes: + if node["type"] == TYPE_CONTAINER_IMAGE: + container_images[node["id"]] = node + container_image_scope_ids.append(node["scope_id"]) + else: + hosts[node["id"]] = node + host_scope_ids.append(node["scope_id"]) + + #Below flag is only for testing purpose... + run_scan_flag = True + + if run_scan_flag == True: + run_scans(host_scope_ids, default_headers, TYPE_HOST) + run_scans(container_image_scope_ids, default_headers, TYPE_CONTAINER_IMAGE) + + target = len(hosts) + len(container_images) + count = 0 + status_track = {} + success_scan_id = [] + + while count != target: + time.sleep(10) + count = check_scan_status(hosts, TYPE_HOST, count, + default_headers, status_track, success_scan_id) + count = check_scan_status(container_images, TYPE_CONTAINER_IMAGE, count, + default_headers, status_track, success_scan_id) + + heading_data = [] + results = {} + first = True + results_count = 0 + + #Fetch and process the scan results + #Results are grouped by the "severity" and sorted by the + #combination of "cve_id"+"cve_caused_by_package"(package name) + for scanid in success_scan_id: + cve_response = requests.post( + "{0}/vulnerability".format(api_url), + json={"filters": {"scan_id": [scanid]}, "size": 10000}, + headers=default_headers, verify=False).json() + + if cve_response["success"] != True: + msg = "Scan failed for entity:"+"Error message(if any):" + if cve_response["error"]: + msg = msg+cve_response["error"] + print(msg) + + for entry in cve_response["data"]: + sev = entry["cve_severity"] + if sev not in results: + results[sev]=SortedDict() + + cve_id = entry["cve_id"] + package_name = entry["cve_caused_by_package"] + key = cve_id + package_name + + if key not in results[sev]: + results[sev][key] = [] + + results[sev][key].append(entry) + results_count += 1 + + if first == True: + heading_data = entry.keys() + first = False + + print("Total number of results:", results_count) + + #Results are writen to the files in the decreasing order of severity + if report_type == CSV: + write_to_csv(heading_data, results[SEV_CRITICAL], results[SEV_HIGH], + results[SEV_MEDIUM], results[SEV_LOW]) + elif report_type == EXCEL: + write_to_xlsx(heading_data, results[SEV_CRITICAL], results[SEV_HIGH], + results[SEV_MEDIUM], results[SEV_LOW]) + elif report_type == JSON: + write_to_json(results[SEV_CRITICAL], results[SEV_HIGH], + results[SEV_MEDIUM], results[SEV_LOW]) + +def run_scans(scope_ids, default_headers, entity): + print("\nStarting vulnerability scan on all "+entity) + print(requests.post( + "{0}/node_action".format(api_url), + json={ + "action": "cve_scan_start", + "node_type": entity, + "node_id_list": scope_ids, + "action_args": { + "scan_type": ["base", "java", "python", "ruby", "php", "javascript", "rust", "golang", "dotnet"] + } + }, + headers=default_headers, verify=False).json()) + +def check_scan_status(entity_ids, entity, count, default_headers, + status_track, success_scan_id): + for entity_id in entity_ids: + resp = requests.get("{0}/node/{1}/cve_scan_status".format(api_url, entity_id), + headers=default_headers, verify=False).json() + status = resp["data"]["action"] + if status == "ERROR": + if entity_id in status_track and status_track[entity_id] == status: + continue + + print("Error encounterd during Scan for {}:{}".format(entity, entity_id)) + status_track[entity_id] = status + count+=1 + elif status == "COMPLETED": + if entity_id in status_track and status_track[entity_id] == status: + continue + + print("Scan completed for {}:{}".format(entity, entity_id)) + status_track[entity_id] = status + success_scan_id.append(resp["data"]["scan_id"]) + count+=1 + else: + if entity_id not in status_track: + print("Initial Scan status for {}:{}, is:{}".format(entity, entity_id, status)) + status_track[entity_id]=status + elif status_track[entity_id] != status: + print("Scan status changed for {}:{}, From:{}, To:{}".format(entity, + entity_id, status_track[entity_id], status)) + status_track[entity_id]=status + return count + +def write_to_csv(heading, critical, high, medium, low): + def write_data(data_map): + for key in data_map: + cve_data = data_map[key] + for record in cve_data: + csv_writer.writerow(record.values()) + + with io.open(CVE_FILE_NAME_CSV, mode='w', encoding='utf-8') as results_file: + csv_writer = csv.writer(results_file, lineterminator="\n") + csv_writer.writerow(heading) + write_data(critical) + write_data(high) + write_data(medium) + write_data(low) + results_file.close() + print("Vulnerabilities saved to {0}".format(CVE_FILE_NAME_CSV)) + +def write_to_xlsx(heading, critical, high, medium, low): + + def write_data(data_map, row): + for key in data_map: + cve_data = data_map[key] + for record in cve_data: + worksheet.write_row(row, 0, record.values()) + row += 1 + return row + + workbook = xlsxwriter.Workbook(CVE_FILE_NAME_XLSX) + worksheet = workbook.add_worksheet() + worksheet.write_row(0, 0, heading) + row = 1 + + row = write_data(critical, row) + row = write_data(high, row) + row = write_data(medium, row) + row = write_data(low, row) + workbook.close() + print("Total records written:", row) + print("Vulnerabilities saved to {0}".format(CVE_FILE_NAME_XLSX)) + + +def write_to_json(critical, high, medium, low): + + def write_data(data_map): + for key in data_map: + cve_data = data_map[key] + for record in cve_data: + output.append(record) + + output = [] + with open(CVE_FILE_NAME_JSON, 'w') as json_file: + write_data(critical) + write_data(high) + write_data(medium) + write_data(low) + json.dump(output, json_file, indent=4) + json_file.close() + print("Total records written to json file:", len(output)) + print("Vulnerabilities saved to {0}".format(CVE_FILE_NAME_JSON)) + +if __name__ == '__main__': + import sys + + if len(sys.argv) < 3: + print("Usage: python3 start_scan_and_download_results.py (optional)") + exit(1) + + report_type = "" + if len(sys.argv) > 3: + sys.argv[3] = sys.argv[3].upper() + if sys.argv[3] == EXCEL or sys.argv[3] == CSV or sys.argv[3] == JSON: + report_type = sys.argv[3] + else: + print("Invalid report type.Supported types are:{}, {}, {}".format(EXCEL, CSV, JSON)) + exit(1) + else: + report_type = EXCEL + + print("Using report type as:"+report_type) + api_url = "https://{0}/deepfence/v1.5".format(sys.argv[1]) + run_scan_and_download_results(api_url, sys.argv[2], report_type)