From 3c1cd67f60afaed9d47565f056d76031cadecaec Mon Sep 17 00:00:00 2001 From: Mike Hunhoff Date: Fri, 9 Sep 2022 12:09:41 -0600 Subject: [PATCH] dotnet: support property feature extraction (#1168) --- CHANGELOG.md | 1 + capa/features/common.py | 31 +++- capa/features/extractors/dnfile/helpers.py | 150 +++++++++++------ capa/features/extractors/dnfile/insn.py | 179 +++++++++++++++++---- capa/features/extractors/dotnetfile.py | 8 +- capa/features/freeze/features.py | 14 ++ capa/features/insn.py | 30 +++- capa/render/vverbose.py | 7 +- capa/rules.py | 17 ++ tests/fixtures.py | 115 +++++++++++++ tests/test_freeze.py | 4 + tests/test_main.py | 27 +++- tests/test_match.py | 41 +++++ tests/test_render.py | 7 + tests/test_rules.py | 41 ++++- 15 files changed, 580 insertions(+), 92 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bec9c6147..a9f36ac94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### New Features - verify rule metadata format on load #1160 @mr-tz +- extract property features from .NET PE files #1168 @anushkavirgaonkar ### Breaking Changes diff --git a/capa/features/common.py b/capa/features/common.py index 30a4c0b25..67c9ed0d9 100644 --- a/capa/features/common.py +++ b/capa/features/common.py @@ -29,6 +29,14 @@ THUNK_CHAIN_DEPTH_DELTA = 5 +class FeatureAccess: + READ = "read" + WRITE = "write" + + +VALID_FEATURE_ACCESS = (FeatureAccess.READ, FeatureAccess.WRITE) + + def bytes_to_str(b: bytes) -> str: return str(codecs.encode(b, "hex").decode("utf-8")) @@ -92,15 +100,19 @@ def __nonzero__(self): class Feature(abc.ABC): - def __init__(self, value: Union[str, int, float, bytes], description=None): + def __init__( + self, + value: Union[str, int, float, bytes], + description: Optional[str] = None, + ): """ Args: value (any): the value of the feature, such as the number or string. description (str): a human-readable description that explains the feature value. """ super(Feature, self).__init__() - self.name = self.__class__.__name__.lower() + self.name = self.__class__.__name__.lower() self.value = value self.description = description @@ -119,23 +131,28 @@ def __lt__(self, other): < capa.features.freeze.features.feature_from_capa(other).json() ) + def get_name_str(self) -> str: + """ + render the name of this feature, for use by `__str__` and friends. + subclasses should override to customize the rendering. + """ + return self.name + def get_value_str(self) -> str: """ render the value of this feature, for use by `__str__` and friends. subclasses should override to customize the rendering. - - Returns: any """ return str(self.value) def __str__(self): if self.value is not None: if self.description: - return "%s(%s = %s)" % (self.name, self.get_value_str(), self.description) + return "%s(%s = %s)" % (self.get_name_str(), self.get_value_str(), self.description) else: - return "%s(%s)" % (self.name, self.get_value_str()) + return "%s(%s)" % (self.get_name_str(), self.get_value_str()) else: - return "%s" % self.name + return "%s" % self.get_name_str() def __repr__(self): return str(self) diff --git a/capa/features/extractors/dnfile/helpers.py b/capa/features/extractors/dnfile/helpers.py index 2b65cc52b..988038732 100644 --- a/capa/features/extractors/dnfile/helpers.py +++ b/capa/features/extractors/dnfile/helpers.py @@ -9,6 +9,7 @@ from __future__ import annotations import logging +from enum import Enum from typing import Any, Tuple, Iterator, Optional import dnfile @@ -17,6 +18,8 @@ from dncil.clr.token import Token, StringToken, InvalidToken from dncil.cil.body.reader import CilMethodBodyReaderBase +from capa.features.common import FeatureAccess + logger = logging.getLogger(__name__) # key indexes to dotnet metadata tables @@ -41,45 +44,36 @@ def seek(self, offset: int) -> int: return self.offset -class DnClass(object): - def __init__(self, token: int, namespace: str, classname: str): - self.token: int = token - self.namespace: str = namespace - self.classname: str = classname +class DnType(object): + def __init__(self, token: int, class_: str, namespace: str = "", member: str = "", access: Optional[str] = None): + self.token = token + self.access = access + self.namespace = namespace + self.class_ = class_ + self.member = member def __hash__(self): - return hash((self.token,)) + return hash((self.token, self.access, self.namespace, self.class_, self.member)) def __eq__(self, other): - return self.token == other.token + return ( + self.token == other.token + and self.access == other.access + and self.namespace == other.namespace + and self.class_ == other.class_ + and self.member == other.member + ) def __str__(self): - return DnClass.format_name(self.namespace, self.classname) + return DnType.format_name(self.class_, namespace=self.namespace, member=self.member) def __repr__(self): return str(self) @staticmethod - def format_name(namespace: str, classname: str): - name: str = classname - if namespace: - # like System.IO.File::OpenRead - name = f"{namespace}.{name}" - return name - - -class DnMethod(DnClass): - def __init__(self, token: int, namespace: str, classname: str, methodname: str): - super(DnMethod, self).__init__(token, namespace, classname) - self.methodname: str = methodname - - def __str__(self): - return DnMethod.format_name(self.namespace, self.classname, self.methodname) - - @staticmethod - def format_name(namespace: str, classname: str, methodname: str): # type: ignore + def format_name(class_: str, namespace: str = "", member: str = ""): # like File::OpenRead - name: str = f"{classname}::{methodname}" + name: str = f"{class_}::{member}" if member else class_ if namespace: # like System.IO.File::OpenRead name = f"{namespace}.{name}" @@ -87,26 +81,26 @@ def format_name(namespace: str, classname: str, methodname: str): # type: ignor class DnUnmanagedMethod: - def __init__(self, token: int, modulename: str, methodname: str): + def __init__(self, token: int, module: str, method: str): self.token: int = token - self.modulename: str = modulename - self.methodname: str = methodname + self.module: str = module + self.method: str = method def __hash__(self): - return hash((self.token,)) + return hash((self.token, self.module, self.method)) def __eq__(self, other): - return self.token == other.token + return self.token == other.token and self.module == other.module and self.method == other.method def __str__(self): - return DnUnmanagedMethod.format_name(self.modulename, self.methodname) + return DnUnmanagedMethod.format_name(self.module, self.method) def __repr__(self): return str(self) @staticmethod - def format_name(modulename, methodname): - return f"{modulename}.{methodname}" + def format_name(module, method): + return f"{module}.{method}" def resolve_dotnet_token(pe: dnfile.dnPE, token: Token) -> Any: @@ -139,7 +133,7 @@ def read_dotnet_method_body(pe: dnfile.dnPE, row: dnfile.mdtable.MethodDefRow) - try: return CilMethodBody(DnfileMethodBodyReader(pe, row)) except MethodBodyFormatError as e: - logger.warn("failed to parse managed method body @ 0x%08x (%s)" % (row.Rva, e)) + logger.warning("failed to parse managed method body @ 0x%08x (%s)" % (row.Rva, e)) return None @@ -148,7 +142,7 @@ def read_dotnet_user_string(pe: dnfile.dnPE, token: StringToken) -> Optional[str try: user_string: Optional[dnfile.stream.UserString] = pe.net.user_strings.get_us(token.rid) except UnicodeDecodeError as e: - logger.warn("failed to decode #US stream index 0x%08x (%s)" % (token.rid, e)) + logger.warning("failed to decode #US stream index 0x%08x (%s)" % (token.rid, e)) return None if user_string is None: @@ -157,7 +151,7 @@ def read_dotnet_user_string(pe: dnfile.dnPE, token: StringToken) -> Optional[str return user_string.value -def get_dotnet_managed_imports(pe: dnfile.dnPE) -> Iterator[DnMethod]: +def get_dotnet_managed_imports(pe: dnfile.dnPE) -> Iterator[DnType]: """get managed imports from MemberRef table see https://www.ntcore.com/files/dotnetformat.htm @@ -174,12 +168,11 @@ def get_dotnet_managed_imports(pe: dnfile.dnPE) -> Iterator[DnMethod]: for (rid, row) in enumerate(iter_dotnet_table(pe, "MemberRef")): if not isinstance(row.Class.row, dnfile.mdtable.TypeRefRow): continue - token: int = calculate_dotnet_token_value(pe.net.mdtables.MemberRef.number, rid + 1) - yield DnMethod(token, row.Class.row.TypeNamespace, row.Class.row.TypeName, row.Name) + yield DnType(token, row.Class.row.TypeName, namespace=row.Class.row.TypeNamespace, member=row.Name) -def get_dotnet_managed_methods(pe: dnfile.dnPE) -> Iterator[DnMethod]: +def get_dotnet_managed_methods(pe: dnfile.dnPE) -> Iterator[DnType]: """get managed method names from TypeDef table see https://www.ntcore.com/files/dotnetformat.htm @@ -193,7 +186,70 @@ def get_dotnet_managed_methods(pe: dnfile.dnPE) -> Iterator[DnMethod]: for row in iter_dotnet_table(pe, "TypeDef"): for index in row.MethodList: token = calculate_dotnet_token_value(index.table.number, index.row_index) - yield DnMethod(token, row.TypeNamespace, row.TypeName, index.row.Name) + yield DnType(token, row.TypeName, namespace=row.TypeNamespace, member=index.row.Name) + + +def get_dotnet_fields(pe: dnfile.dnPE) -> Iterator[DnType]: + """get fields from TypeDef table""" + for row in iter_dotnet_table(pe, "TypeDef"): + for index in row.FieldList: + token = calculate_dotnet_token_value(index.table.number, index.row_index) + yield DnType(token, row.TypeName, namespace=row.TypeNamespace, member=index.row.Name) + + +def get_dotnet_property_map( + pe: dnfile.dnPE, property_row: dnfile.mdtable.PropertyRow +) -> Optional[dnfile.mdtable.TypeDefRow]: + """get property map from PropertyMap table + + see https://www.ntcore.com/files/dotnetformat.htm + + 21 - PropertyMap Table + List of Properties owned by a specific class. + Parent (index into the TypeDef table) + PropertyList (index into Property table). It marks the first of a contiguous run of Properties owned by Parent. The run continues to the smaller of: + the last row of the Property table + the next run of Properties, found by inspecting the PropertyList of the next row in this PropertyMap table + """ + for row in iter_dotnet_table(pe, "PropertyMap"): + for index in row.PropertyList: + if index.row.Name == property_row.Name: + return row.Parent.row + return None + + +def get_dotnet_properties(pe: dnfile.dnPE) -> Iterator[DnType]: + """get property from MethodSemantics table + + see https://www.ntcore.com/files/dotnetformat.htm + + 24 - MethodSemantics Table + Links Events and Properties to specific methods. For example one Event can be associated to more methods. A property uses this table to associate get/set methods. + Semantics (a 2-byte bitmask of type MethodSemanticsAttributes) + Method (index into the MethodDef table) + Association (index into the Event or Property table; more precisely, a HasSemantics coded index) + """ + for row in iter_dotnet_table(pe, "MethodSemantics"): + typedef_row = get_dotnet_property_map(pe, row.Association.row) + if typedef_row is None: + continue + + token = calculate_dotnet_token_value(row.Method.table.number, row.Method.row_index) + + if row.Semantics.msSetter: + access = FeatureAccess.WRITE + elif row.Semantics.msGetter: + access = FeatureAccess.READ + else: + access = None + + yield DnType( + token, + typedef_row.TypeName, + access=access, + namespace=typedef_row.TypeNamespace, + member=row.Association.row.Name, + ) def get_dotnet_managed_method_bodies(pe: dnfile.dnPE) -> Iterator[Tuple[int, CilMethodBody]]: @@ -226,8 +282,8 @@ def get_dotnet_unmanaged_imports(pe: dnfile.dnPE) -> Iterator[DnUnmanagedMethod] ImportScope (index into the ModuleRef table) """ for row in iter_dotnet_table(pe, "ImplMap"): - modulename: str = row.ImportScope.row.Name - methodname: str = row.ImportName + module: str = row.ImportScope.row.Name + method: str = row.ImportName # ECMA says "Each row of the ImplMap table associates a row in the MethodDef table (MemberForwarded) with the # name of a routine (ImportName) in some unmanaged DLL (ImportScope)"; so we calculate and map the MemberForwarded @@ -235,11 +291,11 @@ def get_dotnet_unmanaged_imports(pe: dnfile.dnPE) -> Iterator[DnUnmanagedMethod] token: int = calculate_dotnet_token_value(row.MemberForwarded.table.number, row.MemberForwarded.row_index) # like Kernel32.dll - if modulename and "." in modulename: - modulename = modulename.split(".")[0] + if module and "." in module: + module = module.split(".")[0] # like kernel32.CreateFileA - yield DnUnmanagedMethod(token, modulename, methodname) + yield DnUnmanagedMethod(token, module, method) def calculate_dotnet_token_value(table: int, rid: int) -> int: diff --git a/capa/features/extractors/dnfile/insn.py b/capa/features/extractors/dnfile/insn.py index cc2f51bb4..d99dcd58d 100644 --- a/capa/features/extractors/dnfile/insn.py +++ b/capa/features/extractors/dnfile/insn.py @@ -17,21 +17,26 @@ from dncil.cil.instruction import Instruction import capa.features.extractors.helpers -from capa.features.insn import API, Number -from capa.features.common import Class, String, Feature, Namespace, Characteristic +from capa.features.insn import API, Number, Property +from capa.features.common import Class, String, Feature, Namespace, FeatureAccess, Characteristic from capa.features.address import Address from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle from capa.features.extractors.dnfile.helpers import ( - DnClass, - DnMethod, + DnType, DnUnmanagedMethod, + get_dotnet_fields, resolve_dotnet_token, + get_dotnet_properties, read_dotnet_user_string, get_dotnet_managed_imports, get_dotnet_managed_methods, get_dotnet_unmanaged_imports, ) +METHODDEF_TABLE = dnfile.mdtable.MethodDef.number +MEMBERREF_TABLE = dnfile.mdtable.MemberRef.number +FIELD_TABLE = dnfile.mdtable.Field.number + def get_managed_imports(ctx: Dict) -> Dict: if "managed_imports_cache" not in ctx: @@ -57,18 +62,34 @@ def get_methods(ctx: Dict) -> Dict: return ctx["methods_cache"] -def get_callee(ctx: Dict, token: int) -> Union[DnMethod, DnUnmanagedMethod, None]: +def get_callee(ctx: Dict, token: int) -> Union[DnType, DnUnmanagedMethod, None]: """map dotnet token to un/managed method""" - callee: Union[DnMethod, DnUnmanagedMethod, None] = get_managed_imports(ctx).get(token, None) - if not callee: + callee: Union[DnType, DnUnmanagedMethod, None] = get_managed_imports(ctx).get(token, None) + if callee is None: # we must check unmanaged imports before managed methods because we map forwarded managed methods # to their unmanaged imports; we prefer a forwarded managed method be mapped to its unmanaged import for analysis callee = get_unmanaged_imports(ctx).get(token, None) - if not callee: + if callee is None: callee = get_methods(ctx).get(token, None) return callee +def get_properties(ctx: Dict) -> Dict: + if "properties_cache" not in ctx: + ctx["properties_cache"] = {} + for prop in get_dotnet_properties(ctx["pe"]): + ctx["properties_cache"][prop.token] = prop + return ctx["properties_cache"] + + +def get_fields(ctx: Dict) -> Dict: + if "fields_cache" not in ctx: + ctx["fields_cache"] = {} + for field in get_dotnet_fields(ctx["pe"]): + ctx["fields_cache"][field.token] = field + return ctx["fields_cache"] + + def extract_insn_api_features(fh: FunctionHandle, bh, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]: """parse instruction API features""" insn: Instruction = ih.inner @@ -76,49 +97,148 @@ def extract_insn_api_features(fh: FunctionHandle, bh, ih: InsnHandle) -> Iterato if insn.opcode not in (OpCodes.Call, OpCodes.Callvirt, OpCodes.Jmp, OpCodes.Calli): return - callee: Union[DnMethod, DnUnmanagedMethod, None] = get_callee(fh.ctx, insn.operand.value) + callee: Union[DnType, DnUnmanagedMethod, None] = get_callee(fh.ctx, insn.operand.value) if callee is None: return - if isinstance(callee, DnUnmanagedMethod): - # like kernel32.CreateFileA - for name in capa.features.extractors.helpers.generate_symbols(callee.modulename, callee.methodname): - yield API(name), ih.address - else: + if isinstance(callee, DnType): + if callee.member.startswith(("get_", "set_")): + if insn.operand.table == METHODDEF_TABLE: + # check if the method belongs to the MethodDef table and whether it is used to access a property + if get_properties(fh.ctx).get(insn.operand.value, None) is not None: + return + elif insn.operand.table == MEMBERREF_TABLE: + # if the method belongs to the MemberRef table, we assume it is used to access a property + return + # like System.IO.File::Delete yield API(str(callee)), ih.address + else: + # like kernel32.CreateFileA + for name in capa.features.extractors.helpers.generate_symbols(callee.module, callee.method): + yield API(name), ih.address + + +def extract_insn_property_features(fh: FunctionHandle, bh, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]: + """parse instruction property features""" + insn: Instruction = ih.inner + + name: Optional[str] = None + access: Optional[str] = None + + if insn.opcode in (OpCodes.Call, OpCodes.Callvirt, OpCodes.Jmp, OpCodes.Calli): + if insn.operand.table == METHODDEF_TABLE: + # check if the method belongs to the MethodDef table and whether it is used to access a property + prop = get_properties(fh.ctx).get(insn.operand.value, None) + if prop is not None: + name = str(prop) + access = prop.access + + elif insn.operand.table == MEMBERREF_TABLE: + # if the method belongs to the MemberRef table, we assume it is used to access a property + row: Any = resolve_dotnet_token(fh.ctx["pe"], insn.operand) + if row is None: + return + if not isinstance(row.Class.row, (dnfile.mdtable.TypeRefRow, dnfile.mdtable.TypeDefRow)): + return + if not row.Name.startswith(("get_", "set_")): + return + + name = DnType.format_name( + row.Class.row.TypeName, namespace=row.Class.row.TypeNamespace, member=row.Name[4:] + ) + if row.Name.startswith("get_"): + access = FeatureAccess.READ + elif row.Name.startswith("set_"): + access = FeatureAccess.WRITE + + elif insn.opcode in (OpCodes.Ldfld, OpCodes.Ldflda, OpCodes.Ldsfld, OpCodes.Ldsflda): + if insn.operand.table == FIELD_TABLE: + # determine whether the operand is a field by checking if it belongs to the Field table + read_field: Optional[DnType] = get_fields(fh.ctx).get(insn.operand.value, None) + if read_field: + name = str(read_field) + access = FeatureAccess.READ + + elif insn.opcode in (OpCodes.Stfld, OpCodes.Stsfld): + if insn.operand.table == FIELD_TABLE: + # determine whether the operand is a field by checking if it belongs to the Field table + write_field: Optional[DnType] = get_fields(fh.ctx).get(insn.operand.value, None) + if write_field: + name = str(write_field) + access = FeatureAccess.WRITE + + if name is not None: + if access is not None: + yield Property(name, access=access), ih.address + yield Property(name), ih.address + def extract_insn_class_features(fh: FunctionHandle, bh, ih: InsnHandle) -> Iterator[Tuple[Class, Address]]: """parse instruction class features""" - if ih.inner.opcode not in (OpCodes.Call, OpCodes.Callvirt, OpCodes.Jmp, OpCodes.Calli): + if ih.inner.opcode not in ( + OpCodes.Call, + OpCodes.Callvirt, + OpCodes.Jmp, + OpCodes.Calli, + OpCodes.Ldfld, + OpCodes.Ldflda, + OpCodes.Ldsfld, + OpCodes.Ldsflda, + OpCodes.Stfld, + OpCodes.Stsfld, + ): return - row: Any = resolve_dotnet_token(fh.ctx["pe"], Token(ih.inner.operand.value)) + row: Any = resolve_dotnet_token(fh.ctx["pe"], ih.inner.operand) + if isinstance(row, dnfile.mdtable.MemberRefRow): + if isinstance(row.Class.row, (dnfile.mdtable.TypeRefRow, dnfile.mdtable.TypeDefRow)): + yield Class(DnType.format_name(row.Class.row.TypeName, namespace=row.Class.row.TypeNamespace)), ih.address - if not isinstance(row, dnfile.mdtable.MemberRefRow): - return - if not isinstance(row.Class.row, (dnfile.mdtable.TypeRefRow, dnfile.mdtable.TypeDefRow)): - return + elif isinstance(row, dnfile.mdtable.MethodDefRow): + callee: Union[DnType, DnUnmanagedMethod, None] = get_callee(fh.ctx, ih.inner.operand.value) + if isinstance(callee, DnType): + yield Class(DnType.format_name(callee.class_, namespace=callee.namespace)), ih.address - yield Class(DnClass.format_name(row.Class.row.TypeNamespace, row.Class.row.TypeName)), ih.address + elif isinstance(row, dnfile.mdtable.FieldRow): + field: Optional[DnType] = get_fields(fh.ctx).get(ih.inner.operand.value, None) + if field is not None: + yield Class(DnType.format_name(field.class_, namespace=field.namespace)), ih.address def extract_insn_namespace_features(fh: FunctionHandle, bh, ih: InsnHandle) -> Iterator[Tuple[Namespace, Address]]: """parse instruction namespace features""" - if ih.inner.opcode not in (OpCodes.Call, OpCodes.Callvirt, OpCodes.Jmp, OpCodes.Calli): + if ih.inner.opcode not in ( + OpCodes.Call, + OpCodes.Callvirt, + OpCodes.Jmp, + OpCodes.Calli, + OpCodes.Ldfld, + OpCodes.Ldflda, + OpCodes.Ldsfld, + OpCodes.Ldsflda, + OpCodes.Stfld, + OpCodes.Stsfld, + ): return row: Any = resolve_dotnet_token(fh.ctx["pe"], Token(ih.inner.operand.value)) - if not isinstance(row, dnfile.mdtable.MemberRefRow): - return - if not isinstance(row.Class.row, (dnfile.mdtable.TypeRefRow, dnfile.mdtable.TypeDefRow)): - return - if not row.Class.row.TypeNamespace: - return + if isinstance(row, dnfile.mdtable.MemberRefRow): + if isinstance(row.Class.row, (dnfile.mdtable.TypeRefRow, dnfile.mdtable.TypeDefRow)): + if row.Class.row.TypeNamespace: + yield Namespace(row.Class.row.TypeNamespace), ih.address + + elif isinstance(row, dnfile.mdtable.MethodDefRow): + callee: Union[DnType, DnUnmanagedMethod, None] = get_callee(fh.ctx, ih.inner.operand.value) + if isinstance(callee, DnType) and callee.namespace is not None: + yield Namespace(callee.namespace), ih.address - yield Namespace(row.Class.row.TypeNamespace), ih.address + elif isinstance(row, dnfile.mdtable.FieldRow): + field: Optional[DnType] = get_fields(fh.ctx).get(ih.inner.operand.value, None) + if field is not None: + yield Namespace(field.namespace), ih.address def extract_insn_number_features(fh, bh, ih: InsnHandle) -> Iterator[Tuple[Feature, Address]]: @@ -174,6 +294,7 @@ def extract_features(fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle) -> Itera INSTRUCTION_HANDLERS = ( extract_insn_api_features, + extract_insn_property_features, extract_insn_number_features, extract_insn_string_features, extract_insn_namespace_features, diff --git a/capa/features/extractors/dotnetfile.py b/capa/features/extractors/dotnetfile.py index 076bc6006..b9c9f00a6 100644 --- a/capa/features/extractors/dotnetfile.py +++ b/capa/features/extractors/dotnetfile.py @@ -24,7 +24,7 @@ from capa.features.address import NO_ADDRESS, Address, DNTokenAddress from capa.features.extractors.base_extractor import FeatureExtractor from capa.features.extractors.dnfile.helpers import ( - DnClass, + DnType, iter_dotnet_table, is_dotnet_mixed_mode, get_dotnet_managed_imports, @@ -47,7 +47,7 @@ def extract_file_import_names(pe: dnfile.dnPE, **kwargs) -> Iterator[Tuple[Impor for imp in get_dotnet_unmanaged_imports(pe): # like kernel32.CreateFileA - for name in capa.features.extractors.helpers.generate_symbols(imp.modulename, imp.methodname): + for name in capa.features.extractors.helpers.generate_symbols(imp.module, imp.method): yield Import(name), DNTokenAddress(imp.token) @@ -80,11 +80,11 @@ def extract_file_class_features(pe: dnfile.dnPE, **kwargs) -> Iterator[Tuple[Cla """emit class features from TypeRef and TypeDef tables""" for (rid, row) in enumerate(iter_dotnet_table(pe, "TypeDef")): token = calculate_dotnet_token_value(pe.net.mdtables.TypeDef.number, rid + 1) - yield Class(DnClass.format_name(row.TypeNamespace, row.TypeName)), DNTokenAddress(token) + yield Class(DnType.format_name(row.TypeName, namespace=row.TypeNamespace)), DNTokenAddress(token) for (rid, row) in enumerate(iter_dotnet_table(pe, "TypeRef")): token = calculate_dotnet_token_value(pe.net.mdtables.TypeRef.number, rid + 1) - yield Class(DnClass.format_name(row.TypeNamespace, row.TypeName)), DNTokenAddress(token) + yield Class(DnType.format_name(row.TypeName, namespace=row.TypeNamespace)), DNTokenAddress(token) def extract_file_os(**kwargs) -> Iterator[Tuple[OS, Address]]: diff --git a/capa/features/freeze/features.py b/capa/features/freeze/features.py index 8f8665ca5..9182f8779 100644 --- a/capa/features/freeze/features.py +++ b/capa/features/freeze/features.py @@ -66,6 +66,9 @@ def to_capa(self) -> capa.features.common.Feature: elif isinstance(self, APIFeature): return capa.features.insn.API(self.api, description=self.description) + elif isinstance(self, PropertyFeature): + return capa.features.insn.Property(self.property, access=self.access, description=self.description) + elif isinstance(self, NumberFeature): return capa.features.insn.Number(self.number, description=self.description) @@ -147,6 +150,9 @@ def feature_from_capa(f: capa.features.common.Feature) -> "Feature": elif isinstance(f, capa.features.insn.API): return APIFeature(api=f.value, description=f.description) + elif isinstance(f, capa.features.insn.Property): + return PropertyFeature(property=f.value, access=f.access, description=f.description) + elif isinstance(f, capa.features.insn.Number): return NumberFeature(number=f.value, description=f.description) @@ -266,6 +272,13 @@ class APIFeature(FeatureModel): description: Optional[str] +class PropertyFeature(FeatureModel): + type: str = "property" + access: Optional[str] + property: str + description: Optional[str] + + class NumberFeature(FeatureModel): type: str = "number" number: Union[int, float] @@ -320,6 +333,7 @@ class OperandOffsetFeature(FeatureModel): ClassFeature, NamespaceFeature, APIFeature, + PropertyFeature, NumberFeature, BytesFeature, OffsetFeature, diff --git a/capa/features/insn.py b/capa/features/insn.py index c62d3ddf3..50dd6133f 100644 --- a/capa/features/insn.py +++ b/capa/features/insn.py @@ -6,9 +6,9 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. import abc -from typing import Union +from typing import Union, Optional -from capa.features.common import Feature +from capa.features.common import VALID_FEATURE_ACCESS, Feature def hex(n: int) -> str: @@ -24,6 +24,32 @@ def __init__(self, name: str, description=None): super(API, self).__init__(name, description=description) +class _AccessFeature(Feature, abc.ABC): + # superclass: don't use directly + def __init__(self, value: str, access: Optional[str] = None, description: Optional[str] = None): + super(_AccessFeature, self).__init__(value, description=description) + if access is not None: + if access not in VALID_FEATURE_ACCESS: + raise ValueError("%s access type %s not valid" % (self.name, access)) + self.access = access + + def __hash__(self): + return hash((self.name, self.value, self.access)) + + def __eq__(self, other): + return super().__eq__(other) and self.access == other.access + + def get_name_str(self) -> str: + if self.access is not None: + return f"{self.name}/{self.access}" + return self.name + + +class Property(_AccessFeature): + def __init__(self, value: str, access: Optional[str] = None, description=None): + super(Property, self).__init__(value, access=access, description=description) + + class Number(Feature): def __init__(self, value: Union[int, float], description=None): super(Number, self).__init__(value, description=description) diff --git a/capa/render/vverbose.py b/capa/render/vverbose.py index fbc54eb4a..76a836797 100644 --- a/capa/render/vverbose.py +++ b/capa/render/vverbose.py @@ -131,7 +131,7 @@ def render_feature(ostream, match: rd.Match, feature: frzf.Feature, indent=0): if isinstance(feature, frzf.ImportFeature): # fixup access to Python reserved name value = feature.import_ - if isinstance(feature, frzf.ClassFeature): + elif isinstance(feature, frzf.ClassFeature): value = feature.class_ else: # convert attributes to dictionary using aliased names, if applicable @@ -151,6 +151,11 @@ def render_feature(ostream, match: rd.Match, feature: frzf.Feature, indent=0): value = hex(value) ostream.write(key) + + if isinstance(feature, frzf.PropertyFeature): + if feature.access is not None: + ostream.write("/" + feature.access) + ostream.write(": ") if value: diff --git a/capa/rules.py b/capa/rules.py index d72ff1f4c..5da1f3128 100644 --- a/capa/rules.py +++ b/capa/rules.py @@ -123,6 +123,7 @@ class Scope(str, Enum): INSTRUCTION_SCOPE: { capa.features.common.MatchedRule, capa.features.insn.API, + capa.features.insn.Property, capa.features.insn.Number, capa.features.common.String, capa.features.common.Bytes, @@ -291,6 +292,8 @@ def parse_feature(key: str): return capa.features.common.Class elif key == "namespace": return capa.features.common.Namespace + elif key == "property": + return capa.features.insn.Property else: raise InvalidRule("unexpected statement: %s" % key) @@ -568,6 +571,20 @@ def build_statements(d, scope: str): or (key == "arch" and d[key] not in capa.features.common.VALID_ARCH) ): raise InvalidRule("unexpected %s value %s" % (key, d[key])) + + elif key.startswith("property/"): + access = key[len("property/") :] + if access not in capa.features.common.VALID_FEATURE_ACCESS: + raise InvalidRule("unexpected %s access %s" % (key, access)) + + value, description = parse_description(d[key], key, d.get("description")) + try: + feature = capa.features.insn.Property(value, access=access, description=description) + except ValueError as e: + raise InvalidRule(str(e)) + ensure_feature_valid_for_scope(scope, feature) + return feature + else: Feature = parse_feature(key) value, description = parse_description(d[key], key, d.get("description")) diff --git a/tests/fixtures.py b/tests/fixtures.py index 058df8a69..b3045c937 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -36,6 +36,7 @@ Arch, Format, Feature, + FeatureAccess, ) from capa.features.address import Address from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle @@ -279,6 +280,10 @@ def get_data_path_by_name(name): return os.path.join(CD, "data", "dotnet", "1c444ebeba24dcba8628b7dfe5fec7c6.exe_") elif name.startswith("_692f"): return os.path.join(CD, "data", "dotnet", "692f7fd6d198e804d6af98eb9e390d61.exe_") + elif name.startswith("_0953c"): + return os.path.join(CD, "data", "0953cc3b77ed2974b09e3a00708f88de931d681e2d0cb64afbaf714610beabe6.exe_") + elif name.startswith("_039a6"): + return os.path.join(CD, "data", "039a6336d0802a2255669e6867a5679c7eb83313dbc61fb1c7232147379bd304.exe_") else: raise ValueError("unexpected sample fixture: %s" % name) @@ -758,6 +763,106 @@ def parametrize(params, values, **kwargs): True, ), ("_1c444", "function=0x1F68, bb=0x1F68, insn=0x1FF9", capa.features.insn.API("FromHbitmap"), False), + ( + "_1c444", + "token=0x600002B", + capa.features.insn.Property("System.IO.FileInfo::Length", access=FeatureAccess.READ), + True, + ), # MemberRef method + ( + "_1c444", + "token=0x600002B", + capa.features.insn.Property("System.IO.FileInfo::Length"), + True, + ), # MemberRef method + ( + "_1c444", + "token=0x6000081", + capa.features.insn.API("System.Diagnostics.Process::Start"), + True, + ), # MemberRef method + ( + "_1c444", + "token=0x6000081", + capa.features.insn.Property( + "System.Diagnostics.ProcessStartInfo::UseShellExecute", access=FeatureAccess.WRITE + ), # MemberRef method + True, + ), + ( + "_1c444", + "token=0x6000081", + capa.features.insn.Property( + "System.Diagnostics.ProcessStartInfo::WorkingDirectory", access=FeatureAccess.WRITE + ), # MemberRef method + True, + ), + ( + "_1c444", + "token=0x6000081", + capa.features.insn.Property( + "System.Diagnostics.ProcessStartInfo::FileName", access=FeatureAccess.WRITE + ), # MemberRef method + True, + ), + ( + "_1c444", + "token=0x6000087", + capa.features.insn.Property("Sockets.MySocket::reConnectionDelay", access=FeatureAccess.WRITE), # Field + True, + ), + ( + "_1c444", + "token=0x600008A", + capa.features.insn.Property("Sockets.MySocket::isConnected", access=FeatureAccess.WRITE), # Field + True, + ), + ( + "_1c444", + "token=0x600008A", + capa.features.insn.Property("Sockets.MySocket::onConnected", access=FeatureAccess.READ), # Field + True, + ), + ( + "_0953c", + "token=0x6000004", + capa.features.insn.Property("System.Diagnostics.Debugger::IsAttached", access=FeatureAccess.READ), + True, + ), # MemberRef method + ( + "_692f", + "token=0x6000006", + capa.features.insn.Property( + "System.Management.Automation.PowerShell::Streams", access=FeatureAccess.READ + ), # MemberRef method + False, + ), + ( + "_039a6", + "token=0x6000007", + capa.features.insn.API("System.Reflection.Assembly::Load"), + True, + ), + ( + "_039a6", + "token=0x600001D", + capa.features.insn.Property("StagelessHollow.Arac::Marka", access=FeatureAccess.READ), # MethodDef method + True, + ), + ( + "_039a6", + "token=0x600001C", + capa.features.insn.Property("StagelessHollow.Arac::Marka", access=FeatureAccess.READ), # MethodDef method + False, + ), + ( + "_039a6", + "token=0x6000023", + capa.features.insn.Property( + "System.Runtime.CompilerServices.AsyncTaskMethodBuilder::Task", access=FeatureAccess.READ + ), # MemberRef method + False, + ), ], # order tests by (file, item) # so that our LRU cache is most effective. @@ -904,3 +1009,13 @@ def _1c444_dotnetfile_extractor(): @pytest.fixture def _692f_dotnetfile_extractor(): return get_dnfile_extractor(get_data_path_by_name("_692f")) + + +@pytest.fixture +def _0953c_dotnetfile_extractor(): + return get_dnfile_extractor(get_data_path_by_name("_0953c")) + + +@pytest.fixture +def _039a6_dotnetfile_extractor(): + return get_dnfile_extractor(get_data_path_by_name("_039a6")) diff --git a/tests/test_freeze.py b/tests/test_freeze.py index d81ce303b..664afd44b 100644 --- a/tests/test_freeze.py +++ b/tests/test_freeze.py @@ -147,6 +147,10 @@ def test_serialize_features(): roundtrip_feature(capa.features.file.Import("kernel32.IsWow64Process")) roundtrip_feature(capa.features.file.Import("#11")) roundtrip_feature(capa.features.insn.OperandOffset(0, 0x8)) + roundtrip_feature( + capa.features.insn.Property("System.IO.FileInfo::Length", access=capa.features.common.FeatureAccess.READ) + ) + roundtrip_feature(capa.features.insn.Property("System.IO.FileInfo::Length")) def test_freeze_sample(tmpdir, z9324d_extractor): diff --git a/tests/test_main.py b/tests/test_main.py index b6aa72a85..5c29accb0 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -11,7 +11,12 @@ import fixtures from fixtures import * -from fixtures import _692f_dotnetfile_extractor, _1c444_dotnetfile_extractor +from fixtures import ( + _692f_dotnetfile_extractor, + _1c444_dotnetfile_extractor, + _039a6_dotnetfile_extractor, + _0953c_dotnetfile_extractor, +) import capa.main import capa.rules @@ -469,3 +474,23 @@ def test_main_dotnet2(_692f_dotnetfile_extractor): assert capa.main.main([path, "-j"]) == 0 assert capa.main.main([path, "-q"]) == 0 assert capa.main.main([path]) == 0 + + +def test_main_dotnet3(_0953c_dotnetfile_extractor): + # tests rules can be loaded successfully and all output modes + path = _0953c_dotnetfile_extractor.path + assert capa.main.main([path, "-vv"]) == 0 + assert capa.main.main([path, "-v"]) == 0 + assert capa.main.main([path, "-j"]) == 0 + assert capa.main.main([path, "-q"]) == 0 + assert capa.main.main([path]) == 0 + + +def test_main_dotnet4(_039a6_dotnetfile_extractor): + # tests rules can be loaded successfully and all output modes + path = _039a6_dotnetfile_extractor.path + assert capa.main.main([path, "-vv"]) == 0 + assert capa.main.main([path, "-v"]) == 0 + assert capa.main.main([path, "-j"]) == 0 + assert capa.main.main([path, "-q"]) == 0 + assert capa.main.main([path]) == 0 diff --git a/tests/test_match.py b/tests/test_match.py index e3ec17e54..6fb319cd2 100644 --- a/tests/test_match.py +++ b/tests/test_match.py @@ -585,3 +585,44 @@ def test_match_operand_offset(): # mismatching value _, matches = match([r], {capa.features.insn.OperandOffset(0, 0x11): {1, 2}}, 0x0) assert "test rule" not in matches + + +def test_match_property_access(): + rule = textwrap.dedent( + """ + rule: + meta: + name: test rule + features: + - and: + - property/read: System.IO.FileInfo::Length + """ + ) + r = capa.rules.Rule.from_yaml(rule) + + assert capa.features.insn.Property("System.IO.FileInfo::Length", capa.features.common.FeatureAccess.READ) in { + capa.features.insn.Property("System.IO.FileInfo::Length", capa.features.common.FeatureAccess.READ) + } + + _, matches = match( + [r], + {capa.features.insn.Property("System.IO.FileInfo::Length", capa.features.common.FeatureAccess.READ): {1, 2}}, + 0x0, + ) + assert "test rule" in matches + + # mismatching access + _, matches = match( + [r], + {capa.features.insn.Property("System.IO.FileInfo::Length", capa.features.common.FeatureAccess.WRITE): {1, 2}}, + 0x0, + ) + assert "test rule" not in matches + + # mismatching value + _, matches = match( + [r], + {capa.features.insn.Property("System.IO.FileInfo::Size", capa.features.common.FeatureAccess.READ): {1, 2}}, + 0x0, + ) + assert "test rule" not in matches diff --git a/tests/test_render.py b/tests/test_render.py index cc5691773..fff14a95e 100644 --- a/tests/test_render.py +++ b/tests/test_render.py @@ -15,6 +15,13 @@ def test_render_offset(): assert str(capa.features.insn.Offset(1)) == "offset(0x1)" +def test_render_property(): + assert ( + str(capa.features.insn.Property("System.IO.FileInfo::Length", access=capa.features.common.FeatureAccess.READ)) + == "property/read(System.IO.FileInfo::Length)" + ) + + def test_render_meta_attack(): # Persistence::Boot or Logon Autostart Execution::Registry Run Keys / Startup Folder [T1547.001] id = "T1543.003" diff --git a/tests/test_rules.py b/tests/test_rules.py index 1f221e45f..61bef1116 100644 --- a/tests/test_rules.py +++ b/tests/test_rules.py @@ -14,7 +14,7 @@ import capa.engine import capa.features.common from capa.features.file import FunctionName -from capa.features.insn import Number, Offset +from capa.features.insn import Number, Offset, Property from capa.features.common import ( OS, OS_LINUX, @@ -27,6 +27,7 @@ Format, String, Substring, + FeatureAccess, ) @@ -951,3 +952,41 @@ def test_arch_features(): children = list(r.statement.get_children()) assert (Arch(ARCH_AMD64) in children) == True assert (Arch(ARCH_I386) not in children) == True + + +def test_property_access(): + r = capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: test rule + features: + - property/read: System.IO.FileInfo::Length + """ + ) + ) + assert r.evaluate({Property("System.IO.FileInfo::Length", access=FeatureAccess.READ): {1}}) == True + + assert r.evaluate({Property("System.IO.FileInfo::Length"): {1}}) == False + assert r.evaluate({Property("System.IO.FileInfo::Length", access=FeatureAccess.WRITE): {1}}) == False + + +def test_property_access_symbol(): + r = capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: test rule + features: + - property/read: System.IO.FileInfo::Length = some property + """ + ) + ) + assert ( + r.evaluate( + {Property("System.IO.FileInfo::Length", access=FeatureAccess.READ, description="some property"): {1}} + ) + == True + )