diff --git a/changelog.d/8096.bugfix b/changelog.d/8096.bugfix new file mode 100644 index 000000000000..bda882866f13 --- /dev/null +++ b/changelog.d/8096.bugfix @@ -0,0 +1 @@ +Send events to homeservers that they may have missed in rooms during a period of unreachability. diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 4662008bfdbb..6035fc6eeb57 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -211,7 +211,7 @@ async def handle_event(event: EventBase) -> None: logger.debug("Sending %s to %r", event, destinations) if destinations: - self._send_pdu(event, destinations) + await self._send_pdu(event, destinations) now = self.clock.time_msec() ts = await self.store.get_received_ts(event.event_id) @@ -267,7 +267,7 @@ async def handle_room_events(events: Iterable[EventBase]) -> None: finally: self._is_processing = False - def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: + async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus # table and we'll get back to it later. @@ -285,6 +285,16 @@ def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: sent_pdus_destination_dist_total.inc(len(destinations)) sent_pdus_destination_dist_count.inc() + # track the fact that we have a PDU for these destinations, + # to allow us to perform catch-up later on if the remote is unreachable + # for a while. + await self.store.store_destination_rooms_entries( + destinations, + pdu.room_id, + pdu.event_id, + pdu.internal_metadata.stream_ordering, + ) + for destination in destinations: self._get_per_destination_queue(destination).send_pdu(pdu, order) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index c09ffcaf4cce..6f9fa27942af 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -15,7 +15,7 @@ # limitations under the License. import datetime import logging -from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple +from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast from prometheus_client import Counter @@ -92,6 +92,20 @@ def __init__( self._destination = destination self.transmission_loop_running = False + # True whilst we are sending events that the remote homeserver missed + # because it was unreachable. + # New events will only be sent once this is finished, at which point + # _catching_up is flipped to False. + self._catching_up = True + + # the maximum stream order to catch up to (PDUs after this are expected + # to be in the main transmission queue), inclusive + self._catch_up_max_stream_order = None # type: Optional[int] + + # Cache of the last successfully-transmitted stream ordering for this + # destination (we are the only updater so this is safe) + self._last_successful_stream_order = None # type: Optional[int] + # a list of tuples of (pending pdu, order) self._pending_pdus = [] # type: List[Tuple[EventBase, int]] @@ -137,8 +151,15 @@ def send_pdu(self, pdu: EventBase, order: int) -> None: Args: pdu: pdu to send - order + order: an arbitrary order for the PDU — NOT the stream ordering """ + if ( + self._catch_up_max_stream_order is not None + and pdu.internal_metadata.stream_ordering <= self._catch_up_max_stream_order + ): + # we are in catch-up mode and this PDU is already scheduled to be + # part of the catch-up + return self._pending_pdus.append((pdu, order)) self.attempt_new_transaction() @@ -219,6 +240,16 @@ async def _transaction_transmission_loop(self) -> None: # hence why we throw the result away. await get_retry_limiter(self._destination, self._clock, self._store) + if self._catching_up: + # we're catching up, so we should send old events instead + # in this case, we don't send anything from the new queue + # this keeps the catching-up logic simple + await self._catch_up_transmission_loop() + if self._catching_up: + # if we aren't actually caught up yet, shouldn't carry on to + # the main loop + return + pending_pdus = [] while True: # We have to keep 2 free slots for presence and rr_edus @@ -326,6 +357,15 @@ async def _transaction_transmission_loop(self) -> None: self._last_device_stream_id = device_stream_id self._last_device_list_stream_id = dev_list_id + + if pending_pdus: + final_pdu, _ = pending_pdus[-1] + self._last_successful_stream_order = ( + final_pdu.internal_metadata.stream_ordering + ) + await self._store.set_destination_last_successful_stream_ordering( + self._destination, self._last_successful_stream_order + ) else: break except NotRetryingDestination as e: @@ -359,7 +399,12 @@ async def _transaction_transmission_loop(self) -> None: self._pending_edus_keyed = {} self._pending_presence = {} self._pending_rrs = {} + + self._catching_up = True + # reset max catch up since we have dropped PDUs here + self._catch_up_max_stream_order = None except FederationDeniedError as e: + # remote server is not in our federation whitelist logger.info(e) except HttpResponseException as e: logger.warning( @@ -368,6 +413,10 @@ async def _transaction_transmission_loop(self) -> None: e.code, e, ) + + self._catching_up = True + # reset max catch up since we have dropped PDUs here + self._catch_up_max_stream_order = None except RequestSendFailed as e: logger.warning( "TX [%s] Failed to send transaction: %s", self._destination, e @@ -377,16 +426,122 @@ async def _transaction_transmission_loop(self) -> None: logger.info( "Failed to send event %s to %s", p.event_id, self._destination ) + + self._catching_up = True + # reset max catch up since we have dropped PDUs here + self._catch_up_max_stream_order = None except Exception: logger.exception("TX [%s] Failed to send transaction", self._destination) for p, _ in pending_pdus: logger.info( "Failed to send event %s to %s", p.event_id, self._destination ) + + self._catching_up = True + # reset max catch up since we have dropped PDUs here + self._catch_up_max_stream_order = None finally: # We want to be *very* sure we clear this after we stop processing self.transmission_loop_running = False + async def _catch_up_transmission_loop(self) -> None: + if self._last_successful_stream_order is None: + # first catch-up, so get from database + self._last_successful_stream_order = await self._store.get_destination_last_successful_stream_ordering( + self._destination + ) + + if self._last_successful_stream_order is None: + # if it's still None, then this means we don't have the information + # in our database (oh, the perils of being a new feature). + # So we can't actually do anything here, and in this case, we don't + # know what to catch up, sadly. + # Trying to catch up right now is futile, so let's stop. + self._catching_up = False + return + + if self._catch_up_max_stream_order is None: + # this is our first catch-up so we need to determine how much we + # want to catch-up. + if self._pending_pdus: + # we have PDUs already in the main queue so no need to ask the + # database + first_non_catch_up_pdu, _ = self._pending_pdus[0] + # -1 because we wish to exclude that one — we don't need to catch + # it up as it's in our main queue + self._catch_up_max_stream_order = ( + first_non_catch_up_pdu.internal_metadata.stream_ordering - 1 + ) + else: + # we don't have any PDUs in the main queue so instead find out + # the largest stream order that we know of that has, once upon a + # time, been queued for this destination (i.e. this is what we + # *should* have sent if the remote server was reachable). + self._catch_up_max_stream_order = await self._store.get_largest_destination_rooms_stream_order( + self._destination + ) + if self._catch_up_max_stream_order is None: + # not enough info to catch up + self._catching_up = False + return + + # get at most 50 catchup room/PDUs + while self._last_successful_stream_order < self._catch_up_max_stream_order: + event_ids = await self._store.get_catch_up_room_event_ids( + self._destination, + self._last_successful_stream_order, + self._catch_up_max_stream_order, + ) + + if not event_ids: + # I don't believe this *should* happen unless someone has been + # tinkering with the database, but I also have limited foresight, + # so let's handle this properly + logger.warning( + "Unexpectedly, no event IDs were found for catch-up: " + "last successful = %d, max catch up = %d", + self._last_successful_stream_order, + self._catch_up_max_stream_order, + ) + self._catching_up = False + break + + # fetch the relevant events from the event store + # - redacted behaviour of REDACT is fine, since we only send metadata + # of redacted events to the destination. + # - don't need to worry about rejected events as we do not actively + # forward received events over federation. + events = await self._store.get_events_as_list(event_ids) + if not events: + raise AssertionError( + "No events retrieved when we asked for %r. " + "This should not happen." % event_ids + ) + + # zip them together with their stream orderings + catch_up_pdus = [ + (event, event.internal_metadata.stream_ordering) for event in events + ] + + success = await self._transaction_manager.send_new_transaction( + self._destination, catch_up_pdus, [] + ) + + if not success: + return + + sent_transactions_counter.inc() + final_pdu, _ = catch_up_pdus[-1] + self._last_successful_stream_order = cast( + int, final_pdu.internal_metadata.stream_ordering + ) + await self._store.set_destination_last_successful_stream_ordering( + self._destination, self._last_successful_stream_order + ) + + # once we have reached this point, catch-up is done! + self._catching_up = False + def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: if not self._pending_rrs: return diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 7ab370efef15..698841bb29a8 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1092,7 +1092,7 @@ async def simple_select_one_onecol( self, table: str, keyvalues: Dict[str, Any], - retcol: Iterable[str], + retcol: str, allow_none: bool = False, desc: str = "simple_select_one_onecol", ) -> Optional[Any]: @@ -1122,7 +1122,7 @@ def simple_select_one_onecol_txn( txn: LoggingTransaction, table: str, keyvalues: Dict[str, Any], - retcol: Iterable[str], + retcol: str, allow_none: bool = False, ) -> Optional[Any]: ret = cls.simple_select_onecol_txn( @@ -1139,10 +1139,7 @@ def simple_select_one_onecol_txn( @staticmethod def simple_select_onecol_txn( - txn: LoggingTransaction, - table: str, - keyvalues: Dict[str, Any], - retcol: Iterable[str], + txn: LoggingTransaction, table: str, keyvalues: Dict[str, Any], retcol: str, ) -> List[Any]: sql = ("SELECT %(retcol)s FROM %(table)s") % {"retcol": retcol, "table": table} diff --git a/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql b/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql new file mode 100644 index 000000000000..4b6e49487a88 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/58/11recovery_after_outage.sql @@ -0,0 +1,42 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +-- This schema delta alters the schema to enable 'catching up' remote homeservers +-- after there has been a connectivity problem for any reason. + +-- This stores, for each (destination, room) pair and stream_ordering of the +-- latest event for that destination. +CREATE TABLE IF NOT EXISTS destination_rooms ( + -- the destination in question. + -- Can not be a foreign key because rows in the `destinations` table will + -- only be created when we back off or when we successfully send a + -- transaction. + destination TEXT NOT NULL, + -- the ID of the room in question + room_id TEXT NOT NULL, + -- the stream_ordering of the event + stream_ordering INTEGER NOT NULL, + PRIMARY KEY (destination, room_id), + FOREIGN KEY (room_id) REFERENCES rooms (room_id) + ON DELETE CASCADE, + FOREIGN KEY (stream_ordering) REFERENCES events (stream_ordering) + ON DELETE CASCADE +); + +-- this column tracks the stream_ordering of the event that was most recently +-- successfully transmitted to the destination. +-- A value of NULL means that we have not sent an event successfully yet +-- (at least, not since the introduction of this column). +ALTER TABLE destinations + ADD COLUMN last_successful_stream_ordering INTEGER; diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 5b31aab700f9..2a94ed8ad170 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -12,10 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from collections import namedtuple -from typing import Optional, Tuple +from typing import Iterable, List, Optional, Tuple from canonicaljson import encode_canonical_json @@ -164,7 +163,7 @@ def _get_destination_retry_timings(self, txn, destination): allow_none=True, ) - if result and result["retry_last_ts"] > 0: + if result and result["retry_last_ts"]: return result else: return None @@ -273,3 +272,151 @@ def _cleanup_transactions_txn(txn): await self.db_pool.runInteraction( "_cleanup_transactions", _cleanup_transactions_txn ) + + async def get_destination_last_successful_stream_ordering( + self, destination: str + ) -> Optional[int]: + """ + Gets the stream ordering of the PDU most-recently successfully sent + to the specified destination. + + Args: + destination: the destination we have successfully sent to + """ + return await self.db_pool.simple_select_one_onecol( + "destinations", + {"destination": destination}, + "last_successful_stream_ordering", + allow_none=True, + desc="get_last_successful_stream_ordering", + ) + + async def set_destination_last_successful_stream_ordering( + self, destination: str, last_successful_stream_ordering: int + ) -> None: + """ + Marks that we have successfully sent the PDUs up to and including the + one specified. + + Args: + destination: the destination we have successfully sent to + last_successful_stream_ordering: the stream_ordering of the most + recent successfully-sent PDU + """ + return await self.db_pool.simple_upsert( + "destinations", + keyvalues={"destination": destination}, + values={"last_successful_stream_ordering": last_successful_stream_ordering}, + desc="set_last_successful_stream_ordering", + ) + + async def get_catch_up_room_event_ids( + self, + destination: str, + last_successful_stream_ordering: int, + max_stream_order: int, + ) -> List[str]: + """ + Returns at most 50 event IDs and their corresponding stream_orderings + that correspond to the oldest events that have not yet been sent to + the destination. + + Args: + destination: the destination in question + last_successful_stream_ordering: the stream_ordering of the + most-recently successfully-transmitted event to the destination + max_stream_order: an upper bound, inclusive, of the stream ordering + to return events for. + + Returns: + list of event_ids + """ + return await self.db_pool.runInteraction( + "get_catch_up_room_event_ids", + self._get_catch_up_room_event_ids_txn, + destination, + last_successful_stream_ordering, + max_stream_order, + ) + + @staticmethod + def _get_catch_up_room_event_ids_txn( + txn, + destination: str, + last_successful_stream_ordering: int, + max_stream_order: int, + ) -> List[str]: + q = """ + SELECT event_id FROM destination_rooms + JOIN events USING (stream_ordering) + WHERE destination = ? + AND stream_ordering > ? AND stream_ordering <= ? + ORDER BY stream_ordering + LIMIT 50 + """ + txn.execute( + q, (destination, last_successful_stream_ordering, max_stream_order), + ) + event_ids = [row[0] for row in txn] + return event_ids + + async def get_largest_destination_rooms_stream_order( + self, destination: str + ) -> Optional[int]: + """ + Returns the largest stream_ordering from the destination_rooms table + that corresponds to this destination. + """ + return await self.db_pool.runInteraction( + "get_largest_destination_rooms_stream_order", + self._get_largest_destination_rooms_stream_order_txn, + destination, + ) + + @staticmethod + def _get_largest_destination_rooms_stream_order_txn( + txn, destination: str + ) -> Optional[int]: + txn.execute( + """ + SELECT stream_ordering + FROM destination_rooms + WHERE destination = ? + ORDER BY stream_ordering DESC + LIMIT 1 + """, + (destination,), + ) + rows = [r[0] for r in txn] + if rows: + return rows[0] + return None + + async def store_destination_rooms_entries( + self, + destinations: Iterable[str], + room_id: str, + event_id: str, + stream_ordering: int, + ) -> None: + """ + Updates or creates `destination_rooms` entries in batch for a single event. + + Args: + destinations: list of destinations + room_id: the room_id of the event + event_id: the ID of the event + stream_ordering: the stream_ordering of the event + """ + + rows = [(destination, room_id) for destination in destinations] + + return await self.db_pool.runInteraction( + "store_destination_rooms_entries", + self.db_pool.simple_upsert_many_txn, + "destination_rooms", + ["destination", "room_id"], + rows, + ["stream_ordering"], + [(stream_ordering,)] * len(rows), + ) diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py new file mode 100644 index 000000000000..117a69389177 --- /dev/null +++ b/tests/federation/test_federation_catch_up.py @@ -0,0 +1,376 @@ +from typing import List, Tuple + +from mock import Mock + +from twisted.internet import defer + +from synapse.events import EventBase +from synapse.federation.sender import PerDestinationQueue, TransactionManager +from synapse.federation.units import Edu +from synapse.rest import admin +from synapse.rest.client.v1 import login, room + +from tests.test_utils import event_injection, make_awaitable +from tests.unittest import FederatingHomeserverTestCase, override_config + + +class FederationCatchUpTestCases(FederatingHomeserverTestCase): + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + + def make_homeserver(self, reactor, clock): + return self.setup_test_homeserver( + federation_transport_client=Mock(spec=["send_transaction"]), + ) + + def prepare(self, reactor, clock, hs): + # stub out get_current_hosts_in_room + state_handler = hs.get_state_handler() + + # This mock is crucial for destination_rooms to be populated. + state_handler.get_current_hosts_in_room = Mock( + return_value=make_awaitable(["test", "host2"]) + ) + + # whenever send_transaction is called, record the pdu data + self.pdus = [] + self.failed_pdus = [] + self.is_online = True + self.hs.get_federation_transport_client().send_transaction.side_effect = ( + self.record_transaction + ) + + def get_destination_room(self, room: str, destination: str = "host2") -> dict: + """ + Gets the destination_rooms entry for a (destination, room_id) pair. + + Args: + room: room ID + destination: what destination, default is "host2" + + Returns: + Dictionary of { event_id: str, stream_ordering: int } + """ + event_id, stream_ordering = self.get_success( + self.hs.get_datastore().db_pool.execute( + "test:get_destination_rooms", + None, + """ + SELECT event_id, stream_ordering + FROM destination_rooms dr + JOIN events USING (stream_ordering) + WHERE dr.destination = ? AND dr.room_id = ? + """, + destination, + room, + ) + )[0] + return {"event_id": event_id, "stream_ordering": stream_ordering} + + def make_fake_destination_queue( + self, destination: str = "host2" + ) -> Tuple[PerDestinationQueue, List[EventBase]]: + """ + Makes a fake per-destination queue. + """ + transaction_manager = TransactionManager(self.hs) + per_dest_queue = PerDestinationQueue(self.hs, transaction_manager, destination) + results_list = [] + + async def fake_send( + destination_tm: str, + pending_pdus: List[Tuple[EventBase, int]], + _pending_edus: List[Edu], + ): + assert destination == destination_tm + results_list.extend([row[0] for row in pending_pdus]) + + transaction_manager.send_new_transaction = fake_send + + return per_dest_queue, results_list + + def record_transaction(self, txn, json_cb): + if self.is_online: + data = json_cb() + self.pdus.extend(data["pdus"]) + return defer.succeed({}) + else: + data = json_cb() + self.failed_pdus.extend(data["pdus"]) + return defer.fail(IOError("Failed to connect because this is a test!")) + + @override_config({"send_federation": True}) # critical (1) to federate + def test_catch_up_from_blank_state(self): + """ + Runs an overall test of federation catch-up from scratch. + Further tests will focus on more narrow aspects and edge-cases, but I + hope to provide an overall view with this test. + """ + # bring the other server online + self.is_online = True + + # let's make some events for the other server to receive + self.register_user("u1", "you the one") + u1_token = self.login("u1", "you the one") + room_1 = self.helper.create_room_as("u1", tok=u1_token) + room_2 = self.helper.create_room_as("u1", tok=u1_token) + + # also critical (2) to federate + self.get_success( + event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join") + ) + self.get_success( + event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join") + ) + + self.helper.send_state( + room_1, event_type="m.room.topic", body={"topic": "wombat"}, tok=u1_token + ) + + # check: PDU received for topic event + self.assertEqual(len(self.pdus), 1) + self.assertEqual(self.pdus[0]["type"], "m.room.topic") + + # take the remote offline + self.is_online = False + + # send another event + self.helper.send(room_1, "hi user!", tok=u1_token) + + # check: things didn't go well since the remote is down + self.assertEqual(len(self.failed_pdus), 1) + self.assertEqual(self.failed_pdus[0]["content"]["body"], "hi user!") + + # let's delete the federation transmission queue + # (this pretends we are starting up fresh.) + self.assertFalse( + self.hs.get_federation_sender() + ._per_destination_queues["host2"] + .transmission_loop_running + ) + del self.hs.get_federation_sender()._per_destination_queues["host2"] + + # let's also clear any backoffs + self.get_success( + self.hs.get_datastore().set_destination_retry_timings("host2", None, 0, 0) + ) + + # bring the remote online and clear the received pdu list + self.is_online = True + self.pdus = [] + + # now we need to initiate a federation transaction somehow… + # to do that, let's send another event (because it's simple to do) + # (do it to another room otherwise the catch-up logic decides it doesn't + # need to catch up room_1 — something I overlooked when first writing + # this test) + self.helper.send(room_2, "wombats!", tok=u1_token) + + # we should now have received both PDUs + self.assertEqual(len(self.pdus), 2) + self.assertEqual(self.pdus[0]["content"]["body"], "hi user!") + self.assertEqual(self.pdus[1]["content"]["body"], "wombats!") + + @override_config({"send_federation": True}) + def test_catch_up_destination_rooms_tracking(self): + """ + Tests that we populate the `destination_rooms` table as needed. + """ + self.register_user("u1", "you the one") + u1_token = self.login("u1", "you the one") + room = self.helper.create_room_as("u1", tok=u1_token) + + self.get_success( + event_injection.inject_member_event(self.hs, room, "@user:host2", "join") + ) + + event_id_1 = self.helper.send(room, "wombats!", tok=u1_token)["event_id"] + + row_1 = self.get_destination_room(room) + + event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"] + + row_2 = self.get_destination_room(room) + + # check: events correctly registered in order + self.assertEqual(row_1["event_id"], event_id_1) + self.assertEqual(row_2["event_id"], event_id_2) + self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1) + + @override_config({"send_federation": True}) + def test_catch_up_last_successful_stream_ordering_tracking(self): + """ + Tests that we populate the `destination_rooms` table as needed. + """ + self.register_user("u1", "you the one") + u1_token = self.login("u1", "you the one") + room = self.helper.create_room_as("u1", tok=u1_token) + + # take the remote offline + self.is_online = False + + self.get_success( + event_injection.inject_member_event(self.hs, room, "@user:host2", "join") + ) + + self.helper.send(room, "wombats!", tok=u1_token)["event_id"] + + self.pump() + + lsso_1 = self.get_success( + self.hs.get_datastore().get_destination_last_successful_stream_ordering( + "host2" + ) + ) + + self.assertIsNone( + lsso_1, + "There should be no last successful stream ordering for an always-offline destination", + ) + + # bring the remote offline + self.is_online = True + + event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"] + + lsso_2 = self.get_success( + self.hs.get_datastore().get_destination_last_successful_stream_ordering( + "host2" + ) + ) + row_2 = self.get_destination_room(room) + + self.assertEqual( + self.pdus[0]["content"]["body"], + "rabbits!", + "Test fault: didn't receive the right PDU", + ) + self.assertEqual( + row_2["event_id"], + event_id_2, + "Test fault: destination_rooms not updated correctly", + ) + self.assertEqual( + lsso_2, + row_2["stream_ordering"], + "Send succeeded but not marked as last_successful_stream_ordering", + ) + + @override_config({"send_federation": True}) + def test_catch_up_loop_no_pdus_in_main_queue(self): + """ + Tests, somewhat more synthetically, behaviour of + _catch_up_transmission_loop when there aren't any PDUs in the main queue. + """ + + # ARRANGE + per_dest_queue, sent_pdus = self.make_fake_destination_queue() + + self.register_user("u1", "you the one") + u1_token = self.login("u1", "you the one") + room_1 = self.helper.create_room_as("u1", tok=u1_token) + room_2 = self.helper.create_room_as("u1", tok=u1_token) + room_3 = self.helper.create_room_as("u1", tok=u1_token) + self.get_success( + event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join") + ) + self.get_success( + event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join") + ) + self.get_success( + event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join") + ) + + # create some events to play with + + self.helper.send(room_1, "you hear me!!", tok=u1_token) + event_id_2 = self.helper.send(room_2, "wombats!", tok=u1_token)["event_id"] + self.helper.send(room_3, "Matrix!", tok=u1_token) + event_id_4 = self.helper.send(room_2, "rabbits!", tok=u1_token)["event_id"] + event_id_5 = self.helper.send(room_3, "Synapse!", tok=u1_token)["event_id"] + + # destination_rooms should already be populated, but let us pretend that we already + # delivered up to and including event id 2 + + event_2 = self.get_success(self.hs.get_datastore().get_event(event_id_2)) + + self.get_success( + self.hs.get_datastore().set_destination_last_successful_stream_ordering( + "host2", event_2.internal_metadata.stream_ordering + ) + ) + + # ACT + self.get_success(per_dest_queue._catch_up_transmission_loop()) + + # ASSERT, noticing in particular: + # - event 3 not sent out, because event 5 replaces it + # - order is least recent first, so event 5 comes after event 4 + self.assertEqual(len(sent_pdus), 2) + self.assertEqual(sent_pdus[0].event_id, event_id_4) + self.assertEqual(sent_pdus[1].event_id, event_id_5) + + @override_config({"send_federation": True}) + def test_catch_up_loop_with_pdus_in_main_queue(self): + """ + Tests, somewhat more synthetically, behaviour of + _catch_up_transmission_loop when there aren't any PDUs in the main queue. + """ + + # ARRANGE + per_dest_queue, sent_pdus = self.make_fake_destination_queue() + + self.register_user("u1", "you the one") + u1_token = self.login("u1", "you the one") + room_1 = self.helper.create_room_as("u1", tok=u1_token) + room_2 = self.helper.create_room_as("u1", tok=u1_token) + room_3 = self.helper.create_room_as("u1", tok=u1_token) + self.get_success( + event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join") + ) + self.get_success( + event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join") + ) + self.get_success( + event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join") + ) + + # create some events to play with + + self.helper.send(room_1, "you hear me!!", tok=u1_token) + event_id_2 = self.helper.send(room_2, "wombats!", tok=u1_token)["event_id"] + self.helper.send(room_3, "Matrix!", tok=u1_token) + event_id_4 = self.helper.send(room_2, "rabbits!", tok=u1_token)["event_id"] + event_id_5 = self.helper.send(room_3, "Synapse!", tok=u1_token)["event_id"] + + # put event 5 in the main queue — assume it's the cause of us triggering a + # catch-up (or is otherwise sent after retry backoff ends). + # (Block the transmission loop from running by marking it as already + # running, because we manually invoke the catch-up loop for testing + # purposes.) + per_dest_queue.transmission_loop_running = True + event_5 = self.get_success(self.hs.get_datastore().get_event(event_id_5)) + per_dest_queue.send_pdu(event_5, 1) + + # destination_rooms should already be populated, but let us pretend that we already + # delivered up to and including event id 2 + + event_2 = self.get_success(self.hs.get_datastore().get_event(event_id_2)) + + self.get_success( + self.hs.get_datastore().set_destination_last_successful_stream_ordering( + "host2", event_2.internal_metadata.stream_ordering + ) + ) + + # ACT + self.get_success(per_dest_queue._catch_up_transmission_loop()) + + # ASSERT, noticing in particular: + # - event 3 not sent out, because event 5 replaces it + # - event 5 is not sent out, because it's already in our main PDU queue + self.assertEqual(len(sent_pdus), 1) + self.assertEqual(sent_pdus[0].event_id, event_id_4) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 7bf15c4ba93f..5a1e0cb29932 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -76,6 +76,7 @@ def make_homeserver(self, reactor, clock): "get_destination_retry_timings", "get_devices_by_remote", "maybe_store_room_on_invite", + "get_last_successful_stream_ordering", # Bits that user_directory needs "get_user_directory_stream_pos", "get_current_state_deltas", @@ -162,6 +163,10 @@ def get_users_in_room(room_id): None ) + self.datastore.get_destination_last_successful_stream_ordering = lambda *args, **kwargs: defer.succeed( + None + ) + def test_started_typing_local(self): self.room_members = [U_APPLE, U_BANANA] diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py index e66c9a4c4c6c..3486fbb34c2b 100644 --- a/tests/rest/client/v1/utils.py +++ b/tests/rest/client/v1/utils.py @@ -40,8 +40,12 @@ class RestHelper(object): auth_user_id = attr.ib() def create_room_as( - self, room_creator=None, is_public=True, tok=None, expect_code=200, - ): + self, + room_creator: str = None, + is_public: bool = True, + tok: str = None, + expect_code: int = 200, + ) -> str: temp_id = self.auth_user_id self.auth_user_id = room_creator path = "/_matrix/client/r0/createRoom"