diff --git a/securedrop/source_app/main.py b/securedrop/source_app/main.py index cd52fe42f8..e3dd66b4c5 100644 --- a/securedrop/source_app/main.py +++ b/securedrop/source_app/main.py @@ -212,8 +212,9 @@ def delete(): history. """ - query = Reply.query.filter( - Reply.filename == request.form['reply_filename']) + query = Reply.query.filter_by( + filename=request.form['reply_filename'], + source_id=g.source.id) reply = get_one_or_else(query, current_app.logger, abort) reply.deleted_by_source = True db.session.add(reply) diff --git a/securedrop/tests/test_source.py b/securedrop/tests/test_source.py index 7bae4c710f..dd8d9527dd 100644 --- a/securedrop/tests/test_source.py +++ b/securedrop/tests/test_source.py @@ -5,7 +5,7 @@ import subprocess from cStringIO import StringIO -from flask import session, escape, current_app, url_for +from flask import session, escape, current_app, url_for, g from mock import patch, ANY import crypto_util @@ -647,3 +647,47 @@ def test_csrf_error_page(config, source_app): resp = app.post(url_for('main.create'), follow_redirects=True) text = resp.data.decode('utf-8') assert 'Your session timed out due to inactivity' in text + + +def test_source_can_only_delete_own_replies(source_app): + '''This test checks for a bug an authenticated source A could delete + replies send to source B by "guessing" the filename. + ''' + source0, codename0 = utils.db_helper.init_source() + source1, codename1 = utils.db_helper.init_source() + journalist, _ = utils.db_helper.init_journalist() + replies = utils.db_helper.reply(journalist, source0, 1) + filename = replies[0].filename + confirmation_msg = 'Reply deleted' + + with source_app.test_client() as app: + resp = app.post(url_for('main.login'), + data={'codename': codename1}, + follow_redirects=True) + assert resp.status_code == 200 + assert g.source.id == source1.id + + resp = app.post(url_for('main.delete'), + data={'reply_filename': filename}, + follow_redirects=True) + assert resp.status_code == 404 + assert confirmation_msg not in resp.data.decode('utf-8') + + reply = Reply.query.filter_by(filename=filename).one() + assert not reply.deleted_by_source + + with source_app.test_client() as app: + resp = app.post(url_for('main.login'), + data={'codename': codename0}, + follow_redirects=True) + assert resp.status_code == 200 + assert g.source.id == source0.id + + resp = app.post(url_for('main.delete'), + data={'reply_filename': filename}, + follow_redirects=True) + assert resp.status_code == 200 + assert confirmation_msg in resp.data.decode('utf-8') + + reply = Reply.query.filter_by(filename=filename).one() + assert reply.deleted_by_source