Skip to content

Commit

Permalink
dotnet feature extractor cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
mike-hunhoff committed Apr 6, 2022
1 parent 0499f9e commit 656776f
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 116 deletions.
29 changes: 21 additions & 8 deletions capa/features/extractors/dnfile_.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,36 @@
import dnfile
import pefile

import capa.features.extractors.helpers
from capa.features.file import Import
from capa.features.common import OS, OS_ANY, ARCH_ANY, ARCH_I386, ARCH_AMD64, FORMAT_DOTNET, Arch, Format, Feature
from capa.features.extractors.base_extractor import FeatureExtractor
from capa.features.extractors.dotnet.helpers import get_dotnet_imports

logger = logging.getLogger(__name__)


def extract_file_format(**kwargs):
def extract_file_format(**kwargs) -> Iterator[Tuple[Format, int]]:
yield Format(FORMAT_DOTNET), 0x0


def extract_file_os(**kwargs):
def extract_file_import_names(pe: dnfile.dnPE, **kwargs) -> Iterator[Tuple[Import, int]]:
for (token, imp) in get_dotnet_imports(pe).items():
if "::" in imp:
# like System.IO.File::OpenRead
yield Import(imp), token
else:
# like kernel32.CreateFileA
dll, _, symbol = imp.rpartition(".")
for symbol_variant in capa.features.extractors.helpers.generate_symbols(dll, symbol):
yield Import(symbol_variant), token


def extract_file_os(**kwargs) -> Iterator[Tuple[OS, int]]:
yield OS(OS_ANY), 0x0


def extract_file_arch(pe, **kwargs):
def extract_file_arch(pe: dnfile.dnPE, **kwargs) -> Iterator[Tuple[Arch, int]]:
# to distinguish in more detail, see https://stackoverflow.com/a/23614024/10548020
# .NET 4.5 added option: any CPU, 32-bit preferred
if pe.net.Flags.CLR_32BITREQUIRED and pe.PE_TYPE == pefile.OPTIONAL_HEADER_MAGIC_PE:
Expand All @@ -36,11 +51,9 @@ def extract_file_features(pe: dnfile.dnPE) -> Iterator[Tuple[Feature, int]]:


FILE_HANDLERS = (
# extract_file_export_names,
# extract_file_import_names,
# extract_file_section_names,
# extract_file_strings,
# extract_file_function_names,
extract_file_import_names,
# TODO extract_file_strings,
# TODO extract_file_function_names,
extract_file_format,
)

Expand Down
42 changes: 22 additions & 20 deletions capa/features/extractors/dotnet/extractor.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,61 @@
from __future__ import annotations

from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any, List, Tuple

if TYPE_CHECKING:
from dnfile import dnPE
from capa.features.common import Feature

import dncil
import dnfile

import capa.features.extractors
import capa.features.extractors.dotnet.file
import capa.features.extractors.dotnet.insn
import capa.features.extractors.dotnet.function

from capa.features.extractors.dotnet import get_dotnet_methods
from capa.features.extractors.base_extractor import FeatureExtractor
from capa.features.extractors.dotnet.helpers import get_dotnet_methods


class DnfileFeatureExtractor(FeatureExtractor):
def __init__(self, path: str):
super(DnfileFeatureExtractor, self).__init__()
self.global_features = []
self.pe: dnfile.dnPE = dnfile.dnPE(path)

self.pe: dnPE = dnfile.dnPE(path)
# pre-compute these because we'll yield them at *every* scope.
self.global_features: List[Tuple[Feature, int]] = []
self.global_features.extend(capa.features.extractors.dnfile_.extract_file_os(pe=self.pe))
self.global_features.extend(capa.features.extractors.dnfile_.extract_file_arch(pe=self.pe))

def get_base_address(self):
raise NotImplementedError()
return 0x0

def extract_global_features(self):
yield from self.global_features

def extract_file_features(self):
raise NotImplementedError()
yield from capa.features.extractors.dotnet.file.extract_features(self.pe)

def get_functions(self):
ctx = {}
ctx["pe"] = self.pe

for method in get_dotnet_methods(self.pe):
setattr(method, "ctx", ctx)
yield method
for f in get_dotnet_methods(self.pe):
setattr(f, "ctx", ctx)
yield f

def extract_function_features(self, f):
raise NotImplementedError()
# TODO
yield from []

def get_basic_blocks(self, f):
# we don't support basic blocks for dotnet and treat each method as one large basic block
return f
# each dotnet method is considered 1 basic block
yield f

def extract_basic_block_features(self, f, bb):
# we don't support basic block features for dotnet
return
# we don't support basic block features
yield from []

def get_instructions(self, f, bb):
# we don't support basic blocks for dotnet and treat each method as one large basic block
# each dotnet method is considered 1 basic block
yield from f.instructions

def extract_insn_features(self, f, bb, insn):
yield from capa.features.extractors.dotnet.insn.extract_features(f, bb, insn)
yield from capa.features.extractors.dotnet.insn.extract_features(f, bb, insn)
43 changes: 43 additions & 0 deletions capa/features/extractors/dotnet/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from __future__ import annotations

from typing import TYPE_CHECKING, List, Tuple, Iterator

if TYPE_CHECKING:
import dnfile
from capa.features.common import Feature

import capa.features.extractors.helpers
from capa.features.file import Import
from capa.features.common import FORMAT_DOTNET, Format
from capa.features.extractors.dotnet.helpers import get_dotnet_imports


def extract_file_import_names(pe: dnfile.dnPE) -> Iterator[Tuple[Import, int]]:
"""extract file imports"""
for (token, imp) in get_dotnet_imports(pe).items():
if "::" in imp:
# like System.IO.File::OpenRead
yield Import(imp), token
else:
# like kernel32.CreateFileA
dll, _, symbol = imp.rpartition(".")
for symbol_variant in capa.features.extractors.helpers.generate_symbols(dll, symbol):
yield Import(symbol_variant), token


def extract_file_format(pe: dnfile.dnPE) -> Iterator[Tuple[Format, int]]:
yield Format(FORMAT_DOTNET), 0x0


def extract_features(pe: dnfile.dnPE) -> Iterator[Tuple[Feature, int]]:
for file_handler in FILE_HANDLERS:
for (feature, token) in file_handler(pe):
yield feature, token


FILE_HANDLERS = (
extract_file_import_names,
# TODO extract_file_strings,
# TODO extract_file_function_names,
extract_file_format,
)
76 changes: 35 additions & 41 deletions capa/features/extractors/dotnet/helpers.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,42 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Dict, Tuple, Generator
from typing import Any, Dict, Tuple, Iterator
from itertools import chain

if TYPE_CHECKING:
from dnfile.mdtable import MemberRefRow
from dnfile.mdtable import MethodDefRow
from dnfile import dnPE

import dnfile
from dnfile.enums import MetadataTables
from dncil.cil.body import CilMethodBody
from dncil.cil.error import MethodBodyFormatError
from dncil.clr.token import Token, StringToken, InvalidToken
from dncil.cil.body.reader import CilMethodBodyReaderBase

# key indexes to dotnet metadata tables
DOTNET_META_TABLES_BY_INDEX = {table.value: table.name for table in MetadataTables}
DOTNET_META_TABLES_BY_INDEX = {table.value: table.name for table in dnfile.enums.MetadataTables}


class DnfileMethodBodyReader(CilMethodBodyReaderBase):
def __init__(self, pe: dnPE, row: MethodDefRow):
""" """
self.pe: dnPE = pe
def __init__(self, pe: dnfile.dnPE, row: dnfile.mdtable.MethodDefRow):
self.pe: dnfile.dnPE = pe
self.offset: int = self.pe.get_offset_from_rva(row.Rva)

def read(self, n: int) -> bytes:
""" """
data: bytes = self.pe.get_data(self.pe.get_rva_from_offset(self.offset), n)
self.offset += n
return data

def tell(self) -> int:
""" """
return self.offset

def seek(self, offset: int) -> int:
""" """
self.offset = offset
return self.offset


def generate_dotnet_token(table: int, rid: int) -> int:
""" """
return ((table & 0xFF) << Token.TABLE_SHIFT) | (rid & Token.RID_MASK)


def resolve_dotnet_token(pe: dnPE, token: Token) -> Any:
""" """
def resolve_dotnet_token(pe: dnfile.dnPE, token: Token) -> Any:
"""map generic token to string or table row"""
if isinstance(token, StringToken):
return pe.net.user_strings.get_us(token.rid).value

Expand All @@ -67,18 +57,21 @@ def resolve_dotnet_token(pe: dnPE, token: Token) -> Any:
return InvalidToken(token.value)


def read_dotnet_method_body(pe: dnPE, row: MethodDefRow) -> CilMethodBody:
""" """
def read_dotnet_method_body(pe: dnfile.dnPE, row: dnfile.mdtable.MethodDefRow) -> CilMethodBody:
"""read dotnet method body"""
return CilMethodBody(DnfileMethodBodyReader(pe, row))


def get_class_import_name(row: MemberRefRow) -> str:
""" """
def get_class_import_name(row: dnfile.mdtable.MemberRefRow) -> str:
"""get class import name from TypeRef table"""
if not isinstance(row.Class.row, dnfile.mdtable.TypeRefRow):
return ""
# like System.IO.File
return f"{row.Class.row.TypeNamespace}.{row.Class.row.TypeName}"


def get_class_imports(pe: dnPE) -> Generator[Tuple[int, str], None, None]:
"""parse class imports
def get_class_imports(pe: dnfile.dnPE) -> Iterator[Tuple[int, str]]:
"""get class imports from MemberRef table
see https://www.ntcore.com/files/dotnetformat.htm
Expand All @@ -98,14 +91,15 @@ def get_class_imports(pe: dnPE) -> Generator[Tuple[int, str], None, None]:
if not isinstance(row.Class.row, (dnfile.mdtable.TypeRefRow,)):
continue

class_imp = f"{get_class_import_name(row)}::{row.Name}"
token = generate_dotnet_token(MetadataTables.MemberRef.value, rid + 1)
token = generate_dotnet_token(dnfile.enums.MetadataTables.MemberRef.value, rid + 1)
# like System.IO.File::OpenRead
imp = f"{get_class_import_name(row)}::{row.Name}"

yield token, class_imp
yield token, imp


def get_native_imports(pe: dnPE) -> Generator[Tuple[int, str], None, None]:
"""parse native imports
def get_native_imports(pe: dnfile.dnPE) -> Iterator[Tuple[int, str]]:
"""get native p/invoke calls from ImplMap table
see https://www.ntcore.com/files/dotnetformat.htm
Expand All @@ -122,23 +116,23 @@ def get_native_imports(pe: dnPE) -> Generator[Tuple[int, str], None, None]:
dll: str = row.ImportScope.row.Name
symbol: str = row.ImportName

# ECMA says "Each row of the ImplMap table associates a row in the MethodDef table (MemberForwarded) with the
# name of a routine (ImportName) in some unmanaged DLL (ImportScope)"; so we calculate and map the MemberForwarded
# MethodDef table token to help us later record native import method calls made from CIL
token: int = generate_dotnet_token(row.MemberForwarded.table.number, row.MemberForwarded.row_index)

# like Kernel32.dll
if dll and "." in dll:
dll = dll.split(".")[0].lower()
dll = dll.split(".")[0]

# like kernel32.CreateFileA
native_imp: str = f"{dll}.{symbol}"

# ECMA says "Each row of the ImplMap table associates a row in the MethodDef table (MemberForwarded) with the
# name of a routine (ImportName) in some unmanaged DLL (ImportScope)"; so we calculate and map the MemberForwarded
# MethodDef table token to help us later record native import method calls made from CIL
member_forwarded_token = generate_dotnet_token(row.MemberForwarded.table.number, row.MemberForwarded.row_index)
imp: str = f"{dll}.{symbol}"

yield member_forwarded_token, native_imp
yield token, imp


def get_dotnet_imports(pe: dnPE) -> Dict[int, str]:
""" """
def get_dotnet_imports(pe: dnfile.dnPE) -> Dict[int, str]:
"""get class imports and native p/invoke calls"""
imps: Dict[int, str] = {}

for (token, imp) in chain(get_class_imports(pe), get_native_imports(pe)):
Expand All @@ -147,8 +141,8 @@ def get_dotnet_imports(pe: dnPE) -> Dict[int, str]:
return imps


def get_dotnet_methods(pe: dnPE) -> Generator[CilMethodBody, None, None]:
"""read managed methods from MethodDef table"""
def get_dotnet_methods(pe: dnfile.dnPE) -> Iterator[CilMethodBody]:
"""get managed methods from MethodDef table"""
if not hasattr(pe.net.mdtables, "MethodDef"):
return

Expand All @@ -160,7 +154,7 @@ def get_dotnet_methods(pe: dnPE) -> Generator[CilMethodBody, None, None]:
try:
body: CilMethodBody = read_dotnet_method_body(pe, row)
except MethodBodyFormatError:
# TODO: logging?
# TODO
continue

yield body
Loading

0 comments on commit 656776f

Please sign in to comment.