Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Catch-up after Federation Outage (split, 1) (#8230)
Browse files Browse the repository at this point in the history
Signed-off-by: Olivier Wilkinson (reivilibre) <[email protected]>
  • Loading branch information
reivilibre authored Sep 4, 2020
1 parent e351298 commit 58f61f1
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 7 deletions.
1 change: 1 addition & 0 deletions changelog.d/8230.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Track the latest event for every destination and room for catch-up after federation outage.
11 changes: 9 additions & 2 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ async def handle_event(event: EventBase) -> None:
logger.debug("Sending %s to %r", event, destinations)

if destinations:
self._send_pdu(event, destinations)
await self._send_pdu(event, destinations)

now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)
Expand Down Expand Up @@ -265,7 +265,7 @@ async def handle_room_events(events: Iterable[EventBase]) -> None:
finally:
self._is_processing = False

def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.
Expand All @@ -280,6 +280,13 @@ def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()

# track the fact that we have a PDU for these destinations,
# to allow us to perform catch-up later on if the remote is unreachable
# for a while.
await self.store.store_destination_rooms_entries(
destinations, pdu.room_id, pdu.internal_metadata.stream_ordering,
)

for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu)

Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ def simple_upsert_many_txn(
key_names: Collection[str],
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[str]],
value_values: Iterable[Iterable[Any]],
) -> None:
"""
Upsert, many times.
Expand Down Expand Up @@ -981,7 +981,7 @@ def simple_upsert_many_txn_emulated(
key_names: Iterable[str],
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[str]],
value_values: Iterable[Iterable[Any]],
) -> None:
"""
Upsert, many times, but without native UPSERT support or batching.
Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/databases/main/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def _purge_history_txn(self, txn, room_id, token_str, delete_local_events):
# room_depth
# state_groups
# state_groups_state
# destination_rooms

# we will build a temporary table listing the events so that we don't
# have to keep shovelling the list back and forth across the
Expand Down Expand Up @@ -336,6 +337,7 @@ def _purge_room_txn(self, txn, room_id):
# and finally, the tables with an index on room_id (or no useful index)
for table in (
"current_state_events",
"destination_rooms",
"event_backward_extremities",
"event_forward_extremities",
"event_json",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- This schema delta alters the schema to enable 'catching up' remote homeservers
-- after there has been a connectivity problem for any reason.

-- This stores, for each (destination, room) pair, the stream_ordering of the
-- latest event for that destination.
CREATE TABLE IF NOT EXISTS destination_rooms (
-- the destination in question.
destination TEXT NOT NULL REFERENCES destinations (destination),
-- the ID of the room in question
room_id TEXT NOT NULL REFERENCES rooms (room_id),
-- the stream_ordering of the event
stream_ordering INTEGER NOT NULL,
PRIMARY KEY (destination, room_id)
-- We don't declare a foreign key on stream_ordering here because that'd mean
-- we'd need to either maintain an index (expensive) or do a table scan of
-- destination_rooms whenever we delete an event (also potentially expensive).
-- In addition to that, a foreign key on stream_ordering would be redundant
-- as this row doesn't need to refer to a specific event; if the event gets
-- deleted then it doesn't affect the validity of the stream_ordering here.
);

-- This index is needed to make it so that a deletion of a room (in the rooms
-- table) can be efficient, as otherwise a table scan would need to be performed
-- to check that no destination_rooms rows point to the room to be deleted.
-- Also: it makes it efficient to delete all the entries for a given room ID,
-- such as when purging a room.
CREATE INDEX IF NOT EXISTS destination_rooms_room_id
ON destination_rooms (room_id);
66 changes: 63 additions & 3 deletions synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@

import logging
from collections import namedtuple
from typing import Optional, Tuple
from typing import Iterable, Optional, Tuple

from canonicaljson import encode_canonical_json

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import JsonDict
from synapse.util.caches.expiringcache import ExpiringCache

Expand Down Expand Up @@ -164,7 +165,9 @@ def _get_destination_retry_timings(self, txn, destination):
allow_none=True,
)

if result and result["retry_last_ts"] > 0:
# check we have a row and retry_last_ts is not null or zero
# (retry_last_ts can't be negative)
if result and result["retry_last_ts"]:
return result
else:
return None
Expand Down Expand Up @@ -273,3 +276,60 @@ def _cleanup_transactions_txn(txn):
await self.db_pool.runInteraction(
"_cleanup_transactions", _cleanup_transactions_txn
)

async def store_destination_rooms_entries(
self, destinations: Iterable[str], room_id: str, stream_ordering: int,
) -> None:
"""
Updates or creates `destination_rooms` entries in batch for a single event.
Args:
destinations: list of destinations
room_id: the room_id of the event
stream_ordering: the stream_ordering of the event
"""

return await self.db_pool.runInteraction(
"store_destination_rooms_entries",
self._store_destination_rooms_entries_txn,
destinations,
room_id,
stream_ordering,
)

def _store_destination_rooms_entries_txn(
self,
txn: LoggingTransaction,
destinations: Iterable[str],
room_id: str,
stream_ordering: int,
) -> None:

# ensure we have a `destinations` row for this destination, as there is
# a foreign key constraint.
if isinstance(self.database_engine, PostgresEngine):
q = """
INSERT INTO destinations (destination)
VALUES (?)
ON CONFLICT DO NOTHING;
"""
elif isinstance(self.database_engine, Sqlite3Engine):
q = """
INSERT OR IGNORE INTO destinations (destination)
VALUES (?);
"""
else:
raise RuntimeError("Unknown database engine")

txn.execute_batch(q, ((destination,) for destination in destinations))

rows = [(destination, room_id) for destination in destinations]

self.db_pool.simple_upsert_many_txn(
txn,
"destination_rooms",
["destination", "room_id"],
rows,
["stream_ordering"],
[(stream_ordering,)] * len(rows),
)
82 changes: 82 additions & 0 deletions tests/federation/test_federation_catch_up.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from mock import Mock

from synapse.rest import admin
from synapse.rest.client.v1 import login, room

from tests.test_utils import event_injection, make_awaitable
from tests.unittest import FederatingHomeserverTestCase, override_config


class FederationCatchUpTestCases(FederatingHomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]

def make_homeserver(self, reactor, clock):
return self.setup_test_homeserver(
federation_transport_client=Mock(spec=["send_transaction"]),
)

def prepare(self, reactor, clock, hs):
# stub out get_current_hosts_in_room
state_handler = hs.get_state_handler()

# This mock is crucial for destination_rooms to be populated.
state_handler.get_current_hosts_in_room = Mock(
return_value=make_awaitable(["test", "host2"])
)

def get_destination_room(self, room: str, destination: str = "host2") -> dict:
"""
Gets the destination_rooms entry for a (destination, room_id) pair.
Args:
room: room ID
destination: what destination, default is "host2"
Returns:
Dictionary of { event_id: str, stream_ordering: int }
"""
event_id, stream_ordering = self.get_success(
self.hs.get_datastore().db_pool.execute(
"test:get_destination_rooms",
None,
"""
SELECT event_id, stream_ordering
FROM destination_rooms dr
JOIN events USING (stream_ordering)
WHERE dr.destination = ? AND dr.room_id = ?
""",
destination,
room,
)
)[0]
return {"event_id": event_id, "stream_ordering": stream_ordering}

@override_config({"send_federation": True})
def test_catch_up_destination_rooms_tracking(self):
"""
Tests that we populate the `destination_rooms` table as needed.
"""
self.register_user("u1", "you the one")
u1_token = self.login("u1", "you the one")
room = self.helper.create_room_as("u1", tok=u1_token)

self.get_success(
event_injection.inject_member_event(self.hs, room, "@user:host2", "join")
)

event_id_1 = self.helper.send(room, "wombats!", tok=u1_token)["event_id"]

row_1 = self.get_destination_room(room)

event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"]

row_2 = self.get_destination_room(room)

# check: events correctly registered in order
self.assertEqual(row_1["event_id"], event_id_1)
self.assertEqual(row_2["event_id"], event_id_2)
self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1)

0 comments on commit 58f61f1

Please sign in to comment.