Skip to content

Commit

Permalink
Ianhelle/entity updates 2023 09 01 (#718)
Browse files Browse the repository at this point in the history
* Updates to entity definitions:

- Added additional attributes to Host and Account
- Added entities: mailbox_configuration.py, oauth_application.py service_principal.py
- Added alias entities for Ip (IpAddress), Machine (Host) and User (Account) in entities/__init__.py
- Fixed pivot.remove_pivot_funcs to handle aliased entities without throwing exception
- Added some property descriptors for shared or derived attribute values
- Added a couple common functions for listing entity names and classes to entities/__init__.py

* Flake8 suppressions for account.py

* Minor updates to entity properties

* Suppressing pylint warning in mordor_driver
  • Loading branch information
ianhelle authored Oct 6, 2023
1 parent 8314145 commit df22c8b
Show file tree
Hide file tree
Showing 13 changed files with 525 additions and 41 deletions.
96 changes: 96 additions & 0 deletions msticpy/common/data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,99 @@ def _get_dot_attrib(obj, elem_path: str) -> Any:
if cur_node is None:
raise KeyError(f"{elem} value of {elem_path} is not a valid path")
return cur_node


# Descriptors


class SharedProperty:
"""Descriptor to share property between instances of a class."""

def __init__(self, instance_name: str):
"""
Initialize the descriptor.
Parameters
----------
instance_name : str
Name of the instance attribute to use to store the value.
"""
self.instance_name = instance_name

def __get__(self, instance, owner):
"""Return the value of the instance attribute."""
return getattr(instance, self.instance_name, None)

def __set__(self, instance, value):
"""Set the value of the instance attribute."""
setattr(instance, self.instance_name, value)


class FallbackProperty:
"""Descriptor for aliased property with fallback property."""

def __init__(self, instance_name: str, default_name: str):
"""
Initialize the descriptor.
Parameters
----------
instance_name : str
Name of the instance attribute to use to store the value.
default_name : str
Name of the instance attribute to use as a fallback value.
"""
self.instance_name = instance_name
self.default_name = default_name

def __get__(self, instance, owner):
"""Return the value of the instance (or default) attribute."""
return getattr(
instance, self.instance_name, getattr(instance, self.default_name, None)
)

def __set__(self, instance, value):
"""Set the value of the instance attribute."""
setattr(instance, self.instance_name, value)


class SplitProperty:
"""Descriptor for property that is stored as two delimited attributes."""

def __init__(self, inst_left_name: str, inst_right_name: str, split_char: str):
"""
Initialize the descriptor.
Parameters
----------
inst_left_name : str
Name of the instance attribute to use to store the left value.
inst_right_name : str
Name of the instance attribute to use to store the right value.
split_char : str
Character to use to split the value.
"""
self.inst_left_name = inst_left_name
self.inst_right_name = inst_right_name
self.split_char = split_char

def __get__(self, instance, owner):
"""Return the value of the instance attribute."""
left_part = getattr(instance, self.inst_left_name, None)
right_part = getattr(instance, self.inst_right_name, None)
if left_part and right_part:
return f"{left_part}{self.split_char}{right_part}"
return None

def __set__(self, instance, value):
"""Set the value of the instance attribute."""
if value is None:
return
if self.split_char not in value:
setattr(instance, self.inst_left_name, value)
return
left, right = value.split(self.split_char, maxsplit=1)
setattr(instance, self.inst_left_name, left)
setattr(instance, self.inst_right_name, right)
94 changes: 64 additions & 30 deletions msticpy/datamodel/entities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# --------------------------------------------------------------------------
"""Entity sub-package."""
import difflib
from typing import List

from .account import Account
from .alert import Alert
Expand All @@ -29,92 +30,125 @@
from .mail_cluster import MailCluster
from .mail_message import MailMessage
from .mailbox import Mailbox
from .mailbox_configuration import MailboxConfiguration
from .malware import Malware
from .network_connection import NetworkConnection
from .oauth_application import OAuthApplication
from .process import Process
from .registry_key import RegistryKey
from .registry_value import RegistryValue
from .security_group import SecurityGroup
from .service_principal import ServicePrincipal
from .submission_mail import SubmissionMail
from .threat_intelligence import Threatintelligence
from .unknown_entity import UnknownEntity
from .url import Url

from ..soc.incident import Incident # isort: skip


# Defender class equivalents
class User(Account):
"""Alias for Account."""


class Ip(IpAddress):
"""Alias for IpAddress."""


class Machine(Host):
"""Alias for Host."""


# Dictionary to map text names of types to the class.
Entity.ENTITY_NAME_MAP.update(
{
"account": Account,
"azureresource": AzureResource,
"alert": Alert,
"alerts": Alert,
"azure-resource": AzureResource,
"host": Host,
"process": Process,
"file": File,
"cloudapplication": CloudApplication,
"azureresource": AzureResource,
"cloud-application": CloudApplication,
"cloud-logon-session": CloudLogonSession,
"cloudapplication": CloudApplication,
"cloudlogonsession": CloudLogonSession,
"dns": Dns,
"dnsresolve": Dns,
"ipaddress": IpAddress,
"file": File,
"filehash": FileHash,
"geolocation": GeoLocation,
"host-logon-session": HostLogonSession,
"host": Host,
"hostlogonsession": HostLogonSession,
"incident": Incident,
"iotdevice": IoTDevice,
"ip": IpAddress,
"networkconnection": NetworkConnection,
"network-connection": NetworkConnection,
"mailbox": Mailbox,
"mail-message": MailMessage,
"mailmessage": MailMessage,
"ipaddress": IpAddress,
"location": GeoLocation,
"machine": Machine,
"mail-cluster": MailCluster,
"mail-message": MailMessage,
"mailbox": Mailbox,
"mailcluster": MailCluster,
"mailboxconfiguration": MailboxConfiguration,
"mailmessage": MailMessage,
"malware": Malware,
"network-connection": NetworkConnection,
"networkconnection": NetworkConnection,
"oauthapplication": OAuthApplication,
"process": Process,
"registry-key": RegistryKey,
"registrykey": RegistryKey,
"registry-value": RegistryValue,
"registrykey": RegistryKey,
"registryvalue": RegistryValue,
"host-logon-session": HostLogonSession,
"hostlogonsession": HostLogonSession,
"filehash": FileHash,
"security-group": SecurityGroup,
"securitygroup": SecurityGroup,
"ServicePrincipal": ServicePrincipal,
"SubmissionMail": SubmissionMail,
"alerts": Alert,
"alert": Alert,
"threatintelligence": Threatintelligence,
"url": Url,
"unknown": UnknownEntity,
"geolocation": GeoLocation,
"location": GeoLocation,
"incident": Incident,
"cloud-logon-session": CloudLogonSession,
"url": Url,
"user": User,
}
)


def find_entity(entity):
"""Find entity name."""
entity_cf = entity.casefold()
entity_classes = {
entity_cls_dict = {
cls.__name__.casefold(): cls for cls in Entity.ENTITY_NAME_MAP.values()
}
if entity_cf in Entity.ENTITY_NAME_MAP:
print(f"Match found '{Entity.ENTITY_NAME_MAP[entity].__name__}'")
return Entity.ENTITY_NAME_MAP[entity]
if entity_cf in entity_classes:
print(f"Match found '{entity_classes[entity_cf].__name__}'")
return entity_classes[entity_cf]
if entity_cf in entity_cls_dict:
print(f"Match found '{entity_cls_dict[entity_cf].__name__}'")
return entity_cls_dict[entity_cf]
# Try to find the closest matches
closest = difflib.get_close_matches(entity, entity_classes.keys(), cutoff=0.4)
closest = difflib.get_close_matches(entity, entity_cls_dict.keys(), cutoff=0.4)
mssg = [f"No exact match found for '{entity}'. "]
if len(closest) == 1:
mssg.append(f"Closest match is '{entity_classes[closest[0]].__name__}'")
mssg.append(f"Closest match is '{entity_cls_dict[closest[0]].__name__}'")
elif closest:
match_list = [f"'{entity_classes[mtch].__name__}'" for mtch in closest]
match_list = [f"'{entity_cls_dict[match].__name__}'" for match in closest]
mssg.append(f"Closest matches are {', '.join(match_list)}")
else:
mssg.extend(
[
"No close match found. Entities available:",
*(cls.__name__ for cls in entity_classes.values()),
*(cls.__name__ for cls in entity_cls_dict.values()),
]
)
print("\n".join(mssg))
return None


def list_entities() -> List[str]:
"""List entities."""
return sorted([cls.__name__ for cls in set(Entity.ENTITY_NAME_MAP.values())])


def entity_classes() -> List[type]:
"""Return a list of all entity classes."""
return list(set(Entity.ENTITY_NAME_MAP.values()))
27 changes: 25 additions & 2 deletions msticpy/datamodel/entities/account.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Any, Mapping, Optional

from ..._version import VERSION
from ...common.data_types import SplitProperty
from ...common.utility import export
from .entity import Entity
from .host import Host
Expand Down Expand Up @@ -49,10 +50,14 @@ class Account(Entity):
Account DisplayName
ObjectGuid : str
The object ID of the user account
Upn : str
The User principal name of the account
"""

ID_PROPERTIES = ["QualifiedName", "Sid", "AadUserId", "PUID", "ObjectGuid"]
Upn = SplitProperty("Name", "UPNSuffix", "@")
UPN = Upn

def __init__(
self,
Expand Down Expand Up @@ -92,12 +97,15 @@ def __init__(
self.LogonId: Optional[str] = None
self.Sid: Optional[str] = None
self.AadTenantId: Optional[str] = None
self.AadUserId: Optional[str] = None
self._AadUserId: Optional[str] = None
self.PUID: Optional[str] = None
self.IsDomainJoined: bool = False
self.DisplayName: Optional[str] = None
self.ObjectGuid: Optional[str] = None

if "Upn" in kwargs:
self.Upn = kwargs.pop("Upn")
if "AadUserId" in kwargs:
self.AadUserId = kwargs.pop("AadUserId")
super().__init__(src_entity=src_entity, **kwargs)
if src_event is not None:
self._create_from_event(src_event, role)
Expand All @@ -112,6 +120,16 @@ def name_str(self) -> str:
"""Return Entity Name."""
return self.Name or self.DisplayName or "Unknown Account"

@property
def AadUserId(self) -> Optional[str]: # noqa: N802
"""Return the Azure AD user ID or the ObjectGuid."""
return self._AadUserId or self.ObjectGuid

@AadUserId.setter
def AadUserId(self, value: str): # noqa: N802
"""Set the Azure AD user ID."""
self._AadUserId = value

@property
def qualified_name(self) -> str:
"""Windows qualified account name."""
Expand Down Expand Up @@ -176,6 +194,10 @@ def _create_from_event(self, src_event, role):
self.UPNSuffix = src_event["UpnSuffix"]
else:
self.UPNSuffix = None
if "Upn" in src_event:
self.Upn = src_event["Upn"]
if "UPN" in src_event:
self.Upn = src_event["UPN"]

_entity_schema = {
# Name (type System.String)
Expand All @@ -202,6 +224,7 @@ def _create_from_event(self, src_event, role):
# DisplayName (type System.String)
"DisplayName": None,
"ObjectGuid": None,
"Upn": None,
"TimeGenerated": None,
"StartTime": None,
"EndTime": None,
Expand Down
2 changes: 1 addition & 1 deletion msticpy/datamodel/entities/cloud_logon_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class CloudLogonSession(Entity):
Attributes
----------
SessionId : str
The loggon session ID
The logon session ID
Account : str
The Account
UserAgent : str
Expand Down
8 changes: 5 additions & 3 deletions msticpy/datamodel/entities/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ def __contains__(self, key: str):

def __getattr__(self, name: str):
"""Return the value of the named property 'name'."""
props = ["name_str", "description_str"]
if name in self._entity_schema or name in props:
if name in self._entity_schema or name in {"name_str", "description_str"}:
return None
raise AttributeError(f"{name} is not a valid attribute.")

Expand Down Expand Up @@ -308,7 +307,9 @@ def __hash__(self) -> int:
"""Return the hash of the entity based on non-empty property values."""
return hash(
" ".join(
f"{prop}:{val}" for prop, val in self.properties.items() if str(val)
f"{prop}:{val}"
for prop, val in self.properties.items()
if str(val) and prop not in ("edges", "TimeGenerated")
)
)

Expand Down Expand Up @@ -344,6 +345,7 @@ def is_equivalent(self, other: Any) -> bool:
and self.__dict__[prop]
and other.__dict__[prop]
for prop in self.__dict__ # pylint: disable=consider-using-dict-items
if prop not in ("edges", "TimeGenerated") and not prop.startswith("_")
)

def merge(self, other: Any) -> "Entity":
Expand Down
2 changes: 2 additions & 0 deletions msticpy/datamodel/entities/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Any, List, Mapping, Optional

from ..._version import VERSION
from ...common.data_types import SharedProperty
from ...common.utility import export
from .entity import Entity
from .entity_enums import Algorithm, OSFamily
Expand Down Expand Up @@ -49,6 +50,7 @@ class File(Entity):
"""

ID_PROPERTIES = ["FullPath", "Sha1", "Sha256", "Sha256ac", "Md5"]
FolderPath = SharedProperty("Directory")

def __init__(
self,
Expand Down
Loading

0 comments on commit df22c8b

Please sign in to comment.