Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Track unconverted device list outbound pokes using a position instead #14516

Merged
merged 7 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14516.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor conversion of device list changes in room to outbound pokes to track unconverted rows using a `(stream ID, room ID)` position instead of updating the `converted_to_destinations` flag on every row.
30 changes: 27 additions & 3 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,13 +682,33 @@ async def _handle_new_device_update_async(self) -> None:
hosts_already_sent_to: Set[str] = set()

try:
stream_id, room_id = await self.store.get_device_change_last_converted_pos()

while True:
self._handle_new_device_update_new_data = False
rows = await self.store.get_uncoverted_outbound_room_pokes()
max_stream_id = self.store.get_device_stream_token()
rows = await self.store.get_uncoverted_outbound_room_pokes(
stream_id, room_id
)
if not rows:
# If the DB returned nothing then there is nothing left to
# do, *unless* a new device list update happened during the
# DB query.

# Advance `(stream_id, room_id)`.
# `max_stream_id` comes from *before* the query for unconverted
# rows, which means that any unconverted rows must have a larger
# stream ID.
if max_stream_id > stream_id:
stream_id, room_id = max_stream_id, ""
await self.store.set_device_change_last_converted_pos(
stream_id, room_id
)
else:
assert max_stream_id == stream_id
# Avoid moving `room_id` backwards.
pass

if self._handle_new_device_update_new_data:
continue
else:
Expand Down Expand Up @@ -718,7 +738,6 @@ async def _handle_new_device_update_async(self) -> None:
user_id=user_id,
device_id=device_id,
room_id=room_id,
stream_id=stream_id,
hosts=hosts,
context=opentracing_context,
)
Expand Down Expand Up @@ -752,6 +771,12 @@ async def _handle_new_device_update_async(self) -> None:
hosts_already_sent_to.update(hosts)
current_stream_id = stream_id

# Advance `(stream_id, room_id)`.
_, _, room_id, stream_id, _ = rows[-1]
await self.store.set_device_change_last_converted_pos(
stream_id, room_id
)

finally:
self._handle_new_device_update_is_processing = False

Expand Down Expand Up @@ -834,7 +859,6 @@ async def handle_room_un_partial_stated(self, room_id: str) -> None:
user_id=user_id,
device_id=device_id,
room_id=room_id,
stream_id=None,
hosts=potentially_changed_hosts,
context=None,
)
Expand Down
13 changes: 7 additions & 6 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2075,13 +2075,14 @@ def simple_select_one_txn(
retcols: Collection[str],
allow_none: bool = False,
) -> Optional[Dict[str, Any]]:
select_sql = "SELECT %s FROM %s WHERE %s" % (
", ".join(retcols),
table,
" AND ".join("%s = ?" % (k,) for k in keyvalues),
)
select_sql = "SELECT %s FROM %s" % (", ".join(retcols), table)

if keyvalues:
select_sql += " WHERE %s" % (" AND ".join("%s = ?" % k for k in keyvalues),)
txn.execute(select_sql, list(keyvalues.values()))
else:
txn.execute(select_sql)

txn.execute(select_sql, list(keyvalues.values()))
row = txn.fetchone()

if not row:
Expand Down
107 changes: 69 additions & 38 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -2008,27 +2008,48 @@ def _add_device_outbound_room_poke_txn(
)

async def get_uncoverted_outbound_room_pokes(
self, limit: int = 10
self, start_stream_id: int, start_room_id: str, limit: int = 10
) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]:
"""Get device list changes by room that have not yet been handled and
written to `device_lists_outbound_pokes`.

Args:
start_stream_id: Together with `start_room_id`, indicates the position after
which to return device list changes.
start_room_id: Together with `start_stream_id`, indicates the position after
which to return device list changes.
limit: The maximum number of device list changes to return.

Returns:
A list of user ID, device ID, room ID, stream ID and optional opentracing context.
A list of user ID, device ID, room ID, stream ID and optional opentracing
context, in order of ascending (stream ID, room ID).
"""

sql = """
SELECT user_id, device_id, room_id, stream_id, opentracing_context
FROM device_lists_changes_in_room
WHERE NOT converted_to_destinations
ORDER BY stream_id
WHERE
(stream_id, room_id) > (?, ?) AND
stream_id <= ? AND
NOT converted_to_destinations
ORDER BY stream_id ASC, room_id ASC
LIMIT ?
"""

def get_uncoverted_outbound_room_pokes_txn(
txn: LoggingTransaction,
) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]:
txn.execute(sql, (limit,))
txn.execute(
sql,
(
start_stream_id,
start_room_id,
# Avoid returning rows if there may be uncommitted device list
# changes with smaller stream IDs.
self._device_list_id_gen.get_current_token(),
limit,
),
)

return [
(
Expand All @@ -2050,49 +2071,25 @@ async def add_device_list_outbound_pokes(
user_id: str,
device_id: str,
room_id: str,
stream_id: Optional[int],
hosts: Collection[str],
context: Optional[Dict[str, str]],
) -> None:
"""Queue the device update to be sent to the given set of hosts,
calculated from the room ID.

Marks the associated row in `device_lists_changes_in_room` as handled,
if `stream_id` is provided.
"""
if not hosts:
return

def add_device_list_outbound_pokes_txn(
txn: LoggingTransaction, stream_ids: List[int]
) -> None:
if hosts:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we removing these? Is these baths no longer used?

Copy link
Contributor Author

@squahtx squahtx Nov 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, pretty much. After removing the update to converted_to_destinations, we have

    async def add_device_list_outbound_pokes(
        self,
        user_id: str,
        device_id: str,
        room_id: str,
        hosts: Collection[str],
        context: Optional[Dict[str, str]],
    ) -> None:
        """Queue the device update to be sent to the given set of hosts,
        calculated from the room ID.
        """

        def add_device_list_outbound_pokes_txn(
            txn: LoggingTransaction, stream_ids: List[int]
        ) -> None:
            if hosts:
                self._add_device_outbound_poke_to_stream_txn(
                    txn,
                    user_id=user_id,
                    device_id=device_id,
                    hosts=hosts,
                    stream_ids=stream_ids,
                    context=context,
                )

        if not hosts:
            # If there are no hosts then we don't try and generate stream IDs.
            return await self.db_pool.runInteraction(
                "add_device_list_outbound_pokes",
                add_device_list_outbound_pokes_txn,
                [],
            )

        async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids:
            return await self.db_pool.runInteraction(
                "add_device_list_outbound_pokes",
                add_device_list_outbound_pokes_txn,
                stream_ids,
            )

The if not hosts branch does nothing inside the transaction, so I removed it entirely.

MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
self._add_device_outbound_poke_to_stream_txn(
txn,
user_id=user_id,
device_id=device_id,
hosts=hosts,
stream_ids=stream_ids,
context=context,
)

if stream_id:
self.db_pool.simple_update_txn(
txn,
table="device_lists_changes_in_room",
keyvalues={
"user_id": user_id,
"device_id": device_id,
"stream_id": stream_id,
"room_id": room_id,
},
updatevalues={"converted_to_destinations": True},
)

if not hosts:
# If there are no hosts then we don't try and generate stream IDs.
return await self.db_pool.runInteraction(
"add_device_list_outbound_pokes",
add_device_list_outbound_pokes_txn,
[],
self._add_device_outbound_poke_to_stream_txn(
txn,
user_id=user_id,
device_id=device_id,
hosts=hosts,
stream_ids=stream_ids,
context=context,
)

async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids:
Expand Down Expand Up @@ -2156,3 +2153,37 @@ def get_pending_remote_device_list_updates_for_room_txn(
"get_pending_remote_device_list_updates_for_room",
get_pending_remote_device_list_updates_for_room_txn,
)

async def get_device_change_last_converted_pos(self) -> Tuple[int, str]:
"""
Get the position of the last row in `device_list_changes_in_room` that has been
converted to `device_lists_outbound_pokes`.

Rows with a strictly greater position where `converted_to_destinations` is
`FALSE` have not been converted.
"""

row = await self.db_pool.simple_select_one(
table="device_lists_changes_converted_stream_position",
keyvalues={},
retcols=["stream_id", "room_id"],
desc="get_device_change_last_converted_pos",
)
return row["stream_id"], row["room_id"]

async def set_device_change_last_converted_pos(
self,
stream_id: int,
room_id: str,
) -> None:
"""
Set the position of the last row in `device_list_changes_in_room` that has been
converted to `device_lists_outbound_pokes`.
"""

await self.db_pool.simple_update_one(
table="device_lists_changes_converted_stream_position",
keyvalues={},
updatevalues={"stream_id": stream_id, "room_id": room_id},
desc="set_device_change_last_converted_pos",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Prior to this schema delta, we tracked the set of unconverted rows in
-- `device_lists_changes_in_room` using the `converted_to_destinations` flag. When rows
-- were converted to `device_lists_outbound_pokes`, the `converted_to_destinations` flag
-- would be set.
--
-- After this schema delta, the `converted_to_destinations` is still populated like
-- before, but the set of unconverted rows is determined by the `stream_id` in the new
-- `device_lists_changes_converted_stream_position` table.
--
-- If rolled back, Synapse will re-send all device list changes that happened since the
-- schema delta.

CREATE TABLE IF NOT EXISTS device_lists_changes_converted_stream_position(
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
-- The (stream id, room id) of the last row in `device_lists_changes_in_room` that
-- has been converted to `device_lists_outbound_pokes`. Rows with a strictly larger
-- (stream id, room id) where `converted_to_destinations` is `FALSE` have not been
-- converted.
stream_id BIGINT NOT NULL,
-- `room_id` may be an empty string, which compares less than all valid room IDs.
room_id TEXT NOT NULL,
CHECK (Lock='X')
);

INSERT INTO device_lists_changes_converted_stream_position (stream_id, room_id) VALUES (
(
SELECT COALESCE(
-- The last converted stream id is the smallest unconverted stream id minus
-- one.
MIN(stream_id) - 1,
-- If there is no unconverted stream id, the last converted stream id is the
-- largest stream id.
-- Otherwise, pick 1, since stream ids start at 2.
(SELECT COALESCE(MAX(stream_id), 1) FROM device_lists_changes_in_room)
) FROM device_lists_changes_in_room WHERE NOT converted_to_destinations
),
''
);
3 changes: 1 addition & 2 deletions tests/storage/test_devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def add_device_change(self, user_id, device_ids, host):
"""

for device_id in device_ids:
stream_id = self.get_success(
self.get_success(
self.store.add_device_change_to_streams(
user_id, [device_id], ["!some:room"]
)
Expand All @@ -39,7 +39,6 @@ def add_device_change(self, user_id, device_ids, host):
user_id=user_id,
device_id=device_id,
room_id="!some:room",
stream_id=stream_id,
hosts=[host],
context={},
)
Expand Down