Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Catch-up after Federation Outage (bonus): Catch-up on Synapse Startup #8322

Merged
merged 20 commits into from
Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8230.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix messages over federation being lost until an event is sent into the same room.
1 change: 0 additions & 1 deletion changelog.d/8230.misc

This file was deleted.

1 change: 1 addition & 0 deletions changelog.d/8247.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix messages over federation being lost until an event is sent into the same room.
1 change: 0 additions & 1 deletion changelog.d/8247.misc

This file was deleted.

1 change: 1 addition & 0 deletions changelog.d/8258.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix messages over federation being lost until an event is sent into the same room.
1 change: 0 additions & 1 deletion changelog.d/8258.misc

This file was deleted.

1 change: 1 addition & 0 deletions changelog.d/8322.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix messages over federation being lost until an event is sent into the same room.
51 changes: 51 additions & 0 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@
"Total number of PDUs queued for sending across all destinations",
)

# Time (in s) after Synapse's startup that we will begin to wake up destinations
# that have catch-up outstanding.
CATCH_UP_STARTUP_DELAY_SEC = 15

# Time (in s) to wait in between waking up each destination, i.e. one destination
# will be woken up every <x> seconds after Synapse's startup until we have woken
# every destination has outstanding catch-up.
CATCH_UP_STARTUP_INTERVAL_SEC = 5


class FederationSender:
def __init__(self, hs: "synapse.server.HomeServer"):
Expand Down Expand Up @@ -125,6 +134,14 @@ def __init__(self, hs: "synapse.server.HomeServer"):
1000.0 / hs.config.federation_rr_transactions_per_room_per_second
)

# wake up destinations that have outstanding PDUs to be caught up
self._catchup_after_startup_timer = self.clock.call_later(
CATCH_UP_STARTUP_DELAY_SEC,
run_as_background_process,
"wake_destinations_needing_catchup",
self._wake_destinations_needing_catchup,
)

def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
"""Get or create a PerDestinationQueue for the given destination

Expand Down Expand Up @@ -560,3 +577,37 @@ async def get_replication_rows(
# Dummy implementation for case where federation sender isn't offloaded
# to a worker.
return [], 0, False

async def _wake_destinations_needing_catchup(self):
"""
Wakes up destinations that need catch-up and are not currently being
backed off from.

In order to reduce load spikes, adds a delay between each destination.
"""

last_processed = None # type: Optional[str]

while True:
destinations_to_wake = await self.store.get_catch_up_outstanding_destinations(
last_processed
)

if not destinations_to_wake:
# finished waking all destinations!
self._catchup_after_startup_timer = None
break

destinations_to_wake = [
d
for d in destinations_to_wake
if self._federation_shard_config.should_handle(self._instance_name, d)
]

for last_processed in destinations_to_wake:
logger.info(
"Destination %s has outstanding catch-up, waking up.",
last_processed,
)
self.wake_destination(last_processed)
await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC)
66 changes: 64 additions & 2 deletions synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ def _set_destination_retry_timings(
retry_interval = EXCLUDED.retry_interval
WHERE
EXCLUDED.retry_interval = 0
OR destinations.retry_interval IS NULL
clokep marked this conversation as resolved.
Show resolved Hide resolved
OR destinations.retry_interval < EXCLUDED.retry_interval
"""

Expand Down Expand Up @@ -249,7 +250,11 @@ def _set_destination_retry_timings(
"retry_interval": retry_interval,
},
)
elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval:
elif (
retry_interval == 0
or prev_row["retry_interval"] is None
clokep marked this conversation as resolved.
Show resolved Hide resolved
or prev_row["retry_interval"] < retry_interval
):
self.db_pool.simple_update_one_txn(
txn,
"destinations",
Expand Down Expand Up @@ -397,7 +402,7 @@ async def get_catch_up_room_event_ids(

@staticmethod
def _get_catch_up_room_event_ids_txn(
txn, destination: str, last_successful_stream_ordering: int,
txn: LoggingTransaction, destination: str, last_successful_stream_ordering: int,
) -> List[str]:
q = """
SELECT event_id FROM destination_rooms
Expand All @@ -412,3 +417,60 @@ def _get_catch_up_room_event_ids_txn(
)
event_ids = [row[0] for row in txn]
return event_ids

async def get_catch_up_outstanding_destinations(
self, after_destination: Optional[str]
) -> List[str]:
"""
Gets at most 25 destinations which have outstanding PDUs to be caught up,
and are not being backed off from
Args:
after_destination:
If provided, all destinations must be lexicographically greater
than this one.

Returns:
list of up to 25 destinations with outstanding catch-up.
These are the lexicographically first destinations which are
lexicographically greater than after_destination (if provided).
"""
time = self.hs.get_clock().time_msec()

return await self.db_pool.runInteraction(
"get_catch_up_outstanding_destinations",
self._get_catch_up_outstanding_destinations_txn,
time,
after_destination,
)

@staticmethod
def _get_catch_up_outstanding_destinations_txn(
txn: LoggingTransaction, now_time_ms: int, after_destination: Optional[str]
) -> List[str]:
q = """
SELECT destination FROM destinations
WHERE destination IN (
SELECT destination FROM destination_rooms
WHERE destination_rooms.stream_ordering >
destinations.last_successful_stream_ordering
)
AND destination > ?
AND (
retry_last_ts IS NULL OR
retry_last_ts + retry_interval < ?
Comment on lines +459 to +460
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This attempts to find destinations which have never been backed off from or which are beyond their retry interval?

Does this mean it will just poke all servers when it wakes up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this part means have never been backed off from or have an expired backoff.

                WHERE destination IN (
                    SELECT destination FROM destination_rooms
                        WHERE destination_rooms.stream_ordering >
                            destinations.last_successful_stream_ordering
                )

restricts this to only destinations with catch-up needed.

)
ORDER BY destination
LIMIT 25
"""
txn.execute(
q,
(
# everything is lexicographically greater than "" so this gives
# us the first batch of up to 25.
after_destination or "",
now_time_ms,
),
)

destinations = [row[0] for row in txn]
return destinations
99 changes: 99 additions & 0 deletions tests/federation/test_federation_catch_up.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,3 +321,102 @@ def test_catch_up_loop(self):
per_dest_queue._last_successful_stream_ordering,
event_5.internal_metadata.stream_ordering,
)

@override_config({"send_federation": True})
def test_catch_up_on_synapse_startup(self):
"""
Tests the behaviour of get_catch_up_outstanding_destinations and
_wake_destinations_needing_catchup.
"""

# list of sorted server names (note that there are more servers than the batch
# size used in get_catch_up_outstanding_destinations).
server_names = ["server%02d" % number for number in range(42)] + ["zzzerver"]

# ARRANGE:
# - a local user (u1)
# - a room which u1 is joined to (and remote users @user:serverXX are
# joined to)

# mark the remotes as online
self.is_online = True

self.register_user("u1", "you the one")
u1_token = self.login("u1", "you the one")
room_id = self.helper.create_room_as("u1", tok=u1_token)

for server_name in server_names:
self.get_success(
event_injection.inject_member_event(
self.hs, room_id, "@user:%s" % server_name, "join"
)
)

# create an event
self.helper.send(room_id, "deary me!", tok=u1_token)

# ASSERT:
# - All servers are up to date so none should have outstanding catch-up
outstanding_when_successful = self.get_success(
self.hs.get_datastore().get_catch_up_outstanding_destinations(None)
)
self.assertEqual(outstanding_when_successful, [])

# ACT:
# - Make the remote servers unreachable
self.is_online = False

# - Mark zzzerver as being backed-off from
now = self.clock.time_msec()
self.get_success(
self.hs.get_datastore().set_destination_retry_timings(
"zzzerver", now, now, 24 * 60 * 60 * 1000 # retry in 1 day
)
)

# - Send an event
self.helper.send(room_id, "can anyone hear me?", tok=u1_token)

# ASSERT (get_catch_up_outstanding_destinations):
# - all remotes are outstanding
# - they are returned in batches of 25, in order
outstanding_1 = self.get_success(
self.hs.get_datastore().get_catch_up_outstanding_destinations(None)
)

self.assertEqual(len(outstanding_1), 25)
self.assertEqual(outstanding_1, server_names[0:25])

outstanding_2 = self.get_success(
self.hs.get_datastore().get_catch_up_outstanding_destinations(
outstanding_1[-1]
)
)
self.assertNotIn("zzzerver", outstanding_2)
self.assertEqual(len(outstanding_2), 17)
self.assertEqual(outstanding_2, server_names[25:-1])

# ACT: call _wake_destinations_needing_catchup

# patch wake_destination to just count the destinations instead
woken = []

def wake_destination_track(destination):
woken.append(destination)

self.hs.get_federation_sender().wake_destination = wake_destination_track

# cancel the pre-existing timer for _wake_destinations_needing_catchup
# this is because we are calling it manually rather than waiting for it
# to be called automatically
self.hs.get_federation_sender()._catchup_after_startup_timer.cancel()

self.get_success(
self.hs.get_federation_sender()._wake_destinations_needing_catchup(), by=5.0
)

# ASSERT (_wake_destinations_needing_catchup):
# - all remotes are woken up, save for zzzerver
self.assertNotIn("zzzerver", woken)
# - all destinations are woken exactly once; they appear once in woken.
self.assertCountEqual(woken, server_names[:-1])