diff --git a/CHANGELOG.md b/CHANGELOG.md index 2caecaebe..6a282eb54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ ### New Features +- add Ghidra UI integration #1734 @colton-gabertan @mike-hunhoff + ### Breaking Changes - main: introduce wrapping routines within main for working with CLI args #1813 @williballenthin diff --git a/capa/ghidra/README.md b/capa/ghidra/README.md index f1e811953..b22892136 100644 --- a/capa/ghidra/README.md +++ b/capa/ghidra/README.md @@ -2,8 +2,39 @@ -The Ghidra feature extractor is an application of the FLARE team's open-source project, Ghidrathon, to integrate capa with Ghidra using Python 3. capa is a framework that uses a well-defined collection of rules to identify capabilities in a program. You can run capa against a PE file, ELF file, or shellcode and it tells you what it thinks the program can do. For example, it might suggest that the program is a backdoor, can install services, or relies on HTTP to communicate. The Ghidra feature extractor can be used to run capa analysis on your Ghidra databases without needing access to the original binary file. +The Ghidra feature extractor is an application of the FLARE team's open-source project, Ghidrathon, to integrate capa with Ghidra using Python 3. capa is a framework that uses a well-defined collection of rules to identify capabilities in a program. You can run capa against a PE file, ELF file, or shellcode and it tells you what it thinks the program can do. For example, it might suggest that the program is a backdoor, can install services, or relies on HTTP to communicate. The Ghidra feature extractor can be used to run capa analysis on your Ghidra databases without needing access to the original binary file. As a part of this integration, we've developed two scripts, [capa_explorer.py](capa/ghidra/capa_explorer.py) and [capa_ghidra.py](capa/ghidra/capa_ghidra.py), to display capa results directly in Ghidra. +### Using `capa_explorer.py` + +`capa_explorer.py` integrates capa results directly into Ghidra's UI. In the Symbol Tree Window, under the Namespaces section, you can find the matched rules as well as the corresponding functions that contain the matched features. + +Namespaces & Labeled Functions: + +![image](https://github.com/mandiant/capa/assets/66766340/eeae33f4-99d4-42dc-a5e8-4c1b8c661492) + +Labeled functions may be clicked in the Symbol Tree Window to navigate the Disassembly Listing and Decompilation windows to the function locations. A comment listing each matched capa rule is inserted at the beginning of the function and a comment for each matched capa feature is added at the matched address within the function. These comments can be viewed using both the Disassembly Listing and Decompilation windows. + +Comments: + +![image](https://github.com/mandiant/capa/assets/66766340/bb2b4170-7fd4-45fc-8c7b-ff8f2e2f101b) + +Bookmarks are added for capa matches that are categorized under the MITRE ATT&CK Framework and Malware Behavior Catalog (MBC). These may be found and navigated using Ghidra's Bookmarks Window. + +MITRE ATT&CK & MBC bookmarks: + +![image](https://github.com/mandiant/capa/assets/66766340/7f9a66a9-7be7-4223-91c6-4b8fc4651336) + +### Using `capa_ghidra.py` + +The following is an example of running `capa_ghidra.py` using the Ghidra Script Manager: + +Selecting capa rules: + + +Choosing output format: + + +Viewing results in Ghidra Console Window: ## Getting Started @@ -18,7 +49,7 @@ Please ensure that you have the following dependencies installed before continui | Python | `>= 3.8` | https://www.python.org/downloads | | Ghidra | `>= 10.2` | https://ghidra-sre.org | -In order to run capa using using Ghidra, you must install capa as a library, obtain the official capa rules that match the capa version you have installed, and configure the Python 3 script [capa_ghidra.py](/capa/ghidra/capa_ghidra.py). You can do this by completing the following steps using the Python 3 interpreter that you have configured for your Ghidrathon installation: +You can run capa in Ghidra by completing the following steps using the Python 3 interpreter that you have configured for your Ghidrathon installation: 1. Install capa and its dependencies from PyPI using the following command: ```bash @@ -32,30 +63,22 @@ OR $ capa --version ``` -3. Copy [capa_ghidra.py](/capa/ghidra/capa_ghidra.py) to your `$USER_HOME/ghidra_scripts` directory or manually add `` to the Ghidra Script Manager. +3. Copy [capa_explorer.py](capa/ghidra/capa_explorer.py) and [capa_ghidra.py](/capa/ghidra/capa_ghidra.py) to your `$USER_HOME/ghidra_scripts` directory or manually add `` to the Ghidra Script Manager. ## Usage -After completing the installation steps you can execute `capa_ghidra.py` using the Ghidra Script Manager or Headless Analyzer. +After completing the installation steps you can execute `capa_explorer.py` and `capa_ghidra.py` using the Ghidra Script Manager. You can also execute `capa_ghidra.py` using Ghidra's Headless Analyzer. ### Ghidra Script Manager -To execute `capa_ghidra.py` using the Ghidra Script Manager, first open the Ghidra Script Manager by navigating to `Window > Script Manager` in the Ghidra Code Browser. Next, locate `capa_ghidra.py` by selecting the `Python 3 > capa` category or using the Ghidra Script Manager search funtionality. Finally, double-click `capa_ghidra.py` to execute the script. If you don't see `capa_ghidra.py`, make sure you have copied the script to your `$USER_HOME/ghidra_scripts` directory or manually added `` to the Ghidra Script Manager - -When executed, `capa_ghidra.py` asks you to provide your capa rules directory and preferred output format. `capa_ghidra.py` supports `default`, `verbose`, and `vverbose` output formats when executed from the Ghidra Script Manager. `capa_ghidra.py` writes output to the Ghidra Console Window. - -#### Example - -The following is an example of running `capa_ghidra.py` using the Ghidra Script Manager: - -Selecting capa rules: - +Use the following steps to execute `capa_explorer.py` and `capa_ghidra.py` using Ghidra's Script Manager: +1. Open the Ghidra Script Manager by navigating to `Window > Script Manager` +2. Locate `capa_explorer.py` and `capa_ghidra.py` by selecting the `Python 3 > capa` category or using the Ghidra Script Manager search functionality +3. Double-click `capa_explorer.py` or `capa_ghidra.py` to execute the script -Choosing output format: - +If you don't see `capa_explorer.py` and `capa_ghidra.py` make sure you have copied these scripts to your `$USER_HOME/ghidra_scripts` directory or manually added `` to the Ghidra Script Manager. -Viewing results in Ghidra Console Window: - +Both scripts ask you to provide the path of your capa rules directory. `capa_ghidra.py` also asks you to select `default`, `verbose`, and `vverbose` output formats used when writing output to the Ghidra Console Window. ### Ghidra Headless Analyzer diff --git a/capa/ghidra/capa_explorer.py b/capa/ghidra/capa_explorer.py new file mode 100644 index 000000000..51e2fba64 --- /dev/null +++ b/capa/ghidra/capa_explorer.py @@ -0,0 +1,377 @@ +# Integrate capa results with Ghidra UI +# @author Colton Gabertan (gabertan.colton@gmail.com) +# @category Python 3.capa + +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import sys +import json +import logging +import pathlib +from typing import Any, Dict, List + +from ghidra.app.cmd.label import AddLabelCmd, CreateNamespacesCmd +from ghidra.program.model.symbol import Namespace, SourceType, SymbolType + +import capa +import capa.main +import capa.rules +import capa.render.json +import capa.ghidra.helpers +import capa.features.extractors.ghidra.extractor + +logger = logging.getLogger("capa_explorer") + + +def add_bookmark(addr, txt, category="CapaExplorer"): + """create bookmark at addr""" + currentProgram().getBookmarkManager().setBookmark(addr, "Info", category, txt) # type: ignore [name-defined] # noqa: F821 + + +def create_namespace(namespace_str): + """create new Ghidra namespace for each capa namespace""" + + cmd = CreateNamespacesCmd(namespace_str, SourceType.USER_DEFINED) + cmd.applyTo(currentProgram()) # type: ignore [name-defined] # noqa: F821 + return cmd.getNamespace() + + +def create_label(ghidra_addr, name, capa_namespace): + """custom label cmd to overlay symbols under capa-generated namespaces""" + + # prevent duplicate labels under the same capa-generated namespace + symbol_table = currentProgram().getSymbolTable() # type: ignore [name-defined] # noqa: F821 + for sym in symbol_table.getSymbols(ghidra_addr): + if sym.getName(True) == capa_namespace.getName(True) + Namespace.DELIMITER + name: + return + + # create SymbolType.LABEL at addr + # prioritize capa-generated namespace (duplicate match @ new addr), else put under global Ghidra one (new match) + cmd = AddLabelCmd(ghidra_addr, name, True, SourceType.USER_DEFINED) + cmd.applyTo(currentProgram()) # type: ignore [name-defined] # noqa: F821 + + # assign new match overlay label to capa-generated namespace + cmd.getSymbol().setNamespace(capa_namespace) + return + + +class CapaMatchData: + def __init__( + self, + namespace, + scope, + capability, + matches, + attack: List[Dict[Any, Any]], + mbc: List[Dict[Any, Any]], + ): + self.namespace = namespace + self.scope = scope + self.capability = capability + self.matches = matches + self.attack = attack + self.mbc = mbc + + def bookmark_functions(self): + """create bookmarks for MITRE ATT&CK & MBC mappings""" + + if self.attack == [] and self.mbc == []: + return + + for key in self.matches.keys(): + addr = toAddr(hex(key)) # type: ignore [name-defined] # noqa: F821 + func = getFunctionContaining(addr) # type: ignore [name-defined] # noqa: F821 + + # bookmark & tag MITRE ATT&CK tactics & MBC @ function scope + if func is not None: + func_addr = func.getEntryPoint() + + if self.attack != []: + for item in self.attack: + attack_txt = "" + for part in item.get("parts", {}): + attack_txt = attack_txt + part + Namespace.DELIMITER + attack_txt = attack_txt + item.get("id", {}) + add_bookmark(func_addr, attack_txt, "CapaExplorer::MITRE ATT&CK") + + if self.mbc != []: + for item in self.mbc: + mbc_txt = "" + for part in item.get("parts", {}): + mbc_txt = mbc_txt + part + Namespace.DELIMITER + mbc_txt = mbc_txt + item.get("id", {}) + add_bookmark(func_addr, mbc_txt, "CapaExplorer::MBC") + + def set_plate_comment(self, ghidra_addr): + """set plate comments at matched functions""" + comment = getPlateComment(ghidra_addr) # type: ignore [name-defined] # noqa: F821 + rule_path = self.namespace.replace(Namespace.DELIMITER, "/") + # 2 calls to avoid duplicate comments via subsequent script runs + if comment is None: + # first comment @ function + comment = rule_path + "\n" + setPlateComment(ghidra_addr, comment) # type: ignore [name-defined] # noqa: F821 + elif rule_path not in comment: + comment = comment + rule_path + "\n" + setPlateComment(ghidra_addr, comment) # type: ignore [name-defined] # noqa: F821 + else: + return + + def set_pre_comment(self, ghidra_addr, sub_type, description): + """set pre comments at subscoped matches of main rules""" + comment = getPreComment(ghidra_addr) # type: ignore [name-defined] # noqa: F821 + if comment is None: + comment = "capa: " + sub_type + "(" + description + ")" + ' matched in "' + self.capability + '"\n' + setPreComment(ghidra_addr, comment) # type: ignore [name-defined] # noqa: F821 + elif self.capability not in comment: + comment = ( + comment + "capa: " + sub_type + "(" + description + ")" + ' matched in "' + self.capability + '"\n' + ) + setPreComment(ghidra_addr, comment) # type: ignore [name-defined] # noqa: F821 + else: + return + + def label_matches(self): + """label findings at function scopes and comment on subscope matches""" + capa_namespace = create_namespace(self.namespace) + symbol_table = currentProgram().getSymbolTable() # type: ignore [name-defined] # noqa: F821 + + # handle function main scope of matched rule + # these will typically contain further matches within + if self.scope == "function": + for addr in self.matches.keys(): + ghidra_addr = toAddr(hex(addr)) # type: ignore [name-defined] # noqa: F821 + + # classify new function label under capa-generated namespace + sym = symbol_table.getPrimarySymbol(ghidra_addr) + if sym is not None: + if sym.getSymbolType() == SymbolType.FUNCTION: + create_label(ghidra_addr, sym.getName(), capa_namespace) + self.set_plate_comment(ghidra_addr) + + # parse the corresponding nodes, and pre-comment subscope matched features + # under the encompassing function(s) + for sub_match in self.matches.get(addr): + for loc, node in sub_match.items(): + sub_ghidra_addr = toAddr(hex(loc)) # type: ignore [name-defined] # noqa: F821 + if sub_ghidra_addr == ghidra_addr: + # skip duplicates + continue + + # precomment subscope matches under the function + if node != {}: + for sub_type, description in parse_node(node): + self.set_pre_comment(sub_ghidra_addr, sub_type, description) + else: + # resolve the encompassing function for the capa namespace + # of non-function scoped main matches + for addr in self.matches.keys(): + ghidra_addr = toAddr(hex(addr)) # type: ignore [name-defined] # noqa: F821 + + # basic block / insn scoped main matches + # Ex. See "Create Process on Windows" Rule + func = getFunctionContaining(ghidra_addr) # type: ignore [name-defined] # noqa: F821 + if func is not None: + func_addr = func.getEntryPoint() + create_label(func_addr, func.getName(), capa_namespace) + self.set_plate_comment(func_addr) + + # create subscope match precomments + for sub_match in self.matches.get(addr): + for loc, node in sub_match.items(): + sub_ghidra_addr = toAddr(hex(loc)) # type: ignore [name-defined] # noqa: F821 + + if node != {}: + if func is not None: + # basic block/ insn scope under resolved function + for sub_type, description in parse_node(node): + self.set_pre_comment(sub_ghidra_addr, sub_type, description) + else: + # this would be a global/file scoped main match + # try to resolve the encompassing function via the subscope match, instead + # Ex. "run as service" rule + sub_func = getFunctionContaining(sub_ghidra_addr) # type: ignore [name-defined] # noqa: F821 + if sub_func is not None: + sub_func_addr = sub_func.getEntryPoint() + # place function in capa namespace & create the subscope match label in Ghidra's global namespace + create_label(sub_func_addr, sub_func.getName(), capa_namespace) + self.set_plate_comment(sub_func_addr) + for sub_type, description in parse_node(node): + self.set_pre_comment(sub_ghidra_addr, sub_type, description) + else: + # addr is in some other file section like .data + # represent this location with a label symbol under the capa namespace + # Ex. See "Reference Base64 String" rule + for sub_type, description in parse_node(node): + # in many cases, these will be ghidra-labeled data, so just add the existing + # label symbol to the capa namespace + for sym in symbol_table.getSymbols(sub_ghidra_addr): + if sym.getSymbolType() == SymbolType.LABEL: + sym.setNamespace(capa_namespace) + self.set_pre_comment(sub_ghidra_addr, sub_type, description) + + +def get_capabilities(): + rules_dir: str = "" + try: + selected_dir = askDirectory("Choose capa rules directory", "Ok") # type: ignore [name-defined] # noqa: F821 + if selected_dir: + rules_dir = selected_dir.getPath() + except RuntimeError: + # RuntimeError thrown when user selects "Cancel" + pass + + if not rules_dir: + logger.info("You must choose a capa rules directory before running capa.") + return "" # return empty str to avoid handling both int and str types + + rules_path: pathlib.Path = pathlib.Path(rules_dir) + logger.info("running capa using rules from %s", str(rules_path)) + + rules = capa.main.get_rules([rules_path]) + meta = capa.ghidra.helpers.collect_metadata([rules_path]) + extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor() + + capabilities, counts = capa.main.find_capabilities(rules, extractor, True) + + if capa.main.has_file_limitation(rules, capabilities, is_standalone=False): + popup("capa explorer encountered warnings during analysis. Please check the console output for more information.") # type: ignore [name-defined] # noqa: F821 + logger.info("capa encountered warnings during analysis") + + return capa.render.json.render(meta, rules, capabilities) + + +def get_locations(match_dict): + """recursively collect match addresses and associated nodes""" + + for loc in match_dict.get("locations", {}): + # either an rva (absolute) + # or an offset into a file (file) + if loc.get("type", "") in ("absolute", "file"): + yield loc.get("value"), match_dict.get("node") + + for child in match_dict.get("children", {}): + yield from get_locations(child) + + +def parse_node(node_data): + """pull match descriptions and sub features by parsing node dicts""" + + node = node_data.get(node_data.get("type")) + + if "description" in node: + yield "description", node.get("description") + + data = node.get(node.get("type")) + if isinstance(data, (str, int)): + feat_type = node.get("type") + if isinstance(data, int): + data = hex(data) + yield feat_type, data + + +def parse_json(capa_data): + """Parse json produced by capa""" + + for rule, capability in capa_data.get("rules", {}).items(): + # structure to contain rule match address & supporting feature data + # {rule match addr:[{feature addr:{node_data}}]} + rule_matches: Dict[Any, List[Any]] = {} + for i in range(len(capability.get("matches"))): + # grab rule match location + match_loc = capability.get("matches")[i][0].get("value") + if match_loc is None: + # Ex. See "Reference Base64 string" + # {'type':'no address'} + match_loc = i + rule_matches[match_loc] = [] + + # grab extracted feature locations & corresponding node data + # feature[0]: location + # feature[1]: node + features = capability.get("matches")[i][1] + feat_dict = {} + for feature in get_locations(features): + feat_dict[feature[0]] = feature[1] + rule_matches[match_loc].append(feat_dict) + + # dict data of currently matched rule + meta = capability["meta"] + + # get MITRE ATT&CK and MBC + attack = meta.get("attack") + if attack is None: + attack = [] + mbc = meta.get("mbc") + if mbc is None: + mbc = [] + + # scope match for the rule + scope = meta["scopes"].get("static") + + fmt_rule = Namespace.DELIMITER + rule.replace(" ", "-") + if "namespace" in meta: + # split into list to help define child namespaces + # this requires the correct delimiter used by Ghidra + # Ex. 'communication/named-pipe/create/create pipe' -> capa::communication::named-pipe::create::create-pipe + namespace_str = Namespace.DELIMITER.join(meta["namespace"].split("/")) + namespace = "capa" + Namespace.DELIMITER + namespace_str + fmt_rule + else: + # lib rules via the official rules repo will not contain data + # for the "namespaces" key, so format using rule itself + # Ex. 'contain loop' -> capa::lib::contain-loop + namespace = "capa" + Namespace.DELIMITER + "lib" + fmt_rule + + yield CapaMatchData(namespace, scope, rule, rule_matches, attack, mbc) + + +def main(): + logging.basicConfig(level=logging.INFO) + logging.getLogger().setLevel(logging.INFO) + + if isRunningHeadless(): # type: ignore [name-defined] # noqa: F821 + logger.error("unsupported Ghidra execution mode") + return capa.main.E_UNSUPPORTED_GHIDRA_EXECUTION_MODE + + if not capa.ghidra.helpers.is_supported_ghidra_version(): + logger.error("unsupported Ghidra version") + return capa.main.E_UNSUPPORTED_GHIDRA_VERSION + + if not capa.ghidra.helpers.is_supported_file_type(): + logger.error("unsupported file type") + return capa.main.E_INVALID_FILE_TYPE + + if not capa.ghidra.helpers.is_supported_arch_type(): + logger.error("unsupported file architecture") + return capa.main.E_INVALID_FILE_ARCH + + # capa_data will always contain {'meta':..., 'rules':...} + # if the 'rules' key contains no values, then there were no matches + capa_data = json.loads(get_capabilities()) + if capa_data.get("rules") is None: + logger.info("capa explorer found no matches") + popup("capa explorer found no matches.") # type: ignore [name-defined] # noqa: F821 + return capa.main.E_EMPTY_REPORT + + for item in parse_json(capa_data): + item.bookmark_functions() + item.label_matches() + logger.info("capa explorer analysis complete") + popup("capa explorer analysis complete.\nPlease see results in the Bookmarks Window and Namespaces section of the Symbol Tree Window.") # type: ignore [name-defined] # noqa: F821 + return 0 + + +if __name__ == "__main__": + if sys.version_info < (3, 8): + from capa.exceptions import UnsupportedRuntimeError + + raise UnsupportedRuntimeError("This version of capa can only be used with Python 3.8+") + exit_code = main() + if exit_code != 0: + popup("capa explorer encountered errors during analysis. Please check the console output for more information.") # type: ignore [name-defined] # noqa: F821 + sys.exit(exit_code) diff --git a/capa/main.py b/capa/main.py index 2d85b8684..9d0b1af59 100644 --- a/capa/main.py +++ b/capa/main.py @@ -100,6 +100,7 @@ E_MISSING_CAPE_STATIC_ANALYSIS = 21 E_MISSING_CAPE_DYNAMIC_ANALYSIS = 22 E_EMPTY_REPORT = 23 +E_UNSUPPORTED_GHIDRA_EXECUTION_MODE = 24 logger = logging.getLogger("capa")