Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Speed up room stats background update #5294

Merged
merged 6 commits into from
May 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5294.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix performance problems with the rooms stats background update.
7 changes: 6 additions & 1 deletion synapse/storage/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,12 @@ def _get_total_state_event_counts_txn(self, txn, room_id):
"""
See get_total_state_event_counts.
"""
sql = "SELECT COUNT(*) FROM state_events WHERE room_id=?"
# We join against the events table as that has an index on room_id
sql = """
SELECT COUNT(*) FROM state_events
INNER JOIN events USING (room_id, event_id)
WHERE room_id=?
"""
txn.execute(sql, (room_id,))
row = txn.fetchone()
return row[0] if row else 0
Expand Down
33 changes: 11 additions & 22 deletions synapse/storage/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,26 +142,9 @@ def _get_room_summary_txn(txn):

return self.runInteraction("get_room_summary", _get_room_summary_txn)

def _get_user_count_in_room_txn(self, txn, room_id, membership):
def _get_user_counts_in_room_txn(self, txn, room_id):
"""
See get_user_count_in_room.
"""
sql = (
"SELECT count(*) FROM room_memberships as m"
" INNER JOIN current_state_events as c"
" ON m.event_id = c.event_id "
" AND m.room_id = c.room_id "
" AND m.user_id = c.state_key"
" WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
)

txn.execute(sql, (room_id, membership))
row = txn.fetchone()
return row[0]

def get_user_count_in_room(self, room_id, membership):
"""
Get the user count in a room with a particular membership.
Get the user count in a room by membership.

Args:
room_id (str)
Expand All @@ -170,9 +153,15 @@ def get_user_count_in_room(self, room_id, membership):
Returns:
Deferred[int]
"""
return self.runInteraction(
"get_users_in_room", self._get_user_count_in_room_txn, room_id, membership
)
sql = """
SELECT m.membership, count(*) FROM room_memberships as m
INNER JOIN current_state_events as c USING(event_id)
WHERE c.type = 'm.room.member' AND c.room_id = ?
GROUP BY m.membership
"""

txn.execute(sql, (room_id,))
return {row[0]: row[1] for row in txn}

@cached()
def get_invited_rooms_for_user(self, user_id):
Expand Down
28 changes: 28 additions & 0 deletions synapse/storage/schema/delta/54/stats2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- This delta file gets run after `54/stats.sql` delta.

-- We want to add some indices to the temporary stats table, so we re-insert
-- 'populate_stats_createtables' if we are still processing the rooms update.
INSERT INTO background_updates (update_name, progress_json)
SELECT 'populate_stats_createtables', '{}'
WHERE
'populate_stats_process_rooms' IN (
SELECT update_name FROM background_updates
)
AND 'populate_stats_createtables' NOT IN ( -- don't insert if already exists
SELECT update_name FROM background_updates
);
130 changes: 66 additions & 64 deletions synapse/storage/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from twisted.internet import defer

from synapse.api.constants import EventTypes, Membership
from synapse.storage.prepare_database import get_statements
from synapse.storage.state_deltas import StateDeltasStore
from synapse.util.caches.descriptors import cached

Expand Down Expand Up @@ -69,12 +70,25 @@ def _populate_stats_createtables(self, progress, batch_size):

# Get all the rooms that we want to process.
def _make_staging_area(txn):
sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
)
txn.execute(sql)
# Create the temporary tables
stmts = get_statements("""
-- We just recreate the table, we'll be reinserting the
-- correct entries again later anyway.
DROP TABLE IF EXISTS {temp}_rooms;

CREATE TABLE IF NOT EXISTS {temp}_rooms(
room_id TEXT NOT NULL,
events BIGINT NOT NULL
);

CREATE INDEX {temp}_rooms_events
ON {temp}_rooms(events);
CREATE INDEX {temp}_rooms_id
ON {temp}_rooms(room_id);
""".format(temp=TEMP_TABLE).splitlines())

for statement in stmts:
txn.execute(statement)

sql = (
"CREATE TABLE IF NOT EXISTS "
Expand All @@ -83,15 +97,16 @@ def _make_staging_area(txn):
)
txn.execute(sql)

# Get rooms we want to process from the database
# Get rooms we want to process from the database, only adding
# those that we haven't (i.e. those not in room_stats_earliest_token)
sql = """
SELECT room_id, count(*) FROM current_state_events
GROUP BY room_id
"""
INSERT INTO %s_rooms (room_id, events)
SELECT c.room_id, count(*) FROM current_state_events AS c
LEFT JOIN room_stats_earliest_token AS t USING (room_id)
WHERE t.room_id IS NULL
GROUP BY c.room_id
""" % (TEMP_TABLE,)
txn.execute(sql)
rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()]
self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
del rooms

new_pos = yield self.get_max_stream_id_in_current_state_deltas()
yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
Expand Down Expand Up @@ -179,46 +194,39 @@ def _get_next_batch(txn):

current_state_ids = yield self.get_current_state_ids(room_id)

join_rules = yield self.get_event(
current_state_ids.get((EventTypes.JoinRules, "")), allow_none=True
)
history_visibility = yield self.get_event(
current_state_ids.get((EventTypes.RoomHistoryVisibility, "")),
allow_none=True,
)
encryption = yield self.get_event(
current_state_ids.get((EventTypes.RoomEncryption, "")), allow_none=True
)
name = yield self.get_event(
current_state_ids.get((EventTypes.Name, "")), allow_none=True
)
topic = yield self.get_event(
current_state_ids.get((EventTypes.Topic, "")), allow_none=True
)
avatar = yield self.get_event(
current_state_ids.get((EventTypes.RoomAvatar, "")), allow_none=True
join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
history_visibility_id = current_state_ids.get(
(EventTypes.RoomHistoryVisibility, "")
)
canonical_alias = yield self.get_event(
current_state_ids.get((EventTypes.CanonicalAlias, "")), allow_none=True
)

def _or_none(x, arg):
if x:
return x.content.get(arg)
encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
name_id = current_state_ids.get((EventTypes.Name, ""))
topic_id = current_state_ids.get((EventTypes.Topic, ""))
avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))

state_events = yield self.get_events([
join_rules_id, history_visibility_id, encryption_id, name_id,
topic_id, avatar_id, canonical_alias_id,
])

def _get_or_none(event_id, arg):
event = state_events.get(event_id)
if event:
return event.content.get(arg)
return None

yield self.update_room_state(
room_id,
{
"join_rules": _or_none(join_rules, "join_rule"),
"history_visibility": _or_none(
history_visibility, "history_visibility"
"join_rules": _get_or_none(join_rules_id, "join_rule"),
"history_visibility": _get_or_none(
history_visibility_id, "history_visibility"
),
"encryption": _or_none(encryption, "algorithm"),
"name": _or_none(name, "name"),
"topic": _or_none(topic, "topic"),
"avatar": _or_none(avatar, "url"),
"canonical_alias": _or_none(canonical_alias, "alias"),
"encryption": _get_or_none(encryption_id, "algorithm"),
"name": _get_or_none(name_id, "name"),
"topic": _get_or_none(topic_id, "topic"),
"avatar": _get_or_none(avatar_id, "url"),
"canonical_alias": _get_or_none(canonical_alias_id, "alias"),
},
)

Expand All @@ -233,18 +241,9 @@ def _fetch_data(txn):
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)

current_state_events = len(current_state_ids)
joined_members = self._get_user_count_in_room_txn(
txn, room_id, Membership.JOIN
)
invited_members = self._get_user_count_in_room_txn(
txn, room_id, Membership.INVITE
)
left_members = self._get_user_count_in_room_txn(
txn, room_id, Membership.LEAVE
)
banned_members = self._get_user_count_in_room_txn(
txn, room_id, Membership.BAN
)

membership_counts = self._get_user_counts_in_room_txn(txn, room_id)

total_state_events = self._get_total_state_event_counts_txn(
txn, room_id
)
Expand All @@ -257,10 +256,10 @@ def _fetch_data(txn):
{
"bucket_size": self.stats_bucket_size,
"current_state_events": current_state_events,
"joined_members": joined_members,
"invited_members": invited_members,
"left_members": left_members,
"banned_members": banned_members,
"joined_members": membership_counts.get(Membership.JOIN, 0),
"invited_members": membership_counts.get(Membership.INVITE, 0),
"left_members": membership_counts.get(Membership.LEAVE, 0),
"banned_members": membership_counts.get(Membership.BAN, 0),
"state_events": total_state_events,
},
)
Expand All @@ -270,10 +269,13 @@ def _fetch_data(txn):
{"room_id": room_id, "token": current_token},
)

# We've finished a room. Delete it from the table.
self._simple_delete_one_txn(
txn, TEMP_TABLE + "_rooms", {"room_id": room_id},
)

yield self.runInteraction("update_room_stats", _fetch_data)

# We've finished a room. Delete it from the table.
yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id})
# Update the remaining counter.
progress["remaining"] -= 1
yield self.runInteraction(
Expand Down