Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create secure custom PBA directory on Windows #1270

Merged
merged 5 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions monkey/monkey_island/cc/services/post_breach_files.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
from pathlib import Path

from monkey_island.cc.server_utils.file_utils import create_secure_directory

logger = logging.getLogger(__name__)

Expand All @@ -15,7 +16,8 @@ class PostBreachFilesService:
@classmethod
def initialize(cls, data_dir):
cls.DATA_DIR = data_dir
Path(cls.get_custom_pba_directory()).mkdir(mode=0o0700, parents=True, exist_ok=True)
custom_pba_dir = cls.get_custom_pba_directory()
create_secure_directory(custom_pba_dir)

@staticmethod
def save_file(filename: str, file_contents: bytes):
Expand Down
35 changes: 35 additions & 0 deletions monkey/tests/monkey_island/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from monkey_island.cc.server_utils.file_utils import is_windows_os

if is_windows_os():
import win32api
import win32security

FULL_CONTROL = 2032127
ACE_ACCESS_MODE_GRANT_ACCESS = win32security.GRANT_ACCESS
ACE_INHERIT_OBJECT_AND_CONTAINER = 3


def _get_acl_and_sid_from_path(path: str):
sid, _, _ = win32security.LookupAccountName("", win32api.GetUserName())
security_descriptor = win32security.GetNamedSecurityInfo(
path, win32security.SE_FILE_OBJECT, win32security.DACL_SECURITY_INFORMATION
)
acl = security_descriptor.GetSecurityDescriptorDacl()
return acl, sid


def assert_windows_permissions(path: str):
acl, user_sid = _get_acl_and_sid_from_path(path)

assert acl.GetAceCount() == 1

ace = acl.GetExplicitEntriesFromAcl()[0]

ace_access_mode = ace["AccessMode"]
ace_permissions = ace["AccessPermissions"]
ace_inheritance = ace["Inheritance"]
ace_sid = ace["Trustee"]["Identifier"]

assert ace_sid == user_sid
assert ace_permissions == FULL_CONTROL and ace_access_mode == ACE_ACCESS_MODE_GRANT_ACCESS
assert ace_inheritance == ACE_INHERIT_OBJECT_AND_CONTAINER
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import stat

import pytest
from tests.monkey_island.utils import assert_windows_permissions

from monkey_island.cc.server_utils.file_utils import (
create_secure_directory,
Expand All @@ -10,14 +11,6 @@
open_new_securely_permissioned_file,
)

if is_windows_os():
import win32api
import win32security

FULL_CONTROL = 2032127
ACE_ACCESS_MODE_GRANT_ACCESS = win32security.GRANT_ACCESS
ACE_INHERIT_OBJECT_AND_CONTAINER = 3


def test_expand_user(patched_home_env):
input_path = os.path.join("~", "test")
Expand Down Expand Up @@ -47,15 +40,6 @@ def test_path(tmpdir):
return path


def _get_acl_and_sid_from_path(path: str):
sid, _, _ = win32security.LookupAccountName("", win32api.GetUserName())
security_descriptor = win32security.GetNamedSecurityInfo(
path, win32security.SE_FILE_OBJECT, win32security.DACL_SECURITY_INFORMATION
)
acl = security_descriptor.GetSecurityDescriptorDacl()
return acl, sid


def test_create_secure_directory__already_exists(test_path):
os.mkdir(test_path)
assert os.path.isdir(test_path)
Expand All @@ -82,20 +66,7 @@ def test_create_secure_directory__perm_linux(test_path):
def test_create_secure_directory__perm_windows(test_path):
create_secure_directory(test_path)

acl, user_sid = _get_acl_and_sid_from_path(test_path)

assert acl.GetAceCount() == 1

ace = acl.GetExplicitEntriesFromAcl()[0]

ace_access_mode = ace["AccessMode"]
ace_permissions = ace["AccessPermissions"]
ace_inheritance = ace["Inheritance"]
ace_sid = ace["Trustee"]["Identifier"]

assert ace_sid == user_sid
assert ace_permissions == FULL_CONTROL and ace_access_mode == ACE_ACCESS_MODE_GRANT_ACCESS
assert ace_inheritance == ACE_INHERIT_OBJECT_AND_CONTAINER
assert_windows_permissions(test_path)


def test_open_new_securely_permissioned_file__already_exists(test_path):
Expand Down Expand Up @@ -131,20 +102,7 @@ def test_open_new_securely_permissioned_file__perm_windows(test_path):
with open_new_securely_permissioned_file(test_path):
pass

acl, user_sid = _get_acl_and_sid_from_path(test_path)

assert acl.GetAceCount() == 1

ace = acl.GetExplicitEntriesFromAcl()[0]

ace_access_mode = ace["AccessMode"]
ace_permissions = ace["AccessPermissions"]
ace_inheritance = ace["Inheritance"]
ace_sid = ace["Trustee"]["Identifier"]

assert ace_sid == user_sid
assert ace_permissions == FULL_CONTROL and ace_access_mode == ACE_ACCESS_MODE_GRANT_ACCESS
assert ace_inheritance == ACE_INHERIT_OBJECT_AND_CONTAINER
assert_windows_permissions(test_path)


def test_open_new_securely_permissioned_file__write(test_path):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import os

import pytest
from tests.monkey_island.utils import assert_windows_permissions

from monkey_island.cc.server_utils.file_utils import is_windows_os
from monkey_island.cc.services.post_breach_files import PostBreachFilesService


Expand Down Expand Up @@ -32,13 +34,20 @@ def dir_is_empty(dir_path):
return len(dir_contents) == 0


@pytest.mark.skipif(os.name != "posix", reason="Tests Posix (not Windows) permissions.")
def test_custom_pba_dir_permissions():
@pytest.mark.skipif(is_windows_os(), reason="Tests Posix (not Windows) permissions.")
def test_custom_pba_dir_permissions_linux():
st = os.stat(PostBreachFilesService.get_custom_pba_directory())

assert st.st_mode == 0o40700


@pytest.mark.skipif(not is_windows_os(), reason="Tests Windows (not Posix) permissions.")
mssalvatore marked this conversation as resolved.
Show resolved Hide resolved
def test_custom_pba_dir_permissions_windows():
pba_dir = PostBreachFilesService.get_custom_pba_directory()

assert_windows_permissions(pba_dir)


def test_remove_failure(monkeypatch):
monkeypatch.setattr(os, "remove", lambda x: raise_(OSError("Permission denied")))

Expand Down