Skip to content

Commit

Permalink
explorer: fix UnboundLocal errors and improve render match by function (
Browse files Browse the repository at this point in the history
  • Loading branch information
mike-hunhoff authored Feb 2, 2023
1 parent faceca6 commit 7ea166f
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 51 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
- fix: improve exception handling to prevent IDA from locking up when errors occur #1262 @mike-hunhoff
- verify rule metadata using Pydantic #1167 @mr-tz
- extractor: make read consistent with file object behavior #1254 @mr-tz
- fix: UnboundLocalError x2 #1302 @mike-hunhoff

### Development

Expand Down
59 changes: 31 additions & 28 deletions capa/ida/plugin/form.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ def __init__(self, name: str, option=Options.DEFAULT):

# caches used to speed up capa explorer analysis - these must be init to None
self.resdoc_cache: Optional[capa.render.result_document.ResultDocument] = None
self.ruleset_cache: Optional[capa.rules.RuleSet] = None
self.program_analysis_ruleset_cache: Optional[capa.rules.RuleSet] = None
self.rulegen_ruleset_cache: Optional[capa.rules.RuleSet] = None
self.rulegen_feature_cache: Optional[CapaRuleGenFeatureCache] = None
self.rulegen_current_function: Optional[FunctionHandle] = None

Expand Down Expand Up @@ -527,8 +528,10 @@ def ida_hook_rebase(self, meta, post=False):

def ensure_capa_settings_rule_path(self):
try:
path: str = settings.user.get(CAPA_SETTINGS_RULE_PATH, "")

# resolve rules directory - check self and settings first, then ask user
if not os.path.exists(settings.user.get(CAPA_SETTINGS_RULE_PATH, "")):
if not os.path.exists(path):
# configure rules selection messagebox
rules_message = QtWidgets.QMessageBox()
rules_message.setIcon(QtWidgets.QMessageBox.Information)
Expand All @@ -550,6 +553,10 @@ def ensure_capa_settings_rule_path(self):
if not path:
raise UserCancelledError()

if not os.path.exists(path):
logger.error("rule path %s does not exist or cannot be accessed" % path)
return False

settings.user[CAPA_SETTINGS_RULE_PATH] = path
except UserCancelledError as e:
capa.ida.helpers.inform_user_ida_ui("Analysis requires capa rules")
Expand All @@ -566,10 +573,6 @@ def ensure_capa_settings_rule_path(self):
logger.info("User cancelled analysis.")
return False

if not os.path.exists(path):
logger.error("rule path %s does not exist or cannot be accessed" % path)
return False

return True

def load_capa_rules(self):
Expand All @@ -580,15 +583,15 @@ def load_capa_rules(self):
rule_path: str = settings.user.get(CAPA_SETTINGS_RULE_PATH, "")
try:

def on_load_rule(rule_path, i, total):
update_wait_box("loading capa rules from %s (%d of %d)" % (rule_path, i, total))
def on_load_rule(_, i, total):
update_wait_box("loading capa rules from %s (%d of %d)" % (rule_path, i + 1, total))
if ida_kernwin.user_cancelled():
raise UserCancelledError("user cancelled")

self.ruleset_cache = capa.main.get_rules([rule_path], on_load_rule=on_load_rule)
return capa.main.get_rules([rule_path], on_load_rule=on_load_rule)
except UserCancelledError:
logger.info("User cancelled analysis.")
return False
return None
except Exception as e:
capa.ida.helpers.inform_user_ida_ui(
"Failed to load capa rules from %s" % settings.user[CAPA_SETTINGS_RULE_PATH]
Expand All @@ -604,9 +607,9 @@ def on_load_rule(rule_path, i, total):
)

settings.user[CAPA_SETTINGS_RULE_PATH] = ""
return False
return None

return True
return None

def load_capa_results(self, use_cache=False):
"""run capa analysis and render results in UI
Expand Down Expand Up @@ -652,13 +655,13 @@ def slot_progress_feature_extraction(text):

update_wait_box("loading rules")

# function should handle exceptions and return False
if not self.load_capa_rules():
self.program_analysis_ruleset_cache = self.load_capa_rules()
if self.program_analysis_ruleset_cache is None:
return False
assert self.ruleset_cache is not None

# matching operations may update rule instances,
# so we'll work with a local copy of the ruleset.
ruleset = copy.deepcopy(self.ruleset_cache)
ruleset = copy.deepcopy(self.program_analysis_ruleset_cache)

if ida_kernwin.user_cancelled():
logger.info("User cancelled analysis.")
Expand Down Expand Up @@ -723,10 +726,12 @@ def slot_progress_feature_extraction(text):
# either the results are cached and the doc already exists,
# or the doc was just created above
assert self.resdoc_cache is not None
assert self.program_analysis_ruleset_cache is not None

self.model_data.render_capa_doc(self.resdoc_cache, self.view_show_results_by_function.isChecked())
self.set_view_status_label(
"capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], ruleset.source_rule_count)
"capa rules: %s (%d rules)"
% (settings.user[CAPA_SETTINGS_RULE_PATH], self.program_analysis_ruleset_cache.source_rule_count)
)
except Exception as e:
logger.error("Failed to render results (error: %s)", e, exc_info=True)
Expand Down Expand Up @@ -764,17 +769,18 @@ def analyze_program(self, use_cache=False):

def load_capa_function_results(self):
""" """
if self.ruleset_cache is None:
if self.rulegen_ruleset_cache is None:
# only reload rules if cache is empty
if not self.load_capa_rules():
return False
self.rulegen_ruleset_cache = self.load_capa_rules()
else:
logger.info('Using cached capa rules, click "Reset" to load rules from disk.')

assert self.ruleset_cache is not None
if self.rulegen_ruleset_cache is None:
return False

# matching operations may update rule instances,
# so we'll work with a local copy of the ruleset.
ruleset = copy.deepcopy(self.ruleset_cache)
ruleset = copy.deepcopy(self.rulegen_ruleset_cache)

# clear feature cache
if self.rulegen_feature_cache is not None:
Expand Down Expand Up @@ -919,9 +925,6 @@ def reset_program_analysis_views(self):
self.model_data.reset()
self.reset_view_tree()

self.rules_cache = None
self.ruleset_cache = None

logger.info("Reset completed.")

def reset_function_analysis_views(self, is_analyze=False):
Expand All @@ -940,7 +943,7 @@ def reset_function_analysis_views(self, is_analyze=False):

if not is_analyze:
# clear rules and ruleset cache only if user clicked "Reset"
self.ruleset_cache = None
self.rulegen_ruleset_cache = None
self.set_view_status_label("Click Analyze to get started...")

logger.info("Reset completed.")
Expand Down Expand Up @@ -979,7 +982,7 @@ def update_rule_status(self, rule_text: str):

try:
# we don't expect either cache to be empty at this point
assert self.ruleset_cache is not None
assert self.rulegen_ruleset_cache is not None
assert self.rulegen_feature_cache is not None
except Exception as e:
logger.error("Failed to access cache (error: %s)", e, exc_info=True)
Expand All @@ -1002,7 +1005,7 @@ def update_rule_status(self, rule_text: str):
# we must create a deep copy of rules because any rule matching operations modify the original rule
# the ruleset may derive subscope rules from the source rules loaded from disk.
# by ignoring them, we reconstruct the collection of rules provided by the user.
rules = copy.deepcopy([r for r in self.ruleset_cache.rules.values() if not r.is_subscope_rule()])
rules = copy.deepcopy([r for r in self.rulegen_ruleset_cache.rules.values() if not r.is_subscope_rule()])
rules.append(rule)

try:
Expand Down
54 changes: 31 additions & 23 deletions capa/ida/plugin/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

from typing import Set, Dict, List, Tuple
from typing import Set, Dict, List, Tuple, Optional
from collections import deque

import idc
Expand Down Expand Up @@ -444,37 +444,45 @@ def render_capa_doc_match(self, parent: CapaExplorerDataItem, match: rd.Match, d
self.render_capa_doc_match(parent2, child, doc)

def render_capa_doc_by_function(self, doc: rd.ResultDocument):
""" """
matches_by_function: Dict[int, Tuple[CapaExplorerFunctionItem, Set[str]]] = {}
"""render rule matches by function meaning each rule match is nested under function where it was found"""
matches_by_function: Dict[AbsoluteVirtualAddress, Tuple[CapaExplorerFunctionItem, Set[str]]] = {}
for rule in rutils.capability_rules(doc):
for location_, _ in rule.matches:
location = location_.to_capa()

if not isinstance(location, AbsoluteVirtualAddress):
# only handle matches with a VA
match_eas: List[int] = []

# initial pass of rule matches
for (addr_, _) in rule.matches:
addr: Address = addr_.to_capa()
if isinstance(addr, AbsoluteVirtualAddress):
match_eas.append(int(addr))

for ea in match_eas:
func_ea: Optional[int] = capa.ida.helpers.get_func_start_ea(ea)
if func_ea is None:
# rule match address is not located in a defined function
continue
ea = int(location)

ea = capa.ida.helpers.get_func_start_ea(ea)
if ea is None:
# file scope, skip rendering in this mode
continue
if not matches_by_function.get(ea, ()):
# new function root
matches_by_function[ea] = (
CapaExplorerFunctionItem(self.root_node, location, can_check=False),
func_address: AbsoluteVirtualAddress = AbsoluteVirtualAddress(func_ea)
if not matches_by_function.get(func_address, ()):
# create a new function root to nest its rule matches; Note: we must use the address of the
# function here so everything is displayed properly
matches_by_function[func_address] = (
CapaExplorerFunctionItem(self.root_node, func_address, can_check=False),
set(),
)
function_root, match_cache = matches_by_function[ea]
if rule.meta.name in match_cache:
# rule match already rendered for this function root, skip it

func_root, func_match_cache = matches_by_function[func_address]
if rule.meta.name in func_match_cache:
# only nest each rule once, so if found, skip
continue
match_cache.add(rule.meta.name)

# add matched rule to its function cache; create a new rule node whose parent is the matched
# function node
func_match_cache.add(rule.meta.name)
CapaExplorerRuleItem(
function_root,
func_root,
rule.meta.name,
rule.meta.namespace or "",
len(rule.matches),
len([ea for ea in match_eas if capa.ida.helpers.get_func_start_ea(ea) == func_ea]),
rule.source,
can_check=False,
)
Expand Down

0 comments on commit 7ea166f

Please sign in to comment.