Skip to content

Commit

Permalink
Merge pull request #3731 from freedomofpress/pytest-2-electric-boogaloo
Browse files Browse the repository at this point in the history
Pytest 2 electric boogaloo
  • Loading branch information
redshiftzero authored Aug 24, 2018
2 parents 9c0c9f3 + bc07997 commit f922012
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 116 deletions.
3 changes: 2 additions & 1 deletion securedrop/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ def test_source(journalist_app):
return {'source': source,
'codename': codename,
'filesystem_id': source.filesystem_id,
'uuid': source.uuid}
'uuid': source.uuid,
'id': source.id}


@pytest.fixture(scope='function')
Expand Down
282 changes: 167 additions & 115 deletions securedrop/tests/test_journalist.py
Original file line number Diff line number Diff line change
Expand Up @@ -1364,163 +1364,215 @@ def test_journalist_reply_view(journalist_app, test_source, test_journo):
assert resp.status_code == 302


class TestJournalistApp(TestCase):
def test_too_long_user_password_change(journalist_app, test_journo):
overly_long_password = VALID_PASSWORD + \
'a' * (Journalist.MAX_PASSWORD_LEN - len(VALID_PASSWORD) + 1)

# A method required by flask_testing.TestCase
def create_app(self):
return journalist.app
with journalist_app.test_client() as app:
_login_user(app, test_journo['username'], test_journo['password'],
test_journo['otp_secret'])

def setUp(self):
utils.env.setup()
with InstrumentedApp(journalist_app) as ins:
app.post(url_for('account.new_password'),
data=dict(password=overly_long_password,
token=TOTP(test_journo['otp_secret']).now(),
current_password=test_journo['password']),
follow_redirects=True)

# Patch the two-factor verification to avoid intermittent errors
utils.db_helper.mock_verify_token(self)
ins.assert_message_flashed(
'You submitted a bad password! Password not changed.', 'error')

# Setup test users: user & admin
self.user, self.user_pw = utils.db_helper.init_journalist()
self.admin, self.admin_pw = utils.db_helper.init_journalist(
is_admin=True)

def tearDown(self):
utils.env.teardown()
def test_valid_user_password_change(journalist_app, test_journo):
with journalist_app.test_client() as app:
_login_user(app, test_journo['username'], test_journo['password'],
test_journo['otp_secret'])

# WARNING: we are purposely doing something that would not work in
# production in the _login_user and _login_admin methods. This is done as a
# reminder to the test developer that the flask_testing.TestCase only uses
# one request context per method (see
# https://github.com/freedomofpress/securedrop/issues/1444). By explicitly
# making a point of this, we hope to avoid the introduction of new tests,
# that do not truly prove their result because of this disconnect between
# request context in Flask Testing and production.
def _login_admin(self):
self._ctx.g.user = self.admin
resp = app.post(url_for('account.new_password'),
data=dict(password=VALID_PASSWORD_2,
token=TOTP(test_journo['otp_secret']).now(),
current_password=test_journo['password']),
follow_redirects=True)

def _login_user(self):
self._ctx.g.user = self.user
assert 'Password updated.' in resp.data.decode('utf-8')

def test_too_long_user_password_change(self):
self._login_user()

overly_long_password = VALID_PASSWORD + \
'a' * (Journalist.MAX_PASSWORD_LEN - len(VALID_PASSWORD) + 1)
def test_regenerate_totp(journalist_app, test_journo):
old_secret = test_journo['otp_secret']

self.client.post(url_for('account.new_password'),
data=dict(password=overly_long_password,
token='mocked',
current_password=self.user_pw),
follow_redirects=True)
with journalist_app.test_client() as app:
_login_user(app, test_journo['username'], test_journo['password'],
test_journo['otp_secret'])

self.assertMessageFlashed('You submitted a bad password! Password not '
'changed.', 'error')
with InstrumentedApp(journalist_app) as ins:
resp = app.post(url_for('account.reset_two_factor_totp'))

def test_valid_user_password_change(self):
self._login_user()
resp = self.client.post(
url_for('account.new_password'),
data=dict(password=VALID_PASSWORD_2,
token='mocked',
current_password=self.user_pw),
follow_redirects=True)
new_secret = Journalist.query.get(test_journo['id']).otp_secret

assert 'Password updated.' in \
resp.data.decode('utf-8')
# check that totp is different
assert new_secret != old_secret

def test_regenerate_totp(self):
self._login_user()
old_totp = self.user.totp
# should redirect to verification page
ins.assert_redirects(resp, url_for('account.new_two_factor'))

res = self.client.post(url_for('account.reset_two_factor_totp'))
new_totp = self.user.totp

# check that totp is different
self.assertNotEqual(old_totp.secret, new_totp.secret)
def test_edit_hotp(journalist_app, test_journo):
old_secret = test_journo['otp_secret']

# should redirect to verification page
self.assertRedirects(res, url_for('account.new_two_factor'))
with journalist_app.test_client() as app:
_login_user(app, test_journo['username'], test_journo['password'],
test_journo['otp_secret'])

def test_edit_hotp(self):
self._login_user()
old_hotp = self.user.otp_secret
with InstrumentedApp(journalist_app) as ins:
resp = app.post(url_for('account.reset_two_factor_hotp'),
data=dict(otp_secret=123456))

res = self.client.post(
url_for('account.reset_two_factor_hotp'),
data=dict(otp_secret=123456)
)
new_hotp = self.user.otp_secret
new_secret = Journalist.query.get(test_journo['id']).otp_secret

# check that hotp is different
self.assertNotEqual(old_hotp, new_hotp)
# check that totp is different
assert new_secret != old_secret

# should redirect to verification page
self.assertRedirects(res, url_for('account.new_two_factor'))
# should redirect to verification page
ins.assert_redirects(resp, url_for('account.new_two_factor'))


def test_delete_source_deletes_submissions(journalist_app,
test_journo,
test_source):
"""Verify that when a source is deleted, the submissions that
correspond to them are also deleted."""

def test_delete_source_deletes_submissions(self):
"""Verify that when a source is deleted, the submissions that
correspond to them are also deleted."""
with journalist_app.app_context():
source = Source.query.get(test_source['id'])
journo = Journalist.query.get(test_journo['id'])

utils.db_helper.submit(source, 2)
utils.db_helper.reply(journo, source, 2)

self._delete_collection_setup()
journalist_app_module.utils.delete_collection(
self.source.filesystem_id)

# Source should be gone
results = db.session.query(Source).filter(
Source.id == self.source.id).all()
self.assertEqual(results, [])

def _delete_collection_setup(self):
self.source, _ = utils.db_helper.init_source()
utils.db_helper.submit(self.source, 2)
utils.db_helper.reply(self.user, self.source, 2)

def test_delete_collection_updates_db(self):
"""Verify that when a source is deleted, their Source identity
record, as well as Reply & Submission records associated with
that record are purged from the database."""
self._delete_collection_setup()
test_source['filesystem_id'])

res = Source.query.filter_by(id=test_source['id']).one_or_none()
assert res is None


def test_delete_collection_updates_db(journalist_app,
test_journo,
test_source):
"""Verify that when a source is deleted, their Source identity
record, as well as Reply & Submission records associated with
that record are purged from the database."""

with journalist_app.app_context():
source = Source.query.get(test_source['id'])
journo = Journalist.query.get(test_journo['id'])

utils.db_helper.submit(source, 2)
utils.db_helper.reply(journo, source, 2)

journalist_app_module.utils.delete_collection(
self.source.filesystem_id)
results = Source.query.filter(Source.id == self.source.id).all()
self.assertEqual(results, [])
results = db.session.query(
Submission.source_id == self.source.id).all()
self.assertEqual(results, [])
results = db.session.query(Reply.source_id == self.source.id).all()
self.assertEqual(results, [])

def test_delete_source_deletes_source_key(self):
"""Verify that when a source is deleted, the PGP key that corresponds
to them is also deleted."""
self._delete_collection_setup()
test_source['filesystem_id'])
res = Source.query.filter_by(id=test_source['id']).one_or_none()
assert res is None

res = Submission.query.filter_by(source_id=test_source['id']) \
.one_or_none()
assert res is None

res = Reply.query.filter_by(source_id=test_source['id']) \
.one_or_none()
assert res is None


def test_delete_source_deletes_source_key(journalist_app,
test_source,
test_journo):
"""Verify that when a source is deleted, the PGP key that corresponds
to them is also deleted."""

with journalist_app.app_context():
source = Source.query.get(test_source['id'])
journo = Journalist.query.get(test_journo['id'])

utils.db_helper.submit(source, 2)
utils.db_helper.reply(journo, source, 2)

# Source key exists
source_key = current_app.crypto_util.getkey(self.source.filesystem_id)
self.assertNotEqual(source_key, None)
source_key = current_app.crypto_util.getkey(
test_source['filesystem_id'])
assert source_key is not None

journalist_app_module.utils.delete_collection(
self.source.filesystem_id)
test_source['filesystem_id'])

# Source key no longer exists
source_key = current_app.crypto_util.getkey(self.source.filesystem_id)
self.assertEqual(source_key, None)
source_key = current_app.crypto_util.getkey(
test_source['filesystem_id'])
assert source_key is None


def test_delete_source_deletes_docs_on_disk(self):
"""Verify that when a source is deleted, the encrypted documents that
exist on disk is also deleted."""
self._delete_collection_setup()
def test_delete_source_deletes_docs_on_disk(journalist_app,
test_source,
test_journo,
config):
"""Verify that when a source is deleted, the encrypted documents that
exist on disk is also deleted."""

with journalist_app.app_context():
source = Source.query.get(test_source['id'])
journo = Journalist.query.get(test_journo['id'])

utils.db_helper.submit(source, 2)
utils.db_helper.reply(journo, source, 2)

# Encrypted documents exists
dir_source_docs = os.path.join(config.STORE_DIR,
self.source.filesystem_id)
self.assertTrue(os.path.exists(dir_source_docs))
test_source['filesystem_id'])
assert os.path.exists(dir_source_docs)

job = journalist_app_module.utils.delete_collection(
self.source.filesystem_id)
test_source['filesystem_id'])

# Wait up to 5s to wait for Redis worker `srm` operation to complete
utils.async.wait_for_redis_worker(job)

# Encrypted documents no longer exist
self.assertFalse(os.path.exists(dir_source_docs))
assert not os.path.exists(dir_source_docs)


class TestJournalistApp(TestCase):

# A method required by flask_testing.TestCase
def create_app(self):
return journalist.app

def setUp(self):
utils.env.setup()

# Patch the two-factor verification to avoid intermittent errors
utils.db_helper.mock_verify_token(self)

# Setup test users: user & admin
self.user, self.user_pw = utils.db_helper.init_journalist()
self.admin, self.admin_pw = utils.db_helper.init_journalist(
is_admin=True)

def tearDown(self):
utils.env.teardown()

# WARNING: we are purposely doing something that would not work in
# production in the _login_user and _login_admin methods. This is done as a
# reminder to the test developer that the flask_testing.TestCase only uses
# one request context per method (see
# https://github.com/freedomofpress/securedrop/issues/1444). By explicitly
# making a point of this, we hope to avoid the introduction of new tests,
# that do not truly prove their result because of this disconnect between
# request context in Flask Testing and production.
def _login_admin(self):
self._ctx.g.user = self.admin

def _login_user(self):
self._ctx.g.user = self.user

def test_download_selected_submissions_from_source(self):
source, _ = utils.db_helper.init_source()
Expand Down

0 comments on commit f922012

Please sign in to comment.