diff --git a/CHANGELOG.md b/CHANGELOG.md index e469a676b..5cd526f5d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### New Features +- add function in capa/helpers to load plain and compressed JSON reports #1883 @Rohit1123 ### Breaking Changes diff --git a/capa/helpers.py b/capa/helpers.py index ad27f3903..ecf1b3200 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -6,6 +6,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. import sys +import gzip import json import inspect import logging @@ -30,7 +31,7 @@ EXTENSIONS_SHELLCODE_32 = ("sc32", "raw32") EXTENSIONS_SHELLCODE_64 = ("sc64", "raw64") -EXTENSIONS_DYNAMIC = ("json", "json_") +EXTENSIONS_DYNAMIC = ("json", "json_", "json.gz") EXTENSIONS_ELF = "elf_" EXTENSIONS_FREEZE = "frz" @@ -70,9 +71,19 @@ def assert_never(value) -> NoReturn: assert False, f"Unhandled value: {value} ({type(value).__name__})" # noqa: B011 -def get_format_from_report(sample: Path) -> str: - report = json.load(sample.open(encoding="utf-8")) +def load_json_from_path(json_path: Path): + with gzip.open(json_path, "r") as compressed_report: + try: + report_json = compressed_report.read() + except gzip.BadGzipFile: + report = json.load(json_path.open(encoding="utf-8")) + else: + report = json.loads(report_json) + return report + +def get_format_from_report(sample: Path) -> str: + report = load_json_from_path(sample) if "CAPE" in report: return FORMAT_CAPE diff --git a/capa/loader.py b/capa/loader.py index e4f0a5c92..024091e01 100644 --- a/capa/loader.py +++ b/capa/loader.py @@ -6,7 +6,6 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. import sys -import json import logging import datetime from typing import Set, Dict, List, Optional @@ -180,7 +179,7 @@ def get_extractor( if backend == BACKEND_CAPE: import capa.features.extractors.cape.extractor - report = json.loads(input_path.read_text(encoding="utf-8")) + report = capa.helpers.load_json_from_path(input_path) return capa.features.extractors.cape.extractor.CapeExtractor.from_report(report) elif backend == BACKEND_DOTNET: @@ -297,7 +296,7 @@ def get_file_extractors(input_file: Path, input_format: str) -> List[FeatureExtr elif input_format == FORMAT_CAPE: import capa.features.extractors.cape.extractor - report = json.loads(input_file.read_text(encoding="utf-8")) + report = capa.helpers.load_json_from_path(input_file) file_extractors.append(capa.features.extractors.cape.extractor.CapeExtractor.from_report(report)) return file_extractors diff --git a/tests/fixtures.py b/tests/fixtures.py index ebfe557a5..ce21d7db1 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -191,14 +191,10 @@ def get_binja_extractor(path: Path): @lru_cache(maxsize=1) def get_cape_extractor(path): - import gzip - import json - + from capa.helpers import load_json_from_path from capa.features.extractors.cape.extractor import CapeExtractor - with gzip.open(path, "r") as compressed_report: - report_json = compressed_report.read() - report = json.loads(report_json) + report = load_json_from_path(path) return CapeExtractor.from_report(report) diff --git a/tests/test_main.py b/tests/test_main.py index 6d588dda1..2ee7e7da2 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -356,3 +356,9 @@ def test_main_cape1(tmp_path): assert capa.main.main([str(path), "-j", "-r", str(rules)]) == 0 assert capa.main.main([str(path), "-v", "-r", str(rules)]) == 0 assert capa.main.main([str(path), "-vv", "-r", str(rules)]) == 0 + + +def test_main_cape_gzip(): + # tests successful execution of .json.gz + path = str(fixtures.get_data_path_by_name("0000a657")) + assert capa.main.main([path]) == 0