Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

download-api: Adding sample script to start scan and download scan results #8

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 272 additions & 0 deletions scripts/vulnerability_scan/start_scan_and_download_results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
import requests
import time
import csv
import io
import xlsxwriter
import json
from sortedcontainers import SortedDict
from requests.packages.urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)


'''
Below are the pre-requisite for running this script:
* sortedcontainers
* xlsxwriter

These can be installed using pip as:
* pip3 install sortedcontainers
* pip3 install xlsxwriter
'''

CVE_FILE_NAME_CSV = "vulnerability_scan_results.csv"
CVE_FILE_NAME_XLSX = "vulnerability_scan_results.xlsx"
CVE_FILE_NAME_JSON = "vulnerability_scan_results.json"

SEV_CRITICAL = "critical"
SEV_HIGH = "high"
SEV_MEDIUM = "medium"
SEV_LOW = "low"

TYPE_HOST="host"
TYPE_CONTAINER_IMAGE="container_image"

EXCEL = "EXCEL"
CSV = "CSV"
JSON = "JSON"

def run_scan_and_download_results(api_url, api_key, report_type):
# Auth
default_headers = {"Content-Type": "application/json"}
auth_response = requests.post("{0}/users/auth".format(api_url), json={"api_key": api_key}, headers=default_headers,
verify=False).json()
if auth_response["success"]:
print("Authentication successful")
else:
print("Authentication failed")
return
default_headers["Authorization"] = "Bearer " + auth_response["data"]["access_token"]

# Enumerate nodes
enumerate_response = requests.post(
"{0}/enumerate".format(api_url),
json={"filters": {"type": [TYPE_HOST, TYPE_CONTAINER_IMAGE], "pseudo": False}, "size": 5000},
headers=default_headers, verify=False).json()

counter = 1
enumerate_response_nodes = enumerate_response.get("data", {}).get("data", [])
if not enumerate_response_nodes:
print("No nodes found")
return

container_images = {}
hosts = {}

host_scope_ids = []
container_image_scope_ids = []
for node in enumerate_response_nodes:
if node["type"] == TYPE_CONTAINER_IMAGE:
container_images[node["id"]] = node
container_image_scope_ids.append(node["scope_id"])
else:
hosts[node["id"]] = node
host_scope_ids.append(node["scope_id"])

#Below flag is only for testing purpose...
run_scan_flag = True

if run_scan_flag == True:
run_scans(host_scope_ids, default_headers, TYPE_HOST)
run_scans(container_image_scope_ids, default_headers, TYPE_CONTAINER_IMAGE)

target = len(hosts) + len(container_images)
count = 0
status_track = {}
success_scan_id = []

while count != target:
time.sleep(10)
count = check_scan_status(hosts, TYPE_HOST, count,
default_headers, status_track, success_scan_id)
count = check_scan_status(container_images, TYPE_CONTAINER_IMAGE, count,
default_headers, status_track, success_scan_id)

heading_data = []
results = {}
first = True
results_count = 0

#Fetch and process the scan results
#Results are grouped by the "severity" and sorted by the
#combination of "cve_id"+"cve_caused_by_package"(package name)
for scanid in success_scan_id:
cve_response = requests.post(
"{0}/vulnerability".format(api_url),
json={"filters": {"scan_id": [scanid]}, "size": 10000},
headers=default_headers, verify=False).json()

if cve_response["success"] != True:
msg = "Scan failed for entity:"+"Error message(if any):"
if cve_response["error"]:
msg = msg+cve_response["error"]
print(msg)

for entry in cve_response["data"]:
sev = entry["cve_severity"]
if sev not in results:
results[sev]=SortedDict()

cve_id = entry["cve_id"]
package_name = entry["cve_caused_by_package"]
key = cve_id + package_name

if key not in results[sev]:
results[sev][key] = []

results[sev][key].append(entry)
results_count += 1

if first == True:
heading_data = entry.keys()
first = False

print("Total number of results:", results_count)

#Results are writen to the files in the decreasing order of severity
if report_type == CSV:
write_to_csv(heading_data, results[SEV_CRITICAL], results[SEV_HIGH],
results[SEV_MEDIUM], results[SEV_LOW])
elif report_type == EXCEL:
write_to_xlsx(heading_data, results[SEV_CRITICAL], results[SEV_HIGH],
results[SEV_MEDIUM], results[SEV_LOW])
elif report_type == JSON:
write_to_json(results[SEV_CRITICAL], results[SEV_HIGH],
results[SEV_MEDIUM], results[SEV_LOW])

def run_scans(scope_ids, default_headers, entity):
print("\nStarting vulnerability scan on all "+entity)
print(requests.post(
"{0}/node_action".format(api_url),
json={
"action": "cve_scan_start",
"node_type": entity,
"node_id_list": scope_ids,
"action_args": {
"scan_type": ["base", "java", "python", "ruby", "php", "javascript", "rust", "golang", "dotnet"]
}
},
headers=default_headers, verify=False).json())

def check_scan_status(entity_ids, entity, count, default_headers,
status_track, success_scan_id):
for entity_id in entity_ids:
resp = requests.get("{0}/node/{1}/cve_scan_status".format(api_url, entity_id),
headers=default_headers, verify=False).json()
status = resp["data"]["action"]
if status == "ERROR":
if entity_id in status_track and status_track[entity_id] == status:
continue

print("Error encounterd during Scan for {}:{}".format(entity, entity_id))
status_track[entity_id] = status
count+=1
elif status == "COMPLETED":
if entity_id in status_track and status_track[entity_id] == status:
continue

print("Scan completed for {}:{}".format(entity, entity_id))
status_track[entity_id] = status
success_scan_id.append(resp["data"]["scan_id"])
count+=1
else:
if entity_id not in status_track:
print("Initial Scan status for {}:{}, is:{}".format(entity, entity_id, status))
status_track[entity_id]=status
elif status_track[entity_id] != status:
print("Scan status changed for {}:{}, From:{}, To:{}".format(entity,
entity_id, status_track[entity_id], status))
status_track[entity_id]=status
return count

def write_to_csv(heading, critical, high, medium, low):
def write_data(data_map):
for key in data_map:
cve_data = data_map[key]
for record in cve_data:
csv_writer.writerow(record.values())

with io.open(CVE_FILE_NAME_CSV, mode='w', encoding='utf-8') as results_file:
csv_writer = csv.writer(results_file, lineterminator="\n")
csv_writer.writerow(heading)
write_data(critical)
write_data(high)
write_data(medium)
write_data(low)
results_file.close()
print("Vulnerabilities saved to {0}".format(CVE_FILE_NAME_CSV))

def write_to_xlsx(heading, critical, high, medium, low):

def write_data(data_map, row):
for key in data_map:
cve_data = data_map[key]
for record in cve_data:
worksheet.write_row(row, 0, record.values())
row += 1
return row

workbook = xlsxwriter.Workbook(CVE_FILE_NAME_XLSX)
worksheet = workbook.add_worksheet()
worksheet.write_row(0, 0, heading)
row = 1

row = write_data(critical, row)
row = write_data(high, row)
row = write_data(medium, row)
row = write_data(low, row)
workbook.close()
print("Total records written:", row)
print("Vulnerabilities saved to {0}".format(CVE_FILE_NAME_XLSX))


def write_to_json(critical, high, medium, low):

def write_data(data_map):
for key in data_map:
cve_data = data_map[key]
for record in cve_data:
output.append(record)

output = []
with open(CVE_FILE_NAME_JSON, 'w') as json_file:
write_data(critical)
write_data(high)
write_data(medium)
write_data(low)
json.dump(output, json_file, indent=4)
json_file.close()
print("Total records written to json file:", len(output))
print("Vulnerabilities saved to {0}".format(CVE_FILE_NAME_JSON))

if __name__ == '__main__':
import sys

if len(sys.argv) < 3:
print("Usage: python3 start_scan_and_download_results.py <mgmt_console_ip_address> <api_key> <csv/excel/json>(optional)")
exit(1)

report_type = ""
if len(sys.argv) > 3:
sys.argv[3] = sys.argv[3].upper()
if sys.argv[3] == EXCEL or sys.argv[3] == CSV or sys.argv[3] == JSON:
report_type = sys.argv[3]
else:
print("Invalid report type.Supported types are:{}, {}, {}".format(EXCEL, CSV, JSON))
exit(1)
else:
report_type = EXCEL

print("Using report type as:"+report_type)
api_url = "https://{0}/deepfence/v1.5".format(sys.argv[1])
run_scan_and_download_results(api_url, sys.argv[2], report_type)