From 280d34531cfa32436724f5e3aff702b9559f22e8 Mon Sep 17 00:00:00 2001 From: Pedro Henrique Fernandes Date: Thu, 11 Jul 2024 10:58:10 -0300 Subject: [PATCH] feat: add 'pretty' parameter to optimize JSON serialization performance --- stix2/datastore/filesystem.py | 17 +++++----- stix2/test/v21/test_datastore_filesystem.py | 36 +++++++++++++++++++++ 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/stix2/datastore/filesystem.py b/stix2/datastore/filesystem.py index 2209116a..d6f83273 100644 --- a/stix2/datastore/filesystem.py +++ b/stix2/datastore/filesystem.py @@ -554,7 +554,7 @@ def __init__(self, stix_dir, allow_custom=False, bundlify=False): def stix_dir(self): return self._stix_dir - def _check_path_and_write(self, stix_obj, encoding='utf-8'): + def _check_path_and_write(self, stix_obj, encoding='utf-8', pretty=True): """Write the given STIX object to a file in the STIX file directory. """ type_dir = os.path.join(self._stix_dir, stix_obj["type"]) @@ -585,9 +585,9 @@ def _check_path_and_write(self, stix_obj, encoding='utf-8'): raise DataSourceError("Attempted to overwrite file (!) at: {}".format(file_path)) with io.open(file_path, mode='w', encoding=encoding) as f: - fp_serialize(stix_obj, f, pretty=True, encoding=encoding, ensure_ascii=False) + fp_serialize(stix_obj, f, pretty=pretty, encoding=encoding, ensure_ascii=False) - def add(self, stix_data=None, version=None): + def add(self, stix_data=None, version=None, pretty=True): """Add STIX objects to file directory. Args: @@ -597,6 +597,7 @@ def add(self, stix_data=None, version=None): version (str): If present, it forces the parser to use the version provided. Otherwise, the library will make the best effort based on checking the "spec_version" property. + pretty (bool): If True, the resulting JSON will be "pretty printed" Note: ``stix_data`` can be a Bundle object, but each object in it will be @@ -607,24 +608,24 @@ def add(self, stix_data=None, version=None): if isinstance(stix_data, (v20.Bundle, v21.Bundle)): # recursively add individual STIX objects for stix_obj in stix_data.get("objects", []): - self.add(stix_obj, version=version) + self.add(stix_obj, version=version, pretty=pretty) elif isinstance(stix_data, _STIXBase): # adding python STIX object - self._check_path_and_write(stix_data) + self._check_path_and_write(stix_data, pretty=pretty) elif isinstance(stix_data, (str, dict)): parsed_data = parse(stix_data, allow_custom=self.allow_custom, version=version) if isinstance(parsed_data, _STIXBase): - self.add(parsed_data, version=version) + self.add(parsed_data, version=version, pretty=pretty) else: # custom unregistered object type - self._check_path_and_write(parsed_data) + self._check_path_and_write(parsed_data, pretty=pretty) elif isinstance(stix_data, list): # recursively add individual STIX objects for stix_obj in stix_data: - self.add(stix_obj) + self.add(stix_obj, version=version, pretty=pretty) else: raise TypeError( diff --git a/stix2/test/v21/test_datastore_filesystem.py b/stix2/test/v21/test_datastore_filesystem.py index 3eb20b5f..39b45578 100644 --- a/stix2/test/v21/test_datastore_filesystem.py +++ b/stix2/test/v21/test_datastore_filesystem.py @@ -151,6 +151,42 @@ def test_filesystem_source_bad_stix_file(fs_source, bad_stix_files): except STIXError as e: assert "Can't parse object with no 'type' property" in str(e) +def test_filesystem_sink_add_pretty_true(fs_sink, fs_source): + """Test adding a STIX object with pretty=True.""" + camp1 = stix2.v21.Campaign( + name="Hannibal", + objective="Targeting Italian and Spanish Diplomat internet accounts", + aliases=["War Elephant"], + ) + fs_sink.add(camp1, pretty=True) + filepath = os.path.join( + FS_PATH, "campaign", camp1.id, _timestamp2filename(camp1.modified) + ".json", + ) + assert os.path.exists(filepath) + with open(filepath, 'r', encoding='utf-8') as f: + content = f.read() + assert '\n' in content # Check for pretty-printed output + + os.remove(filepath) + +def test_filesystem_sink_add_pretty_false(fs_sink, fs_source): + """Test adding a STIX object with pretty=False.""" + camp1 = stix2.v21.Campaign( + name="Hannibal", + objective="Targeting Italian and Spanish Diplomat internet accounts", + aliases=["War Elephant"], + ) + fs_sink.add(camp1, pretty=False) + filepath = os.path.join( + FS_PATH, "campaign", camp1.id, _timestamp2filename(camp1.modified) + ".json", + ) + assert os.path.exists(filepath) + with open(filepath, 'r', encoding='utf-8') as f: + content = f.read() + assert '\n' not in content # Check for non-pretty-printed output + + os.remove(filepath) + def test_filesystem_source_get_object(fs_source): # get (latest) object