-
Notifications
You must be signed in to change notification settings - Fork 786
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SSH exploiter plugin.py #3458
Merged
Merged
SSH exploiter plugin.py #3458
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
import logging | ||
from functools import partial | ||
from pprint import pformat | ||
from typing import Any, Dict, Sequence | ||
|
||
# common imports | ||
from common.credentials import Password, SSHKeypair, Username | ||
from common.event_queue import IAgentEventPublisher | ||
from common.types import AgentID, Event, NetworkPort | ||
from common.utils.code_utils import del_key | ||
|
||
# dependencies to get rid of or internalize | ||
from infection_monkey.exploit import IAgentBinaryRepository, IAgentOTPProvider | ||
from infection_monkey.exploit.tools import ( | ||
BruteForceCredentialsProvider, | ||
BruteForceExploiter, | ||
all_tcp_ports_are_closed, | ||
generate_brute_force_credentials, | ||
identity_type_filter, | ||
secret_type_filter, | ||
) | ||
from infection_monkey.exploit.tools.helpers import get_agent_dst_path | ||
from infection_monkey.i_puppet import ExploiterResultData, TargetHost | ||
from infection_monkey.propagation_credentials_repository import IPropagationCredentialsRepository | ||
|
||
from .ssh_command_builder import build_ssh_command | ||
from .ssh_options import SSHOptions | ||
|
||
# from .ssh_remote_access_client import SSH_PORTS | ||
# from .ssh_remote_access_client_factory import SSHRemoteAccessClientFactory | ||
|
||
# TODO: Remove this and import SSH_PORTS from .ssh_remote_access_client | ||
SSH_PORTS = [NetworkPort(22)] | ||
|
||
|
||
# TODO: Remove this and import SSH_PORTS from .ssh_remote_access_client_factory | ||
class SSHRemoteAccessClientFactory: | ||
def __init__(self, *args, **kwargs): | ||
pass | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def should_attempt_exploit(host: TargetHost) -> bool: | ||
return not all_tcp_ports_are_closed(host, SSH_PORTS) | ||
|
||
|
||
class Plugin: | ||
def __init__( | ||
self, | ||
*, | ||
plugin_name: str, | ||
agent_id: AgentID, | ||
agent_event_publisher: IAgentEventPublisher, | ||
agent_binary_repository: IAgentBinaryRepository, | ||
propagation_credentials_repository: IPropagationCredentialsRepository, | ||
otp_provider: IAgentOTPProvider, | ||
**kwargs, | ||
): | ||
self._plugin_name = plugin_name | ||
self._agent_id = agent_id | ||
self._agent_event_publisher = agent_event_publisher | ||
self._agent_binary_repository = agent_binary_repository | ||
credentials_generator = partial( | ||
generate_brute_force_credentials, | ||
identity_filter=identity_type_filter([Username]), | ||
secret_filter=secret_type_filter([Password, SSHKeypair]), | ||
) | ||
self._credentials_provider = BruteForceCredentialsProvider( | ||
credentials_repository=propagation_credentials_repository, | ||
generate_brute_force_credentials=credentials_generator, | ||
) | ||
self._otp_provider = otp_provider | ||
|
||
def run( | ||
self, | ||
*, | ||
host: TargetHost, | ||
servers: Sequence[str], | ||
current_depth: int, | ||
options: Dict[str, Any], | ||
interrupt: Event, | ||
**kwargs, | ||
) -> ExploiterResultData: | ||
# HTTP ports options are hack because they are needed in fingerprinters | ||
del_key(options, "http_ports") | ||
|
||
try: | ||
logger.debug(f"Parsing options: {pformat(options)}") | ||
ssh_options = SSHOptions(**options) | ||
except Exception as err: | ||
msg = f"Failed to parse SSH options: {err}" | ||
logger.exception(msg) | ||
return ExploiterResultData(error_message=msg) | ||
|
||
if not should_attempt_exploit(host): | ||
msg = f"Host {host.ip} has no open SSH ports" | ||
logger.debug(msg) | ||
return ExploiterResultData( | ||
exploitation_success=False, propagation_success=False, error_message=msg | ||
) | ||
|
||
command_builder = partial( | ||
build_ssh_command, | ||
agent_id=self._agent_id, | ||
target_host=host, | ||
servers=servers, | ||
current_depth=current_depth, | ||
otp_provider=self._otp_provider, | ||
) | ||
ssh_exploit_client_factory = SSHRemoteAccessClientFactory( | ||
host=host, options=ssh_options, command_builder=command_builder | ||
) | ||
|
||
brute_force_exploiter = BruteForceExploiter( | ||
exploiter_name=self._plugin_name, | ||
agent_id=self._agent_id, | ||
destination_path=get_agent_dst_path(host), | ||
exploit_client_factory=ssh_exploit_client_factory, | ||
get_credentials=self._credentials_provider, | ||
agent_binary_repository=self._agent_binary_repository, | ||
agent_event_publisher=self._agent_event_publisher, | ||
tags={"ssh-exploiter"}, | ||
) | ||
|
||
try: | ||
logger.debug(f"Running SSH exploiter on host {host.ip}") | ||
return brute_force_exploiter.exploit_host(host, interrupt) | ||
except Exception as err: | ||
msg = f"An unexpected exception occurred while attempting to exploit host: {err}" | ||
logger.exception(msg) | ||
return ExploiterResultData(error_message=msg) |
200 changes: 200 additions & 0 deletions
200
monkey/tests/unit_tests/agent_plugins/exploiters/ssh/test_ssh_plugin.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,200 @@ | ||
from ipaddress import IPv4Address | ||
from threading import Event | ||
from unittest.mock import MagicMock | ||
from uuid import UUID | ||
|
||
import pytest | ||
from agent_plugins.exploiters.ssh.src.plugin import SSH_PORTS, Plugin | ||
|
||
from common import OperatingSystem | ||
from common.types import PortStatus | ||
from infection_monkey.exploit.tools import BruteForceExploiter | ||
from infection_monkey.i_puppet import ( | ||
ExploiterResultData, | ||
PortScanData, | ||
PortScanDataDict, | ||
TargetHost, | ||
TargetHostPorts, | ||
) | ||
from infection_monkey.propagation_credentials_repository import IPropagationCredentialsRepository | ||
|
||
AGENT_ID = UUID("5c145d4e-ec61-44f7-998e-17477112f50f") | ||
BAD_SSH_OPTIONS_DICT = {"blah": "blah"} | ||
TARGET_IP = IPv4Address("127.0.0.1") | ||
OPEN_SSH_PORTS = TargetHostPorts( | ||
tcp_ports=PortScanDataDict({p: PortScanData(port=p, status=PortStatus.OPEN) for p in SSH_PORTS}) | ||
) | ||
EMPTY_TARGET_HOST_PORTS = TargetHostPorts() | ||
SERVERS = ["192.168.1.1"] | ||
EXPLOITER_RESULT_DATA = ExploiterResultData(True, False, error_message="Test error") | ||
|
||
|
||
@pytest.fixture | ||
def target_host() -> TargetHost: | ||
return TargetHost( | ||
ip=TARGET_IP, | ||
operating_system=OperatingSystem.WINDOWS, | ||
ports_status=OPEN_SSH_PORTS, | ||
) | ||
|
||
|
||
@pytest.fixture | ||
def propagation_credentials_repository(): | ||
return MagicMock(spec=IPropagationCredentialsRepository) | ||
|
||
|
||
class ErrorRaisingMockSSHExploiter(BruteForceExploiter): | ||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
|
||
def exploit_host(self, *args, **kwargs) -> ExploiterResultData: | ||
raise Exception("Test error") | ||
|
||
|
||
@pytest.fixture | ||
def mock_ssh_exploiter(): | ||
exploiter = MagicMock(spec=BruteForceExploiter) | ||
exploiter.exploit_host.return_value = EXPLOITER_RESULT_DATA | ||
return exploiter | ||
|
||
|
||
@pytest.fixture | ||
def plugin( | ||
monkeypatch, | ||
propagation_credentials_repository: IPropagationCredentialsRepository, | ||
mock_ssh_exploiter: BruteForceExploiter, | ||
) -> Plugin: | ||
monkeypatch.setattr( | ||
"agent_plugins.exploiters.ssh.src.plugin.BruteForceExploiter", | ||
lambda *args, **kwargs: mock_ssh_exploiter, | ||
) | ||
|
||
return Plugin( | ||
plugin_name="SSH", | ||
agent_id=AGENT_ID, | ||
agent_event_publisher=MagicMock(), | ||
agent_binary_repository=MagicMock(), | ||
propagation_credentials_repository=propagation_credentials_repository, | ||
otp_provider=MagicMock(), | ||
) | ||
|
||
|
||
def test_run__fails_on_bad_options(plugin: Plugin, target_host: TargetHost): | ||
result = plugin.run( | ||
host=target_host, | ||
servers=SERVERS, | ||
current_depth=1, | ||
options=BAD_SSH_OPTIONS_DICT, | ||
interrupt=Event(), | ||
) | ||
|
||
assert not result.exploitation_success | ||
assert not result.propagation_success | ||
|
||
|
||
def test_run__attempts_exploit_if_port_status_unknown( | ||
plugin: Plugin, | ||
mock_ssh_exploiter: BruteForceExploiter, | ||
target_host: TargetHost, | ||
): | ||
host = target_host | ||
host.ports_status.tcp_ports = PortScanDataDict({}) | ||
result = plugin.run( | ||
host=host, | ||
servers=SERVERS, | ||
current_depth=1, | ||
options={}, | ||
interrupt=Event(), | ||
) | ||
|
||
mock_ssh_exploiter.exploit_host.assert_called_once() # type: ignore [attr-defined] | ||
assert result == EXPLOITER_RESULT_DATA | ||
|
||
|
||
def test_run__attempts_exploit_if_port_status_open( | ||
plugin: Plugin, | ||
mock_ssh_exploiter: BruteForceExploiter, | ||
target_host: TargetHost, | ||
): | ||
host = target_host | ||
host.ports_status.tcp_ports = PortScanDataDict( | ||
{SSH_PORTS[0]: PortScanData(port=SSH_PORTS[0], status=PortStatus.OPEN)} | ||
) | ||
result = plugin.run( | ||
host=host, | ||
servers=SERVERS, | ||
current_depth=1, | ||
options={}, | ||
interrupt=Event(), | ||
) | ||
|
||
mock_ssh_exploiter.exploit_host.assert_called_once() # type: ignore [attr-defined] | ||
assert result == EXPLOITER_RESULT_DATA | ||
|
||
|
||
def test_run__skips_exploit_if_port_status_closed( | ||
plugin: Plugin, | ||
mock_ssh_exploiter: BruteForceExploiter, | ||
target_host: TargetHost, | ||
): | ||
host = target_host | ||
host.ports_status.tcp_ports = PortScanDataDict( | ||
{ | ||
SSH_PORTS[0]: PortScanData(port=SSH_PORTS[0], status=PortStatus.CLOSED), | ||
} | ||
) | ||
|
||
result = plugin.run( | ||
host=host, | ||
servers=SERVERS, | ||
current_depth=1, | ||
options={}, | ||
interrupt=Event(), | ||
) | ||
|
||
mock_ssh_exploiter.exploit_host.assert_not_called() # type: ignore [attr-defined] | ||
assert result.exploitation_success is False | ||
assert result.propagation_success is False | ||
|
||
|
||
def test_run__returns_exploiter_result_data(plugin: Plugin, target_host: TargetHost): | ||
result = plugin.run( | ||
host=target_host, | ||
servers=SERVERS, | ||
current_depth=1, | ||
options={}, | ||
interrupt=Event(), | ||
) | ||
|
||
assert result == EXPLOITER_RESULT_DATA | ||
|
||
|
||
def test_run__exploit_host_raises_exception( | ||
monkeypatch, | ||
plugin: Plugin, | ||
propagation_credentials_repository: IPropagationCredentialsRepository, | ||
target_host: TargetHost, | ||
): | ||
monkeypatch.setattr( | ||
"agent_plugins.exploiters.ssh.src.plugin.BruteForceExploiter", | ||
ErrorRaisingMockSSHExploiter, | ||
) | ||
|
||
plugin = Plugin( | ||
plugin_name="SSH", | ||
agent_id=AGENT_ID, | ||
agent_event_publisher=MagicMock(), | ||
agent_binary_repository=MagicMock(), | ||
propagation_credentials_repository=propagation_credentials_repository, | ||
otp_provider=MagicMock(), | ||
) | ||
result = plugin.run( | ||
host=target_host, | ||
servers=SERVERS, | ||
current_depth=1, | ||
options={}, | ||
interrupt=Event(), | ||
) | ||
|
||
assert not result.exploitation_success | ||
assert not result.propagation_success |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: Make "Test error" a variable, since it's also used in
EXPLOITER_RESULT_DATA
, which is used for assertions