Skip to content

Commit

Permalink
Normalize files (#1753)
Browse files Browse the repository at this point in the history
* Insert the detonation-independent fields for files in to a separate collection and reference it from the analysis document.

* Use deepcopy to create a copy of the report doc.

Previously, only a shallow copy was being done and, if nested objects
were modified during reporting (such as adding file_ref keys), then
these would persist for other reporting modules. We don't want that.

---------

Co-authored-by: Tommy Beadle
  • Loading branch information
tbeadle authored Sep 19, 2023
1 parent c9cd1eb commit 0f42097
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 30 deletions.
14 changes: 11 additions & 3 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
### [19.9.2023]
* Storage of file data in MongoDB
* Store the parts of a report's file data that is independent of a detonation in a separate collection
to conserve disk space.
* __ACTION REQUIRED__
* It is recommended to add a regular cron job to call `cd /opt/CAPEv2 && sudo -u cape poetry run python ./utils/cleaners.py --delete-unused-file-data-in-mongo`
to prune these entries that are no longer needed.

### [13.9.2023]
* Monitor updates:
* Monitor updates:
* .NET JIT native cache handling improvements
* New debugger action 'string' to capture decrypted strings
* Fix issue in procname_watch_init() with non-null-terminated unicode paths - thanks Proofpoint for the report

### [8.9.2023]
* Monitor update:
* Monitor update:
* .NET JIT native cache scanning & dumping

### [1.9.2023]
* Monitor updates:
* Monitor updates:
* Fix missing browser hooks config setting for Edge & Chrome
* Trace: add config option to try and skip loops which flood trace logs (loopskip=1)

Expand Down
152 changes: 152 additions & 0 deletions dev_utils/mongo_hooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import itertools
import logging

from pymongo import UpdateOne

from dev_utils.mongodb import (
mongo_bulk_write,
mongo_delete_data,
mongo_delete_many,
mongo_find,
mongo_find_one,
mongo_hook,
mongo_insert_one,
mongo_update_many,
mongo_update_one,
)

log = logging.getLogger(__name__)

FILES_COLL = "files"
FILE_KEY = "sha256"
TASK_IDS_KEY = "_task_ids"


def normalize_file(file_dict, task_id):
"""Pull out the detonation-independent attributes of the given file and
return an UpdateOne object usable by bulk_write to upsert a
document into the FILES_COLL collection with its _id set to the FILE_KEY of
the file. The given file_dict is updated in place to remove those
attributes and add a 'file_ref' key containing the FILE_KEY that can be
used as a lookup in the FILES_COLL collection.
If the file has already been "normalized," then it is not modified and
None is returned.
"""
if "file_ref" in file_dict:
# This has already been normalized.
return
key = file_dict.get(FILE_KEY, None)
if not key:
return
static_fields = (
# hashes
"crc32",
"md5",
"sha1",
"sha256",
"sha512",
"sha3_384",
"ssdeep",
"tlsh",
"rh_hash",
# other metadata & static analysis fields
"size",
"pe",
"ep_bytes",
"entrypoint",
"data",
"strings",
)
new_dict = {}
for fld in static_fields:
try:
new_dict[fld] = file_dict.pop(fld)
except KeyError:
pass

new_dict["_id"] = key
file_dict["file_ref"] = key
return UpdateOne({"_id": key}, {"$set": new_dict, "$addToSet": {TASK_IDS_KEY: task_id}}, upsert=True, hint=[("_id", 1)])


@mongo_hook((mongo_insert_one, mongo_update_one), "analysis")
def normalize_files(report):
"""Take the detonation-independent file data from various parts of
the report and extract them out to a separate collection, keeping a
reference to it (along with the detonation-dependent fields) in the
report.
"""
requests = []
for file_dict in collect_file_dicts(report):
request = normalize_file(file_dict, report["info"]["id"])
if request:
requests.append(request)
if requests:
mongo_bulk_write(FILES_COLL, requests, ordered=False)

return report


@mongo_hook(mongo_find_one, "analysis")
def denormalize_files(report):
"""Pull the file info from the FILES_COLL collection in to associated parts of
the report.
"""
file_dicts = tuple(collect_file_dicts(report))
if not file_dicts:
# This is likely a partial report (like for an ajax request of a specific
# part of the report) that does not include any file information.
return report

if "file_ref" not in file_dicts[0]:
# This analysis uses the old-style of storing file information.
# It includes the static file info in the analysis document
# instead of in the FILES_COLL collection.
return report

file_refs = {file_dict["file_ref"] for file_dict in file_dicts}
file_docs = {}
for file_doc in mongo_find(FILES_COLL, {"_id": {"$in": list(file_refs)}}, {TASK_IDS_KEY: 0}):
file_docs[file_doc.pop("_id")] = file_doc
for file_dict in file_dicts:
if file_dict["file_ref"] not in file_docs:
log.warning("Failed to find %s in %s collection.", FILES_COLL, file_dict["file_ref"])
continue
file_doc = file_docs[file_dict.pop("file_ref")]
file_dict.update(file_doc)

return report


@mongo_hook(mongo_delete_data, "analysis")
def remove_task_references_from_files(task_ids):
"""Remove the given task_ids from the TASK_IDS_KEY field on "files"
documents that were referenced by those tasks that are being deleted.
"""
mongo_update_many(
FILES_COLL,
{TASK_IDS_KEY: {"$elemMatch": {"$in": task_ids}}},
{"$pullAll": {TASK_IDS_KEY: task_ids}},
)


def delete_unused_file_docs():
"""Delete entries in the FILES_COLL collection that are no longer
referenced by any analysis tasks. This should typically be invoked
via utils/cleaners.py in a cron job.
"""
return mongo_delete_many(FILES_COLL, {TASK_IDS_KEY: {"$size": 0}})


def collect_file_dicts(report) -> itertools.chain:
"""Return an iterable containing all of the candidates for files
from various parts of the report to be normalized.
"""
file_dicts = []
target_file = report.get("target", {}).get("file", None)
if target_file:
file_dicts.append([target_file])
file_dicts.append(report.get("dropped", None) or [])
file_dicts.append(report.get("CAPE", {}).get("payloads", None) or [])
file_dicts.append(report.get("procdump", None) or [])
return itertools.chain.from_iterable(file_dicts)
70 changes: 61 additions & 9 deletions dev_utils/mongodb.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import collections
import functools
import logging
import time
from typing import Callable, Iterable
from typing import Callable, Sequence, Union

from lib.cuckoo.common.config import Config

Expand Down Expand Up @@ -60,6 +61,35 @@ def connect_to_mongo() -> MongoClient:

conn = connect_to_mongo()
results_db = conn[mdb]
hooks = collections.defaultdict(lambda: collections.defaultdict(list))


def mongo_hook(mongo_funcs, collection):
if not hasattr(mongo_funcs, "__iter__"):
mongo_funcs = [mongo_funcs]
for mongo_func in mongo_funcs:
assert mongo_func in (
mongo_insert_one,
mongo_update_one,
mongo_find_one,
mongo_delete_data,
), f"{mongo_func} can not have hooks applied"

def decorator(f):
@functools.wraps(f)
def inner(*args, **kwargs):
return f(*args, **kwargs)

for mongo_func in mongo_funcs:
hooks[mongo_func][collection].append(inner)
return inner

return decorator


@graceful_auto_reconnect
def mongo_bulk_write(collection: str, requests, **kwargs):
return getattr(results_db, collection).bulk_write(requests, **kwargs)


@graceful_auto_reconnect
Expand All @@ -71,8 +101,10 @@ def mongo_create_index(collection: str, index, background: bool = True, name: st


@graceful_auto_reconnect
def mongo_insert_one(collection: str, query):
return getattr(results_db, collection).insert_one(query)
def mongo_insert_one(collection: str, doc):
for hook in hooks[mongo_insert_one][collection]:
doc = hook(doc)
return getattr(results_db, collection).insert_one(doc)


@graceful_auto_reconnect
Expand All @@ -89,8 +121,13 @@ def mongo_find_one(collection: str, query, projection=False, sort=None):
if sort is None:
sort = [("_id", -1)]
if projection:
return getattr(results_db, collection).find_one(query, projection, sort=sort)
return getattr(results_db, collection).find_one(query, sort=sort)
result = getattr(results_db, collection).find_one(query, projection, sort=sort)
else:
result = getattr(results_db, collection).find_one(query, sort=sort)
if result:
for hook in hooks[mongo_find_one][collection]:
result = hook(result)
return result


@graceful_auto_reconnect
Expand All @@ -104,12 +141,15 @@ def mongo_delete_many(collection: str, query):


@graceful_auto_reconnect
def mongo_update(collection: str, query, projection):
return getattr(results_db, collection).update(query, projection)
def mongo_update_many(collection: str, query, update):
return getattr(results_db, collection).update_many(query, update)


@graceful_auto_reconnect
def mongo_update_one(collection: str, query, projection, bypass_document_validation: bool = False):
if query.get("$set", None):
for hook in hooks[mongo_find_one][collection]:
query["$set"] = hook(query["$set"])
return getattr(results_db, collection).update_one(query, projection, bypass_document_validation=bypass_document_validation)


Expand All @@ -135,22 +175,29 @@ def mongo_drop_database(database: str):
conn.drop_database(database)


def mongo_delete_data(task_ids: Iterable[int]): # | int
def mongo_delete_data(task_ids: Union[int, Sequence[int]]):
try:
if isinstance(task_ids, int):
task_ids = [task_ids]

analyses_tmp = []
tasks = mongo_find("analysis", {"info.id": {"$in": task_ids}}, {"behavior.processes.calls": 1})
found_task_ids = []
tasks = mongo_find("analysis", {"info.id": {"$in": task_ids}}, {"behavior.processes.calls": 1, "info.id": 1})

for task in tasks or []:
for process in task.get("behavior", {}).get("processes", []):
if process.get("calls"):
mongo_delete_many("calls", {"_id": {"$in": process["calls"]}})
analyses_tmp.append(task["_id"])
task_id = task.get("info", {}).get("id", None)
if task_id is not None:
found_task_ids.append(task_id)

if analyses_tmp:
mongo_delete_many("analysis", {"_id": {"$in": analyses_tmp}})
if found_task_ids:
for hook in hooks[mongo_delete_data]["analysis"]:
hook(found_task_ids)
except Exception as e:
log.error(e, exc_info=True)

Expand All @@ -162,3 +209,8 @@ def mongo_is_cluster():
return True
except OperationFailure:
return False


# Mongodb hooks are registered by importing this module.
# Import it down here because mongo_hooks import this module.
from . import mongo_hooks
4 changes: 4 additions & 0 deletions installer/cape2.sh
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,10 @@ EOF
systemctl enable mongodb.service
systemctl restart mongodb.service

if ! crontab -l | grep -q -F 'delete-unused-file-data-in-mongo'; then
crontab -l | { cat; echo "30 1 * * 0 cd /opt/CAPEv2 && sudo -u cape poetry run python ./utils/cleaners.py --delete-unused-file-data-in-mongo"; } | crontab -
fi

echo -n "https://www.percona.com/blog/2016/08/12/tuning-linux-for-mongodb/"
else
echo "[+] Skipping MongoDB"
Expand Down
20 changes: 16 additions & 4 deletions lib/cuckoo/core/startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import modules.processing
import modules.reporting
import modules.signatures
from dev_utils.mongodb import mongo_create_index
from lib.cuckoo.common.colors import cyan, red, yellow
from lib.cuckoo.common.config import Config
from lib.cuckoo.common.constants import CUCKOO_ROOT
Expand Down Expand Up @@ -52,7 +53,6 @@ def check_python_version():


def check_user_permissions(as_root: bool = False):

if as_root:
log.warning("You running part of CAPE as non 'cape' user! That breaks permissions on temp folder and log folder.")
return
Expand Down Expand Up @@ -88,12 +88,25 @@ def check_webgui_mongo():
if repconf.mongodb.enabled:
from dev_utils.mongodb import connect_to_mongo

good = connect_to_mongo
if not good:
client = connect_to_mongo()
if not client:
sys.exit(
"You have enabled webgui but mongo isn't working, see mongodb manual for correct installation and configuration\nrun `systemctl status mongodb` for more info"
)

# Create an index based on the info.id dict key. Increases overall scalability
# with large amounts of data.
# Note: Silently ignores the creation if the index already exists.
mongo_create_index("analysis", "info.id", name="info.id_1")
# mongo_create_index([("target.file.sha256", TEXT)], name="target_sha256")
# We performs a lot of SHA256 hash lookup so we need this index
# mongo_create_index(
# "analysis",
# [("target.file.sha256", TEXT), ("dropped.sha256", TEXT), ("procdump.sha256", TEXT), ("CAPE.payloads.sha256", TEXT)],
# name="ALL_SHA256",
# )
mongo_create_index("files", [("_task_ids", 1)])

elif repconf.elasticsearchdb.enabled:
# ToDo add check
pass
Expand Down Expand Up @@ -455,7 +468,6 @@ def init_routing():


def check_tcpdump_permissions():

tcpdump = auxconf.sniffer.get("tcpdump", "/usr/sbin/tcpdump")

user = False
Expand Down
11 changes: 0 additions & 11 deletions modules/reporting/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,6 @@ def run(self, results):
report["behavior"] = dict(report["behavior"])
report["behavior"]["processes"] = new_processes

# Create an index based on the info.id dict key. Increases overall scalability
# with large amounts of data.
# Note: Silently ignores the creation if the index already exists.
mongo_create_index("analysis", "info.id", name="info.id_1")
# mongo_create_index([("target.file.sha256", TEXT)], name="target_sha256")
# We performs a lot of SHA256 hash lookup so we need this index
# mongo_create_index(
# "analysis",
# [("target.file.sha256", TEXT), ("dropped.sha256", TEXT), ("procdump.sha256", TEXT), ("CAPE.payloads.sha256", TEXT)],
# name="ALL_SHA256",
# )
# trick for distributed api
if results.get("info", {}).get("options", {}).get("main_task_id", ""):
report["info"]["id"] = int(results["info"]["options"]["main_task_id"])
Expand Down
Loading

0 comments on commit 0f42097

Please sign in to comment.