From 0269f6f78610de34b35dba142b3913afbf27f546 Mon Sep 17 00:00:00 2001 From: Alban Diquet Date: Fri, 4 Dec 2020 21:19:51 -0800 Subject: [PATCH] Move passphrase logic from CryptoUtil to PassphrasesGenerator Move passphrase logic from CryptoUtil to PassphrasesGenerator Move passphrase logic from CryptoUtil to PassphrasesGenerator Re-add generate test Ensure passphrases are random Fix pylint false positive Fix test Rename to PassphraseGenerator Address feedback from review Fix flake8 Fix test Allow non-perfect word lists and re-enable french word list Update test parameters Bump minimum word count to 6000 Fix test Tweak word lists requirements Fix docstring --- securedrop/create-dev-data.py | 4 +- securedrop/crypto_util.py | 53 ++-------- securedrop/journalist_app/__init__.py | 1 - securedrop/journalist_app/account.py | 8 +- securedrop/journalist_app/admin.py | 13 ++- securedrop/journalist_app/utils.py | 16 --- securedrop/manage.py | 16 +-- securedrop/models.py | 4 - securedrop/passphrases.py | 138 ++++++++++++++++++++++++++ securedrop/qa_loader.py | 3 +- securedrop/sdconfig.py | 1 - securedrop/source_app/__init__.py | 1 - securedrop/source_app/forms.py | 8 +- securedrop/source_app/utils.py | 29 ++---- securedrop/tests/test_crypto_util.py | 39 +------- securedrop/tests/test_journalist.py | 12 +-- securedrop/tests/test_passphrases.py | 87 ++++++++++++++++ securedrop/tests/test_source.py | 74 +++----------- securedrop/tests/utils/db_helper.py | 7 +- securedrop/wordlist | 1 - 20 files changed, 294 insertions(+), 221 deletions(-) create mode 100644 securedrop/passphrases.py create mode 100644 securedrop/tests/test_passphrases.py delete mode 120000 securedrop/wordlist diff --git a/securedrop/create-dev-data.py b/securedrop/create-dev-data.py index a6e32e1920..9266544397 100755 --- a/securedrop/create-dev-data.py +++ b/securedrop/create-dev-data.py @@ -10,6 +10,8 @@ from flask import current_app +from passphrases import PassphraseGenerator + os.environ["SECUREDROP_ENV"] = "dev" # noqa import journalist_app @@ -104,7 +106,7 @@ def create_source_data( num_replies: int = 2, ) -> None: # Store source in database - codename = current_app.crypto_util.genrandomid() + codename = PassphraseGenerator.get_default().generate_passphrase() filesystem_id = current_app.crypto_util.hash_codename(codename) journalist_designation = current_app.crypto_util.display_id() source = Source(filesystem_id, journalist_designation) diff --git a/securedrop/crypto_util.py b/securedrop/crypto_util.py index 142d9a1a8c..19e050a6c7 100644 --- a/securedrop/crypto_util.py +++ b/securedrop/crypto_util.py @@ -22,6 +22,8 @@ import rm from models import Source +from passphrases import DicewarePassphrase + # monkey patch to work with Focal gnupg. # https://github.com/isislovecruft/python-gnupg/issues/250 @@ -66,7 +68,6 @@ class CryptoException(Exception): class CryptoUtil: GPG_KEY_TYPE = "RSA" - DEFAULT_WORDS_IN_RANDOM_ID = 8 # All reply keypairs will be "created" on the same day SecureDrop (then # Strongbox) was publicly released for the first time. @@ -87,12 +88,10 @@ def __init__(self, scrypt_id_pepper: str, scrypt_gpg_pepper: str, securedrop_root: str, - word_list: str, nouns_file: str, adjectives_file: str, gpg_key_dir: str) -> None: self.__securedrop_root = securedrop_root - self.__word_list = word_list if os.environ.get('SECUREDROP_ENV') in ('dev', 'test'): # Optimize crypto to speed up tests (at the expense of security @@ -119,9 +118,6 @@ def __init__(self, else: self.gpg = gpg_binary - # map code for a given language to a localized wordlist - self.__language2words = {} # type: Dict[str, List[str]] - with io.open(nouns_file) as f: self.nouns = f.read().splitlines() @@ -138,40 +134,6 @@ def do_runtime_tests(self) -> None: if not rm.check_secure_delete_capability(): raise AssertionError("Secure file deletion is not possible.") - def get_wordlist(self, locale: str) -> List[str]: - """" Ensure the wordlist for the desired locale is read and available - in the words global variable. If there is no wordlist for the - desired local, fallback to the default english wordlist. - - The localized wordlist are read from wordlists/{locale}.txt but - for backward compatibility purposes the english wordlist is read - from the config.WORD_LIST file. - """ - - if locale not in self.__language2words: - if locale != 'en': - path = os.path.join(self.__securedrop_root, - 'wordlists', - locale + '.txt') - if os.path.exists(path): - wordlist_path = path - else: - wordlist_path = self.__word_list - else: - wordlist_path = self.__word_list - - with io.open(wordlist_path) as f: - content = f.read().splitlines() - self.__language2words[locale] = content - - return self.__language2words[locale] - - def genrandomid(self, - words_in_random_id: int = DEFAULT_WORDS_IN_RANDOM_ID, - locale: str = 'en') -> str: - return ' '.join(random.choice(self.get_wordlist(locale)) - for x in range(words_in_random_id)) - def display_id(self) -> str: """Generate random journalist_designation until we get an unused one""" @@ -189,7 +151,7 @@ def display_id(self) -> str: raise ValueError("Could not generate unique journalist designation for new source") - def hash_codename(self, codename: str, salt: Optional[str] = None) -> str: + def hash_codename(self, codename: DicewarePassphrase, salt: Optional[str] = None) -> str: """Salts and hashes a codename using scrypt. :param codename: A source's codename. @@ -198,10 +160,9 @@ def hash_codename(self, codename: str, salt: Optional[str] = None) -> str: """ if salt is None: salt = self.scrypt_id_pepper - _validate_name_for_diceware(codename) return b32encode(scrypt.hash(codename, salt, **self.scrypt_params)).decode('utf-8') - def genkeypair(self, name: str, secret: str) -> gnupg._parsers.GenKey: + def genkeypair(self, name: str, secret: DicewarePassphrase) -> gnupg._parsers.GenKey: """Generate a GPG key through batch file key generation. A source's codename is salted with SCRYPT_GPG_PEPPER and hashed with scrypt to provide the passphrase used to encrypt their private key. Their name @@ -222,11 +183,11 @@ def genkeypair(self, name: str, secret: str) -> gnupg._parsers.GenKey: """ _validate_name_for_diceware(name) - secret = self.hash_codename(secret, salt=self.scrypt_gpg_pepper) + hashed_secret = self.hash_codename(secret, salt=self.scrypt_gpg_pepper) genkey_obj = self.gpg.gen_key(self.gpg.gen_key_input( key_type=self.GPG_KEY_TYPE, key_length=self.__gpg_key_length, - passphrase=secret, + passphrase=hashed_secret, name_email=name, name_real="Source Key", creation_date=self.DEFAULT_KEY_CREATION_DATE.isoformat(), @@ -336,7 +297,7 @@ def encrypt(self, plaintext: str, fingerprints: List[str], output: Optional[str] else: raise CryptoException(out.stderr) - def decrypt(self, secret: str, ciphertext: bytes) -> str: + def decrypt(self, secret: DicewarePassphrase, ciphertext: bytes) -> str: """ >>> crypto = current_app.crypto_util >>> key = crypto.genkeypair('randomid', 'randomid') diff --git a/securedrop/journalist_app/__init__.py b/securedrop/journalist_app/__init__.py index 28ab3633b1..d0ac1b3403 100644 --- a/securedrop/journalist_app/__init__.py +++ b/securedrop/journalist_app/__init__.py @@ -70,7 +70,6 @@ def create_app(config: 'SDConfig') -> Flask: scrypt_id_pepper=config.SCRYPT_ID_PEPPER, scrypt_gpg_pepper=config.SCRYPT_GPG_PEPPER, securedrop_root=config.SECUREDROP_ROOT, - word_list=config.WORD_LIST, nouns_file=config.NOUNS, adjectives_file=config.ADJECTIVES, gpg_key_dir=config.GPG_KEY_DIR, diff --git a/securedrop/journalist_app/account.py b/securedrop/journalist_app/account.py index 9742888bfa..35745121d4 100644 --- a/securedrop/journalist_app/account.py +++ b/securedrop/journalist_app/account.py @@ -6,9 +6,11 @@ flash, session) from flask_babel import gettext +import i18n from db import db -from journalist_app.utils import (make_password, set_diceware_password, set_name, validate_user, +from journalist_app.utils import (set_diceware_password, set_name, validate_user, validate_hotp_secret) +from passphrases import PassphraseGenerator from sdconfig import SDConfig @@ -17,7 +19,9 @@ def make_blueprint(config: SDConfig) -> Blueprint: @view.route('/account', methods=('GET',)) def edit() -> str: - password = make_password(config) + password = PassphraseGenerator.get_default().generate_passphrase( + preferred_language=i18n.get_language(config) + ) return render_template('edit_account.html', password=password) diff --git a/securedrop/journalist_app/admin.py b/securedrop/journalist_app/admin.py index 0b74dc4187..b7f481f4c7 100644 --- a/securedrop/journalist_app/admin.py +++ b/securedrop/journalist_app/admin.py @@ -11,14 +11,16 @@ from sqlalchemy.exc import IntegrityError from sqlalchemy.orm.exc import NoResultFound +import i18n from db import db from models import (InstanceConfig, Journalist, InvalidUsernameException, FirstOrLastNameError, PasswordError) from journalist_app.decorators import admin_required -from journalist_app.utils import (make_password, commit_account_changes, set_diceware_password, +from journalist_app.utils import (commit_account_changes, set_diceware_password, validate_hotp_secret, revoke_token) from journalist_app.forms import LogoForm, NewUserForm, SubmissionPreferencesForm from sdconfig import SDConfig +from passphrases import PassphraseGenerator def make_blueprint(config: SDConfig) -> Blueprint: @@ -124,8 +126,11 @@ def add_user() -> Union[str, werkzeug.Response]: return redirect(url_for('admin.new_user_two_factor', uid=new_user.id)) + password = PassphraseGenerator.get_default().generate_passphrase( + preferred_language=i18n.get_language(config) + ) return render_template("admin_add_user.html", - password=make_password(config), + password=password, form=form) @view.route('/2fa', methods=('GET', 'POST')) @@ -221,7 +226,9 @@ def edit_user(user_id: int) -> Union[str, werkzeug.Response]: commit_account_changes(user) - password = make_password(config) + password = PassphraseGenerator.get_default().generate_passphrase( + preferred_language=i18n.get_language(config) + ) return render_template("edit_account.html", user=user, password=password) diff --git a/securedrop/journalist_app/utils.py b/securedrop/journalist_app/utils.py index 62f5230612..63891abeb5 100644 --- a/securedrop/journalist_app/utils.py +++ b/securedrop/journalist_app/utils.py @@ -12,8 +12,6 @@ from flask_babel import gettext, ngettext from sqlalchemy.exc import IntegrityError -import i18n - from db import db from models import ( BadTokenException, @@ -36,8 +34,6 @@ ) from store import add_checksum_for_file -from sdconfig import SDConfig - def logged_in() -> bool: # When a user is logged in, we push their user ID (database primary key) @@ -326,18 +322,6 @@ def col_delete(cols_selected: List[str]) -> werkzeug.Response: return redirect(url_for('main.index')) -def make_password(config: SDConfig) -> str: - while True: - password = current_app.crypto_util.genrandomid( - 7, - i18n.get_language(config)) - try: - Journalist.check_password_acceptable(password) - return password - except PasswordError: - continue - - def delete_collection(filesystem_id: str) -> None: # Delete the source's collection of submissions path = current_app.storage.path(filesystem_id) diff --git a/securedrop/manage.py b/securedrop/manage.py index acd909b3e1..a1f7e9d275 100755 --- a/securedrop/manage.py +++ b/securedrop/manage.py @@ -17,10 +17,11 @@ from flask.ctx import AppContext from typing import List +from passphrases import PassphraseGenerator + sys.path.insert(0, "/var/www/securedrop") # noqa: E402 import qrcode -from flask import current_app from sqlalchemy.orm.exc import NoResultFound os.environ['SECUREDROP_ENV'] = 'dev' # noqa @@ -30,7 +31,6 @@ FirstOrLastNameError, InvalidUsernameException, Journalist, - PasswordError, ) from management import app_context, config from management.run import run @@ -177,16 +177,6 @@ def _get_yubikey_usage() -> bool: print('Invalid answer. Please type "y" or "n"') -def _make_password() -> str: - while True: - password = current_app.crypto_util.genrandomid(7) - try: - Journalist.check_password_acceptable(password) - return password - except PasswordError: - continue - - def _add_user(is_admin: bool = False, context: Optional[AppContext] = None) -> int: with context or app_context(): username = _get_username() @@ -194,7 +184,7 @@ def _add_user(is_admin: bool = False, context: Optional[AppContext] = None) -> i last_name = _get_last_name() print("Note: Passwords are now autogenerated.") - password = _make_password() + password = PassphraseGenerator.get_default().generate_passphrase() print("This user's password is: {}".format(password)) is_hotp = _get_yubikey_usage() diff --git a/securedrop/models.py b/securedrop/models.py index a021d81b1e..65b5938f17 100644 --- a/securedrop/models.py +++ b/securedrop/models.py @@ -71,10 +71,6 @@ class Source(db.Model): # when deletion of the source was requested deleted_at = Column(DateTime) - # Don't create or bother checking excessively long codenames to prevent DoS - NUM_WORDS = 7 - MAX_CODENAME_LEN = 128 - def __init__(self, filesystem_id: str, journalist_designation: str) -> None: diff --git a/securedrop/passphrases.py b/securedrop/passphrases.py new file mode 100644 index 0000000000..c066d517ff --- /dev/null +++ b/securedrop/passphrases.py @@ -0,0 +1,138 @@ +from pathlib import Path +from random import SystemRandom +from typing import Optional, NewType, Set + +from typing import Dict +from typing import List + +from sdconfig import config + +# A list of words to be used by as a passphrase +# For example: "recede anytime acorn durably discuss" +# More details at https://www.rempe.us/diceware/#eff +DicewarePassphrase = NewType("DicewarePassphrase", str) + + +_current_generator = None # type: Optional["PassphraseGenerator"] + + +class InvalidWordListError(Exception): + pass + + +class PassphraseGenerator: + + PASSPHRASE_WORDS_COUNT = 7 + + # Enforce a reasonable maximum length for passphrases to avoid DoS + MAX_PASSPHRASE_LENGTH = 128 + MIN_PASSPHRASE_LENGTH = 20 + + _WORD_LIST_MINIMUM_SIZE = 7300 # Minimum number of words in any of the word lists + + def __init__( + self, language_to_words: Dict[str, List[str]], fallback_language: str = "en" + ) -> None: + # SystemRandom sources from the system rand (e.g. urandom, CryptGenRandom, etc) + # It supplies a CSPRNG but with an interface that supports methods like choice + self._random_generator = SystemRandom() + + self._fallback_language = fallback_language + self._language_to_words = language_to_words + if self._fallback_language not in self._language_to_words: + raise InvalidWordListError( + "Missing words list for fallback language '{}'".format(self._fallback_language) + ) + + # Validate each words list + for language, word_list in self._language_to_words.items(): + # Ensure that there are enough words in the list + word_list_size = len(word_list) + if word_list_size < self._WORD_LIST_MINIMUM_SIZE: + raise InvalidWordListError( + "The word list for language '{}' only contains {} long-enough words;" + " minimum required is {} words.".format( + language, word_list_size, self._WORD_LIST_MINIMUM_SIZE, + ) + ) + + # Ensure all words are ascii + try: + " ".join(word_list).encode("ascii") + except UnicodeEncodeError: + raise InvalidWordListError( + "The word list for language '{}' contains non-ASCII words." + ) + + # Ensure that passphrases longer than what's supported can't be generated + longest_word = max(word_list, key=len) + longest_passphrase_length = len(longest_word) * self.PASSPHRASE_WORDS_COUNT + longest_passphrase_length += self.PASSPHRASE_WORDS_COUNT # One space between each word + if longest_passphrase_length >= self.MAX_PASSPHRASE_LENGTH: + raise InvalidWordListError( + "Passphrases over the maximum length ({}) may be generated:" + " longest word in word list for language '{}' is '{}' and number of words per" + " passphrase is {}".format( + self.MAX_PASSPHRASE_LENGTH, + language, + longest_word, + self.PASSPHRASE_WORDS_COUNT, + ) + ) + + # Ensure that passphrases shorter than what's supported can't be generated + shortest_word = min(word_list, key=len) + shortest_passphrase_length = len(shortest_word) * self.PASSPHRASE_WORDS_COUNT + shortest_passphrase_length += self.PASSPHRASE_WORDS_COUNT + if shortest_passphrase_length <= self.MIN_PASSPHRASE_LENGTH: + raise InvalidWordListError( + "Passphrases under the minimum length ({}) may be generated:" + " shortest word in word list for language '{}' is '{}' and number of words per" + " passphrase is {}".format( + self.MIN_PASSPHRASE_LENGTH, + language, + shortest_word, + self.PASSPHRASE_WORDS_COUNT, + ) + ) + + @classmethod + def get_default(cls) -> "PassphraseGenerator": + global _current_generator + if _current_generator is None: + language_to_words = _parse_available_words_list(Path(config.SECUREDROP_ROOT)) + _current_generator = cls(language_to_words) + return _current_generator + + @property + def available_languages(self) -> Set[str]: + return set(self._language_to_words.keys()) + + def generate_passphrase(self, preferred_language: Optional[str] = None) -> DicewarePassphrase: + final_language = preferred_language if preferred_language else self._fallback_language + try: + words_list = self._language_to_words[final_language] + except KeyError: + # If there is no wordlist for the desired language, fall back to the word list for the + # default language + words_list = self._language_to_words[self._fallback_language] + + words = [ + self._random_generator.choice(words_list) for _ in range(self.PASSPHRASE_WORDS_COUNT) + ] # type: List[str] + return DicewarePassphrase(" ".join(words)) + + +def _parse_available_words_list(securedrop_root: Path) -> Dict[str, List[str]]: + """Find all .txt files in the wordlists folder and parse them as words lists. + + This will also ignore words that are too short. + """ + language_to_words = {} + words_lists_folder = securedrop_root / "wordlists" + for words_file in words_lists_folder.glob("*.txt"): + language = words_file.stem + all_words = words_file.read_text().strip().splitlines() + words_that_are_long_enough = [word for word in all_words if len(word) >= 2] + language_to_words[language] = words_that_are_long_enough + return language_to_words diff --git a/securedrop/qa_loader.py b/securedrop/qa_loader.py index 5bc3db77e3..dd19d37c4d 100755 --- a/securedrop/qa_loader.py +++ b/securedrop/qa_loader.py @@ -19,6 +19,7 @@ from journalist_app import create_app from models import Journalist, JournalistLoginAttempt, Reply, Source, SourceStar, Submission from sdconfig import SDConfig +from passphrases import PassphraseGenerator from sdconfig import config as sdconfig @@ -138,7 +139,7 @@ def new_journalist(self) -> None: self.journalists.append(journalist.id) def new_source(self) -> None: - codename = current_app.crypto_util.genrandomid() + codename = PassphraseGenerator.get_default().generate_passphrase() filesystem_id = current_app.crypto_util.hash_codename(codename) journalist_designation = current_app.crypto_util.display_id() source = Source(filesystem_id, journalist_designation) diff --git a/securedrop/sdconfig.py b/securedrop/sdconfig.py index faca77f50d..ad36f99736 100644 --- a/securedrop/sdconfig.py +++ b/securedrop/sdconfig.py @@ -26,7 +26,6 @@ def __init__(self) -> None: self.ADJECTIVES = _config.ADJECTIVES # type: str self.NOUNS = _config.NOUNS # type: str - self.WORD_LIST = _config.WORD_LIST # type: str self.GPG_KEY_DIR = _config.GPG_KEY_DIR # type: str diff --git a/securedrop/source_app/__init__.py b/securedrop/source_app/__init__.py index d5ea2333f1..022b2964a2 100644 --- a/securedrop/source_app/__init__.py +++ b/securedrop/source_app/__init__.py @@ -56,7 +56,6 @@ def create_app(config: SDConfig) -> Flask: scrypt_id_pepper=config.SCRYPT_ID_PEPPER, scrypt_gpg_pepper=config.SCRYPT_GPG_PEPPER, securedrop_root=config.SECUREDROP_ROOT, - word_list=config.WORD_LIST, nouns_file=config.NOUNS, adjectives_file=config.ADJECTIVES, gpg_key_dir=config.GPG_KEY_DIR, diff --git a/securedrop/source_app/forms.py b/securedrop/source_app/forms.py index 1c88a4249c..75bd70bc35 100644 --- a/securedrop/source_app/forms.py +++ b/securedrop/source_app/forms.py @@ -5,17 +5,19 @@ from wtforms import FileField, PasswordField, TextAreaField from wtforms.validators import InputRequired, Regexp, Length, ValidationError -from models import Source, Submission +from models import Submission +from passphrases import PassphraseGenerator class LoginForm(FlaskForm): + codename = PasswordField('codename', validators=[ InputRequired(message=gettext('This field is required.')), - Length(1, Source.MAX_CODENAME_LEN, + Length(1, PassphraseGenerator.MAX_PASSPHRASE_LENGTH, message=gettext( 'Field must be between 1 and ' '{max_codename_len} characters long.'.format( - max_codename_len=Source.MAX_CODENAME_LEN))), + max_codename_len=PassphraseGenerator.MAX_PASSPHRASE_LENGTH))), # Make sure to allow dashes since some words in the wordlist have them Regexp(r'[\sA-Za-z0-9-]+$', message=gettext('Invalid input.')) ]) diff --git a/securedrop/source_app/utils.py b/securedrop/source_app/utils.py index b2fa4c1405..0b1b4b51dc 100644 --- a/securedrop/source_app/utils.py +++ b/securedrop/source_app/utils.py @@ -15,6 +15,7 @@ from crypto_util import CryptoUtil, CryptoException from models import Source +from passphrases import PassphraseGenerator, DicewarePassphrase from sdconfig import SDConfig if typing.TYPE_CHECKING: @@ -42,33 +43,19 @@ def valid_codename(codename: str) -> bool: return source is not None -def generate_unique_codename(config: SDConfig) -> str: +def generate_unique_codename(config: SDConfig) -> DicewarePassphrase: """Generate random codenames until we get an unused one""" while True: - codename = current_app.crypto_util.genrandomid( - Source.NUM_WORDS, - i18n.get_language(config)) - - # The maximum length of a word in the wordlist is 9 letters and the - # codename length is 7 words, so it is currently impossible to - # generate a codename that is longer than the maximum codename length - # (currently 128 characters). This code is meant to be defense in depth - # to guard against potential future changes, such as modifications to - # the word list or the maximum codename length. - if len(codename) > Source.MAX_CODENAME_LEN: - current_app.logger.warning( - "Generated a source codename that was too long, " - "skipping it. This should not happen. " - "(Codename='{}')".format(codename)) - continue - + passphrase = PassphraseGenerator.get_default().generate_passphrase( + preferred_language=i18n.get_language(config) + ) # scrypt (slow) - filesystem_id = current_app.crypto_util.hash_codename(codename) + filesystem_id = current_app.crypto_util.hash_codename(passphrase) matching_sources = Source.query.filter( Source.filesystem_id == filesystem_id).all() if len(matching_sources) == 0: - return codename + return passphrase def get_entropy_estimate() -> int: @@ -87,7 +74,7 @@ def wrapper(*args, **kwargs): # type: ignore def async_genkey(crypto_util_: CryptoUtil, db_uri: str, filesystem_id: str, - codename: str) -> None: + codename: DicewarePassphrase) -> None: # We pass in the `crypto_util_` so we don't have to reference `current_app` # here. The app might not have a pushed context during testing which would # cause this asynchronous function to break. diff --git a/securedrop/tests/test_crypto_util.py b/securedrop/tests/test_crypto_util.py index 6f234637ef..68fc355938 100644 --- a/securedrop/tests/test_crypto_util.py +++ b/securedrop/tests/test_crypto_util.py @@ -9,6 +9,8 @@ from flask import url_for, session +from passphrases import PassphraseGenerator + os.environ['SECUREDROP_ENV'] = 'test' # noqa import crypto_util import models @@ -18,7 +20,6 @@ def test_word_list_does_not_contain_empty_strings(journalist_app): - assert '' not in journalist_app.crypto_util.get_wordlist('en') assert '' not in journalist_app.crypto_util.nouns assert '' not in journalist_app.crypto_util.adjectives @@ -136,38 +137,8 @@ def test_basic_encrypt_then_decrypt_multiple_recipients(source_app, assert plaintext == message -def verify_genrandomid(app, locale): - id = app.crypto_util.genrandomid(locale=locale) - id_words = id.split() - - crypto_util._validate_name_for_diceware(id) - assert len(id_words) == CryptoUtil.DEFAULT_WORDS_IN_RANDOM_ID - - for word in id_words: - assert word in app.crypto_util.get_wordlist(locale) - - -def test_genrandomid_default_locale_is_en(source_app): - verify_genrandomid(source_app, 'en') - - -def test_get_wordlist(source_app, config): - locales = [] - wordlists_path = os.path.join(config.SECUREDROP_ROOT, 'wordlists') - for f in os.listdir(wordlists_path): - if f.endswith('.txt') and f != 'en.txt': - locales.append(f.split('.')[0]) - - with source_app.app_context(): - list_en = source_app.crypto_util.get_wordlist('en') - for locale in locales: - assert source_app.crypto_util.get_wordlist(locale) != list_en - verify_genrandomid(source_app, locale) - assert source_app.crypto_util.get_wordlist('unknown') == list_en - - def test_hash_codename(source_app): - codename = source_app.crypto_util.genrandomid() + codename = PassphraseGenerator.get_default().generate_passphrase() hashed_codename = source_app.crypto_util.hash_codename(codename) assert re.compile('^[2-7A-Z]{103}=$').match(hashed_codename) @@ -198,7 +169,7 @@ def test_display_id_designation_collisions(source_app): def test_genkeypair(source_app): with source_app.app_context(): - codename = source_app.crypto_util.genrandomid() + codename = PassphraseGenerator.get_default().generate_passphrase() filesystem_id = source_app.crypto_util.hash_codename(codename) journalist_filename = source_app.crypto_util.display_id() source = models.Source(filesystem_id, journalist_filename) @@ -231,7 +202,7 @@ def parse_gpg_date_string(date_string): def test_reply_keypair_creation_and_expiration_dates(source_app): with source_app.app_context(): - codename = source_app.crypto_util.genrandomid() + codename = PassphraseGenerator.get_default().generate_passphrase() filesystem_id = source_app.crypto_util.hash_codename(codename) journalist_filename = source_app.crypto_util.display_id() source = models.Source(filesystem_id, journalist_filename) diff --git a/securedrop/tests/test_journalist.py b/securedrop/tests/test_journalist.py index 24f95d7e55..8a97af8124 100644 --- a/securedrop/tests/test_journalist.py +++ b/securedrop/tests/test_journalist.py @@ -18,7 +18,6 @@ from sqlalchemy.orm.exc import StaleDataError from sqlalchemy.sql.expression import func -import crypto_util import journalist_app as journalist_app_module from journalist_app.utils import mark_seen import models @@ -35,7 +34,7 @@ InvalidUsernameException, Submission ) -from sdconfig import SDConfig, config +from sdconfig import config from .utils.instrument import InstrumentedApp from . import utils @@ -140,15 +139,6 @@ def test_user_with_whitespace_in_username_can_login(journalist_app): otp_secret) -def test_make_password(journalist_app): - with patch.object(crypto_util.CryptoUtil, 'genrandomid', - side_effect=['bad', VALID_PASSWORD]): - fake_config = SDConfig() - with journalist_app.test_request_context('/'): - password = journalist_app_module.utils.make_password(fake_config) - assert password == VALID_PASSWORD - - def test_reply_error_logging(journalist_app, test_journo, test_source): exception_class = StaleDataError exception_msg = 'Potentially sensitive content!' diff --git a/securedrop/tests/test_passphrases.py b/securedrop/tests/test_passphrases.py new file mode 100644 index 0000000000..38549d51db --- /dev/null +++ b/securedrop/tests/test_passphrases.py @@ -0,0 +1,87 @@ +from unittest import mock + +import pytest + +from passphrases import PassphraseGenerator, InvalidWordListError + +# pylint: disable=unsupported-membership-test +# False positive https://github.com/PyCQA/pylint/issues/3045 + + +class TestPassphrasesGenerator: + def test_default_generator(self): + # Given the default generator for the Securedrop app + generator = PassphraseGenerator.get_default() + assert generator.available_languages == {"en", "fr"} + + # When using it to generate a passphrase + # It succeeds + passphrase = generator.generate_passphrase() + + # And a reasonably-secure passphrase was generated + assert passphrase + assert len(passphrase) >= 20 + assert len(passphrase.split(" ")) >= 7 + + def test_default_generator_passphrases_are_random(self): + # Given the default generator for the Securedrop app + generator = PassphraseGenerator.get_default() + + # When using it to generate two passphrases + # It succeeds + passphrase1 = generator.generate_passphrase() + passphrase2 = generator.generate_passphrase() + + # And the two passphrases are different because they are randomly-generated + assert passphrase1 != passphrase2 + + @mock.patch.object(PassphraseGenerator, "_WORD_LIST_MINIMUM_SIZE", 1) + def test_generate_passphrase_with_specific_language(self): + # Given a generator that supports two languages + generator = PassphraseGenerator(language_to_words={"en": ["boat"], "fr": ["bateau"]}) + assert generator.available_languages == {"en", "fr"} + + # When using it to create a passphrase for one of the two languages + # It succeeds + passphrase = generator.generate_passphrase(preferred_language="fr") + + # And the passphrase is in the chosen language + assert "bateau" in passphrase + assert "boat" not in passphrase + + @mock.patch.object(PassphraseGenerator, "_WORD_LIST_MINIMUM_SIZE", 1) + def test_generate_passphrase_with_specific_language_that_is_not_available(self): + # Given a generator that supports two languages + generator = PassphraseGenerator( + language_to_words={"en": ["boat"], "fr": ["bateau"]}, + # With english as the fallback language + fallback_language="en", + ) + assert generator.available_languages == {"en", "fr"} + + # When using it to create a passphrase for another, non-supported language + # It succeeds + passphrase = generator.generate_passphrase(preferred_language="es") + + # And the passphrase is in the default/fallback language, english + assert "boat" in passphrase + assert "bateau" not in passphrase + + def test_word_list_does_not_have_enough_words(self): + with pytest.raises(InvalidWordListError, match="long-enough words"): + PassphraseGenerator(language_to_words={"en": ["only", "three", "words"]}) + + @mock.patch.object(PassphraseGenerator, "_WORD_LIST_MINIMUM_SIZE", 1) + def test_word_list_will_generate_overly_long_passphrase(self): + with pytest.raises(InvalidWordListError, match="over the maximum length"): + PassphraseGenerator(language_to_words={"en": ["overlylongwordtogetoverthelimit"]}) + + @mock.patch.object(PassphraseGenerator, "_WORD_LIST_MINIMUM_SIZE", 1) + def test_word_list_will_generate_overly_short_passphrase(self): + with pytest.raises(InvalidWordListError, match="under the minimum length"): + PassphraseGenerator(language_to_words={"en": ["b", "a"]}) + + @mock.patch.object(PassphraseGenerator, "_WORD_LIST_MINIMUM_SIZE", 1) + def test_word_list_has_non_ascii_string(self): + with pytest.raises(InvalidWordListError, match="non-ASCII words"): + PassphraseGenerator(language_to_words={"en": ["word", "éoèô"]}) diff --git a/securedrop/tests/test_source.py b/securedrop/tests/test_source.py index 48bc1ceb48..119aea7f7a 100644 --- a/securedrop/tests/test_source.py +++ b/securedrop/tests/test_source.py @@ -5,11 +5,12 @@ import time from io import BytesIO, StringIO -from flask import session, escape, current_app, url_for, g, request +from flask import session, escape, url_for, g, request from mock import patch, ANY import crypto_util import source +from passphrases import PassphraseGenerator from . import utils import version @@ -21,7 +22,7 @@ from .utils.db_helper import new_codename from .utils.instrument import InstrumentedApp -overly_long_codename = 'a' * (Source.MAX_CODENAME_LEN + 1) +overly_long_codename = 'a' * (PassphraseGenerator.MAX_PASSPHRASE_LENGTH + 1) def test_page_not_found(source_app): @@ -43,32 +44,6 @@ def test_index(source_app): assert 'Return visit' in text -def test_all_words_in_wordlist_validate(source_app): - """Verify that all words in the wordlist are allowed by the form - validation. Otherwise a source will have a codename and be unable to - return.""" - - with source_app.app_context(): - wordlist_en = current_app.crypto_util.get_wordlist('en') - - # chunk the words to cut down on the number of requets we make - # otherwise this test is *slow* - chunks = [wordlist_en[i:i + 7] for i in range(0, len(wordlist_en), 7)] - - with source_app.test_client() as app: - for words in chunks: - resp = app.post(url_for('main.login'), - data=dict(codename=' '.join(words)), - follow_redirects=True) - assert resp.status_code == 200 - text = resp.data.decode('utf-8') - # If the word does not validate, then it will show - # 'Invalid input'. If it does validate, it should show that - # it isn't a recognized codename. - assert 'Sorry, that is not a recognized codename.' in text - assert 'logged_in' not in session - - def _find_codename(html): """Find a source codename (diceware passphrase) in HTML""" # Codenames may contain HTML escape characters, and the wordlist @@ -80,22 +55,6 @@ def _find_codename(html): return codename_match.group('codename') -def test_generate(source_app): - with source_app.test_client() as app: - resp = app.get(url_for('main.generate')) - assert resp.status_code == 200 - session_codename = next(iter(session['codenames'].values())) - - text = resp.data.decode('utf-8') - assert "This codename is what you will use in future visits" in text - - codename = _find_codename(resp.data.decode('utf-8')) - assert len(codename.split()) == Source.NUM_WORDS - # codename is also stored in the session - make sure it matches the - # codename displayed to the source - assert codename == escape(session_codename) - - def test_generate_already_logged_in(source_app): with source_app.test_client() as app: new_codename(app, session) @@ -123,22 +82,19 @@ def test_create_new_source(source_app): assert 'codenames' not in session -def test_generate_too_long_codename(source_app): - """Generate a codename that exceeds the maximum codename length""" +def test_generate(source_app): + with source_app.test_client() as app: + resp = app.get(url_for('main.generate')) + assert resp.status_code == 200 + session_codename = next(iter(session['codenames'].values())) - with patch.object(source_app.logger, 'warning') as logger: - with patch.object(crypto_util.CryptoUtil, 'genrandomid', - side_effect=[overly_long_codename, - 'short codename']): - with source_app.test_client() as app: - resp = app.post(url_for('main.generate')) - assert resp.status_code == 200 + text = resp.data.decode('utf-8') + assert "This codename is what you will use in future visits" in text - logger.assert_called_with( - "Generated a source codename that was too long, " - "skipping it. This should not happen. " - "(Codename='{}')".format(overly_long_codename) - ) + codename = _find_codename(resp.data.decode('utf-8')) + # codename is also stored in the session - make sure it matches the + # codename displayed to the source + assert codename == escape(session_codename) def test_create_duplicate_codename_logged_in_not_in_session(source_app): @@ -644,7 +600,7 @@ def test_login_with_overly_long_codename(source_app): assert resp.status_code == 200 text = resp.data.decode('utf-8') assert ("Field must be between 1 and {} characters long." - .format(Source.MAX_CODENAME_LEN)) in text + .format(PassphraseGenerator.MAX_PASSPHRASE_LENGTH)) in text assert not mock_hash_codename.called, \ "Called hash_codename for codename w/ invalid length" diff --git a/securedrop/tests/utils/db_helper.py b/securedrop/tests/utils/db_helper.py index 3e81e54b3b..3d8e9b2755 100644 --- a/securedrop/tests/utils/db_helper.py +++ b/securedrop/tests/utils/db_helper.py @@ -15,6 +15,7 @@ from db import db from journalist_app.utils import mark_seen from models import Journalist, Reply, SeenReply, Source, Submission +from passphrases import PassphraseGenerator from sdconfig import config os.environ['SECUREDROP_ENV'] = 'test' # noqa @@ -30,8 +31,8 @@ def init_journalist(first_name=None, last_name=None, is_admin=False): corresponding to the row just added to the database. The second, their password string. """ - username = current_app.crypto_util.genrandomid() - user_pw = current_app.crypto_util.genrandomid() + username = PassphraseGenerator.get_default().generate_passphrase() + user_pw = PassphraseGenerator.get_default().generate_passphrase() user = Journalist( username=username, password=user_pw, @@ -126,7 +127,7 @@ def init_source_without_keypair(): initialized. The second, their codename string. """ # Create source identity and database record - codename = current_app.crypto_util.genrandomid() + codename = PassphraseGenerator.get_default().generate_passphrase() filesystem_id = current_app.crypto_util.hash_codename(codename) journalist_filename = current_app.crypto_util.display_id() source = Source(filesystem_id, journalist_filename) diff --git a/securedrop/wordlist b/securedrop/wordlist deleted file mode 120000 index 62a6dce219..0000000000 --- a/securedrop/wordlist +++ /dev/null @@ -1 +0,0 @@ -wordlists/en.txt \ No newline at end of file