Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Purge chain cover tables when purging events. (#9498)
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep authored Mar 3, 2021
1 parent d790d0d commit 922788c
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 10 deletions.
1 change: 1 addition & 0 deletions changelog.d/9498.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Properly purge the event chain cover index when purging history.
42 changes: 37 additions & 5 deletions synapse/storage/databases/main/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
async def purge_history(
self, room_id: str, token: str, delete_local_events: bool
) -> Set[int]:
"""Deletes room history before a certain point
"""Deletes room history before a certain point.
Note that only a single purge can occur at once, this is guaranteed via
a higher level (in the PaginationHandler).
Args:
room_id:
Expand All @@ -52,7 +55,9 @@ async def purge_history(
delete_local_events,
)

def _purge_history_txn(self, txn, room_id, token, delete_local_events):
def _purge_history_txn(
self, txn, room_id: str, token: RoomStreamToken, delete_local_events: bool
) -> Set[int]:
# Tables that should be pruned:
# event_auth
# event_backward_extremities
Expand Down Expand Up @@ -103,7 +108,7 @@ def _purge_history_txn(self, txn, room_id, token, delete_local_events):
if max_depth < token.topological:
# We need to ensure we don't delete all the events from the database
# otherwise we wouldn't be able to send any events (due to not
# having any backwards extremeties)
# having any backwards extremities)
raise SynapseError(
400, "topological_ordering is greater than forward extremeties"
)
Expand Down Expand Up @@ -154,7 +159,7 @@ def _purge_history_txn(self, txn, room_id, token, delete_local_events):

logger.info("[purge] Finding new backward extremities")

# We calculate the new entries for the backward extremeties by finding
# We calculate the new entries for the backward extremities by finding
# events to be purged that are pointed to by events we're not going to
# purge.
txn.execute(
Expand Down Expand Up @@ -296,7 +301,7 @@ async def purge_room(self, room_id: str) -> List[int]:
"purge_room", self._purge_room_txn, room_id
)

def _purge_room_txn(self, txn, room_id):
def _purge_room_txn(self, txn, room_id: str) -> List[int]:
# First we fetch all the state groups that should be deleted, before
# we delete that information.
txn.execute(
Expand All @@ -310,6 +315,31 @@ def _purge_room_txn(self, txn, room_id):

state_groups = [row[0] for row in txn]

# Get all the auth chains that are referenced by events that are to be
# deleted.
txn.execute(
"""
SELECT chain_id, sequence_number FROM events
LEFT JOIN event_auth_chains USING (event_id)
WHERE room_id = ?
""",
(room_id,),
)
referenced_chain_id_tuples = list(txn)

logger.info("[purge] removing events from event_auth_chain_links")
txn.executemany(
"""
DELETE FROM event_auth_chain_links WHERE
(origin_chain_id = ? AND origin_sequence_number = ?) OR
(target_chain_id = ? AND target_sequence_number = ?)
""",
(
(chain_id, seq_num, chain_id, seq_num)
for (chain_id, seq_num) in referenced_chain_id_tuples
),
)

# Now we delete tables which lack an index on room_id but have one on event_id
for table in (
"event_auth",
Expand All @@ -319,6 +349,8 @@ def _purge_room_txn(self, txn, room_id):
"event_reference_hashes",
"event_relations",
"event_to_state_groups",
"event_auth_chains",
"event_auth_chain_to_calculate",
"redactions",
"rejections",
"state_events",
Expand Down
5 changes: 0 additions & 5 deletions synapse/storage/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ async def _find_unreferenced_groups(self, state_groups: Set[int]) -> Set[int]:
Returns:
The set of state groups that can be deleted.
"""
# Graph of state group -> previous group
graph = {}

# Set of events that we have found to be referenced by events
referenced_groups = set()

Expand Down Expand Up @@ -111,8 +108,6 @@ async def _find_unreferenced_groups(self, state_groups: Set[int]) -> Set[int]:
next_to_search |= prevs
state_groups_seen |= prevs

graph.update(edges)

to_delete = state_groups_seen - referenced_groups

return to_delete

0 comments on commit 922788c

Please sign in to comment.