From b334930fbbce5fec1dac17d8dd535ad07c7c3441 Mon Sep 17 00:00:00 2001 From: Kevin O'Gorman Date: Fri, 3 Dec 2021 17:51:47 -0500 Subject: [PATCH 1/4] Enforced 160-bit HOTP secret length for journalist and admin accounts - Updated tests to check for 40 char hexadecimal input on user adds/edits - Added a RequiredIf validator to force validation of otp_secret field when is_hotp field is checked in new user form - Added a length check in function validating input for HOTP updates. --- securedrop/journalist_app/forms.py | 36 ++++++-- securedrop/journalist_app/utils.py | 18 ++-- securedrop/models.py | 2 + securedrop/tests/test_integration.py | 2 +- securedrop/tests/test_journalist.py | 123 ++++++++++++++++----------- 5 files changed, 120 insertions(+), 61 deletions(-) diff --git a/securedrop/journalist_app/forms.py b/securedrop/journalist_app/forms.py index ad4104b4c0..a66547b679 100644 --- a/securedrop/journalist_app/forms.py +++ b/securedrop/journalist_app/forms.py @@ -6,15 +6,41 @@ from wtforms import Field from wtforms import (TextAreaField, StringField, BooleanField, HiddenField, ValidationError) -from wtforms.validators import InputRequired, Optional +from wtforms.validators import InputRequired, Optional, DataRequired, StopValidation -from models import Journalist, InstanceConfig +from models import Journalist, InstanceConfig, HOTP_SECRET_LENGTH + +from typing import Any + + +class RequiredIf(DataRequired): + + def __init__(self, other_field_name: str, *args: Any, **kwargs: Any) -> None: + self.other_field_name = other_field_name + + def __call__(self, form: FlaskForm, field: Field) -> None: + if self.other_field_name in form: + other_field = form[self.other_field_name] + if bool(other_field.data): + self.message = gettext( + 'The "{name}" field is required when "{other_name}" is set.' + .format(other_name=self.other_field_name, name=field.name)) + super(RequiredIf, self).__call__(form, field) + else: + field.errors[:] = [] + raise StopValidation() + else: + raise ValidationError( + gettext( + 'The "{other_name}" field was not found - it is required by "{name}".' + .format(other_name=self.other_field_name, name=field.name)) + ) def otp_secret_validation(form: FlaskForm, field: Field) -> None: strip_whitespace = field.data.replace(' ', '') input_length = len(strip_whitespace) - if input_length != 40: + if input_length != HOTP_SECRET_LENGTH: raise ValidationError( ngettext( 'HOTP secrets are 40 characters long - you have entered {num}.', @@ -74,8 +100,8 @@ class NewUserForm(FlaskForm): is_admin = BooleanField('is_admin') is_hotp = BooleanField('is_hotp') otp_secret = StringField('otp_secret', validators=[ - otp_secret_validation, - Optional() + RequiredIf("is_hotp"), + otp_secret_validation ]) diff --git a/securedrop/journalist_app/utils.py b/securedrop/journalist_app/utils.py index 28d5f221bc..b5a4c4035c 100644 --- a/securedrop/journalist_app/utils.py +++ b/securedrop/journalist_app/utils.py @@ -30,6 +30,7 @@ Submission, WrongPasswordException, get_one_or_else, + HOTP_SECRET_LENGTH, ) from store import add_checksum_for_file @@ -134,6 +135,17 @@ def validate_hotp_secret(user: Journalist, otp_secret: str) -> bool: :param otp_secret: the new HOTP secret :return: True if it validates, False if it does not """ + strip_whitespace = otp_secret.replace(' ', '') + secret_length = len(strip_whitespace) + + if secret_length != HOTP_SECRET_LENGTH: + flash(ngettext( + 'HOTP secrets are 40 characters long - you have entered {num}.', + 'HOTP secrets are 40 characters long - you have entered {num}.', + secret_length + ).format(num=secret_length), "error") + return False + try: user.set_hotp_secret(otp_secret) except (binascii.Error, TypeError) as e: @@ -143,12 +155,6 @@ def validate_hotp_secret(user: Journalist, otp_secret: str) -> bool: "please only submit letters A-F and numbers 0-9."), "error") return False - elif "Odd-length string" in str(e): - flash(gettext( - "Invalid secret format: " - "odd-length secret. Did you mistype the secret?"), - "error") - return False else: flash(gettext( "An unexpected error occurred! " diff --git a/securedrop/models.py b/securedrop/models.py index 82451409ca..dd9acc735f 100644 --- a/securedrop/models.py +++ b/securedrop/models.py @@ -36,6 +36,8 @@ ARGON2_PARAMS = dict(memory_cost=2**16, rounds=4, parallelism=2) +HOTP_SECRET_LENGTH = 40 # 160 bits == 40 hex digits (== 32 ascii-encoded chars in db) + def get_one_or_else(query: Query, logger: 'Logger', diff --git a/securedrop/tests/test_integration.py b/securedrop/tests/test_integration.py index 8c988e19f5..3c9fdad57e 100644 --- a/securedrop/tests/test_integration.py +++ b/securedrop/tests/test_integration.py @@ -639,7 +639,7 @@ def test_user_change_password(journalist_app, test_journo): def test_login_after_regenerate_hotp(journalist_app, test_journo): """Test that journalists can login after resetting their HOTP 2fa""" - otp_secret = 'aaaaaa' + otp_secret = '0123456789abcdef0123456789abcdef01234567' b32_otp_secret = b32encode(unhexlify(otp_secret)) # edit hotp diff --git a/securedrop/tests/test_journalist.py b/securedrop/tests/test_journalist.py index 1f3b3bbbd1..9daa10501c 100644 --- a/securedrop/tests/test_journalist.py +++ b/securedrop/tests/test_journalist.py @@ -1016,9 +1016,10 @@ def test_admin_resets_user_hotp_format_non_hexa( old_secret = journo.otp_secret + non_hexa_secret='0123456789ABCDZZ0123456789ABCDEF01234567' with InstrumentedApp(journalist_app) as ins: app.post(url_for('admin.reset_two_factor_hotp'), - data=dict(uid=test_journo['id'], otp_secret='ZZ')) + data=dict(uid=test_journo['id'], otp_secret=non_hexa_secret)) # fetch altered DB object journo = Journalist.query.get(journo.id) @@ -1034,53 +1035,62 @@ def test_admin_resets_user_hotp_format_non_hexa( "numbers 0-9.", "error") -def test_admin_resets_user_hotp(journalist_app, test_admin, test_journo): +@pytest.mark.parametrize("the_secret", [' ', ' ', '0123456789ABCDEF0123456789ABCDE']) +def test_admin_resets_user_hotp_format_too_short( + journalist_app, test_admin, test_journo, the_secret): + with journalist_app.test_client() as app: _login_user(app, test_admin['username'], test_admin['password'], test_admin['otp_secret']) journo = test_journo['journalist'] + # guard to ensure check below tests the correct condition + assert journo.is_totp + old_secret = journo.otp_secret with InstrumentedApp(journalist_app) as ins: - resp = app.post(url_for('admin.reset_two_factor_hotp'), - data=dict(uid=test_journo['id'], - otp_secret=123456)) + app.post(url_for('admin.reset_two_factor_hotp'), + data=dict(uid=test_journo['id'], otp_secret=the_secret)) # fetch altered DB object journo = Journalist.query.get(journo.id) new_secret = journo.otp_secret - assert old_secret != new_secret - assert not journo.is_totp + assert old_secret == new_secret - # Redirect to admin 2FA view - ins.assert_redirects(resp, url_for('admin.new_user_two_factor', - uid=journo.id)) + # ensure we didn't accidentally enable hotp + assert journo.is_totp + ins.assert_message_flashed( + "HOTP secrets are 40 characters long" + " - you have entered {num}.".format(num=len(the_secret.replace(' ', ''))), "error") -def test_admin_resets_user_hotp_format_odd(journalist_app, - test_admin, - test_journo): - old_secret = test_journo['otp_secret'] +def test_admin_resets_user_hotp(journalist_app, test_admin, test_journo): with journalist_app.test_client() as app: _login_user(app, test_admin['username'], test_admin['password'], test_admin['otp_secret']) + journo = test_journo['journalist'] + old_secret = journo.otp_secret + + valid_secret="DEADBEEF01234567DEADBEEF01234567DEADBEEF" with InstrumentedApp(journalist_app) as ins: - app.post(url_for('admin.reset_two_factor_hotp'), - data=dict(uid=test_journo['id'], otp_secret='Z')) + resp = app.post(url_for('admin.reset_two_factor_hotp'), + data=dict(uid=test_journo['id'], + otp_secret=valid_secret)) - ins.assert_message_flashed( - "Invalid secret format: " - "odd-length secret. Did you mistype the secret?", "error") + # fetch altered DB object + journo = Journalist.query.get(journo.id) - # Re-fetch journalist to get fresh DB instance - user = Journalist.query.get(test_journo['id']) - new_secret = user.otp_secret + new_secret = journo.otp_secret + assert old_secret != new_secret + assert not journo.is_totp - assert old_secret == new_secret + # Redirect to admin 2FA view + ins.assert_redirects(resp, url_for('admin.new_user_two_factor', + uid=journo.id)) def test_admin_resets_user_hotp_error(mocker, @@ -1088,7 +1098,7 @@ def test_admin_resets_user_hotp_error(mocker, test_admin, test_journo): - bad_secret = '1234' + bad_secret = '0123456789ABCDZZ0123456789ABCDZZ01234567' error_message = 'SOMETHING WRONG!' mocked_error_logger = mocker.patch('journalist.app.logger.error') old_secret = test_journo['otp_secret'] @@ -1120,7 +1130,7 @@ def test_admin_resets_user_hotp_error(mocker, def test_user_resets_hotp(journalist_app, test_journo): old_secret = test_journo['otp_secret'] - new_secret = 123456 + new_secret = '0123456789ABCDEF0123456789ABCDEF01234567' # Precondition assert new_secret != old_secret @@ -1142,37 +1152,17 @@ def test_user_resets_hotp(journalist_app, test_journo): assert old_secret != new_secret -def test_user_resets_user_hotp_format_odd(journalist_app, test_journo): - old_secret = test_journo['otp_secret'] - - with journalist_app.test_client() as app: - _login_user(app, test_journo['username'], test_journo['password'], - test_journo['otp_secret']) - - with InstrumentedApp(journalist_app) as ins: - app.post(url_for('account.reset_two_factor_hotp'), - data=dict(otp_secret='123')) - ins.assert_message_flashed( - "Invalid secret format: " - "odd-length secret. Did you mistype the secret?", "error") - - # Re-fetch journalist to get fresh DB instance - user = Journalist.query.get(test_journo['id']) - new_secret = user.otp_secret - - assert old_secret == new_secret - - def test_user_resets_user_hotp_format_non_hexa(journalist_app, test_journo): old_secret = test_journo['otp_secret'] + non_hexa_secret='0123456789ABCDZZ0123456789ABCDEF01234567' with journalist_app.test_client() as app: _login_user(app, test_journo['username'], test_journo['password'], test_journo['otp_secret']) with InstrumentedApp(journalist_app) as ins: app.post(url_for('account.reset_two_factor_hotp'), - data=dict(otp_secret='ZZ')) + data=dict(otp_secret=non_hexa_secret)) ins.assert_message_flashed( "Invalid secret format: " "please only submit letters A-F and numbers 0-9.", "error") @@ -1187,7 +1177,7 @@ def test_user_resets_user_hotp_format_non_hexa(journalist_app, test_journo): def test_user_resets_user_hotp_error(mocker, journalist_app, test_journo): - bad_secret = '1234' + bad_secret = '0123456789ABCDZZ0123456789ABCDZZ01234567' old_secret = test_journo['otp_secret'] error_message = 'SOMETHING WRONG!' mocked_error_logger = mocker.patch('journalist.app.logger.error') @@ -1528,6 +1518,40 @@ def test_admin_add_user_yubikey_odd_length(journalist_app, test_admin, locale, s in resp.data.decode("utf-8") ) +@flaky(rerun_filter=utils.flaky_filter_xfail) +@pytest.mark.parametrize( + "locale, secret", + ( + (locale, ' ' *i) + for locale in get_test_locales() + for i in range(3) + ) +) +def test_admin_add_user_yubikey_blank_secret(journalist_app, test_admin, locale, secret): + with journalist_app.test_client() as app: + _login_user(app, test_admin['username'], test_admin['password'], + test_admin['otp_secret']) + + resp = app.post( + url_for('admin.add_user', l=locale), + data=dict( + username='dellsberg', + first_name='', + last_name='', + password=VALID_PASSWORD, + password_again=VALID_PASSWORD, + is_admin=None, + is_hotp=True, + otp_secret=secret + ), + ) + + assert page_language(resp.data) == language_tag(locale) + msgids = ['The "otp_secret" field is required when "is_hotp" is set.'] + with xfail_untranslated_messages(config, locale, msgids): + # Should redirect to the token verification page + assert gettext(msgids[0]) in resp.data.decode('utf-8') + @flaky(rerun_filter=utils.flaky_filter_xfail) @pytest.mark.parametrize("locale", get_test_locales()) @@ -2363,6 +2387,7 @@ def test_regenerate_totp(journalist_app, test_journo): def test_edit_hotp(journalist_app, test_journo): old_secret = test_journo['otp_secret'] + valid_secret="DEADBEEF01234567DEADBEEF01234567DADEFEEB" with journalist_app.test_client() as app: _login_user(app, test_journo['username'], test_journo['password'], @@ -2370,7 +2395,7 @@ def test_edit_hotp(journalist_app, test_journo): with InstrumentedApp(journalist_app) as ins: resp = app.post(url_for('account.reset_two_factor_hotp'), - data=dict(otp_secret=123456)) + data=dict(otp_secret=valid_secret)) new_secret = Journalist.query.get(test_journo['id']).otp_secret From 4ede5e424a7ef874dcfdb083089d921c87f7371a Mon Sep 17 00:00:00 2001 From: Kevin O'Gorman Date: Tue, 7 Dec 2021 12:01:56 -0500 Subject: [PATCH 2/4] added minimum length check for otp secrets on Journalist login --- securedrop/journalist_app/api.py | 5 +++-- securedrop/journalist_app/utils.py | 7 +++++++ securedrop/models.py | 13 +++++++++++++ securedrop/tests/test_journalist.py | 21 +++++++++++++++++++-- 4 files changed, 42 insertions(+), 4 deletions(-) diff --git a/securedrop/journalist_app/api.py b/securedrop/journalist_app/api.py index f4ffd8655e..1e589eddbf 100644 --- a/securedrop/journalist_app/api.py +++ b/securedrop/journalist_app/api.py @@ -19,7 +19,8 @@ from journalist_app import utils from models import (Journalist, Reply, SeenReply, Source, Submission, LoginThrottledException, InvalidUsernameException, - BadTokenException, WrongPasswordException) + BadTokenException, InvalidOTPSecretException, + WrongPasswordException) from sdconfig import SDConfig from store import NotEncrypted @@ -133,7 +134,7 @@ def get_token() -> Tuple[flask.Response, int]: return response, 200 except (LoginThrottledException, InvalidUsernameException, - BadTokenException, WrongPasswordException): + BadTokenException, InvalidOTPSecretException, WrongPasswordException): return abort(403, 'Token authentication failed.') @api.route('/sources', methods=['GET']) diff --git a/securedrop/journalist_app/utils.py b/securedrop/journalist_app/utils.py index b5a4c4035c..85c6766d70 100644 --- a/securedrop/journalist_app/utils.py +++ b/securedrop/journalist_app/utils.py @@ -16,6 +16,7 @@ BadTokenException, FirstOrLastNameError, InvalidPasswordLength, + InvalidOTPSecretException, InvalidUsernameException, Journalist, LoginThrottledException, @@ -94,6 +95,7 @@ def validate_user( try: return Journalist.login(username, password, token) except (InvalidUsernameException, + InvalidOTPSecretException, BadTokenException, WrongPasswordException, LoginThrottledException, @@ -112,6 +114,11 @@ def validate_user( "Please wait at least {num} seconds before logging in again.", period ).format(num=period) + elif isinstance(e, InvalidOTPSecretException): + login_flashed_msg += ' ' + login_flashed_msg += gettext( + "Your 2FA details are invalid" + " - please contact an administrator to reset them.") else: try: user = Journalist.query.filter_by( diff --git a/securedrop/models.py b/securedrop/models.py index dd9acc735f..166b4b0946 100644 --- a/securedrop/models.py +++ b/securedrop/models.py @@ -36,8 +36,13 @@ ARGON2_PARAMS = dict(memory_cost=2**16, rounds=4, parallelism=2) +# Required length for hex-format HOTP secrets as input by users HOTP_SECRET_LENGTH = 40 # 160 bits == 40 hex digits (== 32 ascii-encoded chars in db) +# Minimum length for ascii-encoded OTP secrets - by default, secrets are now 160-bit (32 chars) +# but existing Journalist users may still have 80-bit (16-char) secrets +OTP_SECRET_MIN_ASCII_LENGTH = 16 # 80 bits == 40 hex digits (== 16 ascii-encoded chars in db) + def get_one_or_else(query: Query, logger: 'Logger', @@ -366,6 +371,11 @@ class BadTokenException(Exception): """Raised when a user logins in with an incorrect TOTP token""" +class InvalidOTPSecretException(Exception): + + """Raised when a user's OTP secret is invalid - for example, too short""" + + class PasswordError(Exception): """Generic error for passwords that are invalid. @@ -698,6 +708,9 @@ def login(cls, user.uuid in Journalist.INVALID_USERNAMES: raise InvalidUsernameException(gettext("Invalid username")) + if len(user.otp_secret) < OTP_SECRET_MIN_ASCII_LENGTH: + raise InvalidOTPSecretException(gettext("Invalid OTP secret")) + if LOGIN_HARDENING: cls.throttle_login(user) diff --git a/securedrop/tests/test_journalist.py b/securedrop/tests/test_journalist.py index 9daa10501c..d3943f4ccb 100644 --- a/securedrop/tests/test_journalist.py +++ b/securedrop/tests/test_journalist.py @@ -52,14 +52,31 @@ VALID_PASSWORD_2 = 'another correct horse battery staple generic passphrase' -def _login_user(app, username, password, otp_secret): +def _login_user(app, username, password, otp_secret, success=True): resp = app.post(url_for('main.login'), data={'username': username, 'password': password, 'token': TOTP(otp_secret).now()}, follow_redirects=True) assert resp.status_code == 200 - assert hasattr(g, 'user') # ensure logged in + assert (success == hasattr(g, 'user')) # check logged-in vs expected + + +@pytest.mark.parametrize("otp_secret", ['', 'GA','GARBAGE','JHCOGO7VCER3EJ4']) +def test_user_with_invalid_otp_secret_cannot_login(journalist_app, otp_secret): + # Create a user with whitespace at the end of the username + with journalist_app.app_context(): + new_username = 'badotp' + otp_secret + user, password = utils.db_helper.init_journalist(is_admin=False) + user.otp_secret = otp_secret + user.username = new_username + db.session.add(user) + db.session.commit() + + # Verify that user is *not* able to login successfully + with journalist_app.test_client() as app: + _login_user(app, new_username, password, + otp_secret, success=False) def test_user_with_whitespace_in_username_can_login(journalist_app): From 8cc6582690b6965f6eaa499a95851169a8665987 Mon Sep 17 00:00:00 2001 From: Kevin O'Gorman Date: Wed, 8 Dec 2021 17:54:53 -0500 Subject: [PATCH 3/4] fixed 500 error when invalid HOTP secret chars were entered on the New User form --- securedrop/journalist_app/admin.py | 12 ++++++++++++ securedrop/journalist_app/utils.py | 2 +- securedrop/tests/test_journalist.py | 4 ++-- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/securedrop/journalist_app/admin.py b/securedrop/journalist_app/admin.py index c8feae9700..e23f98b310 100644 --- a/securedrop/journalist_app/admin.py +++ b/securedrop/journalist_app/admin.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import os +import binascii from typing import Optional from typing import Union @@ -132,6 +133,17 @@ def add_user() -> Union[str, werkzeug.Response]: 'There was an error with the autogenerated password. ' 'User not created. Please try again.'), 'error') form_valid = False + except (binascii.Error, TypeError) as e: + if "Non-hexadecimal digit found" in str(e): + flash(gettext( + "Invalid HOTP secret format: " + "please only submit letters A-F and numbers 0-9."), + "error") + else: + flash(gettext( + "An unexpected error occurred! " + "Please inform your admin."), "error") + form_valid = False except InvalidUsernameException as e: form_valid = False # Translators: Here, "{message}" explains the problem with the username. diff --git a/securedrop/journalist_app/utils.py b/securedrop/journalist_app/utils.py index 85c6766d70..c16fca4f83 100644 --- a/securedrop/journalist_app/utils.py +++ b/securedrop/journalist_app/utils.py @@ -158,7 +158,7 @@ def validate_hotp_secret(user: Journalist, otp_secret: str) -> bool: except (binascii.Error, TypeError) as e: if "Non-hexadecimal digit found" in str(e): flash(gettext( - "Invalid secret format: " + "Invalid HOTP secret format: " "please only submit letters A-F and numbers 0-9."), "error") return False diff --git a/securedrop/tests/test_journalist.py b/securedrop/tests/test_journalist.py index d3943f4ccb..ad08005a80 100644 --- a/securedrop/tests/test_journalist.py +++ b/securedrop/tests/test_journalist.py @@ -1048,7 +1048,7 @@ def test_admin_resets_user_hotp_format_non_hexa( assert journo.is_totp ins.assert_message_flashed( - "Invalid secret format: please only submit letters A-F and " + "Invalid HOTP secret format: please only submit letters A-F and " "numbers 0-9.", "error") @@ -1181,7 +1181,7 @@ def test_user_resets_user_hotp_format_non_hexa(journalist_app, test_journo): app.post(url_for('account.reset_two_factor_hotp'), data=dict(otp_secret=non_hexa_secret)) ins.assert_message_flashed( - "Invalid secret format: " + "Invalid HOTP secret format: " "please only submit letters A-F and numbers 0-9.", "error") # Re-fetch journalist to get fresh DB instance From 0acee7822148b80becfccb4f50a20a546fffda3c Mon Sep 17 00:00:00 2001 From: Kevin O'Gorman Date: Wed, 8 Dec 2021 18:03:48 -0500 Subject: [PATCH 4/4] Cleaned up the HOTP edit page template, added explanatory text. --- securedrop/journalist_templates/admin_edit_hotp_secret.html | 3 ++- securedrop/tests/test_journalist.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/securedrop/journalist_templates/admin_edit_hotp_secret.html b/securedrop/journalist_templates/admin_edit_hotp_secret.html index 397df7c7d9..9774f6d000 100644 --- a/securedrop/journalist_templates/admin_edit_hotp_secret.html +++ b/securedrop/journalist_templates/admin_edit_hotp_secret.html @@ -1,10 +1,11 @@ {% extends "base.html" %} {% block body %} +

{{ gettext('Change HOTP Secret')}}

+

{{ gettext("Enter a new HOTP secret formatted as a 40-digit hexadecimal string. Spaces will be ignored:") }}

-

diff --git a/securedrop/tests/test_journalist.py b/securedrop/tests/test_journalist.py index ad08005a80..6514905739 100644 --- a/securedrop/tests/test_journalist.py +++ b/securedrop/tests/test_journalist.py @@ -1275,7 +1275,7 @@ def test_admin_resets_hotp_with_missing_otp_secret_key(config, journalist_app, t ) assert page_language(resp.data) == language_tag(locale) - msgids = ['Change Secret'] + msgids = ['Change HOTP Secret'] with xfail_untranslated_messages(config, locale, msgids): assert gettext(msgids[0]) in resp.data.decode('utf-8')