diff --git a/docs/source/getting_started/Installing.rst b/docs/source/getting_started/Installing.rst index c87478afa..60b192df2 100644 --- a/docs/source/getting_started/Installing.rst +++ b/docs/source/getting_started/Installing.rst @@ -331,11 +331,12 @@ Windows Source to Isolated Linux Environment `__ script. Example: + .. code-block:: powershell python \path\to\python\file --python-version "3.8.5" --module-name "msticpy[sentinel]" --module-version "2.7.0" --directory \path\to\destination -3. Zip and copy the directory folder to the isolated environment. +3. Copy the directory folder to the isolated environment. 4. From the isolated environment, unzip if needed and then you will need to run the following for each .whl file: diff --git a/msticpy/_version.py b/msticpy/_version.py index 28e81634a..8e8859a39 100644 --- a/msticpy/_version.py +++ b/msticpy/_version.py @@ -1,2 +1,2 @@ """Version file.""" -VERSION = "2.8.0.pre1" +VERSION = "2.8.0" diff --git a/msticpy/common/data_types.py b/msticpy/common/data_types.py index 1c162e690..d1a5b042a 100644 --- a/msticpy/common/data_types.py +++ b/msticpy/common/data_types.py @@ -76,3 +76,99 @@ def _get_dot_attrib(obj, elem_path: str) -> Any: if cur_node is None: raise KeyError(f"{elem} value of {elem_path} is not a valid path") return cur_node + + +# Descriptors + + +class SharedProperty: + """Descriptor to share property between instances of a class.""" + + def __init__(self, instance_name: str): + """ + Initialize the descriptor. + + Parameters + ---------- + instance_name : str + Name of the instance attribute to use to store the value. + + """ + self.instance_name = instance_name + + def __get__(self, instance, owner): + """Return the value of the instance attribute.""" + return getattr(instance, self.instance_name, None) + + def __set__(self, instance, value): + """Set the value of the instance attribute.""" + setattr(instance, self.instance_name, value) + + +class FallbackProperty: + """Descriptor for aliased property with fallback property.""" + + def __init__(self, instance_name: str, default_name: str): + """ + Initialize the descriptor. + + Parameters + ---------- + instance_name : str + Name of the instance attribute to use to store the value. + default_name : str + Name of the instance attribute to use as a fallback value. + + """ + self.instance_name = instance_name + self.default_name = default_name + + def __get__(self, instance, owner): + """Return the value of the instance (or default) attribute.""" + return getattr( + instance, self.instance_name, getattr(instance, self.default_name, None) + ) + + def __set__(self, instance, value): + """Set the value of the instance attribute.""" + setattr(instance, self.instance_name, value) + + +class SplitProperty: + """Descriptor for property that is stored as two delimited attributes.""" + + def __init__(self, inst_left_name: str, inst_right_name: str, split_char: str): + """ + Initialize the descriptor. + + Parameters + ---------- + inst_left_name : str + Name of the instance attribute to use to store the left value. + inst_right_name : str + Name of the instance attribute to use to store the right value. + split_char : str + Character to use to split the value. + """ + self.inst_left_name = inst_left_name + self.inst_right_name = inst_right_name + self.split_char = split_char + + def __get__(self, instance, owner): + """Return the value of the instance attribute.""" + left_part = getattr(instance, self.inst_left_name, None) + right_part = getattr(instance, self.inst_right_name, None) + if left_part and right_part: + return f"{left_part}{self.split_char}{right_part}" + return None + + def __set__(self, instance, value): + """Set the value of the instance attribute.""" + if value is None: + return + if self.split_char not in value: + setattr(instance, self.inst_left_name, value) + return + left, right = value.split(self.split_char, maxsplit=1) + setattr(instance, self.inst_left_name, left) + setattr(instance, self.inst_right_name, right) diff --git a/msticpy/common/wsconfig.py b/msticpy/common/wsconfig.py index 3bf3b28a8..f678b50b9 100644 --- a/msticpy/common/wsconfig.py +++ b/msticpy/common/wsconfig.py @@ -248,17 +248,34 @@ def from_settings(cls, settings: Dict[str, Any]) -> "WorkspaceConfig": @classmethod def from_connection_string(cls, connection_str: str) -> "WorkspaceConfig": """Create a WorkstationConfig from a connection string.""" - tenant_regex = r".*tenant\(\s?['\"](?P[\w]+)['\"].*" - workspace_regex = r".*workspace\(\s?['\"](?P[\w]+)['\"].*" - tenant_id = workspace_id = None - if match := re.match(tenant_regex, connection_str): + tenant_regex = r""" + .*tenant\s?=\s?['\"]\{? + (?P[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) + \}?['\"].*""" + workspace_regex = r""" + .*workspace\s?=\s?['\"]\{? + (?P[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) + \}?['\"].*""" + ws_name_regex = r".*alias\s?=\s?['\"]\{?(?P\w+)['\"].*" + + tenant_id = workspace_id = workspace_name = None + if match := re.match(tenant_regex, connection_str, re.IGNORECASE | re.VERBOSE): tenant_id = match.groupdict()["tenant_id"] - if match := re.match(workspace_regex, connection_str): + else: + raise ValueError("Could not find tenant ID in connection string.") + if match := re.match( + workspace_regex, connection_str, re.IGNORECASE | re.VERBOSE + ): workspace_id = match.groupdict()["workspace_id"] + else: + raise ValueError("Could not find workspace ID in connection string.") + if match := re.match(ws_name_regex, connection_str, re.IGNORECASE | re.VERBOSE): + workspace_name = match.groupdict()["workspace_name"] return cls( config={ cls.CONF_WS_ID_KEY: workspace_id, # type: ignore[dict-item] cls.CONF_TENANT_ID_KEY: tenant_id, # type: ignore[dict-item] + cls.CONF_WS_NAME_KEY: workspace_name, # type: ignore[dict-item] } ) diff --git a/msticpy/data/drivers/azure_monitor_driver.py b/msticpy/data/drivers/azure_monitor_driver.py index 7475aa3af..a5f6fe3b2 100644 --- a/msticpy/data/drivers/azure_monitor_driver.py +++ b/msticpy/data/drivers/azure_monitor_driver.py @@ -14,6 +14,7 @@ azure/monitor-query-readme?view=azure-python """ +import contextlib import logging from datetime import datetime from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, cast @@ -409,10 +410,11 @@ def _get_workspaces(self, connection_str: Optional[str] = None, **kwargs): ) elif isinstance(connection_str, str): self._def_connection_str = connection_str - ws_config = WorkspaceConfig.from_connection_string(connection_str) - logger.info( - "WorkspaceConfig created from connection_str %s", connection_str - ) + with contextlib.suppress(ValueError): + ws_config = WorkspaceConfig.from_connection_string(connection_str) + logger.info( + "WorkspaceConfig created from connection_str %s", connection_str + ) elif isinstance(connection_str, WorkspaceConfig): logger.info("WorkspaceConfig as parameter %s", connection_str.workspace_id) ws_config = connection_str diff --git a/msticpy/data/drivers/mordor_driver.py b/msticpy/data/drivers/mordor_driver.py index 77956654d..ab44812eb 100644 --- a/msticpy/data/drivers/mordor_driver.py +++ b/msticpy/data/drivers/mordor_driver.py @@ -441,7 +441,7 @@ def _to_datetime(date_val) -> datetime: DS_PREFIX = "https://raw.githubusercontent.com/OTRF/Security-Datasets/master/datasets/" -# pylint: disable=not-an-iterable, no-member +# pylint: disable=not-an-iterable, no-member, too-many-instance-attributes @attr.s(auto_attribs=True) diff --git a/msticpy/datamodel/entities/__init__.py b/msticpy/datamodel/entities/__init__.py index 2d734dd71..079988265 100644 --- a/msticpy/datamodel/entities/__init__.py +++ b/msticpy/datamodel/entities/__init__.py @@ -5,6 +5,7 @@ # -------------------------------------------------------------------------- """Entity sub-package.""" import difflib +from typing import List from .account import Account from .alert import Alert @@ -29,12 +30,15 @@ from .mail_cluster import MailCluster from .mail_message import MailMessage from .mailbox import Mailbox +from .mailbox_configuration import MailboxConfiguration from .malware import Malware from .network_connection import NetworkConnection +from .oauth_application import OAuthApplication from .process import Process from .registry_key import RegistryKey from .registry_value import RegistryValue from .security_group import SecurityGroup +from .service_principal import ServicePrincipal from .submission_mail import SubmissionMail from .threat_intelligence import Threatintelligence from .unknown_entity import UnknownEntity @@ -42,49 +46,69 @@ from ..soc.incident import Incident # isort: skip + +# Defender class equivalents +class User(Account): + """Alias for Account.""" + + +class Ip(IpAddress): + """Alias for IpAddress.""" + + +class Machine(Host): + """Alias for Host.""" + + # Dictionary to map text names of types to the class. Entity.ENTITY_NAME_MAP.update( { "account": Account, - "azureresource": AzureResource, + "alert": Alert, + "alerts": Alert, "azure-resource": AzureResource, - "host": Host, - "process": Process, - "file": File, - "cloudapplication": CloudApplication, + "azureresource": AzureResource, "cloud-application": CloudApplication, + "cloud-logon-session": CloudLogonSession, + "cloudapplication": CloudApplication, + "cloudlogonsession": CloudLogonSession, "dns": Dns, "dnsresolve": Dns, - "ipaddress": IpAddress, + "file": File, + "filehash": FileHash, + "geolocation": GeoLocation, + "host-logon-session": HostLogonSession, + "host": Host, + "hostlogonsession": HostLogonSession, + "incident": Incident, "iotdevice": IoTDevice, "ip": IpAddress, - "networkconnection": NetworkConnection, - "network-connection": NetworkConnection, - "mailbox": Mailbox, - "mail-message": MailMessage, - "mailmessage": MailMessage, + "ipaddress": IpAddress, + "location": GeoLocation, + "machine": Machine, "mail-cluster": MailCluster, + "mail-message": MailMessage, + "mailbox": Mailbox, "mailcluster": MailCluster, + "mailboxconfiguration": MailboxConfiguration, + "mailmessage": MailMessage, "malware": Malware, + "network-connection": NetworkConnection, + "networkconnection": NetworkConnection, + "oauthapplication": OAuthApplication, + "process": Process, "registry-key": RegistryKey, - "registrykey": RegistryKey, "registry-value": RegistryValue, + "registrykey": RegistryKey, "registryvalue": RegistryValue, - "host-logon-session": HostLogonSession, - "hostlogonsession": HostLogonSession, - "filehash": FileHash, "security-group": SecurityGroup, "securitygroup": SecurityGroup, + "ServicePrincipal": ServicePrincipal, "SubmissionMail": SubmissionMail, - "alerts": Alert, - "alert": Alert, "threatintelligence": Threatintelligence, - "url": Url, "unknown": UnknownEntity, - "geolocation": GeoLocation, - "location": GeoLocation, - "incident": Incident, - "cloud-logon-session": CloudLogonSession, + "url": Url, + "user": User, } ) @@ -92,29 +116,39 @@ def find_entity(entity): """Find entity name.""" entity_cf = entity.casefold() - entity_classes = { + entity_cls_dict = { cls.__name__.casefold(): cls for cls in Entity.ENTITY_NAME_MAP.values() } if entity_cf in Entity.ENTITY_NAME_MAP: print(f"Match found '{Entity.ENTITY_NAME_MAP[entity].__name__}'") return Entity.ENTITY_NAME_MAP[entity] - if entity_cf in entity_classes: - print(f"Match found '{entity_classes[entity_cf].__name__}'") - return entity_classes[entity_cf] + if entity_cf in entity_cls_dict: + print(f"Match found '{entity_cls_dict[entity_cf].__name__}'") + return entity_cls_dict[entity_cf] # Try to find the closest matches - closest = difflib.get_close_matches(entity, entity_classes.keys(), cutoff=0.4) + closest = difflib.get_close_matches(entity, entity_cls_dict.keys(), cutoff=0.4) mssg = [f"No exact match found for '{entity}'. "] if len(closest) == 1: - mssg.append(f"Closest match is '{entity_classes[closest[0]].__name__}'") + mssg.append(f"Closest match is '{entity_cls_dict[closest[0]].__name__}'") elif closest: - match_list = [f"'{entity_classes[mtch].__name__}'" for mtch in closest] + match_list = [f"'{entity_cls_dict[match].__name__}'" for match in closest] mssg.append(f"Closest matches are {', '.join(match_list)}") else: mssg.extend( [ "No close match found. Entities available:", - *(cls.__name__ for cls in entity_classes.values()), + *(cls.__name__ for cls in entity_cls_dict.values()), ] ) print("\n".join(mssg)) return None + + +def list_entities() -> List[str]: + """List entities.""" + return sorted([cls.__name__ for cls in set(Entity.ENTITY_NAME_MAP.values())]) + + +def entity_classes() -> List[type]: + """Return a list of all entity classes.""" + return list(set(Entity.ENTITY_NAME_MAP.values())) diff --git a/msticpy/datamodel/entities/account.py b/msticpy/datamodel/entities/account.py index 8a0870205..c98f25b8a 100644 --- a/msticpy/datamodel/entities/account.py +++ b/msticpy/datamodel/entities/account.py @@ -7,6 +7,7 @@ from typing import Any, Mapping, Optional from ..._version import VERSION +from ...common.data_types import SplitProperty from ...common.utility import export from .entity import Entity from .host import Host @@ -49,10 +50,14 @@ class Account(Entity): Account DisplayName ObjectGuid : str The object ID of the user account + Upn : str + The User principal name of the account """ ID_PROPERTIES = ["QualifiedName", "Sid", "AadUserId", "PUID", "ObjectGuid"] + Upn = SplitProperty("Name", "UPNSuffix", "@") + UPN = Upn def __init__( self, @@ -92,12 +97,15 @@ def __init__( self.LogonId: Optional[str] = None self.Sid: Optional[str] = None self.AadTenantId: Optional[str] = None - self.AadUserId: Optional[str] = None + self._AadUserId: Optional[str] = None self.PUID: Optional[str] = None self.IsDomainJoined: bool = False self.DisplayName: Optional[str] = None self.ObjectGuid: Optional[str] = None - + if "Upn" in kwargs: + self.Upn = kwargs.pop("Upn") + if "AadUserId" in kwargs: + self.AadUserId = kwargs.pop("AadUserId") super().__init__(src_entity=src_entity, **kwargs) if src_event is not None: self._create_from_event(src_event, role) @@ -112,6 +120,16 @@ def name_str(self) -> str: """Return Entity Name.""" return self.Name or self.DisplayName or "Unknown Account" + @property + def AadUserId(self) -> Optional[str]: # noqa: N802 + """Return the Azure AD user ID or the ObjectGuid.""" + return self._AadUserId or self.ObjectGuid + + @AadUserId.setter + def AadUserId(self, value: str): # noqa: N802 + """Set the Azure AD user ID.""" + self._AadUserId = value + @property def qualified_name(self) -> str: """Windows qualified account name.""" @@ -176,6 +194,10 @@ def _create_from_event(self, src_event, role): self.UPNSuffix = src_event["UpnSuffix"] else: self.UPNSuffix = None + if "Upn" in src_event: + self.Upn = src_event["Upn"] + if "UPN" in src_event: + self.Upn = src_event["UPN"] _entity_schema = { # Name (type System.String) @@ -202,6 +224,7 @@ def _create_from_event(self, src_event, role): # DisplayName (type System.String) "DisplayName": None, "ObjectGuid": None, + "Upn": None, "TimeGenerated": None, "StartTime": None, "EndTime": None, diff --git a/msticpy/datamodel/entities/cloud_logon_session.py b/msticpy/datamodel/entities/cloud_logon_session.py index 1f86e7b55..5e238f12b 100644 --- a/msticpy/datamodel/entities/cloud_logon_session.py +++ b/msticpy/datamodel/entities/cloud_logon_session.py @@ -25,7 +25,7 @@ class CloudLogonSession(Entity): Attributes ---------- SessionId : str - The loggon session ID + The logon session ID Account : str The Account UserAgent : str diff --git a/msticpy/datamodel/entities/entity.py b/msticpy/datamodel/entities/entity.py index 2e73749e2..fd3860501 100644 --- a/msticpy/datamodel/entities/entity.py +++ b/msticpy/datamodel/entities/entity.py @@ -215,8 +215,7 @@ def __contains__(self, key: str): def __getattr__(self, name: str): """Return the value of the named property 'name'.""" - props = ["name_str", "description_str"] - if name in self._entity_schema or name in props: + if name in self._entity_schema or name in {"name_str", "description_str"}: return None raise AttributeError(f"{name} is not a valid attribute.") @@ -308,7 +307,9 @@ def __hash__(self) -> int: """Return the hash of the entity based on non-empty property values.""" return hash( " ".join( - f"{prop}:{val}" for prop, val in self.properties.items() if str(val) + f"{prop}:{val}" + for prop, val in self.properties.items() + if str(val) and prop not in ("edges", "TimeGenerated") ) ) @@ -344,6 +345,7 @@ def is_equivalent(self, other: Any) -> bool: and self.__dict__[prop] and other.__dict__[prop] for prop in self.__dict__ # pylint: disable=consider-using-dict-items + if prop not in ("edges", "TimeGenerated") and not prop.startswith("_") ) def merge(self, other: Any) -> "Entity": diff --git a/msticpy/datamodel/entities/file.py b/msticpy/datamodel/entities/file.py index 1b327e40d..3acf6eae6 100644 --- a/msticpy/datamodel/entities/file.py +++ b/msticpy/datamodel/entities/file.py @@ -7,6 +7,7 @@ from typing import Any, List, Mapping, Optional from ..._version import VERSION +from ...common.data_types import SharedProperty from ...common.utility import export from .entity import Entity from .entity_enums import Algorithm, OSFamily @@ -49,6 +50,7 @@ class File(Entity): """ ID_PROPERTIES = ["FullPath", "Sha1", "Sha256", "Sha256ac", "Md5"] + FolderPath = SharedProperty("Directory") def __init__( self, diff --git a/msticpy/datamodel/entities/host.py b/msticpy/datamodel/entities/host.py index 6e5c69d89..5f1f85bef 100644 --- a/msticpy/datamodel/entities/host.py +++ b/msticpy/datamodel/entities/host.py @@ -7,6 +7,7 @@ from typing import Any, Mapping, Optional from ..._version import VERSION +from ...common.data_types import SplitProperty from ...common.utility import export from .entity import Entity from .entity_enums import OSFamily @@ -46,7 +47,8 @@ class Host(Entity): """ - ID_PROPERTIES = ["fqdn", "AzureID", "OMSAgentID"] + ID_PROPERTIES = ["fqdn", "AzureID", "OMSAgentID", "DeviceId"] + DeviceName = SplitProperty("HostName", "DnsDomain", ".") def __init__( self, @@ -83,6 +85,7 @@ def __init__( self.OSFamily: OSFamily = OSFamily.Windows self.OSVersion: Optional[str] = None self.IsDomainJoined: bool = False + self.DeviceId: Optional[str] = None super().__init__(src_entity=src_entity, **kwargs) self._computer = None @@ -133,6 +136,9 @@ def _create_from_event(self, src_event): if "DnsDomain" in src_event: self.DnsDomain = src_event["DnsDomain"] self.NetBiosName = self.HostName + self.AzureID = src_event.get("AzureID") + self.DeviceId = src_event.get("DeviceId") + self.DeviceName = src_event.get("DeviceName") _entity_schema = { # DnsDomain (type System.String) @@ -152,6 +158,8 @@ def _create_from_event(self, src_event): "OSFamily": "OSFamily", # IsDomainJoined (type System.Nullable`1[System.Boolean]) "IsDomainJoined": None, + "DeviceId": None, + "DeviceName": None, "TimeGenerated": None, "StartTime": None, "EndTime": None, diff --git a/msticpy/datamodel/entities/mailbox.py b/msticpy/datamodel/entities/mailbox.py index a12b0fc06..582504073 100644 --- a/msticpy/datamodel/entities/mailbox.py +++ b/msticpy/datamodel/entities/mailbox.py @@ -79,6 +79,8 @@ def _create_from_event(self, src_event): self.MailboxPrimaryAddress = src_event.get("MailboxPrimaryAddress") self.Upn = src_event.get("Upn") self.DisplayName = src_event.get("DisplayName") + self.ExternalDirectoryObjectId = src_event.get("ExternalDirectoryObjectId") + self.RiskLevel = src_event.get("RiskLevel") @property def description_str(self): diff --git a/msticpy/datamodel/entities/mailbox_configuration.py b/msticpy/datamodel/entities/mailbox_configuration.py new file mode 100644 index 000000000..de71e3006 --- /dev/null +++ b/msticpy/datamodel/entities/mailbox_configuration.py @@ -0,0 +1,113 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +"""MailboxConfiguration Entity class.""" +from typing import Any, Mapping, Optional + +from ..._version import VERSION +from ...common.utility import export +from .entity import Entity + +__version__ = VERSION +__author__ = "Ian Hellen" + + +# pylint: disable=invalid-name + + +@export +class MailboxConfiguration(Entity): + """ + MailboxConfiguration Entity class. + + Attributes + ---------- + ConfigType : str + The MailboxConfigurationType represented by this entity. + Could be of type: MailForwardingRule, OwaSettings, EWSSettings MailDelegation + or UserInboxRule + MailboxPrimaryAddress : str + PrimaryAddress of the Mailbox + DisplayName : str + DisplayName of the Mailbox + Upn : str + Upn of the Mailbox + ConfigId : str + A mailbox can have more than one configuration entity of same configurationType. + This unique id equivalent to URN. + ExternalDirectoryObjectId : str + ExternalDirectoryObjectId of the Mailbox + + """ + + ID_PROPERTIES = ["ConfigId"] + + def __init__( + self, + src_entity: Mapping[str, Any] = None, + src_event: Mapping[str, Any] = None, + **kwargs, + ): + """ + Create a new instance of the entity type. + + Parameters + ---------- + src_entity : Mapping[str, Any], optional + Create entity from existing entity or + other mapping object that implements entity properties. + (the default is None) + src_event : Mapping[str, Any], optional + Create entity from event properties + (the default is None) + + Other Parameters + ---------------- + kwargs : Dict[str, Any] + Supply the entity properties as a set of + kw arguments. + + """ + self.MailboxPrimaryAddress: Optional[str] = None + self.DisplayName: Optional[str] = None + self.Upn: Optional[str] = None + self.ExternalDirectoryObjectId: Optional[str] = None + self.ConfigType: Optional[str] = None + self.ConfigId: Optional[str] = None + + super().__init__(src_entity=src_entity, **kwargs) + if src_event: + self._create_from_event(src_event) + + def _create_from_event(self, src_event): + self.MailboxPrimaryAddress = src_event.get("MailboxPrimaryAddress") + self.Upn = src_event.get("Upn") + self.DisplayName = src_event.get("DisplayName") + self.ConfigId = src_event.get("ConfigId") + self.ExternalDirectoryObjectId = src_event.get("ExternalDirectoryObjectId") + self.ConfigType = src_event.get("ConfigType") + + @property + def description_str(self): + """Return Entity Description.""" + return ( + f"{self.MailboxPrimaryAddress} - {self.ConfigId}" or self.__class__.__name__ + ) + + @property + def name_str(self) -> str: + """Return Entity Name.""" + return self.MailboxPrimaryAddress or self.__class__.__name__ + + _entity_schema = { + "MailboxPrimaryAddress": None, + "DisplayName": None, + "Upn": None, + "ExternalDirectoryObjectId": None, + "RiskLevel": None, + "TimeGenerated": None, + "StartTime": None, + "EndTime": None, + } diff --git a/msticpy/datamodel/entities/oauth_application.py b/msticpy/datamodel/entities/oauth_application.py new file mode 100644 index 000000000..28d9bd8c3 --- /dev/null +++ b/msticpy/datamodel/entities/oauth_application.py @@ -0,0 +1,105 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +"""OAuthApplication Entity class.""" +from typing import Any, List, Mapping, Optional + +from ..._version import VERSION +from ...common.utility import export +from .entity import Entity + +__version__ = VERSION +__author__ = "Ian Hellen" + + +# pylint: disable=invalid-name + + +@export +class OAuthApplication(Entity): + """ + OAuthApplication Entity class. + + Attributes + ---------- + OAuthAppId : str + Global ID of the application (in AAD, "Application ID" of the application object) + OAuthObjectId : str + Object ID of the service principal in AAD + Name : str + Name of the app + TenantId : str + AAD tenant ID in which the app was installed + PublisherName : str + The publisher name of the app + Risk: str + The app risk - like Low, Medium, High or Unknown + Permissions: List[str] + List of permissions that were requested by the application, and their severities + RedirectURLs : List[str] + List of redirect urls + AuthorizedBy : int + How many users consented the app + + """ + + ID_PROPERTIES = ["OAuthAppId", "OAuthObjectId"] + + def __init__(self, src_entity: Mapping[str, Any] = None, **kwargs): + """ + Create a new instance of the entity type. + + Parameters + ---------- + src_entity : Mapping[str, Any], optional + Create entity from existing entity or + other mapping object that implements entity properties. + (the default is None) + + Other Parameters + ---------------- + kwargs : Dict[str, Any] + Supply the entity properties as a set of + kw arguments. + + """ + self.OAuthAppId: Optional[str] = None + self.OAuthObjectId: Optional[str] = None + self.Name: Optional[str] = None + self.TenantId: Optional[str] = None + self.PublisherName: Optional[str] = None + self.Risk: Optional[str] = None + self.Permissions: List[str] = [] + self.RedirectURLs: List[str] = [] + self.AuthorizedBy: int = 0 + + super().__init__(src_entity=src_entity, **kwargs) + + @property + def description_str(self): + """Return Entity Description.""" + return self.Name or self.OAuthAppId or self.__class__.__name__ + + @property + def name_str(self) -> str: + """Return Entity Name.""" + return self.Name or self.OAuthAppId or self.__class__.__name__ + + _entity_schema = { + # OAuthAppId (type System.String) + "OAuthAppId": None, + # OAuthObjectId (type System.String) + "OAuthObjectId": None, + # TenantId (type System.String) + "TenantId": None, + "PublisherName": None, + "Risk": None, + "Permissions": None, + "RedirectURLs": None, + "AuthorizedBy": None, + "TimeGenerated": None, + "StartTime": None, + "EndTime": None, + } diff --git a/msticpy/datamodel/entities/service_principal.py b/msticpy/datamodel/entities/service_principal.py new file mode 100644 index 000000000..ad10fac3f --- /dev/null +++ b/msticpy/datamodel/entities/service_principal.py @@ -0,0 +1,96 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- +"""ServicePrincipal Entity class.""" +from typing import Any, Mapping, Optional + +from ..._version import VERSION +from ...common.utility import export +from .entity import Entity + +__version__ = VERSION +__author__ = "Ian Hellen" + + +# pylint: disable=invalid-name + + +@export +class ServicePrincipal(Entity): + """ + ServicePrincipal Entity class. + + Attributes + ---------- + ServicePrincipalName : str + ServicePrincipal DistinguishedName + ServicePrincipalObjectId : str + ServicePrincipal UUID + AppId : str + SecurityGroup ObjectGuid + AppOwnerTenantId : str + The tenant id where the application is registered. + This is applicable only to service principals backed by applications. + TenantId : str + The AAD tenant id of Service Principal + ServicePrincipalType : str + The type of the service principal: 'Unknown', 'Application', + 'ManagedIdentity', 'Legacy' + + """ + + ID_PROPERTIES = ["DistinguishedName", "SID", "ObjectGuid"] + + def __init__(self, src_entity: Mapping[str, Any] = None, **kwargs): + """ + Create a new instance of the entity type. + + Parameters + ---------- + src_entity : Mapping[str, Any], optional + Create entity from existing entity or + other mapping object that implements entity properties. + (the default is None) + + Other Parameters + ---------------- + kwargs : Dict[str, Any] + Supply the entity properties as a set of + kw arguments. + + """ + self.ServicePrincipalName: Optional[str] = None + self.ServicePrincipalObjectId: Optional[str] = None + self.AppId: Optional[str] = None + self.AppOwnerTenantId: Optional[str] = None + self.TenantId: Optional[str] = None + self.ServicePrincipalType: Optional[str] = None + + super().__init__(src_entity=src_entity, **kwargs) + + @property + def description_str(self): + """Return Entity Description.""" + return self.DistinguishedName or self.__class__.__name__ + + @property + def name_str(self) -> str: + """Return Entity Name.""" + return self.__class__.__name__ + + _entity_schema = { + # ServicePrincipalName (type System.String) + "ServicePrincipalName": None, + # ServicePrincipalObjectId (type System.String) + "ServicePrincipalObjectId": None, + # AppId (type System.String) + "AppId": None, + "AppOwnerTenantId": None, + "TenantId": None, + "ServicePrincipalType": None, + "TimeGenerated": None, + "StartTime": None, + "EndTime": None, + } diff --git a/msticpy/init/pivot.py b/msticpy/init/pivot.py index 2863b463c..c5e537744 100644 --- a/msticpy/init/pivot.py +++ b/msticpy/init/pivot.py @@ -426,9 +426,11 @@ def remove_pivot_funcs(entity: str): for attr in dir(entity_cls): attr_obj = getattr(entity_cls, attr) if type(attr_obj).__name__ == "PivotContainer": - delattr(entity_cls, attr) + with contextlib.suppress(AttributeError): + delattr(entity_cls, attr) if callable(attr_obj) and hasattr(attr_obj, "pivot_properties"): - delattr(entity_cls, attr) + with contextlib.suppress(AttributeError): + delattr(entity_cls, attr) @staticmethod def browse(): diff --git a/tests/common/test_wsconfig.py b/tests/common/test_wsconfig.py index df35d5715..500afe501 100644 --- a/tests/common/test_wsconfig.py +++ b/tests/common/test_wsconfig.py @@ -157,3 +157,45 @@ def test_wsconfig_single_ws(): and _NAMED_WS["WorkspaceId"] in wstest_config.code_connect_str and _NAMED_WS["TenantId"] in wstest_config.code_connect_str ) + + +_TENANT = "d8d9d2f2-5d2d-4d7e-9c5c-5d6d9d1d8d9d" +_WS_ID = "f8d9d2f2-5d2d-4d7e-9c5c-5d6d9d1d8d9e" +_CLI_ID = "18d9d2f2-5d2d-4d7e-9c5c-5d6d9d1d8d9f" +_WS_NAME = "Workspace" +_CONFIG_STR_TEST_CASES = ( + ( + f"loganalytics://code;workspace='{_WS_ID}';alias='{_WS_NAME}';tenant='{_TENANT}'", + True, + ), + ( + f"loganalytics://tenant='{_TENANT}';clientid='{_CLI_ID}';clientsecret='[PLACEHOLDER]';workspace='{_WS_ID}';alias='{_WS_NAME}'", + True, + ), + ( + f"loganalytics://username='User';password='[PLACEHOLDER]';workspace='{_WS_ID}';alias='{_WS_NAME}';tenant='{_TENANT}'", + True, + ), + ( + f"loganalytics://anonymous;workspace='{_WS_ID}';alias='{_WS_NAME}';tenant='{_TENANT}'", + True, + ), + (f"loganalytics://code;workspace='{_WS_ID}';alias='{_WS_NAME}'", False), + (f"loganalytics://code;alias='{_WS_NAME}';tenant='{_TENANT}'", False), +) + + +@pytest.mark.parametrize("config_str, is_valid", _CONFIG_STR_TEST_CASES) +def test_wsconfig_config_str(config_str, is_valid): + """Test capture of config from connections strings.""" + if is_valid: + ws = WorkspaceConfig.from_connection_string(config_str) + if "workspace" in config_str: + check.equal(ws["workspace_id"], _WS_ID) + if "tenant" in config_str: + check.equal(ws["tenant_id"], _TENANT) + if "alias" in config_str: + check.equal(ws["workspace_name"], _WS_NAME) + else: + with pytest.raises(ValueError): + WorkspaceConfig.from_connection_string(config_str) diff --git a/tests/data/drivers/test_azure_monitor_driver.py b/tests/data/drivers/test_azure_monitor_driver.py index 50a6586dc..a4df6150e 100644 --- a/tests/data/drivers/test_azure_monitor_driver.py +++ b/tests/data/drivers/test_azure_monitor_driver.py @@ -103,6 +103,11 @@ def get_token(self, *args, **kwargs): "a927809c-8142-43e1-96b3-4ad87cfe95a4", ] +_VALID_CONN_STR = ( + f"loganalytics://tenant='{_WS_IDS[0]}';workspace='{_WS_IDS[1]}'" + f";alias='wksp';clientid='{_WS_IDS[0]}';client_secret='{_WS_IDS[1]}'" +) + _TEST_CONNECT_PARAMS = ( ( {"auth_types": ["cli", "environment", "msi"]}, @@ -110,6 +115,7 @@ def get_token(self, *args, **kwargs): ), ({"auth_types": "cli"}, [("_connect_auth_types", ["cli"])]), ({"tenant_id": "test"}, [("_az_tenant_id", "test")]), + ({"connection_str": _VALID_CONN_STR}, [("_workspace_id", _WS_IDS[1])]), ({"connection_str": "test"}, [(None, MsticpyKqlConnectionError)]), ( {"mp_az_auth": ["cli", "environment", "msi"]}, diff --git a/tests/unit_test_lib.py b/tests/unit_test_lib.py index a37ccc122..d0dc6aaa0 100644 --- a/tests/unit_test_lib.py +++ b/tests/unit_test_lib.py @@ -15,7 +15,6 @@ import nbformat import yaml from filelock import FileLock -import yaml from nbconvert.preprocessors import CellExecutionError, ExecutePreprocessor from msticpy.common import pkg_config @@ -30,7 +29,9 @@ def get_test_data_path(): def get_queries_schema(): """Get queries schema.""" - queries_schema_path = Path(__file__).parent.parent.joinpath(".schemas").joinpath("queries.json") + queries_schema_path = ( + Path(__file__).parent.parent.joinpath(".schemas").joinpath("queries.json") + ) with queries_schema_path.open(mode="r", encoding="utf-8") as queries_schema: return yaml.safe_load(queries_schema)