Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Catch-up after Federation Outage (split, 4): catch-up loop (#8272)
Browse files Browse the repository at this point in the history
  • Loading branch information
reivilibre authored Sep 15, 2020
1 parent aec294e commit 576bc37
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 5 deletions.
1 change: 1 addition & 0 deletions changelog.d/8272.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix messages over federation being lost until an event is sent into the same room.
129 changes: 125 additions & 4 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
import datetime
import logging
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast

from prometheus_client import Counter

Expand Down Expand Up @@ -92,6 +92,21 @@ def __init__(
self._destination = destination
self.transmission_loop_running = False

# True whilst we are sending events that the remote homeserver missed
# because it was unreachable. We start in this state so we can perform
# catch-up at startup.
# New events will only be sent once this is finished, at which point
# _catching_up is flipped to False.
self._catching_up = True # type: bool

# The stream_ordering of the most recent PDU that was discarded due to
# being in catch-up mode.
self._catchup_last_skipped = 0 # type: int

# Cache of the last successfully-transmitted stream ordering for this
# destination (we are the only updater so this is safe)
self._last_successful_stream_ordering = None # type: Optional[int]

# a list of pending PDUs
self._pending_pdus = [] # type: List[EventBase]

Expand Down Expand Up @@ -138,7 +153,13 @@ def send_pdu(self, pdu: EventBase) -> None:
Args:
pdu: pdu to send
"""
self._pending_pdus.append(pdu)
if not self._catching_up or self._last_successful_stream_ordering is None:
# only enqueue the PDU if we are not catching up (False) or do not
# yet know if we have anything to catch up (None)
self._pending_pdus.append(pdu)
else:
self._catchup_last_skipped = pdu.internal_metadata.stream_ordering

self.attempt_new_transaction()

def send_presence(self, states: Iterable[UserPresenceState]) -> None:
Expand Down Expand Up @@ -218,6 +239,13 @@ async def _transaction_transmission_loop(self) -> None:
# hence why we throw the result away.
await get_retry_limiter(self._destination, self._clock, self._store)

if self._catching_up:
# we potentially need to catch-up first
await self._catch_up_transmission_loop()
if self._catching_up:
# not caught up yet
return

pending_pdus = []
while True:
# We have to keep 2 free slots for presence and rr_edus
Expand Down Expand Up @@ -351,8 +379,9 @@ async def _transaction_transmission_loop(self) -> None:
if e.retry_interval > 60 * 60 * 1000:
# we won't retry for another hour!
# (this suggests a significant outage)
# We drop pending PDUs and EDUs because otherwise they will
# We drop pending EDUs because otherwise they will
# rack up indefinitely.
# (Dropping PDUs is already performed by `_start_catching_up`.)
# Note that:
# - the EDUs that are being dropped here are those that we can
# afford to drop (specifically, only typing notifications,
Expand All @@ -364,11 +393,12 @@ async def _transaction_transmission_loop(self) -> None:

# dropping read receipts is a bit sad but should be solved
# through another mechanism, because this is all volatile!
self._pending_pdus = []
self._pending_edus = []
self._pending_edus_keyed = {}
self._pending_presence = {}
self._pending_rrs = {}

self._start_catching_up()
except FederationDeniedError as e:
logger.info(e)
except HttpResponseException as e:
Expand All @@ -378,6 +408,8 @@ async def _transaction_transmission_loop(self) -> None:
e.code,
e,
)

self._start_catching_up()
except RequestSendFailed as e:
logger.warning(
"TX [%s] Failed to send transaction: %s", self._destination, e
Expand All @@ -387,16 +419,96 @@ async def _transaction_transmission_loop(self) -> None:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)

self._start_catching_up()
except Exception:
logger.exception("TX [%s] Failed to send transaction", self._destination)
for p in pending_pdus:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)

self._start_catching_up()
finally:
# We want to be *very* sure we clear this after we stop processing
self.transmission_loop_running = False

async def _catch_up_transmission_loop(self) -> None:
first_catch_up_check = self._last_successful_stream_ordering is None

if first_catch_up_check:
# first catchup so get last_successful_stream_ordering from database
self._last_successful_stream_ordering = await self._store.get_destination_last_successful_stream_ordering(
self._destination
)

if self._last_successful_stream_ordering is None:
# if it's still None, then this means we don't have the information
# in our database ­ we haven't successfully sent a PDU to this server
# (at least since the introduction of the feature tracking
# last_successful_stream_ordering).
# Sadly, this means we can't do anything here as we don't know what
# needs catching up — so catching up is futile; let's stop.
self._catching_up = False
return

# get at most 50 catchup room/PDUs
while True:
event_ids = await self._store.get_catch_up_room_event_ids(
self._destination, self._last_successful_stream_ordering,
)

if not event_ids:
# No more events to catch up on, but we can't ignore the chance
# of a race condition, so we check that no new events have been
# skipped due to us being in catch-up mode

if self._catchup_last_skipped > self._last_successful_stream_ordering:
# another event has been skipped because we were in catch-up mode
continue

# we are done catching up!
self._catching_up = False
break

if first_catch_up_check:
# as this is our check for needing catch-up, we may have PDUs in
# the queue from before we *knew* we had to do catch-up, so
# clear those out now.
self._start_catching_up()

# fetch the relevant events from the event store
# - redacted behaviour of REDACT is fine, since we only send metadata
# of redacted events to the destination.
# - don't need to worry about rejected events as we do not actively
# forward received events over federation.
catchup_pdus = await self._store.get_events_as_list(event_ids)
if not catchup_pdus:
raise AssertionError(
"No events retrieved when we asked for %r. "
"This should not happen." % event_ids
)

if logger.isEnabledFor(logging.INFO):
rooms = (p.room_id for p in catchup_pdus)
logger.info("Catching up rooms to %s: %r", self._destination, rooms)

success = await self._transaction_manager.send_new_transaction(
self._destination, catchup_pdus, []
)

if not success:
return

sent_transactions_counter.inc()
final_pdu = catchup_pdus[-1]
self._last_successful_stream_ordering = cast(
int, final_pdu.internal_metadata.stream_ordering
)
await self._store.set_destination_last_successful_stream_ordering(
self._destination, self._last_successful_stream_ordering
)

def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
if not self._pending_rrs:
return
Expand Down Expand Up @@ -457,3 +569,12 @@ async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]
]

return (edus, stream_id)

def _start_catching_up(self) -> None:
"""
Marks this destination as being in catch-up mode.
This throws away the PDU queue.
"""
self._catching_up = True
self._pending_pdus = []
43 changes: 42 additions & 1 deletion synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import logging
from collections import namedtuple
from typing import Iterable, Optional, Tuple
from typing import Iterable, List, Optional, Tuple

from canonicaljson import encode_canonical_json

Expand Down Expand Up @@ -371,3 +371,44 @@ async def set_destination_last_successful_stream_ordering(
values={"last_successful_stream_ordering": last_successful_stream_ordering},
desc="set_last_successful_stream_ordering",
)

async def get_catch_up_room_event_ids(
self, destination: str, last_successful_stream_ordering: int,
) -> List[str]:
"""
Returns at most 50 event IDs and their corresponding stream_orderings
that correspond to the oldest events that have not yet been sent to
the destination.
Args:
destination: the destination in question
last_successful_stream_ordering: the stream_ordering of the
most-recently successfully-transmitted event to the destination
Returns:
list of event_ids
"""
return await self.db_pool.runInteraction(
"get_catch_up_room_event_ids",
self._get_catch_up_room_event_ids_txn,
destination,
last_successful_stream_ordering,
)

@staticmethod
def _get_catch_up_room_event_ids_txn(
txn, destination: str, last_successful_stream_ordering: int,
) -> List[str]:
q = """
SELECT event_id FROM destination_rooms
JOIN events USING (stream_ordering)
WHERE destination = ?
AND stream_ordering > ?
ORDER BY stream_ordering
LIMIT 50
"""
txn.execute(
q, (destination, last_successful_stream_ordering),
)
event_ids = [row[0] for row in txn]
return event_ids
Loading

0 comments on commit 576bc37

Please sign in to comment.