Skip to content

Commit

Permalink
Add feature toggle for checking path safety
Browse files Browse the repository at this point in the history
  • Loading branch information
yoniabrahamy committed Sep 13, 2023
1 parent b1ec5cd commit 372963f
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 14 deletions.
4 changes: 4 additions & 0 deletions conf/web.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,7 @@ guest_rdp_port = 3389
# Some samples will only detonate on specific versions of Windows (see web.conf packages for more info)
# Example: MSIX - Windows >= 10
msix = win10,win11

[security]
# When using mounted folder you might want to set to no
check_path_safe = yes
35 changes: 21 additions & 14 deletions web/analysis/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ def __call__(self, func):
return self.decorator(func)


def _path_safe(path: str) -> bool:
if web_cfg.security.check_path_safe:
return path_safe(path)

return True


def get_tags_tasks(task_ids: list) -> str:
for analysis in db.list_tasks(task_ids=task_ids):
return analysis.tags_tasks
Expand Down Expand Up @@ -535,7 +542,7 @@ def _load_file(task_id, sha256, existen_details, name):

elif name == "debugger":
debugger_log_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(task_id), "debugger")
if path_exists(debugger_log_path) and path_safe(debugger_log_path):
if path_exists(debugger_log_path) and _path_safe(debugger_log_path):
for log in os.listdir(debugger_log_path):
if not log.endswith(".log"):
continue
Expand All @@ -545,7 +552,7 @@ def _load_file(task_id, sha256, existen_details, name):
return existen_details

if name in ("bingraph", "vba2graph"):
if not filepath or not path_exists(filepath) or not path_safe(filepath):
if not filepath or not path_exists(filepath) or not _path_safe(filepath):
return existen_details

existen_details.setdefault(sha256, Path(filepath).read_text())
Expand Down Expand Up @@ -646,8 +653,8 @@ def load_files(request, task_id, category):
ajax_response["suricata"] = data.get("suricata", {})
ajax_response["cif"] = data.get("cif", [])
tls_path = os.path.join(ANALYSIS_BASE_PATH, "analyses", str(task_id), "tlsdump", "tlsdump.log")
if path_safe(tls_path):
ajax_response["tlskeys_exists"] = path_safe(tls_path)
if _path_safe(tls_path):
ajax_response["tlskeys_exists"] = _path_safe(tls_path)
elif category == "behavior":
ajax_response["detections2pid"] = data.get("detections2pid", {})
return render(request, page, ajax_response)
Expand Down Expand Up @@ -1472,7 +1479,7 @@ def report(request, task_id):
CUCKOO_ROOT, "storage", "analyses", str(task_id), "vba2graph", "svg", report["target"]["file"]["sha256"] + ".svg"
)

if path_exists(vba2graph_svg_path) and path_safe(vba2graph_svg_path):
if path_exists(vba2graph_svg_path) and _path_safe(vba2graph_svg_path):
vba2graph_dict_content.setdefault(report["target"]["file"]["sha256"], Path(vba2graph_svg_path).read_text())

bingraph = reporting_cfg.bingraph.enabled
Expand Down Expand Up @@ -1601,7 +1608,7 @@ def file_nl(request, category, task_id, dlfile):
else:
return render(request, "error.html", {"error": "Category not defined"})

if path and not path_safe(path):
if path and not _path_safe(path):
return render(request, "error.html", {"error": "File not found"})

# Performance considerations
Expand Down Expand Up @@ -1753,7 +1760,7 @@ def file(request, category, task_id, dlfile):
if isinstance(path, list):
test_path = path[0]

if test_path and (not path_exists(test_path) or not path_safe(test_path)):
if test_path and (not path_exists(test_path) or not _path_safe(test_path)):
return render(request, "error.html", {"error": "File {} not found".format(os.path.basename(test_path))})

try:
Expand Down Expand Up @@ -1796,7 +1803,7 @@ def procdump(request, task_id, process_id, start, end, zipped=False):

dumpfile = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "memory", origname)

if not path_safe(dumpfile):
if not _path_safe(dumpfile):
return render(request, "error.html", {"error": f"File not found: {os.path.basename(dumpfile)}"})

if not path_exists(dumpfile):
Expand Down Expand Up @@ -1886,7 +1893,7 @@ def filereport(request, task_id, category):
if category in formats:
path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(task_id), "reports", formats[category])

if not path_safe(path) or not path_exists(path):
if not _path_safe(path) or not path_exists(path):
return render(request, "error.html", {"error": f"File not found: {formats[category]}"})

response = HttpResponse(Path(path).read_bytes(), content_type="application/octet-stream")
Expand All @@ -1902,7 +1909,7 @@ def full_memory_dump_file(request, analysis_number):
filename = False
for name in ("memory.dmp", "memory.dmp.zip"):
path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(analysis_number), name)
if path_exists(path) and path_safe(path):
if path_exists(path) and _path_safe(path):
filename = name
break

Expand All @@ -1924,7 +1931,7 @@ def full_memory_dump_strings(request, analysis_number):
path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(analysis_number), name)
if path_exists(path):
filename = name
if not path_safe(ANALYSIS_BASE_PATH):
if not _path_safe(ANALYSIS_BASE_PATH):
return render(request, "error.html", {"error": f"File not found: {name}"})
break
if filename:
Expand Down Expand Up @@ -2124,7 +2131,7 @@ def pcapstream(request, task_id, conntuple):
# if we do, build out the path to it
path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "dump_sorted.pcap")

if not path_exists(path) or not path_safe(path):
if not path_exists(path) or not _path_safe(path):
return render(request, "standalone_error.html", {"error": "The required sorted PCAP does not exist"})

fobj = open(path, "rb")
Expand Down Expand Up @@ -2194,7 +2201,7 @@ def vtupload(request, category, task_id, filename, dlfile):
if folder_name:
path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, folder_name, filename)

if not path or not path_safe(path):
if not path or not _path_safe(path):
return render(request, "error.html", {"error": f"File not found: {os.path.basename(path)}"})

headers = {"x-apikey": settings.VTDL_KEY}
Expand Down Expand Up @@ -2287,7 +2294,7 @@ def on_demand(request, service: str, task_id: str, category: str, sha256):
category = "target.file"
extractedfile = True

if path and (not path_safe(path) or not path_exists(path)):
if path and (not _path_safe(path) or not path_exists(path)):
return render(request, "error.html", {"error": "File not found: {}".format(path)})

details = False
Expand Down

0 comments on commit 372963f

Please sign in to comment.