Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PowerShell plugin.py #3374

Merged
merged 3 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions monkey/agent_plugins/exploiters/powershell/src/plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import logging
from functools import partial
from pprint import pformat
from typing import Any, Dict, Sequence

# common imports
from common.event_queue import IAgentEventPublisher
from common.types import AgentID, Event, NetworkPort
from common.utils.code_utils import del_key
from common.utils.environment import is_windows_os

# dependencies to get rid of or internalize
from infection_monkey.exploit import IAgentBinaryRepository, IAgentOTPProvider
from infection_monkey.exploit.tools import (
BruteForceCredentialsProvider,
BruteForceExploiter,
all_tcp_ports_are_closed,
)
from infection_monkey.exploit.tools.helpers import get_agent_dst_path
from infection_monkey.i_puppet import ExploiterResultData, TargetHost
from infection_monkey.propagation_credentials_repository import IPropagationCredentialsRepository

from .credentials_generator import generate_powershell_credentials
from .powershell_command_builder import build_powershell_command
from .powershell_options import PowerShellOptions

logger = logging.getLogger(__name__)


POWERSHELL_NO_SSL_PORT = NetworkPort(5985)
POWERSHELL_SSL_PORT = NetworkPort(5986)


def should_attempt_exploit(host: TargetHost) -> bool:
return not all_tcp_ports_are_closed(host, [POWERSHELL_NO_SSL_PORT, POWERSHELL_SSL_PORT])


class StubPowerShellRemoteAccessClientFactory:
def __init__(*args, **kwargs):
pass


class Plugin:
def __init__(
self,
*,
plugin_name: str,
agent_id: AgentID,
agent_event_publisher: IAgentEventPublisher,
agent_binary_repository: IAgentBinaryRepository,
propagation_credentials_repository: IPropagationCredentialsRepository,
otp_provider: IAgentOTPProvider,
**kwargs,
):
self._plugin_name = plugin_name
self._agent_id = agent_id
self._agent_event_publisher = agent_event_publisher
self._agent_binary_repository = agent_binary_repository
credentials_generator = partial(
generate_powershell_credentials,
running_from_windows=is_windows_os(),
)
self._credentials_provider = BruteForceCredentialsProvider(
propagation_credentials_repository, credentials_generator
)
self._otp_provider = otp_provider

def run(
self,
*,
host: TargetHost,
servers: Sequence[str],
current_depth: int,
options: Dict[str, Any],
interrupt: Event,
**kwargs,
) -> ExploiterResultData:
# HTTP ports options are hack because they are needed in fingerprinters
del_key(options, "http_ports")

try:
logger.debug(f"Parsing options: {pformat(options)}")
powershell_options = PowerShellOptions(**options)
except Exception as err:
msg = f"Failed to parse PowerShell options: {err}"
logger.exception(msg)
return ExploiterResultData(error_message=msg)

if not should_attempt_exploit(host):
msg = f"Host {host.ip} has no open PowerShell remoting ports"
logger.debug(msg)
return ExploiterResultData(
exploitation_success=False, propagation_success=False, error_message=msg
)

command_builder = partial(
build_powershell_command,
self._agent_id,
servers,
current_depth,
remote_agent_binary_destination_path=get_agent_dst_path(host),
otp_provider=self._otp_provider,
)
powershell_exploit_client_factory = StubPowerShellRemoteAccessClientFactory(
host, powershell_options, command_builder
)

brute_force_exploiter = BruteForceExploiter(
self._plugin_name,
self._agent_id,
get_agent_dst_path(host),
powershell_exploit_client_factory,
self._credentials_provider,
self._agent_binary_repository,
self._agent_event_publisher,
{"powershell-exploiter"},
)

try:
logger.debug(f"Running PowerShell exploiter on host {host.ip}")
return brute_force_exploiter.exploit_host(host, interrupt)
except Exception as err:
msg = f"An unexpected exception occurred while attempting to exploit host: {err}"
logger.exception(msg)
return ExploiterResultData(error_message=msg)
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
from ipaddress import IPv4Address
from threading import Event
from unittest.mock import MagicMock
from uuid import UUID

import pytest
from agent_plugins.exploiters.powershell.src.plugin import (
POWERSHELL_NO_SSL_PORT,
POWERSHELL_SSL_PORT,
Plugin,
)

from common import OperatingSystem
from common.types import PortStatus
from infection_monkey.exploit.tools import BruteForceExploiter
from infection_monkey.i_puppet import (
ExploiterResultData,
PortScanData,
PortScanDataDict,
TargetHost,
TargetHostPorts,
)
from infection_monkey.propagation_credentials_repository import IPropagationCredentialsRepository

POWERSHELL_PORTS = [POWERSHELL_NO_SSL_PORT, POWERSHELL_SSL_PORT]

AGENT_ID = UUID("5c145d4e-ec61-44f7-998e-17477112f50f")
BAD_POWERSHELL_OPTIONS_DICT = {"blah": "blah"}
TARGET_IP = IPv4Address("1.1.1.1")
OPEN_POWERSHELL_PORTS = TargetHostPorts(
tcp_ports=PortScanDataDict(
{p: PortScanData(port=p, status=PortStatus.OPEN) for p in POWERSHELL_PORTS}
)
)
EMPTY_TARGET_HOST_PORTS = TargetHostPorts()
SERVERS = ["10.10.10.10"]
EXPLOITER_RESULT_DATA = ExploiterResultData(True, False, error_message="Test error")


@pytest.fixture
def target_host() -> TargetHost:
return TargetHost(
ip=TARGET_IP,
operating_system=OperatingSystem.WINDOWS,
ports_status=OPEN_POWERSHELL_PORTS,
)


@pytest.fixture
def propagation_credentials_repository():
return MagicMock(spec=IPropagationCredentialsRepository)


class ErrorRaisingMockPowerShellExploiter(BruteForceExploiter):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def exploit_host(self, *args, **kwargs) -> ExploiterResultData:
raise Exception("Test error")


@pytest.fixture
def mock_powershell_exploiter():
exploiter = MagicMock(spec=BruteForceExploiter)
exploiter.exploit_host.return_value = EXPLOITER_RESULT_DATA
return exploiter


@pytest.fixture
def plugin(
monkeypatch,
propagation_credentials_repository: IPropagationCredentialsRepository,
mock_powershell_exploiter: BruteForceExploiter,
) -> Plugin:
monkeypatch.setattr(
"agent_plugins.exploiters.powershell.src.plugin.BruteForceExploiter",
lambda *args, **kwargs: mock_powershell_exploiter,
)

return Plugin(
plugin_name="PowerShell",
agent_id=AGENT_ID,
agent_event_publisher=MagicMock(),
agent_binary_repository=MagicMock(),
propagation_credentials_repository=propagation_credentials_repository,
otp_provider=MagicMock(),
)


def test_run__fails_on_bad_options(plugin: Plugin, target_host: TargetHost):
result = plugin.run(
host=target_host,
servers=SERVERS,
current_depth=1,
options=BAD_POWERSHELL_OPTIONS_DICT,
interrupt=Event(),
)

assert not result.exploitation_success
assert not result.propagation_success


@pytest.mark.parametrize(
"tcp_port_status",
(
PortScanDataDict(
{POWERSHELL_PORTS[0]: PortScanData(port=POWERSHELL_PORTS[0], status=PortStatus.CLOSED)}
),
PortScanDataDict(
{POWERSHELL_PORTS[1]: PortScanData(port=POWERSHELL_PORTS[1], status=PortStatus.CLOSED)}
),
PortScanDataDict({}),
),
)
def test_run__attempts_exploit_if_port_status_unknown(
plugin: Plugin,
mock_powershell_exploiter: BruteForceExploiter,
target_host: TargetHost,
tcp_port_status: PortScanDataDict,
):
host = target_host
host.ports_status.tcp_ports = tcp_port_status
result = plugin.run(
host=host,
servers=SERVERS,
current_depth=1,
options={},
interrupt=Event(),
)

mock_powershell_exploiter.exploit_host.assert_called_once()
assert result == EXPLOITER_RESULT_DATA


@pytest.mark.parametrize(
"tcp_port_status",
(
PortScanDataDict(
{POWERSHELL_PORTS[0]: PortScanData(port=POWERSHELL_PORTS[0], status=PortStatus.OPEN)}
),
PortScanDataDict(
{POWERSHELL_PORTS[1]: PortScanData(port=POWERSHELL_PORTS[1], status=PortStatus.OPEN)}
),
PortScanDataDict(
{
POWERSHELL_PORTS[0]: PortScanData(
port=POWERSHELL_PORTS[0], status=PortStatus.CLOSED
),
POWERSHELL_PORTS[1]: PortScanData(port=POWERSHELL_PORTS[1], status=PortStatus.OPEN),
}
),
PortScanDataDict(
{
POWERSHELL_PORTS[0]: PortScanData(port=POWERSHELL_PORTS[0], status=PortStatus.OPEN),
POWERSHELL_PORTS[1]: PortScanData(
port=POWERSHELL_PORTS[1], status=PortStatus.CLOSED
),
}
),
PortScanDataDict(
{
POWERSHELL_PORTS[0]: PortScanData(port=POWERSHELL_PORTS[0], status=PortStatus.OPEN),
POWERSHELL_PORTS[1]: PortScanData(port=POWERSHELL_PORTS[1], status=PortStatus.OPEN),
}
),
),
)
def test_run__attempts_exploit_if_port_status_open(
plugin: Plugin,
mock_powershell_exploiter: BruteForceExploiter,
target_host: TargetHost,
tcp_port_status: PortScanDataDict,
):
host = target_host
host.ports_status.tcp_ports = tcp_port_status
result = plugin.run(
host=host,
servers=SERVERS,
current_depth=1,
options={},
interrupt=Event(),
)

mock_powershell_exploiter.exploit_host.assert_called_once()
assert result == EXPLOITER_RESULT_DATA


def test_run__skips_exploit_if_port_status_closed(
plugin: Plugin,
mock_powershell_exploiter: BruteForceExploiter,
target_host: TargetHost,
):
host = target_host
host.ports_status.tcp_ports = PortScanDataDict(
{
POWERSHELL_PORTS[0]: PortScanData(port=POWERSHELL_PORTS[0], status=PortStatus.CLOSED),
POWERSHELL_PORTS[1]: PortScanData(port=POWERSHELL_PORTS[1], status=PortStatus.CLOSED),
}
)

result = plugin.run(
host=host,
servers=SERVERS,
current_depth=1,
options={},
interrupt=Event(),
)

mock_powershell_exploiter.exploit_host.assert_not_called()
assert result.exploitation_success is False
assert result.propagation_success is False


def test_run__returns_exploiter_result_data(plugin: Plugin, target_host: TargetHost):
result = plugin.run(
host=target_host,
servers=SERVERS,
current_depth=1,
options={},
interrupt=Event(),
)

assert result == EXPLOITER_RESULT_DATA


def test_run__exploit_host_raises_exception(
monkeypatch,
plugin: Plugin,
propagation_credentials_repository: IPropagationCredentialsRepository,
target_host: TargetHost,
):
monkeypatch.setattr(
"agent_plugins.exploiters.powershell.src.plugin.BruteForceExploiter",
ErrorRaisingMockPowerShellExploiter,
)

plugin = Plugin(
plugin_name="POWERSHELL",
agent_id=AGENT_ID,
agent_event_publisher=MagicMock(),
agent_binary_repository=MagicMock(),
propagation_credentials_repository=propagation_credentials_repository,
otp_provider=MagicMock(),
)
result = plugin.run(
host=target_host,
servers=SERVERS,
current_depth=1,
options={},
interrupt=Event(),
)

assert not result.exploitation_success
assert not result.propagation_success