Skip to content

Commit

Permalink
feat: start dotnet feature extraction (#958)
Browse files Browse the repository at this point in the history
  • Loading branch information
mike-hunhoff authored Apr 8, 2022
1 parent c8a772d commit 662be3a
Show file tree
Hide file tree
Showing 15 changed files with 525 additions and 48 deletions.
3 changes: 3 additions & 0 deletions .github/mypy/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,6 @@ ignore_missing_imports = True

[mypy-elftools.*]
ignore_missing_imports = True

[mypy-dncil.*]
ignore_missing_imports = True
Empty file.
70 changes: 70 additions & 0 deletions capa/features/extractors/dnfile/extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

from __future__ import annotations

from typing import TYPE_CHECKING, Any, List, Tuple

if TYPE_CHECKING:
from capa.features.common import Feature

import dnfile

import capa.features.extractors
import capa.features.extractors.dnfile.file
import capa.features.extractors.dnfile.insn
from capa.features.extractors.base_extractor import FeatureExtractor
from capa.features.extractors.dnfile.helpers import get_dotnet_managed_method_bodies


class DnfileFeatureExtractor(FeatureExtractor):
def __init__(self, path: str):
super(DnfileFeatureExtractor, self).__init__()
self.pe: dnfile.dnPE = dnfile.dnPE(path)

# pre-compute these because we'll yield them at *every* scope.
self.global_features: List[Tuple[Feature, int]] = []
self.global_features.extend(capa.features.extractors.dotnetfile.extract_file_os(pe=self.pe))
self.global_features.extend(capa.features.extractors.dotnetfile.extract_file_arch(pe=self.pe))

def get_base_address(self):
return 0x0

def extract_global_features(self):
yield from self.global_features

def extract_file_features(self):
yield from capa.features.extractors.dnfile.file.extract_features(self.pe)

def get_functions(self):
# data structure shared across functions yielded here.
# useful for caching analysis relevant across a single workspace.
ctx = {}
ctx["pe"] = self.pe

for f in get_dotnet_managed_method_bodies(self.pe):
setattr(f, "ctx", ctx)
yield f

def extract_function_features(self, f):
# TODO
yield from []

def get_basic_blocks(self, f):
# each dotnet method is considered 1 basic block
yield f

def extract_basic_block_features(self, f, bb):
# we don't support basic block features
yield from []

def get_instructions(self, f, bb):
yield from f.instructions

def extract_insn_features(self, f, bb, insn):
yield from capa.features.extractors.dnfile.insn.extract_features(f, bb, insn)
40 changes: 40 additions & 0 deletions capa/features/extractors/dnfile/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

from __future__ import annotations

from typing import TYPE_CHECKING, Tuple, Iterator

if TYPE_CHECKING:
import dnfile
from capa.features.common import Feature, Format
from capa.features.file import Import

import capa.features.extractors


def extract_file_import_names(pe: dnfile.dnPE) -> Iterator[Tuple[Import, int]]:
yield from capa.features.extractors.dotnetfile.extract_file_import_names(pe)


def extract_file_format(pe: dnfile.dnPE) -> Iterator[Tuple[Format, int]]:
yield from capa.features.extractors.dotnetfile.extract_file_format(pe=pe)


def extract_features(pe: dnfile.dnPE) -> Iterator[Tuple[Feature, int]]:
for file_handler in FILE_HANDLERS:
for (feature, token) in file_handler(pe):
yield feature, token


FILE_HANDLERS = (
extract_file_import_names,
# TODO extract_file_strings,
# TODO extract_file_function_names,
extract_file_format,
)
169 changes: 169 additions & 0 deletions capa/features/extractors/dnfile/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

from __future__ import annotations

import logging
from typing import Any, Tuple, Iterator, Optional

import dnfile
from dncil.cil.body import CilMethodBody
from dncil.cil.error import MethodBodyFormatError
from dncil.clr.token import Token, StringToken, InvalidToken
from dncil.cil.body.reader import CilMethodBodyReaderBase

logger = logging.getLogger(__name__)

# key indexes to dotnet metadata tables
DOTNET_META_TABLES_BY_INDEX = {table.value: table.name for table in dnfile.enums.MetadataTables}


class DnfileMethodBodyReader(CilMethodBodyReaderBase):
def __init__(self, pe: dnfile.dnPE, row: dnfile.mdtable.MethodDefRow):
self.pe: dnfile.dnPE = pe
self.offset: int = self.pe.get_offset_from_rva(row.Rva)

def read(self, n: int) -> bytes:
data: bytes = self.pe.get_data(self.pe.get_rva_from_offset(self.offset), n)
self.offset += n
return data

def tell(self) -> int:
return self.offset

def seek(self, offset: int) -> int:
self.offset = offset
return self.offset


def calculate_dotnet_token_value(table: int, rid: int) -> int:
return ((table & 0xFF) << Token.TABLE_SHIFT) | (rid & Token.RID_MASK)


def resolve_dotnet_token(pe: dnfile.dnPE, token: Token) -> Any:
"""map generic token to string or table row"""
if isinstance(token, StringToken):
user_string: Optional[str] = read_dotnet_user_string(pe, token)
if user_string is None:
return InvalidToken(token.value)
return user_string

table_name: str = DOTNET_META_TABLES_BY_INDEX.get(token.table, "")
if not table_name:
# table_index is not valid
return InvalidToken(token.value)

table: Any = getattr(pe.net.mdtables, table_name, None)
if table is None:
# table index is valid but table is not present
return InvalidToken(token.value)

try:
return table.rows[token.rid - 1]
except IndexError:
# table index is valid but row index is not valid
return InvalidToken(token.value)


def read_dotnet_method_body(pe: dnfile.dnPE, row: dnfile.mdtable.MethodDefRow) -> Optional[CilMethodBody]:
"""read dotnet method body"""
try:
return CilMethodBody(DnfileMethodBodyReader(pe, row))
except MethodBodyFormatError as e:
logger.warn("failed to parse managed method body @ 0x%08x (%s)" % (row.Rva, e))
return None


def read_dotnet_user_string(pe: dnfile.dnPE, token: StringToken) -> Optional[str]:
"""read user string from #US stream"""
try:
user_string: Optional[dnfile.stream.UserString] = pe.net.user_strings.get_us(token.rid)
except UnicodeDecodeError as e:
logger.warn("failed to decode #US stream index 0x%08x (%s)" % (token.rid, e))
return None
if user_string is None:
return None
return user_string.value


def get_dotnet_managed_imports(pe: dnfile.dnPE) -> Iterator[Tuple[int, str]]:
"""get managed imports from MemberRef table
see https://www.ntcore.com/files/dotnetformat.htm
10 - MemberRef Table
Each row represents an imported method
Class (index into the TypeRef, ModuleRef, MethodDef, TypeSpec or TypeDef tables)
Name (index into String heap)
01 - TypeRef Table
Each row represents an imported class, its namespace and the assembly which contains it
TypeName (index into String heap)
TypeNamespace (index into String heap)
"""
if not hasattr(pe.net.mdtables, "MemberRef"):
return

for (rid, row) in enumerate(pe.net.mdtables.MemberRef):
if not isinstance(row.Class.row, (dnfile.mdtable.TypeRefRow,)):
continue

token: int = calculate_dotnet_token_value(dnfile.enums.MetadataTables.MemberRef.value, rid + 1)
# like System.IO.File::OpenRead
imp: str = f"{row.Class.row.TypeNamespace}.{row.Class.row.TypeName}::{row.Name}"

yield token, imp


def get_dotnet_unmanaged_imports(pe: dnfile.dnPE) -> Iterator[Tuple[int, str]]:
"""get unmanaged imports from ImplMap table
see https://www.ntcore.com/files/dotnetformat.htm
28 - ImplMap Table
ImplMap table holds information about unmanaged methods that can be reached from managed code, using PInvoke dispatch
MemberForwarded (index into the Field or MethodDef table; more precisely, a MemberForwarded coded index)
ImportName (index into the String heap)
ImportScope (index into the ModuleRef table)
"""
if not hasattr(pe.net.mdtables, "ImplMap"):
return

for row in pe.net.mdtables.ImplMap:
dll: str = row.ImportScope.row.Name
symbol: str = row.ImportName

# ECMA says "Each row of the ImplMap table associates a row in the MethodDef table (MemberForwarded) with the
# name of a routine (ImportName) in some unmanaged DLL (ImportScope)"; so we calculate and map the MemberForwarded
# MethodDef table token to help us later record native import method calls made from CIL
token: int = calculate_dotnet_token_value(row.MemberForwarded.table.number, row.MemberForwarded.row_index)

# like Kernel32.dll
if dll and "." in dll:
dll = dll.split(".")[0]

# like kernel32.CreateFileA
imp: str = f"{dll}.{symbol}"

yield token, imp


def get_dotnet_managed_method_bodies(pe: dnfile.dnPE) -> Iterator[CilMethodBody]:
"""get managed methods from MethodDef table"""
if not hasattr(pe.net.mdtables, "MethodDef"):
return

for row in pe.net.mdtables.MethodDef:
if not row.ImplFlags.miIL or any((row.Flags.mdAbstract, row.Flags.mdPinvokeImpl)):
# skip methods that do not have a method body
continue

body: Optional[CilMethodBody] = read_dotnet_method_body(pe, row)
if body is None:
continue

yield body
96 changes: 96 additions & 0 deletions capa/features/extractors/dnfile/insn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

from __future__ import annotations

from typing import TYPE_CHECKING, Dict, Tuple, Iterator, Optional
from itertools import chain

if TYPE_CHECKING:
from dncil.cil.instruction import Instruction
from dncil.cil.body import CilMethodBody
from capa.features.common import Feature

from dncil.clr.token import StringToken
from dncil.cil.opcode import OpCodes

import capa.features.extractors.helpers
from capa.features.insn import API, Number
from capa.features.common import String
from capa.features.extractors.dnfile.helpers import (
read_dotnet_user_string,
get_dotnet_managed_imports,
get_dotnet_unmanaged_imports,
)


def get_imports(ctx: Dict) -> Dict:
if "imports_cache" not in ctx:
ctx["imports_cache"] = {
token: imp
for (token, imp) in chain(get_dotnet_managed_imports(ctx["pe"]), get_dotnet_unmanaged_imports(ctx["pe"]))
}
return ctx["imports_cache"]


def extract_insn_api_features(f: CilMethodBody, bb: CilMethodBody, insn: Instruction) -> Iterator[Tuple[API, int]]:
"""parse instruction API features"""
if insn.opcode not in (OpCodes.Call, OpCodes.Callvirt, OpCodes.Jmp, OpCodes.Calli):
return

name: str = get_imports(f.ctx).get(insn.operand.value, "")
if not name:
return

if "::" in name:
# like System.IO.File::OpenRead
yield API(name), insn.offset
else:
# like kernel32.CreateFileA
dll, _, symbol = name.rpartition(".")
for name_variant in capa.features.extractors.helpers.generate_symbols(dll, symbol):
yield API(name_variant), insn.offset


def extract_insn_number_features(
f: CilMethodBody, bb: CilMethodBody, insn: Instruction
) -> Iterator[Tuple[Number, int]]:
"""parse instruction number features"""
if insn.is_ldc():
yield Number(insn.get_ldc()), insn.offset


def extract_insn_string_features(
f: CilMethodBody, bb: CilMethodBody, insn: Instruction
) -> Iterator[Tuple[String, int]]:
"""parse instruction string features"""
if not insn.is_ldstr():
return

if not isinstance(insn.operand, StringToken):
return

user_string: Optional[str] = read_dotnet_user_string(f.ctx["pe"], insn.operand)
if user_string is None:
return

yield String(user_string), insn.offset


def extract_features(f: CilMethodBody, bb: CilMethodBody, insn: Instruction) -> Iterator[Tuple[Feature, int]]:
"""extract instruction features"""
for inst_handler in INSTRUCTION_HANDLERS:
for (feature, offset) in inst_handler(f, bb, insn):
yield feature, offset


INSTRUCTION_HANDLERS = (
extract_insn_api_features,
extract_insn_number_features,
extract_insn_string_features,
)
Loading

0 comments on commit 662be3a

Please sign in to comment.