From 3c8d301f55cca48361d32b3f76708b13abe91c80 Mon Sep 17 00:00:00 2001 From: Seth Grover Date: Fri, 8 Nov 2024 08:56:07 -0700 Subject: [PATCH] work in progress for mandiant threat intel integration, cisagov/Malcolm#358 --- shared/bin/zeek_threat_feed_utils.py | 52 +++------------------------- 1 file changed, 4 insertions(+), 48 deletions(-) diff --git a/shared/bin/zeek_threat_feed_utils.py b/shared/bin/zeek_threat_feed_utils.py index 5ab872066..82c3b1e31 100644 --- a/shared/bin/zeek_threat_feed_utils.py +++ b/shared/bin/zeek_threat_feed_utils.py @@ -190,55 +190,11 @@ def download_to_file(url, session=None, local_filename=None, chunk_bytes=4096, s return None -def mandiant_json_serializer(obj): - """ - JSON serializer for mandiant_threatintel.APIResponse object (for debug output) - """ - - if isinstance(obj, datetime): - return obj.astimezone(UTCTimeZone).isoformat() - - elif isinstance(obj, GeneratorType): - return list(map(mandiant_json_serializer, obj)) - - elif isinstance(obj, list): - return [mandiant_json_serializer(item) for item in obj] - - elif isinstance(obj, dict): - return {key: mandiant_json_serializer(value) for key, value in obj.items()} - - elif isinstance(obj, set): - return {mandiant_json_serializer(item) for item in obj} - - elif isinstance(obj, tuple): - return tuple(mandiant_json_serializer(item) for item in obj) - - elif isinstance(obj, FunctionType): - return f"function {obj.__name__}" if obj.__name__ != "" else "lambda" - - elif isinstance(obj, LambdaType): - return "lambda" - - elif (not hasattr(obj, "__str__") or obj.__str__ is object.__str__) and ( - not hasattr(obj, "__repr__") or obj.__repr__ is object.__repr__ - ): - return obj.__class__.__name__ - - else: - return str(obj) - - def mandiant_indicator_as_json_str(indicator, skip_attr_map={}): - return json.dumps( - { - key: getattr(indicator, key) - for key in indicator.__dir__() - if (skip_attr_map.get(key, False) == False) - and (not key.startswith("_")) - and (not callable(getattr(indicator, key))) - }, - default=mandiant_json_serializer, - ) + if indicator and indicator._api_response: + return json.dumps(indicator._api_response) + else: + return 'unknown indicator' def map_mandiant_indicator_to_zeek(