Skip to content

Commit

Permalink
Merge pull request #1486 from freedomofpress/1457-pending-replies
Browse files Browse the repository at this point in the history
fix: limit and track processing of `SendReplyJob`s around session and network partitions
  • Loading branch information
gonzalo-bulnes authored Dec 22, 2022
2 parents 8cd7682 + f0040bc commit 4ef79d8
Show file tree
Hide file tree
Showing 12 changed files with 254 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""DraftReply: add column for sending PID
Revision ID: 414627c04463
Revises: d7c8af95bc8e
Create Date: 2022-05-24 20:48:28.581857
"""
import sqlalchemy as sa

from alembic import op

# revision identifiers, used by Alembic.
revision = "414627c04463"
down_revision = "d7c8af95bc8e"
branch_labels = None
depends_on = None


def upgrade():
op.add_column("draftreplies", sa.Column("sending_pid", sa.Integer(), nullable=True))


def downgrade():
# #457: batch_op.drop_column() is necessary instead of the op.drop_column()
# automatically generated by Alembic.
with op.batch_alter_table("draftreplies", schema=None) as batch_op:
batch_op.drop_column("sending_pid")
4 changes: 4 additions & 0 deletions securedrop_client/api_jobs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ def __lt__(self, other: QueueJobType) -> bool:
return self.order_number < other.order_number


class ClearQueueJob(QueueJob):
pass


class PauseQueueJob(QueueJob):
def __init__(self) -> None:
super().__init__()
Expand Down
3 changes: 3 additions & 0 deletions securedrop_client/api_jobs/uploads.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os

import sdclientapi
from sdclientapi import API, RequestTimeoutError, ServerConnectionError
Expand Down Expand Up @@ -52,6 +53,8 @@ def call_api(self, api_client: API, session: Session) -> str:
)
if not draft_reply_db_object:
raise Exception("Draft reply {} does not exist".format(self.reply_uuid))
draft_reply_db_object.sending_pid = os.getpid()
session.commit()

# If the source was deleted locally then do not send the message and delete the draft.
source = session.query(Source).filter_by(uuid=self.source_uuid).one_or_none()
Expand Down
1 change: 1 addition & 0 deletions securedrop_client/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ class DraftReply(Base):
# This tracks the sending status of the reply.
send_status_id = Column(Integer, ForeignKey("replysendstatuses.id"))
send_status = relationship("ReplySendStatus")
sending_pid = Column(Integer)

def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
Expand Down
1 change: 1 addition & 0 deletions securedrop_client/gui/widgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2037,6 +2037,7 @@ def __init__( # type: ignore [no-untyped-def]

self.check_mark.show()

update_signal.connect(self._on_reply_success)
message_succeeded_signal.connect(self._on_reply_success)
message_failed_signal.connect(self._on_reply_failure)
self.controller.update_authenticated_user.connect(self._on_update_authenticated_user)
Expand Down
22 changes: 16 additions & 6 deletions securedrop_client/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ def __init__( # type: ignore [no-untyped-def]
self.api_job_queue = ApiJobQueue(
self.api, self.session_maker, self.main_queue_thread, self.file_download_queue_thread
)
self.api_job_queue.cleared.connect(self.on_queue_cleared)
self.api_job_queue.paused.connect(self.on_queue_paused)
self.api_job_queue.main_queue_updated.connect(self._on_main_queue_updated)
self.add_job.connect(self.api_job_queue.enqueue)
Expand Down Expand Up @@ -492,6 +493,9 @@ def call_api( # type: ignore [no-untyped-def]
# Start the thread and related activity.
new_api_thread.start()

def on_queue_cleared(self) -> None:
self.update_failed_replies()

def on_queue_paused(self) -> None:
self.gui.update_error_status(
_("The SecureDrop server cannot be reached. Trying to reconnect..."), duration=0
Expand Down Expand Up @@ -537,7 +541,6 @@ def login(self, username: str, password: str, totp: str) -> None:
default_request_timeout for Queue API requests in ApiJobQueue in order to display errors
faster.
"""
storage.mark_all_pending_drafts_as_failed(self.session)
self.api = sdclientapi.API(
self.hostname, username, password, totp, self.proxy, default_request_timeout=60
)
Expand Down Expand Up @@ -607,7 +610,6 @@ def login_offline_mode(self) -> None:
# may have attempted online mode login, then switched to offline)
self.gui.clear_clipboard()
self.gui.show_main_window()
storage.mark_all_pending_drafts_as_failed(self.session)
self.update_sources()
self.show_last_sync()
self.show_last_sync_timer.start(TIME_BETWEEN_SHOWING_LAST_SYNC_MS)
Expand Down Expand Up @@ -660,6 +662,7 @@ def on_sync_success(self) -> None:
self.gui.refresh_current_source_conversation()
self.download_new_messages()
self.download_new_replies()
self.update_failed_replies()
self.sync_succeeded.emit()

try:
Expand Down Expand Up @@ -803,10 +806,6 @@ def logout(self) -> None:

self.invalidate_token()

failed_replies = storage.mark_all_pending_drafts_as_failed(self.session)
for failed_reply in failed_replies:
self.reply_failed.emit(failed_reply.uuid)

self.api_sync.stop()
self.api_job_queue.stop()
self.gui.logout()
Expand Down Expand Up @@ -1137,3 +1136,14 @@ def on_logout_success(self, result: Exception) -> None:

def on_logout_failure(self, result: Exception) -> None:
logging.info("Client logout failure")

def update_failed_replies(self) -> None:
"""
Emit an explicit `reply_failed` signal for each pending reply marked as failed by
the storage layer rather than the API job responsible for sending it (for example,
if the application quit mid-job or mid-queue). Without this signal, the reply
won't be shown as failed in the GUI until the application is restarted.
"""
failed_replies = storage.mark_all_pending_drafts_as_failed(self.session)
for failed_reply in failed_replies:
self.reply_failed.emit(failed_reply.uuid)
60 changes: 55 additions & 5 deletions securedrop_client/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
DEFAULT_NUM_ATTEMPTS,
ApiInaccessibleError,
ApiJob,
ClearQueueJob,
PauseQueueJob,
QueueJob,
)
Expand Down Expand Up @@ -85,6 +86,7 @@ class RunnableQueue(QObject):

# These are the priorities for processing jobs. Lower numbers corresponds to a higher priority.
JOB_PRIORITIES = {
ClearQueueJob: 0, # Must preempt all other jobs
PauseQueueJob: 11,
FileDownloadJob: 13, # File downloads processed in separate queue
DeleteSourceJob: 14,
Expand All @@ -96,7 +98,10 @@ class RunnableQueue(QObject):
SeenJob: 18,
}

# Signal that is emitted when processing stops
# Signal that is emitted when processing is stopped and queued jobs are cleared
cleared = pyqtSignal()

# Signal that is emitted when processing is paused
paused = pyqtSignal()

# Signal that is emitted to resume processing jobs
Expand Down Expand Up @@ -138,6 +143,16 @@ def _check_for_duplicate_jobs(self, job: QueueJob) -> bool:
return True
return False

def _clear(self) -> None:
"""
Reinstantiate the PriorityQueue, rather than trying to clear it via undocumented methods.[1]
[1]: https://stackoverflow.com/a/38560911
"""
with self.condition_add_or_remove_job:
self.queue = PriorityQueue()
self.cleared.emit()

def add_job(self, job: QueueJob) -> None:
"""
Add the job with its priority to the queue after assigning it the next order_number.
Expand Down Expand Up @@ -176,6 +191,9 @@ def process(self) -> None:
"""
Process the next job in the queue.
If the job is a ClearQueueJob, call _clear() and return from the processing loop so that
the processing thread can quit.
If the job is a PauseQueueJob, emit the paused signal and return from the processing loop so
that no more jobs are processed until the queue resumes.
Expand All @@ -196,7 +214,13 @@ def process(self) -> None:
self.condition_add_or_remove_job.wait_for(lambda: not self.queue.empty())
priority, self.current_job = self.queue.get(block=False)

if isinstance(self.current_job, PauseQueueJob):
if isinstance(self.current_job, ClearQueueJob):
with self.condition_add_or_remove_job:
self.current_job = None
self._clear()
return

if isinstance(self.current_job, PauseQueueJob): # type: ignore
self.paused.emit()
with self.condition_add_or_remove_job:
self.current_job = None
Expand Down Expand Up @@ -238,6 +262,9 @@ class ApiJobQueue(QObject):
from the Controller.
"""

# Signal that is emitted after a queue is cleared.
cleared = pyqtSignal()

# Signal that is emitted after a queue is paused.
paused = pyqtSignal()

Expand Down Expand Up @@ -270,6 +297,9 @@ def __init__(
self.main_queue.paused.connect(self.on_main_queue_paused)
self.download_file_queue.paused.connect(self.on_file_download_queue_paused)

self.main_queue.cleared.connect(self.on_main_queue_cleared)
self.download_file_queue.cleared.connect(self.on_file_download_queue_cleared)

def start(self, api_client: API) -> None:
"""
Start the queues whenever a new api token is provided.
Expand All @@ -287,15 +317,19 @@ def start(self, api_client: API) -> None:

def stop(self) -> None:
"""
Stop the queues.
Inject a ClearQueueJob into each queue and quit its processing thread. To keep this
method non-blocking, we do NOT wait() for the thread to return, which will happen only
when RunnableQueue.process() reaches the ClearQueueJob and returns from its loop.
"""
if self.main_thread.isRunning():
self.main_queue.add_job(ClearQueueJob())
self.main_thread.quit()
logger.debug("Stopped main queue")
logger.debug("Asked main queue thread to quit")

if self.download_file_thread.isRunning():
self.download_file_queue.add_job(ClearQueueJob())
self.download_file_thread.quit()
logger.debug("Stopped file download queue")
logger.debug("Asked file-download queue thread to quit")

@pyqtSlot()
def on_main_queue_paused(self) -> None:
Expand All @@ -313,6 +347,22 @@ def on_file_download_queue_paused(self) -> None:
logger.debug("Paused file download queue")
self.paused.emit()

@pyqtSlot()
def on_main_queue_cleared(self) -> None:
"""
Emit the "cleared" signal when the main RunnableQueue is cleared.
"""
logger.debug("Cleared main queue")
self.cleared.emit()

@pyqtSlot()
def on_file_download_queue_cleared(self) -> None:
"""
Emit the "cleared" signal when the file-download RunnableQueue is cleared.
"""
logger.debug("Cleared file download queue")
self.cleared.emit()

def resume_queues(self) -> None:
"""
Emit the resume signal to the queues if they are running.
Expand Down
12 changes: 10 additions & 2 deletions securedrop_client/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ def get_reply(session: Session, uuid: str) -> Reply:

def mark_all_pending_drafts_as_failed(session: Session) -> List[DraftReply]:
"""
When we login (offline or online) or logout, we need to set all the pending replies as failed.
Mark as failed those pending replies that originate from other sessions (PIDs).
"""
pending_status = (
session.query(ReplySendStatus).filter_by(name=ReplySendStatusCodes.PENDING.value).one()
Expand All @@ -1089,7 +1089,15 @@ def mark_all_pending_drafts_as_failed(session: Session) -> List[DraftReply]:
session.query(ReplySendStatus).filter_by(name=ReplySendStatusCodes.FAILED.value).one()
)

pending_drafts = session.query(DraftReply).filter_by(send_status=pending_status).all()
pending_drafts = (
session.query(DraftReply)
.filter(
DraftReply.send_status == pending_status,
DraftReply.sending_pid.isnot(os.getpid()),
)
.all()
)
logger.debug(f"Found {len(pending_drafts)} pending replies not being processed in this session")
for pending_draft in pending_drafts:
pending_draft.send_status = failed_status

Expand Down
1 change: 1 addition & 0 deletions tests/gui/test_widgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -4518,6 +4518,7 @@ def test_ReplyWidget_success_failure_slots(mocker):
)

# ensure we have connected the slots
mock_update_signal.connect.assert_called_with(widget._on_reply_success)
mock_success_signal.connect.assert_called_once_with(widget._on_reply_success)
mock_failure_signal.connect.assert_called_once_with(widget._on_reply_failure)
assert mock_update_signal.connect.called # to ensure no stale mocks
Expand Down
Loading

0 comments on commit 4ef79d8

Please sign in to comment.