From 248684001e0804492374a92fd42706b122d16200 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 16 Sep 2020 09:33:02 +0100 Subject: [PATCH 01/20] Begin outstanding catch-ups on Synapse startup --- changelog.d/8322.bugfix | 2 + synapse/federation/sender/__init__.py | 43 ++++++++++++++ .../storage/databases/main/transactions.py | 58 ++++++++++++++++++- 3 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 changelog.d/8322.bugfix diff --git a/changelog.d/8322.bugfix b/changelog.d/8322.bugfix new file mode 100644 index 000000000000..b251dc15e849 --- /dev/null +++ b/changelog.d/8322.bugfix @@ -0,0 +1,2 @@ +Fix messages over federation being lost until an event is sent into the same room. + diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 41a726878dfa..16d9551d58e9 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -55,6 +55,15 @@ "Total number of PDUs queued for sending across all destinations", ) +# Time (in s) after Synapse's startup that we will begin to wake up destinations +# that have catch-up outstanding. +CATCH_UP_WAKE_AFTER_SYNAPSE_STARTUP_DELAY_SEC = 15 + +# Time (in s) waited in between waking up each destination, i.e. one destination +# will be woken up every seconds after Synapse's startup until we have woken +# every destination has outstanding catch-up. +CATCH_UP_WAKE_AFTER_SYNAPSE_STARTUP_INTERVAL_SEC = 5 + class FederationSender: def __init__(self, hs: "synapse.server.HomeServer"): @@ -125,6 +134,13 @@ def __init__(self, hs: "synapse.server.HomeServer"): 1000.0 / hs.config.federation_rr_transactions_per_room_per_second ) + self.clock.call_later( + CATCH_UP_WAKE_AFTER_SYNAPSE_STARTUP_DELAY_SEC, + run_as_background_process, + "wake_destinations_needing_catchup", + self._wake_destinations_needing_catchup, + ) + def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue: """Get or create a PerDestinationQueue for the given destination @@ -560,3 +576,30 @@ async def get_replication_rows( # Dummy implementation for case where federation sender isn't offloaded # to a worker. return [], 0, False + + async def _wake_destinations_needing_catchup(self): + """ + Wakes up destinations that need catch-up and are not currently being + backed off from. + Does so in a slow way (one every 5 seconds) to reduce load spikes. + """ + + last_processed = None # type: Optional[str] + + while True: + destinations_to_wake = await self.store.get_catch_up_outstanding_destinations( + last_processed + ) + + if not destinations_to_wake: + # finished waking all destinations! + break + + for destination in destinations_to_wake: + if self._federation_shard_config.should_handle( + self._instance_name, destination + ): + last_processed = destination + logger.info("Destination %s has outstanding catch-up, waking up.", destination) + self.wake_destination(destination) + await self.clock.sleep(CATCH_UP_WAKE_AFTER_SYNAPSE_STARTUP_INTERVAL_SEC) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 091367006e17..d873b98faaeb 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -397,7 +397,7 @@ async def get_catch_up_room_event_ids( @staticmethod def _get_catch_up_room_event_ids_txn( - txn, destination: str, last_successful_stream_ordering: int, + txn: LoggingTransaction, destination: str, last_successful_stream_ordering: int, ) -> List[str]: q = """ SELECT event_id FROM destination_rooms @@ -412,3 +412,59 @@ def _get_catch_up_room_event_ids_txn( ) event_ids = [row[0] for row in txn] return event_ids + + async def get_catch_up_outstanding_destinations( + self, after_destination: Optional[str] + ) -> List[str]: + """ + Gets at most 25 destinations which have outstanding PDUs to be caught up, + and are not being backed off from + Args: + after_destination: + If provided, all destinations must be lexicographically greater + than this one. + + Returns: + list of up to 25 destinations with outstanding catch-up. + These are the lexicographically first destinations which are + lexicographically greater than after_destination (if provided). + """ + time = self.hs.get_clock().time_msec() + + return await self.db_pool.runInteraction( + "get_catch_up_outstanding_destinations", + self._get_catch_up_outstanding_destinations_txn, + time, + after_destination, + ) + + @staticmethod + def _get_catch_up_outstanding_destinations_txn( + txn: LoggingTransaction, now_time_ms: int, after_destination: Optional[str] + ) -> List[str]: + q = """ + SELECT destination FROM destinations + WHERE destination IN ( + SELECT destination FROM destination_rooms + WHERE stream_ordering > last_successful_stream_ordering + ) + AND destination > ? + AND ( + retry_last_ts IS NULL OR + retry_last_ts + retry_interval < ? + ) + ORDER BY destination + LIMIT 25 + """ + txn.execute( + q, + ( + # everything is lexicographically greater than "" so this gives + # us the first batch of up to 25. + after_destination or "", + now_time_ms, + ), + ) + + destinations = [row[0] for row in txn] + return destinations From c36261e4883fb72ef29ea20563e5c6c7ea7fc95f Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 16 Sep 2020 09:34:54 +0100 Subject: [PATCH 02/20] Consolidate changelog entries --- changelog.d/8230.misc | 2 +- changelog.d/8247.misc | 2 +- changelog.d/8258.misc | 2 +- changelog.d/8322.bugfix | 1 - 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/changelog.d/8230.misc b/changelog.d/8230.misc index bf0ba767307d..532d0e22fefb 100644 --- a/changelog.d/8230.misc +++ b/changelog.d/8230.misc @@ -1 +1 @@ -Track the latest event for every destination and room for catch-up after federation outage. +Fix messages over federation being lost until an event is sent into the same room. diff --git a/changelog.d/8247.misc b/changelog.d/8247.misc index 3c27803be45f..532d0e22fefb 100644 --- a/changelog.d/8247.misc +++ b/changelog.d/8247.misc @@ -1 +1 @@ -Track the `stream_ordering` of the last successfully-sent event to every destination, so we can use this information to 'catch up' a remote server after an outage. +Fix messages over federation being lost until an event is sent into the same room. diff --git a/changelog.d/8258.misc b/changelog.d/8258.misc index 3c27803be45f..532d0e22fefb 100644 --- a/changelog.d/8258.misc +++ b/changelog.d/8258.misc @@ -1 +1 @@ -Track the `stream_ordering` of the last successfully-sent event to every destination, so we can use this information to 'catch up' a remote server after an outage. +Fix messages over federation being lost until an event is sent into the same room. diff --git a/changelog.d/8322.bugfix b/changelog.d/8322.bugfix index b251dc15e849..532d0e22fefb 100644 --- a/changelog.d/8322.bugfix +++ b/changelog.d/8322.bugfix @@ -1,2 +1 @@ Fix messages over federation being lost until an event is sent into the same room. - From 86d0b79e1997f65232d09814a933d7f3e8ba62ca Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 16 Sep 2020 09:38:40 +0100 Subject: [PATCH 03/20] =?UTF-8?q?Consolidate=20misc=20=E2=86=92=20bugfix?= =?UTF-8?q?=20changelog=20entries?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- changelog.d/{8230.misc => 8230.bugfix} | 0 changelog.d/{8247.misc => 8247.bugfix} | 0 changelog.d/{8258.misc => 8258.bugfix} | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename changelog.d/{8230.misc => 8230.bugfix} (100%) rename changelog.d/{8247.misc => 8247.bugfix} (100%) rename changelog.d/{8258.misc => 8258.bugfix} (100%) diff --git a/changelog.d/8230.misc b/changelog.d/8230.bugfix similarity index 100% rename from changelog.d/8230.misc rename to changelog.d/8230.bugfix diff --git a/changelog.d/8247.misc b/changelog.d/8247.bugfix similarity index 100% rename from changelog.d/8247.misc rename to changelog.d/8247.bugfix diff --git a/changelog.d/8258.misc b/changelog.d/8258.bugfix similarity index 100% rename from changelog.d/8258.misc rename to changelog.d/8258.bugfix From a2e6b36db53817a5bab3b5ed41fcfe14f5c81bd6 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 16 Sep 2020 09:40:06 +0100 Subject: [PATCH 04/20] Antilint Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/federation/sender/__init__.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 16d9551d58e9..aa587da83a31 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -600,6 +600,11 @@ async def _wake_destinations_needing_catchup(self): self._instance_name, destination ): last_processed = destination - logger.info("Destination %s has outstanding catch-up, waking up.", destination) + logger.info( + "Destination %s has outstanding catch-up, waking up.", + destination, + ) self.wake_destination(destination) - await self.clock.sleep(CATCH_UP_WAKE_AFTER_SYNAPSE_STARTUP_INTERVAL_SEC) + await self.clock.sleep( + CATCH_UP_WAKE_AFTER_SYNAPSE_STARTUP_INTERVAL_SEC + ) From 223ba3c1b7835c3797b609ef336dd38d7c7ecb72 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 16 Sep 2020 13:45:38 +0100 Subject: [PATCH 05/20] Fix _set_destination_retry_timings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This came about because the code assumed that retry_interval could not be NULL — which has been challenged by catch-up. --- synapse/storage/databases/main/transactions.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index d873b98faaeb..4e6c05030919 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -218,6 +218,7 @@ def _set_destination_retry_timings( retry_interval = EXCLUDED.retry_interval WHERE EXCLUDED.retry_interval = 0 + OR destinations.retry_interval IS NULL OR destinations.retry_interval < EXCLUDED.retry_interval """ @@ -249,7 +250,7 @@ def _set_destination_retry_timings( "retry_interval": retry_interval, }, ) - elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval: + elif not retry_interval or prev_row["retry_interval"] < retry_interval: self.db_pool.simple_update_one_txn( txn, "destinations", From 49bd80e16e558a882a9940634f2b714ec9a6fb1c Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 16 Sep 2020 15:41:12 +0100 Subject: [PATCH 06/20] Comment --- synapse/federation/sender/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index aa587da83a31..3553eab7a9cc 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -134,6 +134,7 @@ def __init__(self, hs: "synapse.server.HomeServer"): 1000.0 / hs.config.federation_rr_transactions_per_room_per_second ) + # wake up destinations that have outstanding PDUs to be caught up self.clock.call_later( CATCH_UP_WAKE_AFTER_SYNAPSE_STARTUP_DELAY_SEC, run_as_background_process, From 918ef908245a5ad9cbaed99d4ac228bb954cd492 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 16 Sep 2020 15:51:33 +0100 Subject: [PATCH 07/20] Make the timer cancellable --- synapse/federation/sender/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 3553eab7a9cc..d0c5b3ef3735 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -135,7 +135,7 @@ def __init__(self, hs: "synapse.server.HomeServer"): ) # wake up destinations that have outstanding PDUs to be caught up - self.clock.call_later( + self._catchup_after_startup_timer = self.clock.call_later( CATCH_UP_WAKE_AFTER_SYNAPSE_STARTUP_DELAY_SEC, run_as_background_process, "wake_destinations_needing_catchup", From 876b4d1037b2db8cf8e2c962681c9ed646dbcb17 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 16 Sep 2020 15:51:52 +0100 Subject: [PATCH 08/20] Test catch-up on start-up --- tests/federation/test_federation_catch_up.py | 98 ++++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index cc52c3dfac0a..8e329b73ed7e 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -1,3 +1,4 @@ +from collections import defaultdict from typing import List, Tuple from mock import Mock @@ -321,3 +322,100 @@ def test_catch_up_loop(self): per_dest_queue._last_successful_stream_ordering, event_5.internal_metadata.stream_ordering, ) + + @override_config({"send_federation": True}) + def test_catch_up_on_synapse_startup(self): + """ + Tests the behaviour of get_catch_up_outstanding_destinations and + _wake_destinations_needing_catchup. + """ + + # list of sorted server names + server_names = ["server%02d" % number for number in range(42)] + ["zzzerver"] + + # ARRANGE: + # - a local user (u1) + # - a room which u1 is joined to (and remote users @user:serverXX are + # joined to) + + # mark the remotes as online + self.is_online = True + + self.register_user("u1", "you the one") + u1_token = self.login("u1", "you the one") + room_1 = self.helper.create_room_as("u1", tok=u1_token) + + for server_name in server_names: + self.get_success( + event_injection.inject_member_event( + self.hs, room_1, "@user:%s" % server_name, "join" + ) + ) + + # create an event + self.helper.send(room_1, "deary me!", tok=u1_token) + + # ASSERT: + # - All servers are up to date so none should have outstanding catch-up + outstanding_when_successful = self.get_success( + self.hs.get_datastore().get_catch_up_outstanding_destinations(None) + ) + self.assertEqual(outstanding_when_successful, []) + + # ACT: + # - Make the remote servers unreachable + self.is_online = False + + # - Mark zzzerver as being backed-off from + now = self.clock.time_msec() + self.get_success( + self.hs.get_datastore().set_destination_retry_timings( + "zzzerver", now, now, 24 * 60 * 60 * 1000 # retry in 1 day + ) + ) + + # - Send an event + self.helper.send(room_1, "can anyone hear me?", tok=u1_token) + + # ASSERT (get_catch_up_outstanding_destinations): + # - all remotes are outstanding + # - they are returned in batches of 25, in order + outstanding_1 = self.get_success( + self.hs.get_datastore().get_catch_up_outstanding_destinations(None) + ) + + self.assertEqual(len(outstanding_1), 25) + self.assertEqual(outstanding_1, server_names[0:25]) + + outstanding_2 = self.get_success( + self.hs.get_datastore().get_catch_up_outstanding_destinations( + server_names[24] + ) + ) + self.assertNotIn("zzzerver", outstanding_2) + self.assertEqual(len(outstanding_2), 17) + self.assertEqual(outstanding_2, server_names[25:-1]) + + # ACT: call _wake_destinations_needing_catchup + + # patch wake_destination to just count the destinations instead + woken = defaultdict(lambda: 0) + + def wake_destination_track(destination): + woken[destination] += 1 + + self.hs.get_federation_sender().wake_destination = wake_destination_track + + # cancel the pre-existing timer for _wake_destinations_needing_catchup + self.hs.get_federation_sender()._catchup_after_startup_timer.cancel() + + self.get_success( + self.hs.get_federation_sender()._wake_destinations_needing_catchup(), by=5.0 + ) + + # ASSERT (_wake_destinations_needing_catchup): + # - all remotes are woken up, save for zzzerver + self.assertNotIn("zzzerver", woken) + self.assertEqual(set(woken.keys()), set(server_names[:-1])) + # check that all destinations were woken exactly once + self.assertEqual([value for value in woken.values() if value != 1], []) From 9b97702cf92423eceadc38cca0ba2981193ed2f4 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 16 Sep 2020 16:28:39 +0100 Subject: [PATCH 09/20] Fix code path for old SQLite --- synapse/storage/databases/main/transactions.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 4e6c05030919..86bc07519d6a 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -250,7 +250,11 @@ def _set_destination_retry_timings( "retry_interval": retry_interval, }, ) - elif not retry_interval or prev_row["retry_interval"] < retry_interval: + elif ( + retry_interval == 0 + or prev_row["retry_interval"] is None + or prev_row["retry_interval"] < retry_interval + ): self.db_pool.simple_update_one_txn( txn, "destinations", From 3cafd45b702fd260b45f922b2d64f7a65d3c1d5b Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Fri, 18 Sep 2020 09:30:18 +0100 Subject: [PATCH 10/20] Update synapse/federation/sender/__init__.py Co-authored-by: Patrick Cloke --- synapse/federation/sender/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index d0c5b3ef3735..01a43816b5b8 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -59,10 +59,10 @@ # that have catch-up outstanding. CATCH_UP_WAKE_AFTER_SYNAPSE_STARTUP_DELAY_SEC = 15 -# Time (in s) waited in between waking up each destination, i.e. one destination +# Time (in s) to wait in between waking up each destination, i.e. one destination # will be woken up every seconds after Synapse's startup until we have woken # every destination has outstanding catch-up. -CATCH_UP_WAKE_AFTER_SYNAPSE_STARTUP_INTERVAL_SEC = 5 +CATCH_UP_STARTUP_INTERVAL_SEC = 5 class FederationSender: From f27744dabd7b4fcc17449c1a4f83ba551e829614 Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Fri, 18 Sep 2020 09:30:29 +0100 Subject: [PATCH 11/20] Update synapse/federation/sender/__init__.py Co-authored-by: Patrick Cloke --- synapse/federation/sender/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 01a43816b5b8..4194cb01f96f 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -57,7 +57,7 @@ # Time (in s) after Synapse's startup that we will begin to wake up destinations # that have catch-up outstanding. -CATCH_UP_WAKE_AFTER_SYNAPSE_STARTUP_DELAY_SEC = 15 +CATCH_UP_STARTUP_DELAY_SEC = 15 # Time (in s) to wait in between waking up each destination, i.e. one destination # will be woken up every seconds after Synapse's startup until we have woken From c76209ae62b3e2b9e62aa08ad3c94d46acd6e3a8 Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Fri, 18 Sep 2020 09:30:47 +0100 Subject: [PATCH 12/20] Update synapse/federation/sender/__init__.py Co-authored-by: Patrick Cloke --- synapse/federation/sender/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 4194cb01f96f..198a3b7daa5c 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -582,7 +582,8 @@ async def _wake_destinations_needing_catchup(self): """ Wakes up destinations that need catch-up and are not currently being backed off from. - Does so in a slow way (one every 5 seconds) to reduce load spikes. + + In order to reduce load spikes, add a delay between each destination. """ last_processed = None # type: Optional[str] From 47452c38877757d38ffe2217f543942cf05f9fc5 Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Fri, 18 Sep 2020 10:04:47 +0100 Subject: [PATCH 13/20] Update tests/federation/test_federation_catch_up.py Co-authored-by: Patrick Cloke --- tests/federation/test_federation_catch_up.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index 8e329b73ed7e..d4c3b60423c3 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -330,7 +330,7 @@ def test_catch_up_on_synapse_startup(self): _wake_destinations_needing_catchup. """ - # list of sorted server names + # list of sorted server names (note that there are more servers than the batch size used in get_catch_up_outstanding_destinations). server_names = ["server%02d" % number for number in range(42)] + ["zzzerver"] # ARRANGE: From d464a4f147669ca69f5e45668d77d6d74c110c36 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 18 Sep 2020 09:36:56 +0100 Subject: [PATCH 14/20] Make query clearer --- synapse/storage/databases/main/transactions.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 86bc07519d6a..08929112ac06 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -451,7 +451,8 @@ def _get_catch_up_outstanding_destinations_txn( SELECT destination FROM destinations WHERE destination IN ( SELECT destination FROM destination_rooms - WHERE stream_ordering > last_successful_stream_ordering + WHERE destination_rooms.stream_ordering > + destinations.last_successful_stream_ordering ) AND destination > ? AND ( From 4b92775b76bf81e0772634d520db0784c1fea71e Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 18 Sep 2020 09:49:56 +0100 Subject: [PATCH 15/20] Pre-filter destinations list --- synapse/federation/sender/__init__.py | 28 +++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 198a3b7daa5c..e968d8a29936 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -136,7 +136,7 @@ def __init__(self, hs: "synapse.server.HomeServer"): # wake up destinations that have outstanding PDUs to be caught up self._catchup_after_startup_timer = self.clock.call_later( - CATCH_UP_WAKE_AFTER_SYNAPSE_STARTUP_DELAY_SEC, + CATCH_UP_STARTUP_DELAY_SEC, run_as_background_process, "wake_destinations_needing_catchup", self._wake_destinations_needing_catchup, @@ -583,7 +583,7 @@ async def _wake_destinations_needing_catchup(self): Wakes up destinations that need catch-up and are not currently being backed off from. - In order to reduce load spikes, add a delay between each destination. + In order to reduce load spikes, adds a delay between each destination. """ last_processed = None # type: Optional[str] @@ -597,16 +597,16 @@ async def _wake_destinations_needing_catchup(self): # finished waking all destinations! break + destinations_to_wake = [ + d + for d in destinations_to_wake + if self._federation_shard_config.should_handle(self._instance_name, d) + ] + for destination in destinations_to_wake: - if self._federation_shard_config.should_handle( - self._instance_name, destination - ): - last_processed = destination - logger.info( - "Destination %s has outstanding catch-up, waking up.", - destination, - ) - self.wake_destination(destination) - await self.clock.sleep( - CATCH_UP_WAKE_AFTER_SYNAPSE_STARTUP_INTERVAL_SEC - ) + last_processed = destination + logger.info( + "Destination %s has outstanding catch-up, waking up.", destination, + ) + self.wake_destination(destination) + await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC) From 0ea50f3240bdeb3d87eb4fa9a6432d0f30a01395 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 18 Sep 2020 09:51:25 +0100 Subject: [PATCH 16/20] Clear out timer here --- synapse/federation/sender/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index e968d8a29936..022fd144b5a4 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -595,6 +595,7 @@ async def _wake_destinations_needing_catchup(self): if not destinations_to_wake: # finished waking all destinations! + self._catchup_after_startup_timer = None break destinations_to_wake = [ From 9425bb4ea2ca472cb694972ccd9c5bf96e88269e Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 18 Sep 2020 10:03:47 +0100 Subject: [PATCH 17/20] Clean-up test --- tests/federation/test_federation_catch_up.py | 25 ++++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index d4c3b60423c3..1e88dbe40f6b 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -343,17 +343,17 @@ def test_catch_up_on_synapse_startup(self): self.register_user("u1", "you the one") u1_token = self.login("u1", "you the one") - room_1 = self.helper.create_room_as("u1", tok=u1_token) + room_id = self.helper.create_room_as("u1", tok=u1_token) for server_name in server_names: self.get_success( event_injection.inject_member_event( - self.hs, room_1, "@user:%s" % server_name, "join" + self.hs, room_id, "@user:%s" % server_name, "join" ) ) # create an event - self.helper.send(room_1, "deary me!", tok=u1_token) + self.helper.send(room_id, "deary me!", tok=u1_token) # ASSERT: # - All servers are up to date so none should have outstanding catch-up @@ -375,7 +375,7 @@ def test_catch_up_on_synapse_startup(self): ) # - Send an event - self.helper.send(room_1, "can anyone hear me?", tok=u1_token) + self.helper.send(room_id, "can anyone hear me?", tok=u1_token) # ASSERT (get_catch_up_outstanding_destinations): # - all remotes are outstanding @@ -389,7 +389,7 @@ def test_catch_up_on_synapse_startup(self): outstanding_2 = self.get_success( self.hs.get_datastore().get_catch_up_outstanding_destinations( - server_names[24] + outstanding_1[-1] ) ) self.assertNotIn("zzzerver", outstanding_2) @@ -399,14 +399,16 @@ def test_catch_up_on_synapse_startup(self): # ACT: call _wake_destinations_needing_catchup # patch wake_destination to just count the destinations instead - woken = defaultdict(lambda: 0) + woken = [] def wake_destination_track(destination): - woken[destination] += 1 + woken.append(destination) self.hs.get_federation_sender().wake_destination = wake_destination_track # cancel the pre-existing timer for _wake_destinations_needing_catchup + # this is because we are calling it manually rather than waiting for it + # to be called automatically self.hs.get_federation_sender()._catchup_after_startup_timer.cancel() self.get_success( @@ -416,6 +418,9 @@ def wake_destination_track(destination): # ASSERT (_wake_destinations_needing_catchup): # - all remotes are woken up, save for zzzerver self.assertNotIn("zzzerver", woken) - self.assertEqual(set(woken.keys()), set(server_names[:-1])) - # check that all destinations were woken exactly once - self.assertEqual([value for value in woken.values() if value != 1], []) + # - all destinations are woken exactly once + # (assertCountEqual has a misleading name — it checks that the counts + # of each item in the collections are not the same. + # It does not merely check the lengths of the collections are equal, + # as the name implies.) + self.assertCountEqual(woken, server_names[:-1]) From 1f0bedf2c87afa693ea353f87e40bd9cfd15859c Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 18 Sep 2020 10:35:26 +0100 Subject: [PATCH 18/20] Antilint --- synapse/federation/sender/__init__.py | 2 +- tests/federation/test_federation_catch_up.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 022fd144b5a4..d4dfd9fca5b9 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -582,7 +582,7 @@ async def _wake_destinations_needing_catchup(self): """ Wakes up destinations that need catch-up and are not currently being backed off from. - + In order to reduce load spikes, adds a delay between each destination. """ diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index 1e88dbe40f6b..b0b51c2ad338 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -1,4 +1,3 @@ -from collections import defaultdict from typing import List, Tuple from mock import Mock From 5186364873ae8dc08b3c26f3c68b1c72d329e420 Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Fri, 18 Sep 2020 13:49:26 +0100 Subject: [PATCH 19/20] Update synapse/federation/sender/__init__.py Co-authored-by: Patrick Cloke --- synapse/federation/sender/__init__.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index d4dfd9fca5b9..2f1ce706b049 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -604,10 +604,9 @@ async def _wake_destinations_needing_catchup(self): if self._federation_shard_config.should_handle(self._instance_name, d) ] - for destination in destinations_to_wake: - last_processed = destination + for last_processed in destinations_to_wake: logger.info( - "Destination %s has outstanding catch-up, waking up.", destination, + "Destination %s has outstanding catch-up, waking up.", last_processed, ) - self.wake_destination(destination) + self.wake_destination(last_processed) await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC) From 7757cedd31c6d1b130fa4c8c0bc704a2606b7906 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 18 Sep 2020 13:53:46 +0100 Subject: [PATCH 20/20] Comment changes --- synapse/federation/sender/__init__.py | 3 ++- tests/federation/test_federation_catch_up.py | 9 +++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 2f1ce706b049..8bb17b3a05d3 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -606,7 +606,8 @@ async def _wake_destinations_needing_catchup(self): for last_processed in destinations_to_wake: logger.info( - "Destination %s has outstanding catch-up, waking up.", last_processed, + "Destination %s has outstanding catch-up, waking up.", + last_processed, ) self.wake_destination(last_processed) await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC) diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index b0b51c2ad338..1a3ccb263dae 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -329,7 +329,8 @@ def test_catch_up_on_synapse_startup(self): _wake_destinations_needing_catchup. """ - # list of sorted server names (note that there are more servers than the batch size used in get_catch_up_outstanding_destinations). + # list of sorted server names (note that there are more servers than the batch + # size used in get_catch_up_outstanding_destinations). server_names = ["server%02d" % number for number in range(42)] + ["zzzerver"] # ARRANGE: @@ -417,9 +418,5 @@ def wake_destination_track(destination): # ASSERT (_wake_destinations_needing_catchup): # - all remotes are woken up, save for zzzerver self.assertNotIn("zzzerver", woken) - # - all destinations are woken exactly once - # (assertCountEqual has a misleading name — it checks that the counts - # of each item in the collections are not the same. - # It does not merely check the lengths of the collections are equal, - # as the name implies.) + # - all destinations are woken exactly once; they appear once in woken. self.assertCountEqual(woken, server_names[:-1])