diff --git a/securedrop/source_app/session_manager.py b/securedrop/source_app/session_manager.py index 45ef08a7bb2..7459a58ab60 100644 --- a/securedrop/source_app/session_manager.py +++ b/securedrop/source_app/session_manager.py @@ -4,7 +4,6 @@ import sqlalchemy from flask import session -from sdconfig import config from source_user import SourceUser, authenticate_source_user, InvalidPassphraseError if TYPE_CHECKING: @@ -40,6 +39,9 @@ def log_user_in( db_session: sqlalchemy.orm.Session, supplied_passphrase: "DicewarePassphrase" ) -> SourceUser: + # Late import so the module can be used without a config.py in the parent folder + from sdconfig import config + # Validate the passphrase; will raise an exception if it is not valid source_user = authenticate_source_user( db_session=db_session, supplied_passphrase=supplied_passphrase diff --git a/securedrop/store.py b/securedrop/store.py index 5e37f1786a2..7e1297bea8d 100644 --- a/securedrop/store.py +++ b/securedrop/store.py @@ -106,8 +106,7 @@ def __init__(self, storage_path: str, temp_dir: str) -> None: # where files and directories are sent to be securely deleted self.__shredder_path = os.path.abspath(os.path.join(self.__storage_path, "../shredder")) - if not os.path.exists(self.__shredder_path): - os.makedirs(self.__shredder_path, mode=0o700) + os.makedirs(self.__shredder_path, mode=0o700, exist_ok=True) # crash if we don't have a way to securely remove files if not rm.check_secure_delete_capability(): diff --git a/securedrop/tests/conftest.py b/securedrop/tests/conftest.py index 37de4616ed6..a4cbb332c36 100644 --- a/securedrop/tests/conftest.py +++ b/securedrop/tests/conftest.py @@ -124,7 +124,7 @@ def setup_journalist_key_and_gpg_folder() -> Generator[Tuple[str, Path], None, N # This path matches the GPG_KEY_DIR defined in the config.py used for the tests # If they don't match, it can make the tests flaky and very hard to debug tmp_gpg_dir = Path("/tmp") / "securedrop" / "keys" - tmp_gpg_dir.mkdir(parents=True, exist_ok=True) + tmp_gpg_dir.mkdir(parents=True, exist_ok=True, mode=0o0700) try: # GPG 2.1+ requires gpg-agent, see #4013 diff --git a/securedrop/tests/functional/app_navigators.py b/securedrop/tests/functional/app_navigators.py new file mode 100644 index 00000000000..eabcb43f1a8 --- /dev/null +++ b/securedrop/tests/functional/app_navigators.py @@ -0,0 +1,385 @@ +from contextlib import contextmanager +import logging +import os +from pathlib import Path +from enum import Enum +import time +from datetime import datetime +from os.path import abspath +from os.path import dirname +from os.path import expanduser +from os.path import join +from os.path import realpath +from typing import Generator, Optional + +from selenium.webdriver.firefox.webdriver import WebDriver +from selenium.webdriver.remote.webelement import WebElement + +import tbselenium.common as cm +from selenium import webdriver +from selenium.common.exceptions import NoAlertPresentException +from selenium.common.exceptions import WebDriverException +from selenium.webdriver.common.by import By +from selenium.webdriver.remote.remote_connection import LOGGER +from selenium.webdriver.support import expected_conditions +from selenium.webdriver.support.ui import WebDriverWait +from tbselenium.tbdriver import TorBrowserDriver + + +_LOGFILE_PATH = abspath(join(dirname(realpath(__file__)), "../log/driver.log")) +_FIREFOX_PATH = "/usr/bin/firefox/firefox" + +_TBB_PATH = abspath(expanduser("~/.local/tbb/tor-browser_en-US/")) +os.environ["TBB_PATH"] = _TBB_PATH + +LOGGER.setLevel(logging.WARNING) + +# width & height of the browser window. If the top of screenshots is cropped, +# increase the height of the window so the the whole page fits in the window. +_BROWSER_SIZE = (1024, 1400) + + +class WebDriverTypeEnum(Enum): + TOR_BROWSER = 1 + FIREFOX = 2 + + +_DRIVER_RETRY_COUNT = 3 +_DRIVER_RETRY_INTERNVAL = 5 + + +def _create_torbrowser_driver( + accept_languages: Optional[str] = None, +) -> TorBrowserDriver: + logging.info("Creating TorBrowserDriver") + log_file = open(_LOGFILE_PATH, "a") + log_file.write("\n\n[%s] Running Functional Tests\n" % str(datetime.now())) + log_file.flush() + + # Don't use Tor when reading from localhost, and turn off private + # browsing. We need to turn off private browsing because we won't be + # able to access the browser's cookies in private browsing mode. Since + # we use session cookies in SD anyway (in private browsing mode all + # cookies are set as session cookies), this should not affect session + # lifetime. + pref_dict = { + "network.proxy.no_proxies_on": "127.0.0.1", + "browser.privatebrowsing.autostart": False, + } + if accept_languages is not None: + pref_dict["intl.accept_languages"] = accept_languages + + Path(_TBB_PATH).mkdir(parents=True, exist_ok=True) + torbrowser_driver = None + for i in range(_DRIVER_RETRY_COUNT): + try: + torbrowser_driver = TorBrowserDriver( + _TBB_PATH, + tor_cfg=cm.USE_RUNNING_TOR, + pref_dict=pref_dict, + tbb_logfile_path=_LOGFILE_PATH, + ) + logging.info("Created Tor Browser web driver") + torbrowser_driver.set_window_position(0, 0) + torbrowser_driver.set_window_size(*_BROWSER_SIZE) + break + except Exception as e: + logging.error("Error creating Tor Browser web driver: %s", e) + if i < _DRIVER_RETRY_COUNT: + time.sleep(_DRIVER_RETRY_INTERNVAL) + + if not torbrowser_driver: + raise Exception("Could not create Tor Browser web driver") + + return torbrowser_driver + + +def _create_firefox_driver( + accept_languages: Optional[str] = None, +) -> webdriver.Firefox: + logging.info("Creating Firefox web driver") + + profile = webdriver.FirefoxProfile() + if accept_languages is not None: + profile.set_preference("intl.accept_languages", accept_languages) + profile.update_preferences() + + firefox_driver = None + for i in range(_DRIVER_RETRY_COUNT): + try: + firefox_driver = webdriver.Firefox(firefox_binary=_FIREFOX_PATH, firefox_profile=profile) + firefox_driver.set_window_position(0, 0) + firefox_driver.set_window_size(*_BROWSER_SIZE) + logging.info("Created Firefox web driver") + break + except Exception as e: + logging.error("Error creating Firefox web driver: %s", e) + if i < _DRIVER_RETRY_COUNT: + time.sleep(_DRIVER_RETRY_INTERNVAL) + + if not firefox_driver: + raise Exception("Could not create Firefox web driver") + + return firefox_driver + + +# TODO(AD): This is intended to eventually replace the web driver code in FunctionalTest +@contextmanager +def get_web_driver( + web_driver_type: WebDriverTypeEnum = WebDriverTypeEnum.TOR_BROWSER, +) -> Generator[WebDriver, None, None]: + if web_driver_type == WebDriverTypeEnum.TOR_BROWSER: + web_driver = _create_torbrowser_driver() + elif web_driver_type == WebDriverTypeEnum.FIREFOX: + web_driver = _create_firefox_driver() + else: + raise ValueError(f"Unexpected value {web_driver_type}") + + try: + yield web_driver + finally: + try: + web_driver.quit() + except Exception: + logging.exception("Error stopping driver") + + +# TODO(AD): This is intended to eventually replace the navigation/driver code in FunctionalTest +class _NavigationHelper: + + _TIMEOUT = 10 + _POLL_FREQUENCY = 0.1 + + def __init__(self, web_driver: WebDriver) -> None: + self.driver = web_driver + + def wait_for(self, function_with_assertion, timeout=_TIMEOUT): + """Polling wait for an arbitrary assertion.""" + # Thanks to + # http://chimera.labs.oreilly.com/books/1234000000754/ch20.html#_a_common_selenium_problem_race_conditions + start_time = time.time() + while time.time() - start_time < timeout: + try: + return function_with_assertion() + except (AssertionError, WebDriverException): + time.sleep(self._POLL_FREQUENCY) + # one more try, which will raise any errors if they are outstanding + return function_with_assertion() + + def safe_click_by_id(self, element_id): + """ + Clicks the element with the given ID attribute. + + Returns: + el: The element, if found. + + Raises: + selenium.common.exceptions.TimeoutException: If the element cannot be found in time. + + """ + el = WebDriverWait(self.driver, self._TIMEOUT, self._POLL_FREQUENCY).until( + expected_conditions.element_to_be_clickable((By.ID, element_id)) + ) + el.location_once_scrolled_into_view + el.click() + return el + + def safe_click_by_css_selector(self, selector): + """ + Clicks the first element with the given CSS selector. + + Returns: + el: The element, if found. + + Raises: + selenium.common.exceptions.TimeoutException: If the element cannot be found in time. + + """ + el = WebDriverWait(self.driver, self._TIMEOUT, self._POLL_FREQUENCY).until( + expected_conditions.element_to_be_clickable((By.CSS_SELECTOR, selector)) + ) + el.click() + return el + + def safe_click_all_by_css_selector(self, selector, root=None): + """ + Clicks each element that matches the given CSS selector. + + Returns: + els (list): The list of elements that matched the selector. + + Raises: + selenium.common.exceptions.TimeoutException: If the element cannot be found in time. + + """ + if root is None: + root = self.driver + els = self.wait_for(lambda: root.find_elements_by_css_selector(selector)) + for el in els: + clickable_el = WebDriverWait(self.driver, self._TIMEOUT, self._POLL_FREQUENCY).until( + expected_conditions.element_to_be_clickable((By.CSS_SELECTOR, selector)) + ) + clickable_el.click() + return els + + def safe_send_keys_by_id(self, element_id, text): + """ + Sends the given text to the element with the specified ID. + + Returns: + el: The element, if found. + + Raises: + selenium.common.exceptions.TimeoutException: If the element cannot be found in time. + + """ + el = WebDriverWait(self.driver, self._TIMEOUT, self._POLL_FREQUENCY).until( + expected_conditions.element_to_be_clickable((By.ID, element_id)) + ) + el.send_keys(text) + return el + + def safe_send_keys_by_css_selector(self, selector, text): + """ + Sends the given text to the first element with the given CSS selector. + + Returns: + el: The element, if found. + + Raises: + selenium.common.exceptions.TimeoutException: If the element cannot be found in time. + + """ + el = WebDriverWait(self.driver, self._TIMEOUT, self._POLL_FREQUENCY).until( + expected_conditions.element_to_be_clickable((By.CSS_SELECTOR, selector)) + ) + el.send_keys(text) + return el + + def alert_wait(self, timeout=_TIMEOUT * 10): + WebDriverWait(self.driver, timeout, self._POLL_FREQUENCY).until( + expected_conditions.alert_is_present(), "Timed out waiting for confirmation popup." + ) + + def alert_accept(self): + # adapted from https://stackoverflow.com/a/34795883/837471 + def alert_is_not_present(object): + """Expect an alert to not be present.""" + try: + alert = self.driver.switch_to.alert + alert.text + return False + except NoAlertPresentException: + return True + + self.driver.switch_to.alert.accept() + WebDriverWait(self.driver, self._TIMEOUT, self._POLL_FREQUENCY).until( + alert_is_not_present, "Timed out waiting for confirmation popup to disappear." + ) + + +# TODO(AD): This is intended to eventually replace the SourceNavigationStepsMixin +class SourceAppNagivator: + def __init__( + self, + source_app_base_url: str, + web_driver: WebDriver, + accept_languages: Optional[str] = None, + ) -> None: + self._source_app_base_url = source_app_base_url + self.nav_helper = _NavigationHelper(web_driver) + self.driver = web_driver + self.accept_languages = accept_languages + + def _is_on_source_homepage(self) -> WebElement: + return self.nav_helper.wait_for(lambda: self.driver.find_element_by_id("source-index")) + + def source_visits_source_homepage(self) -> None: + self.driver.get(self._source_app_base_url) + assert self._is_on_source_homepage() + + def _is_on_generate_page(self) -> WebElement: + return self.nav_helper.wait_for(lambda: self.driver.find_element_by_id("create-form")) + + def source_clicks_submit_documents_on_homepage(self) -> None: + # It's the source's first time visiting this SecureDrop site, so they + # choose to "Submit Documents". + self.nav_helper.safe_click_by_id("submit-documents-button") + + # The source should now be on the page where they are presented with + # a diceware codename they can use for subsequent logins + assert self._is_on_generate_page() + + def source_continues_to_submit_page(self) -> None: + self.nav_helper.safe_click_by_id("continue-button") + + def submit_page_loaded() -> None: + if not self.accept_languages: + headline = self.driver.find_element_by_class_name("headline") + assert "Submit Files or Messages" == headline.text + + self.nav_helper.wait_for(submit_page_loaded) + + def _is_on_logout_page(self) -> WebElement: + return self.nav_helper.wait_for( + lambda: self.driver.find_element_by_id("click-new-identity-tor") + ) + + def source_logs_out(self) -> None: + self.nav_helper.safe_click_by_id("logout") + assert self._is_on_logout_page() + + def source_retrieves_codename_from_hint(self) -> str: + # The DETAILS element will be missing the OPEN attribute if it is + # closed, hiding its contents. + content = self.driver.find_element_by_css_selector("details#codename-hint") + assert content.get_attribute("open") is None + + self.nav_helper.safe_click_by_id("codename-hint") + + assert content.get_attribute("open") is not None + content_content = self.driver.find_element_by_css_selector("details#codename-hint mark") + return content_content.text + + def source_chooses_to_login(self) -> None: + self.driver.find_element_by_id("login-button").click() + self.nav_helper.wait_for( + lambda: self.driver.find_elements_by_id("login-with-existing-codename") + ) + + def _is_logged_in(self) -> WebElement: + return self.nav_helper.wait_for(lambda: self.driver.find_element_by_id("logout")) + + def source_proceeds_to_login(self, codename: str) -> None: + self.nav_helper.safe_send_keys_by_id("login-with-existing-codename", codename) + self.nav_helper.safe_click_by_id("login") + + # Check that we've logged in + assert self._is_logged_in() + + replies = self.driver.find_elements_by_id("replies") + assert len(replies) == 1 + + def source_submits_a_message(self, message: str = "S3cr3t m3ss4ge") -> str: + # Write the message to submit + self.nav_helper.safe_send_keys_by_css_selector("[name=msg]", message) + + # Hit the submit button + submit_button = self.driver.find_element_by_id("submit-doc-button") + submit_button.click() + + # Wait for confirmation that the message was submitted + def message_submitted(): + if not self.accept_languages: + notification = self.driver.find_element_by_css_selector(".success") + assert "Thank" in notification.text + return notification.text + + # Return the confirmation notification + notification_text = self.nav_helper.wait_for(message_submitted) + return notification_text + + def source_sees_flash_message(self) -> str: + notification = self.driver.find_element_by_css_selector(".notification") + assert notification + return notification diff --git a/securedrop/tests/functional/conftest.py b/securedrop/tests/functional/conftest.py new file mode 100644 index 00000000000..d064e5b5f6f --- /dev/null +++ b/securedrop/tests/functional/conftest.py @@ -0,0 +1,158 @@ +from contextlib import contextmanager +from dataclasses import dataclass +from multiprocessing.context import Process +from pathlib import Path +from typing import Generator +import requests +from selenium.webdriver.firefox.webdriver import WebDriver + +import pytest +from models import Journalist +from tests.functional.db_session import get_database_session +from tests.functional.factories import SecureDropConfigFactory +from tests.functional.sd_config_v2 import SecureDropConfig +from tests.functional.app_navigators import WebDriverTypeEnum, get_web_driver +import socket +import time + + +# Function-scoped so that tests can be run in parallel if needed +@pytest.fixture(scope="function") +def firefox_web_driver() -> WebDriver: + with get_web_driver(web_driver_type=WebDriverTypeEnum.FIREFOX) as web_driver: + yield web_driver + + +# Function-scoped so that tests can be run in parallel if needed +@pytest.fixture(scope="function") +def tor_browser_web_driver() -> WebDriver: + with get_web_driver(web_driver_type=WebDriverTypeEnum.TOR_BROWSER) as web_driver: + yield web_driver + + +def _get_unused_port() -> int: + s = socket.socket() + s.bind(("127.0.0.1", 0)) + port = s.getsockname()[1] + s.close() + return port + + +def _start_source_server(port: int, config_to_use: SecureDropConfig) -> None: + # This function will be called in a separate Process that runs the source app + # Modify the sdconfig module in the app's memory so that it mirrors the supplied config + # Do this BEFORE importing any other module of the application so the modified config is + # what eventually gets imported by the app's code + import sdconfig + + sdconfig.config = config_to_use # type: ignore + + # Then start the source app + from source_app import create_app + + source_app = create_app(config_to_use) # type: ignore + source_app.run(port=port, debug=True, use_reloader=False, threaded=True) + + +def _start_journalist_server(port: int, config_to_use: SecureDropConfig) -> None: + # This function will be called in a separate Process that runs the journalist app + # Modify the sdconfig module in the app's memory so that it mirrors the supplied config + # Do this BEFORE importing any other module of the application so the modified config is + # what eventually gets imported by the app's code + import sdconfig + + sdconfig.config = config_to_use # type: ignore + + # Then start the journalist app + from journalist_app import create_app + + journalist_app = create_app(config_to_use) # type: ignore + journalist_app.run(port=port, debug=True, use_reloader=False, threaded=True) + + +@dataclass(frozen=True) +class SdServersFixtureResult: + source_app_base_url: str + journalist_app_base_url: str + + +@contextmanager +def spawn_sd_servers( + config_to_use: SecureDropConfig +) -> Generator[SdServersFixtureResult, None, None]: + """Spawn the source and journalist apps as separate processes with the supplied config.""" + journalist_app_process = None + source_app_process = None + try: + # Add a test journalist + with get_database_session( + database_uri=config_to_use.DATABASE_URI + ) as db_session_for_sd_servers: + journalist = Journalist( + username="journalist", + password="correct horse battery staple profanity oil chewy", + is_admin=True, + ) + journalist.otp_secret = "JHCOGO7VCER3EJ4L" + db_session_for_sd_servers.add(journalist) + db_session_for_sd_servers.commit() + + # Spawn the source and journalist web apps in separate processes + source_port = _get_unused_port() + journalist_port = _get_unused_port() + source_app_process = Process(target=_start_source_server, args=(source_port, config_to_use)) + source_app_process.start() + journalist_app_process = Process( + target=_start_journalist_server, args=(journalist_port, config_to_use) + ) + journalist_app_process.start() + source_app_base_url = f"http://127.0.0.1:{source_port}" + journalist_app_base_url = f"http://127.0.0.1:{journalist_port}" + + # Sleep until the source and journalist web apps are up and running + response_source_status_code = None + response_journalist_status_code = None + for _ in range(30): + try: + response_source = requests.get(source_app_base_url, timeout=1) + response_source_status_code = response_source.status_code + response_journalist = requests.get(journalist_app_base_url, timeout=1) + response_journalist_status_code = response_journalist.status_code + break + except requests.ConnectionError: + time.sleep(0.25) + assert response_source_status_code == 200 + assert response_journalist_status_code == 200 + + # Ready for the tests + yield SdServersFixtureResult( + source_app_base_url=source_app_base_url, + journalist_app_base_url=journalist_app_base_url, + ) + + # Clean everything up + finally: + if source_app_process: + source_app_process.terminate() + source_app_process.join() + if journalist_app_process: + journalist_app_process.terminate() + journalist_app_process.join() + + +# TODO(AD): This is intended to eventually replace the sd_servers fixture +@pytest.fixture(scope="session") +def sd_servers_v2(setup_journalist_key_and_gpg_folder): + """Spawn the source and journalist apps as separate processes with a default config.""" + default_config = SecureDropConfigFactory.create( + SECUREDROP_DATA_ROOT=Path("/tmp/sd-tests/functional"), + ) + + # Ensure the GPG settings match the one in the config to use, to ensure consistency + journalist_key_fingerprint, gpg_dir = setup_journalist_key_and_gpg_folder + assert Path(default_config.GPG_KEY_DIR) == gpg_dir + assert default_config.JOURNALIST_KEY == journalist_key_fingerprint + + # Spawn the apps in separate processes + with spawn_sd_servers(config_to_use=default_config) as sd_servers_result: + yield sd_servers_result diff --git a/securedrop/tests/functional/db_session.py b/securedrop/tests/functional/db_session.py new file mode 100644 index 00000000000..0cfe3321da2 --- /dev/null +++ b/securedrop/tests/functional/db_session.py @@ -0,0 +1,38 @@ +from contextlib import contextmanager +from typing import Generator + +from sqlalchemy.orm.session import Session + +from db import db +from flask import Flask +from flask_sqlalchemy import SQLAlchemy + + +@contextmanager +def _get_fake_db_module( + database_uri: str, +) -> Generator[SQLAlchemy, None, None]: + # flask-sqlalchemy's API only allows DB access via a Flask app + # So we create a fake Flask app just so we can get a connection to the DB + app_for_db_connection = Flask("FakeAppForDbConnection") + app_for_db_connection.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False + app_for_db_connection.config['SQLALCHEMY_DATABASE_URI'] = database_uri + db.init_app(app_for_db_connection) + + with app_for_db_connection.app_context(): + yield db + + +@contextmanager +def get_database_session(database_uri: str) -> Generator[Session, None, None]: + """Easily get a session to the DB without having to deal with Flask apps. + + Can be used for tests and utility scripts that need to add data to the DB outside of the + context of the source & journalist Flask applications. + """ + with _get_fake_db_module(database_uri) as initialized_db_module: + db_session = initialized_db_module.session + try: + yield db_session + finally: + db_session.close() diff --git a/securedrop/tests/functional/factories.py b/securedrop/tests/functional/factories.py new file mode 100644 index 00000000000..4c4e54e10d9 --- /dev/null +++ b/securedrop/tests/functional/factories.py @@ -0,0 +1,79 @@ +from pathlib import Path +import secrets +import shutil +import subprocess +from tests.functional.sd_config_v2 import DEFAULT_SECUREDROP_ROOT, FlaskAppConfig +from tests.functional.sd_config_v2 import SecureDropConfig + +from tests.functional.db_session import _get_fake_db_module + + +def _generate_random_token() -> str: + return secrets.token_hex(32) + + +class FlaskAppConfigFactory: + @staticmethod + def create(SESSION_COOKIE_NAME: str) -> FlaskAppConfig: + """Create a Flask app config suitable for the unit tests.""" + return FlaskAppConfig( + SESSION_COOKIE_NAME=SESSION_COOKIE_NAME, + SECRET_KEY=_generate_random_token(), + TESTING=True, + USE_X_SENDFILE=False, + # Disable CSRF checks to make writing tests easier + WTF_CSRF_ENABLED=False, + ) + + +class SecureDropConfigFactory: + @staticmethod + def create( + SECUREDROP_DATA_ROOT: Path, + SESSION_EXPIRATION_MINUTES: float = 120, + NOUNS: Path = DEFAULT_SECUREDROP_ROOT / "dictionaries" / "nouns.txt", + ADJECTIVES: Path = DEFAULT_SECUREDROP_ROOT / "dictionaries" / "adjectives.txt", + ) -> SecureDropConfig: + """Create a securedrop config suitable for the unit tests. + + It will automatically create an initialized DB at SECUREDROP_DATA_ROOT/db.sqlite which will + be set as the DATABASE_FILE. + """ + # Clear the data root directory + if SECUREDROP_DATA_ROOT.exists(): + shutil.rmtree(SECUREDROP_DATA_ROOT) + SECUREDROP_DATA_ROOT.mkdir(parents=True) + database_file = SECUREDROP_DATA_ROOT / "db.sqlite" + + config = SecureDropConfig( + SESSION_EXPIRATION_MINUTES=SESSION_EXPIRATION_MINUTES, + SECUREDROP_DATA_ROOT=str(SECUREDROP_DATA_ROOT), + DATABASE_FILE=str(database_file), + DATABASE_ENGINE="sqlite", + JOURNALIST_APP_FLASK_CONFIG_CLS=FlaskAppConfigFactory.create(SESSION_COOKIE_NAME="js"), + SOURCE_APP_FLASK_CONFIG_CLS=FlaskAppConfigFactory.create(SESSION_COOKIE_NAME="ss"), + SCRYPT_GPG_PEPPER=_generate_random_token(), + SCRYPT_ID_PEPPER=_generate_random_token(), + SCRYPT_PARAMS=dict(N=2 ** 14, r=8, p=1), + WORKER_PIDFILE="/tmp/securedrop_test_worker.pid", + RQ_WORKER_NAME="test", + NOUNS=str(NOUNS), + ADJECTIVES=str(ADJECTIVES), + # The next 2 fields must match the GPG fixture + GPG_KEY_DIR="/tmp/securedrop/keys", + JOURNALIST_KEY="65A1B5FF195B56353CC63DFFCC40EF1228271441", + ) + + # Delete any previous/existing DB and tnitialize a new one + database_file.unlink(missing_ok=True) # type: ignore + database_file.touch() + subprocess.check_call(["sqlite3", database_file, ".databases"]) + with _get_fake_db_module(database_uri=config.DATABASE_URI) as initialized_db_module: + initialized_db_module.create_all() + + # Create the other directories + Path(config.TEMP_DIR).mkdir(parents=True) + Path(config.STORE_DIR).mkdir(parents=True) + + # All done + return config diff --git a/securedrop/tests/functional/journalist_navigation_steps.py b/securedrop/tests/functional/journalist_navigation_steps.py index 1a9d2961420..d428140b276 100644 --- a/securedrop/tests/functional/journalist_navigation_steps.py +++ b/securedrop/tests/functional/journalist_navigation_steps.py @@ -22,6 +22,7 @@ # Number of times to try flaky clicks. from encryption import EncryptionManager +from tests.functional.tor_utils import proxies_for_url from source_user import _SourceScryptManager from tests.test_encryption import import_journalist_private_key @@ -65,10 +66,7 @@ def return_downloaded_content(self, url, cookies): :param cookies: the cookies to access :return: Content of the URL """ - proxies = None - if ".onion" in url: - proxies = {"http": "socks5h://127.0.0.1:9150", "https": "socks5h://127.0.0.1:9150"} - r = requests.get(url, cookies=cookies, proxies=proxies, stream=True) + r = requests.get(url, cookies=cookies, proxies=proxies_for_url(url), stream=True) if r.status_code != 200: raise Exception("Failed to download the data.") data = b"" diff --git a/securedrop/tests/functional/sd_config_v2.py b/securedrop/tests/functional/sd_config_v2.py new file mode 100644 index 00000000000..966e1145ea2 --- /dev/null +++ b/securedrop/tests/functional/sd_config_v2.py @@ -0,0 +1,102 @@ +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, List, Optional + + +@dataclass(frozen=True) +class FlaskAppConfig: + SESSION_COOKIE_NAME: str + SECRET_KEY: str + + DEBUG: bool = False + TESTING: bool = False + WTF_CSRF_ENABLED: bool = True + + # Use MAX_CONTENT_LENGTH to mimic the behavior of Apache's LimitRequestBody + # in the development environment. See #1714. + MAX_CONTENT_LENGTH: int = 524288000 + + # This is recommended for performance, and also resolves #369 + USE_X_SENDFILE: bool = True + + +DEFAULT_SECUREDROP_ROOT = Path(__file__).absolute().parent.parent.parent + + +# TODO(AD): This mirrors the field in an SDConfig; it is intended to eventually replace it +@dataclass(frozen=True) +class SecureDropConfig: + JOURNALIST_APP_FLASK_CONFIG_CLS: FlaskAppConfig + SOURCE_APP_FLASK_CONFIG_CLS: FlaskAppConfig + + GPG_KEY_DIR: str + JOURNALIST_KEY: str + SCRYPT_GPG_PEPPER: str + SCRYPT_ID_PEPPER: str + SCRYPT_PARAMS: Dict[str, int] + + SECUREDROP_DATA_ROOT: str + + DATABASE_ENGINE: str + DATABASE_FILE: str + + # The following fields are required if the DB engine is NOT sqlite + DATABASE_USERNAME: Optional[str] = None + DATABASE_PASSWORD: Optional[str] = None + DATABASE_HOST: Optional[str] = None + DATABASE_NAME: Optional[str] = None + + SECUREDROP_ROOT: str = str(DEFAULT_SECUREDROP_ROOT) + TRANSLATION_DIRS: Path = DEFAULT_SECUREDROP_ROOT / "translations" + SOURCE_TEMPLATES_DIR: str = str(DEFAULT_SECUREDROP_ROOT / "source_templates") + JOURNALIST_TEMPLATES_DIR: str = str(DEFAULT_SECUREDROP_ROOT / "journalist_templates") + WORD_LIST: str = str(DEFAULT_SECUREDROP_ROOT / "wordlist") + NOUNS: str = str(DEFAULT_SECUREDROP_ROOT / "dictionaries" / "nouns.txt") + ADJECTIVES: str = str(DEFAULT_SECUREDROP_ROOT / "dictionaries" / "adjectives.txt") + + DEFAULT_LOCALE: str = "en_US" + SUPPORTED_LOCALES: List[str] = field(default_factory=lambda: ["en_US"]) + + SESSION_EXPIRATION_MINUTES: float = 120 + + WORKER_PIDFILE: str = "/tmp/securedrop_worker.pid" + RQ_WORKER_NAME: str = "default" + + @property + def TEMP_DIR(self) -> str: + # We use a directory under the SECUREDROP_DATA_ROOT instead of `/tmp` because + # we need to expose this directory via X-Send-File, and want to minimize the + # potential for exposing unintended files. + return str(Path(self.SECUREDROP_DATA_ROOT) / "tmp") + + @property + def STORE_DIR(self) -> str: + return str(Path(self.SECUREDROP_DATA_ROOT) / "store") + + @property + def DATABASE_URI(self) -> str: + if self.DATABASE_ENGINE == "sqlite": + db_uri = f"{self.DATABASE_ENGINE}:///{self.DATABASE_FILE}" + + else: + if self.DATABASE_USERNAME is None: + raise RuntimeError("Missing DATABASE_USERNAME entry from config.py") + if self.DATABASE_PASSWORD is None: + raise RuntimeError("Missing DATABASE_PASSWORD entry from config.py") + if self.DATABASE_HOST is None: + raise RuntimeError("Missing DATABASE_HOST entry from config.py") + if self.DATABASE_NAME is None: + raise RuntimeError("Missing DATABASE_NAME entry from config.py") + + db_uri = ( + self.DATABASE_ENGINE + + "://" + + self.DATABASE_USERNAME + + ":" + + self.DATABASE_PASSWORD + + "@" + + self.DATABASE_HOST + + "/" + + self.DATABASE_NAME + ) + return db_uri diff --git a/securedrop/tests/functional/source_navigation_steps.py b/securedrop/tests/functional/source_navigation_steps.py index d777cae6a3c..77ebbe35a41 100644 --- a/securedrop/tests/functional/source_navigation_steps.py +++ b/securedrop/tests/functional/source_navigation_steps.py @@ -1,6 +1,5 @@ import tempfile import time -import json import pytest from selenium.common.exceptions import NoSuchElementException @@ -29,13 +28,6 @@ def _source_visits_source_homepage(self): self.driver.get(self.source_location) assert self._is_on_source_homepage() - def _source_checks_instance_metadata(self): - self.driver.get(self.source_location + "/metadata") - j = json.loads(self.driver.find_element_by_tag_name("body").text) - assert j["server_os"] == "20.04" - assert j["sd_version"] == self.source_app.jinja_env.globals["version"] - assert j["gpg_fpr"] != "" - def _source_clicks_submit_documents_on_homepage(self): # It's the source's first time visiting this SecureDrop site, so they diff --git a/securedrop/tests/functional/test_source_metadata.py b/securedrop/tests/functional/test_source_metadata.py index 8a14791f66d..c2fa0a4dccb 100644 --- a/securedrop/tests/functional/test_source_metadata.py +++ b/securedrop/tests/functional/test_source_metadata.py @@ -1,10 +1,17 @@ -from . import source_navigation_steps -from . import functional_test +import requests +from tests.functional import tor_utils +from version import __version__ -class TestInstanceMetadata( - functional_test.FunctionalTest, - source_navigation_steps.SourceNavigationStepsMixin): +class TestSourceAppInstanceMetadata: - def test_instance_metadata(self): - self._source_checks_instance_metadata() + def test_instance_metadata(self, sd_servers_v2): + # Given a source app, when fetching the instance's metadata + url = f"{sd_servers_v2.source_app_base_url}/metadata" + response = requests.get(url=url, proxies=tor_utils.proxies_for_url(url)) + + # Then it succeeds and the right information is returned + returned_data = response.json() + assert returned_data["server_os"] == "20.04" + assert returned_data["sd_version"] == __version__ + assert returned_data["gpg_fpr"] diff --git a/securedrop/tests/functional/test_source_notfound.py b/securedrop/tests/functional/test_source_notfound.py index 2da3889259b..404dac3410a 100644 --- a/securedrop/tests/functional/test_source_notfound.py +++ b/securedrop/tests/functional/test_source_notfound.py @@ -1,10 +1,11 @@ -from . import source_navigation_steps -from . import functional_test +# TODO(AD): This test duplicates TestSourceLayout.test_notfound() - remove it? +class TestSourceAppNotFound: + def test_not_found(self, sd_servers_v2, tor_browser_web_driver): + # Given a source user + # When they try to access a page that does not exist + tor_browser_web_driver.get(f"{sd_servers_v2.source_app_base_url}/does_not_exist") -class TestSourceInterfaceNotFound( - functional_test.FunctionalTest, - source_navigation_steps.SourceNavigationStepsMixin): - - def test_not_found(self): - self._source_not_found() + # Then the right error is displayed + message = tor_browser_web_driver.find_element_by_id("page-not-found") + assert message.is_displayed() diff --git a/securedrop/tests/functional/test_source_session_timeout.py b/securedrop/tests/functional/test_source_session_timeout.py index a2db7a3483f..93b339ccbd4 100644 --- a/securedrop/tests/functional/test_source_session_timeout.py +++ b/securedrop/tests/functional/test_source_session_timeout.py @@ -1,17 +1,56 @@ -from . import source_navigation_steps -from . import functional_test +import time +from pathlib import Path +import pytest -class TestSourceSessions( - functional_test.FunctionalTest, - source_navigation_steps.SourceNavigationStepsMixin): +from tests.functional.conftest import spawn_sd_servers +from tests.functional.app_navigators import SourceAppNagivator +from tests.functional.factories import SecureDropConfigFactory - session_expiration = 5 - def test_source_session_timeout(self): - self._source_visits_source_homepage() - self._source_clicks_submit_documents_on_homepage() - self._source_continues_to_submit_page() - self._source_waits_for_session_to_timeout() - self.driver.refresh() - self._source_sees_session_timeout_message() +# Very short session expiration time +SESSION_EXPIRATION_SECONDS = 3 + + +@pytest.fixture(scope="session") +def sd_servers_with_short_session_timeout(setup_journalist_key_and_gpg_folder): + """Spawn the source and journalist apps as separate processes with a short session timeout.""" + # Generate a securedrop config with a very short session timeout + config_with_short_timeout = SecureDropConfigFactory.create( + SESSION_EXPIRATION_MINUTES=SESSION_EXPIRATION_SECONDS / 60, + SECUREDROP_DATA_ROOT=Path("/tmp/sd-tests/functional-session-timeout"), + ) + + # Ensure the GPG settings match the one in the config to use, to ensure consistency + journalist_key_fingerprint, gpg_dir = setup_journalist_key_and_gpg_folder + assert Path(config_with_short_timeout.GPG_KEY_DIR) == gpg_dir + assert config_with_short_timeout.JOURNALIST_KEY == journalist_key_fingerprint + + # Spawn the apps in separate processes + with spawn_sd_servers(config_to_use=config_with_short_timeout) as sd_servers_result: + yield sd_servers_result + + +# TODO(AD): This test duplicates TestSourceSessionLayout - remove it? +class TestSourceAppSessionExpiration: + def test(self, sd_servers_with_short_session_timeout, tor_browser_web_driver): + navigator = SourceAppNagivator( + source_app_base_url=sd_servers_with_short_session_timeout.source_app_base_url, + web_driver=tor_browser_web_driver, + ) + + # Given a source user who's logged in and is using the source app + navigator.source_visits_source_homepage() + navigator.source_clicks_submit_documents_on_homepage() + navigator.source_continues_to_submit_page() + + # And their session just expired + time.sleep(SESSION_EXPIRATION_SECONDS + 1) + + # When the source user reloads the page + navigator.driver.refresh() + + # Then the source user sees the "session expired" message + notification = navigator.driver.find_element_by_css_selector(".important") + expected_text = "You were logged out due to inactivity." + assert expected_text in notification.text diff --git a/securedrop/tests/functional/test_source_warnings.py b/securedrop/tests/functional/test_source_warnings.py index e69333ac689..258c5c8e01f 100644 --- a/securedrop/tests/functional/test_source_warnings.py +++ b/securedrop/tests/functional/test_source_warnings.py @@ -1,78 +1,105 @@ import os import shutil - +import pytest from selenium import webdriver +from tests.functional.app_navigators import SourceAppNagivator + from . import functional_test -from . import source_navigation_steps - - -class TestSourceInterfaceBannerWarnings( - functional_test.FunctionalTest, source_navigation_steps.SourceNavigationStepsMixin -): - def test_warning_appears_if_tor_browser_not_in_use(self): - try: - self.switch_to_firefox_driver() - self.driver.get(self.source_location) - - warning_banner = self.driver.find_element_by_id("use-tor-browser") - - assert "It is recommended to use Tor Browser" in warning_banner.text - - # User should be able to dismiss the warning - warning_dismiss_button = self.driver.find_element_by_id("use-tor-browser-close") - self.banner_is_dismissed(warning_banner, warning_dismiss_button) - finally: - self.switch_to_torbrowser_driver() - - def test_warning_appears_if_orbot_is_used(self): - orbotUserAgent = "Mozilla/5.0 (Android; Mobile;" " rv:52.0) Gecko/20100101 Firefox/52.0" - - self.f_profile_path2 = "/tmp/testprofile2" - if os.path.exists(self.f_profile_path2): - shutil.rmtree(self.f_profile_path2) - # Create new profile and driver with the orbot user agent for this test - os.mkdir(self.f_profile_path2) - profile = webdriver.FirefoxProfile(self.f_profile_path2) - profile.set_preference("general.useragent.override", orbotUserAgent) - if self.journalist_location.find(".onion") != -1: - # set FF preference to socks proxy in Tor Browser - profile.set_preference("network.proxy.type", 1) - profile.set_preference("network.proxy.socks", "127.0.0.1") - profile.set_preference("network.proxy.socks_port", 9150) - profile.set_preference("network.proxy.socks_version", 5) - profile.set_preference("network.proxy.socks_remote_dns", True) - profile.set_preference("network.dns.blockDotOnion", False) - profile.update_preferences() - self.driver2 = webdriver.Firefox( - firefox_binary=functional_test.FIREFOX_PATH, firefox_profile=profile + + +@pytest.fixture +def orbot_web_driver(sd_servers_v2): + # Create new profile and driver with the orbot user agent + orbot_user_agent = "Mozilla/5.0 (Android; Mobile; rv:52.0) Gecko/20100101 Firefox/52.0" + f_profile_path2 = "/tmp/testprofile2" + if os.path.exists(f_profile_path2): + shutil.rmtree(f_profile_path2) + os.mkdir(f_profile_path2) + profile = webdriver.FirefoxProfile(f_profile_path2) + profile.set_preference("general.useragent.override", orbot_user_agent) + + if sd_servers_v2.journalist_app_base_url.find(".onion") != -1: + # set FF preference to socks proxy in Tor Browser + profile.set_preference("network.proxy.type", 1) + profile.set_preference("network.proxy.socks", "127.0.0.1") + profile.set_preference("network.proxy.socks_port", 9150) + profile.set_preference("network.proxy.socks_version", 5) + profile.set_preference("network.proxy.socks_remote_dns", True) + profile.set_preference("network.dns.blockDotOnion", False) + profile.update_preferences() + orbot_web_driver = webdriver.Firefox( + firefox_binary=functional_test.FIREFOX_PATH, firefox_profile=profile + ) + + try: + driver_user_agent = orbot_web_driver.execute_script("return navigator.userAgent") + assert driver_user_agent == orbot_user_agent + yield orbot_web_driver + finally: + orbot_web_driver.quit() + + +class TestSourceAppBrowserWarnings: + def test_warning_appears_if_tor_browser_not_in_use(self, sd_servers_v2, firefox_web_driver): + # Given a user + navigator = SourceAppNagivator( + source_app_base_url=sd_servers_v2.source_app_base_url, + # Who is using Firefox instead of the tor browser + web_driver=firefox_web_driver, ) - self.driver2.get(self.source_location) - currentAgent = self.driver2.execute_script("return navigator.userAgent") - assert currentAgent == orbotUserAgent + # When they access the source app's home page + navigator.source_visits_source_homepage() - warning_banner = self.driver2.find_element_by_id("orfox-browser") + # Then they see a warning + warning_banner = navigator.driver.find_element_by_id("use-tor-browser") + assert "It is recommended to use Tor Browser" in warning_banner.text - assert "It is recommended you use the desktop version of Tor Browser" in warning_banner.text + # And they are able to dismiss the warning + warning_dismiss_button = navigator.driver.find_element_by_id("use-tor-browser-close") + warning_dismiss_button.click() - # User should be able to dismiss the warning - warning_dismiss_button = self.driver2.find_element_by_id("orfox-browser-close") - self.banner_is_dismissed(warning_banner, warning_dismiss_button) + def warning_banner_is_hidden(): + assert warning_banner.is_displayed() is False - self.driver2.quit() + navigator.nav_helper.wait_for(warning_banner_is_hidden) - def banner_is_dismissed(self, warning_banner, dismiss_button): + def test_warning_appears_if_orbot_is_used(self, sd_servers_v2, orbot_web_driver): + # Given a user + navigator = SourceAppNagivator( + source_app_base_url=sd_servers_v2.source_app_base_url, + # Who is using Orbot instead of the (desktop) Tor browser + web_driver=orbot_web_driver, + ) + + # When they access the source app's home page + navigator.source_visits_source_homepage() + + # Then they see a warning + warning_banner = navigator.driver.find_element_by_id("orfox-browser") + assert "use the desktop version of Tor Browser" in warning_banner.text - dismiss_button.click() + # And they are able to dismiss the warning + warning_dismiss_button = navigator.driver.find_element_by_id("orfox-browser-close") + warning_dismiss_button.click() def warning_banner_is_hidden(): assert warning_banner.is_displayed() is False - self.wait_for(warning_banner_is_hidden) + navigator.nav_helper.wait_for(warning_banner_is_hidden) + + def test_warning_high_security(self, sd_servers_v2, tor_browser_web_driver): + # Given a user + navigator = SourceAppNagivator( + source_app_base_url=sd_servers_v2.source_app_base_url, + # Who is using the Tor browser + web_driver=tor_browser_web_driver, + ) - def test_warning_high_security(self): - self.driver.get(self.source_location) + # When they access the source app's home page + navigator.source_visits_source_homepage() - banner = self.driver.find_element_by_id("js-warning") + # Then they see a warning + banner = navigator.driver.find_element_by_id("js-warning") assert "Security Slider to Safest", banner.text diff --git a/securedrop/tests/functional/tor_utils.py b/securedrop/tests/functional/tor_utils.py new file mode 100644 index 00000000000..f35d820e918 --- /dev/null +++ b/securedrop/tests/functional/tor_utils.py @@ -0,0 +1,9 @@ +from typing import Dict, Optional + + +def proxies_for_url(url: str) -> Optional[Dict[str, str]]: + """Generate the right proxies argument to pass to requests.get() for supporting Tor.""" + proxies = None + if ".onion" in url: + proxies = {"http": "socks5h://127.0.0.1:9150", "https": "socks5h://127.0.0.1:9150"} + return proxies