diff --git a/CHANGELOG.md b/CHANGELOG.md index bc3711bf6..7621df00e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -112,6 +112,7 @@ - fix: improve exception handling to prevent IDA from locking up when errors occur #1262 @mike-hunhoff - verify rule metadata using Pydantic #1167 @mr-tz - extractor: make read consistent with file object behavior #1254 @mr-tz +- fix: UnboundLocalError x2 #1302 @mike-hunhoff ### Development diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index ee1faa3e0..146cf3b92 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -160,7 +160,8 @@ def __init__(self, name: str, option=Options.DEFAULT): # caches used to speed up capa explorer analysis - these must be init to None self.resdoc_cache: Optional[capa.render.result_document.ResultDocument] = None - self.ruleset_cache: Optional[capa.rules.RuleSet] = None + self.program_analysis_ruleset_cache: Optional[capa.rules.RuleSet] = None + self.rulegen_ruleset_cache: Optional[capa.rules.RuleSet] = None self.rulegen_feature_cache: Optional[CapaRuleGenFeatureCache] = None self.rulegen_current_function: Optional[FunctionHandle] = None @@ -527,8 +528,10 @@ def ida_hook_rebase(self, meta, post=False): def ensure_capa_settings_rule_path(self): try: + path: str = settings.user.get(CAPA_SETTINGS_RULE_PATH, "") + # resolve rules directory - check self and settings first, then ask user - if not os.path.exists(settings.user.get(CAPA_SETTINGS_RULE_PATH, "")): + if not os.path.exists(path): # configure rules selection messagebox rules_message = QtWidgets.QMessageBox() rules_message.setIcon(QtWidgets.QMessageBox.Information) @@ -550,6 +553,10 @@ def ensure_capa_settings_rule_path(self): if not path: raise UserCancelledError() + if not os.path.exists(path): + logger.error("rule path %s does not exist or cannot be accessed" % path) + return False + settings.user[CAPA_SETTINGS_RULE_PATH] = path except UserCancelledError as e: capa.ida.helpers.inform_user_ida_ui("Analysis requires capa rules") @@ -566,10 +573,6 @@ def ensure_capa_settings_rule_path(self): logger.info("User cancelled analysis.") return False - if not os.path.exists(path): - logger.error("rule path %s does not exist or cannot be accessed" % path) - return False - return True def load_capa_rules(self): @@ -580,15 +583,15 @@ def load_capa_rules(self): rule_path: str = settings.user.get(CAPA_SETTINGS_RULE_PATH, "") try: - def on_load_rule(rule_path, i, total): - update_wait_box("loading capa rules from %s (%d of %d)" % (rule_path, i, total)) + def on_load_rule(_, i, total): + update_wait_box("loading capa rules from %s (%d of %d)" % (rule_path, i + 1, total)) if ida_kernwin.user_cancelled(): raise UserCancelledError("user cancelled") - self.ruleset_cache = capa.main.get_rules([rule_path], on_load_rule=on_load_rule) + return capa.main.get_rules([rule_path], on_load_rule=on_load_rule) except UserCancelledError: logger.info("User cancelled analysis.") - return False + return None except Exception as e: capa.ida.helpers.inform_user_ida_ui( "Failed to load capa rules from %s" % settings.user[CAPA_SETTINGS_RULE_PATH] @@ -604,9 +607,9 @@ def on_load_rule(rule_path, i, total): ) settings.user[CAPA_SETTINGS_RULE_PATH] = "" - return False + return None - return True + return None def load_capa_results(self, use_cache=False): """run capa analysis and render results in UI @@ -652,13 +655,13 @@ def slot_progress_feature_extraction(text): update_wait_box("loading rules") - # function should handle exceptions and return False - if not self.load_capa_rules(): + self.program_analysis_ruleset_cache = self.load_capa_rules() + if self.program_analysis_ruleset_cache is None: return False - assert self.ruleset_cache is not None + # matching operations may update rule instances, # so we'll work with a local copy of the ruleset. - ruleset = copy.deepcopy(self.ruleset_cache) + ruleset = copy.deepcopy(self.program_analysis_ruleset_cache) if ida_kernwin.user_cancelled(): logger.info("User cancelled analysis.") @@ -723,10 +726,12 @@ def slot_progress_feature_extraction(text): # either the results are cached and the doc already exists, # or the doc was just created above assert self.resdoc_cache is not None + assert self.program_analysis_ruleset_cache is not None self.model_data.render_capa_doc(self.resdoc_cache, self.view_show_results_by_function.isChecked()) self.set_view_status_label( - "capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], ruleset.source_rule_count) + "capa rules: %s (%d rules)" + % (settings.user[CAPA_SETTINGS_RULE_PATH], self.program_analysis_ruleset_cache.source_rule_count) ) except Exception as e: logger.error("Failed to render results (error: %s)", e, exc_info=True) @@ -764,17 +769,18 @@ def analyze_program(self, use_cache=False): def load_capa_function_results(self): """ """ - if self.ruleset_cache is None: + if self.rulegen_ruleset_cache is None: # only reload rules if cache is empty - if not self.load_capa_rules(): - return False + self.rulegen_ruleset_cache = self.load_capa_rules() else: logger.info('Using cached capa rules, click "Reset" to load rules from disk.') - assert self.ruleset_cache is not None + if self.rulegen_ruleset_cache is None: + return False + # matching operations may update rule instances, # so we'll work with a local copy of the ruleset. - ruleset = copy.deepcopy(self.ruleset_cache) + ruleset = copy.deepcopy(self.rulegen_ruleset_cache) # clear feature cache if self.rulegen_feature_cache is not None: @@ -919,9 +925,6 @@ def reset_program_analysis_views(self): self.model_data.reset() self.reset_view_tree() - self.rules_cache = None - self.ruleset_cache = None - logger.info("Reset completed.") def reset_function_analysis_views(self, is_analyze=False): @@ -940,7 +943,7 @@ def reset_function_analysis_views(self, is_analyze=False): if not is_analyze: # clear rules and ruleset cache only if user clicked "Reset" - self.ruleset_cache = None + self.rulegen_ruleset_cache = None self.set_view_status_label("Click Analyze to get started...") logger.info("Reset completed.") @@ -979,7 +982,7 @@ def update_rule_status(self, rule_text: str): try: # we don't expect either cache to be empty at this point - assert self.ruleset_cache is not None + assert self.rulegen_ruleset_cache is not None assert self.rulegen_feature_cache is not None except Exception as e: logger.error("Failed to access cache (error: %s)", e, exc_info=True) @@ -1002,7 +1005,7 @@ def update_rule_status(self, rule_text: str): # we must create a deep copy of rules because any rule matching operations modify the original rule # the ruleset may derive subscope rules from the source rules loaded from disk. # by ignoring them, we reconstruct the collection of rules provided by the user. - rules = copy.deepcopy([r for r in self.ruleset_cache.rules.values() if not r.is_subscope_rule()]) + rules = copy.deepcopy([r for r in self.rulegen_ruleset_cache.rules.values() if not r.is_subscope_rule()]) rules.append(rule) try: diff --git a/capa/ida/plugin/model.py b/capa/ida/plugin/model.py index d0a6a8579..216a8e34d 100644 --- a/capa/ida/plugin/model.py +++ b/capa/ida/plugin/model.py @@ -6,7 +6,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -from typing import Set, Dict, List, Tuple +from typing import Set, Dict, List, Tuple, Optional from collections import deque import idc @@ -444,37 +444,45 @@ def render_capa_doc_match(self, parent: CapaExplorerDataItem, match: rd.Match, d self.render_capa_doc_match(parent2, child, doc) def render_capa_doc_by_function(self, doc: rd.ResultDocument): - """ """ - matches_by_function: Dict[int, Tuple[CapaExplorerFunctionItem, Set[str]]] = {} + """render rule matches by function meaning each rule match is nested under function where it was found""" + matches_by_function: Dict[AbsoluteVirtualAddress, Tuple[CapaExplorerFunctionItem, Set[str]]] = {} for rule in rutils.capability_rules(doc): - for location_, _ in rule.matches: - location = location_.to_capa() - - if not isinstance(location, AbsoluteVirtualAddress): - # only handle matches with a VA + match_eas: List[int] = [] + + # initial pass of rule matches + for (addr_, _) in rule.matches: + addr: Address = addr_.to_capa() + if isinstance(addr, AbsoluteVirtualAddress): + match_eas.append(int(addr)) + + for ea in match_eas: + func_ea: Optional[int] = capa.ida.helpers.get_func_start_ea(ea) + if func_ea is None: + # rule match address is not located in a defined function continue - ea = int(location) - ea = capa.ida.helpers.get_func_start_ea(ea) - if ea is None: - # file scope, skip rendering in this mode - continue - if not matches_by_function.get(ea, ()): - # new function root - matches_by_function[ea] = ( - CapaExplorerFunctionItem(self.root_node, location, can_check=False), + func_address: AbsoluteVirtualAddress = AbsoluteVirtualAddress(func_ea) + if not matches_by_function.get(func_address, ()): + # create a new function root to nest its rule matches; Note: we must use the address of the + # function here so everything is displayed properly + matches_by_function[func_address] = ( + CapaExplorerFunctionItem(self.root_node, func_address, can_check=False), set(), ) - function_root, match_cache = matches_by_function[ea] - if rule.meta.name in match_cache: - # rule match already rendered for this function root, skip it + + func_root, func_match_cache = matches_by_function[func_address] + if rule.meta.name in func_match_cache: + # only nest each rule once, so if found, skip continue - match_cache.add(rule.meta.name) + + # add matched rule to its function cache; create a new rule node whose parent is the matched + # function node + func_match_cache.add(rule.meta.name) CapaExplorerRuleItem( - function_root, + func_root, rule.meta.name, rule.meta.namespace or "", - len(rule.matches), + len([ea for ea in match_eas if capa.ida.helpers.get_func_start_ea(ea) == func_ea]), rule.source, can_check=False, )