Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Change pushers to use the event_actions table #705

Merged
merged 22 commits into from
Apr 11, 2016
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion synapse/handlers/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from synapse.types import UserID, RoomAlias, Requester
from synapse.push.action_generator import ActionGenerator

from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn

import logging

Expand Down Expand Up @@ -406,6 +406,12 @@ def is_inviter_member_event(e):
event, context=context
)

# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
event_stream_id, max_stream_id
)

destinations = set()
for k, s in context.current_state.items():
try:
Expand Down
8 changes: 7 additions & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor
from synapse.util.frozenutils import unfreeze
Expand Down Expand Up @@ -1097,6 +1097,12 @@ def _handle_new_event(self, origin, event, state=None, auth_events=None):
context=context,
)

# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
event_stream_id, max_stream_id
)

defer.returnValue((context, event_stream_id, max_stream_id))

@defer.inlineCallbacks
Expand Down
21 changes: 17 additions & 4 deletions synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ def _received_remote_receipt(self, origin, content):
def _handle_new_receipts(self, receipts):
"""Takes a list of receipts, stores them and informs the notifier.
"""
min_batch_id = None
max_batch_id = None

for receipt in receipts:
room_id = receipt["room_id"]
receipt_type = receipt["receipt_type"]
Expand All @@ -97,10 +100,20 @@ def _handle_new_receipts(self, receipts):

stream_id, max_persisted_id = res

with PreserveLoggingContext():
self.notifier.on_new_event(
"receipt_key", max_persisted_id, rooms=[room_id]
)
if min_batch_id is None or stream_id < min_batch_id:
min_batch_id = stream_id
if max_batch_id is None or max_persisted_id > max_batch_id:
max_batch_id = max_persisted_id

affected_room_ids = list(set([r["room_id"] for r in receipts]))

with PreserveLoggingContext():
self.notifier.on_new_event(
"receipt_key", max_batch_id, rooms=affected_room_ids
)
self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids
)

defer.returnValue(True)

Expand Down
Loading