Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #5294 from matrix-org/erikj/speed_up_room_stats
Browse files Browse the repository at this point in the history
Speed up room stats background update
  • Loading branch information
erikjohnston authored May 31, 2019
2 parents 3e1af51 + 39bbf6a commit 31d44ec
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 87 deletions.
1 change: 1 addition & 0 deletions changelog.d/5294.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix performance problems with the rooms stats background update.
7 changes: 6 additions & 1 deletion synapse/storage/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,12 @@ def _get_total_state_event_counts_txn(self, txn, room_id):
"""
See get_total_state_event_counts.
"""
sql = "SELECT COUNT(*) FROM state_events WHERE room_id=?"
# We join against the events table as that has an index on room_id
sql = """
SELECT COUNT(*) FROM state_events
INNER JOIN events USING (room_id, event_id)
WHERE room_id=?
"""
txn.execute(sql, (room_id,))
row = txn.fetchone()
return row[0] if row else 0
Expand Down
33 changes: 11 additions & 22 deletions synapse/storage/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,26 +142,9 @@ def _get_room_summary_txn(txn):

return self.runInteraction("get_room_summary", _get_room_summary_txn)

def _get_user_count_in_room_txn(self, txn, room_id, membership):
def _get_user_counts_in_room_txn(self, txn, room_id):
"""
See get_user_count_in_room.
"""
sql = (
"SELECT count(*) FROM room_memberships as m"
" INNER JOIN current_state_events as c"
" ON m.event_id = c.event_id "
" AND m.room_id = c.room_id "
" AND m.user_id = c.state_key"
" WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
)

txn.execute(sql, (room_id, membership))
row = txn.fetchone()
return row[0]

def get_user_count_in_room(self, room_id, membership):
"""
Get the user count in a room with a particular membership.
Get the user count in a room by membership.
Args:
room_id (str)
Expand All @@ -170,9 +153,15 @@ def get_user_count_in_room(self, room_id, membership):
Returns:
Deferred[int]
"""
return self.runInteraction(
"get_users_in_room", self._get_user_count_in_room_txn, room_id, membership
)
sql = """
SELECT m.membership, count(*) FROM room_memberships as m
INNER JOIN current_state_events as c USING(event_id)
WHERE c.type = 'm.room.member' AND c.room_id = ?
GROUP BY m.membership
"""

txn.execute(sql, (room_id,))
return {row[0]: row[1] for row in txn}

@cached()
def get_invited_rooms_for_user(self, user_id):
Expand Down
28 changes: 28 additions & 0 deletions synapse/storage/schema/delta/54/stats2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- This delta file gets run after `54/stats.sql` delta.

-- We want to add some indices to the temporary stats table, so we re-insert
-- 'populate_stats_createtables' if we are still processing the rooms update.
INSERT INTO background_updates (update_name, progress_json)
SELECT 'populate_stats_createtables', '{}'
WHERE
'populate_stats_process_rooms' IN (
SELECT update_name FROM background_updates
)
AND 'populate_stats_createtables' NOT IN ( -- don't insert if already exists
SELECT update_name FROM background_updates
);
130 changes: 66 additions & 64 deletions synapse/storage/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from twisted.internet import defer

from synapse.api.constants import EventTypes, Membership
from synapse.storage.prepare_database import get_statements
from synapse.storage.state_deltas import StateDeltasStore
from synapse.util.caches.descriptors import cached

Expand Down Expand Up @@ -69,12 +70,25 @@ def _populate_stats_createtables(self, progress, batch_size):

# Get all the rooms that we want to process.
def _make_staging_area(txn):
sql = (
"CREATE TABLE IF NOT EXISTS "
+ TEMP_TABLE
+ "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
)
txn.execute(sql)
# Create the temporary tables
stmts = get_statements("""
-- We just recreate the table, we'll be reinserting the
-- correct entries again later anyway.
DROP TABLE IF EXISTS {temp}_rooms;
CREATE TABLE IF NOT EXISTS {temp}_rooms(
room_id TEXT NOT NULL,
events BIGINT NOT NULL
);
CREATE INDEX {temp}_rooms_events
ON {temp}_rooms(events);
CREATE INDEX {temp}_rooms_id
ON {temp}_rooms(room_id);
""".format(temp=TEMP_TABLE).splitlines())

for statement in stmts:
txn.execute(statement)

sql = (
"CREATE TABLE IF NOT EXISTS "
Expand All @@ -83,15 +97,16 @@ def _make_staging_area(txn):
)
txn.execute(sql)

# Get rooms we want to process from the database
# Get rooms we want to process from the database, only adding
# those that we haven't (i.e. those not in room_stats_earliest_token)
sql = """
SELECT room_id, count(*) FROM current_state_events
GROUP BY room_id
"""
INSERT INTO %s_rooms (room_id, events)
SELECT c.room_id, count(*) FROM current_state_events AS c
LEFT JOIN room_stats_earliest_token AS t USING (room_id)
WHERE t.room_id IS NULL
GROUP BY c.room_id
""" % (TEMP_TABLE,)
txn.execute(sql)
rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()]
self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
del rooms

new_pos = yield self.get_max_stream_id_in_current_state_deltas()
yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
Expand Down Expand Up @@ -179,46 +194,39 @@ def _get_next_batch(txn):

current_state_ids = yield self.get_current_state_ids(room_id)

join_rules = yield self.get_event(
current_state_ids.get((EventTypes.JoinRules, "")), allow_none=True
)
history_visibility = yield self.get_event(
current_state_ids.get((EventTypes.RoomHistoryVisibility, "")),
allow_none=True,
)
encryption = yield self.get_event(
current_state_ids.get((EventTypes.RoomEncryption, "")), allow_none=True
)
name = yield self.get_event(
current_state_ids.get((EventTypes.Name, "")), allow_none=True
)
topic = yield self.get_event(
current_state_ids.get((EventTypes.Topic, "")), allow_none=True
)
avatar = yield self.get_event(
current_state_ids.get((EventTypes.RoomAvatar, "")), allow_none=True
join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
history_visibility_id = current_state_ids.get(
(EventTypes.RoomHistoryVisibility, "")
)
canonical_alias = yield self.get_event(
current_state_ids.get((EventTypes.CanonicalAlias, "")), allow_none=True
)

def _or_none(x, arg):
if x:
return x.content.get(arg)
encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
name_id = current_state_ids.get((EventTypes.Name, ""))
topic_id = current_state_ids.get((EventTypes.Topic, ""))
avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))

state_events = yield self.get_events([
join_rules_id, history_visibility_id, encryption_id, name_id,
topic_id, avatar_id, canonical_alias_id,
])

def _get_or_none(event_id, arg):
event = state_events.get(event_id)
if event:
return event.content.get(arg)
return None

yield self.update_room_state(
room_id,
{
"join_rules": _or_none(join_rules, "join_rule"),
"history_visibility": _or_none(
history_visibility, "history_visibility"
"join_rules": _get_or_none(join_rules_id, "join_rule"),
"history_visibility": _get_or_none(
history_visibility_id, "history_visibility"
),
"encryption": _or_none(encryption, "algorithm"),
"name": _or_none(name, "name"),
"topic": _or_none(topic, "topic"),
"avatar": _or_none(avatar, "url"),
"canonical_alias": _or_none(canonical_alias, "alias"),
"encryption": _get_or_none(encryption_id, "algorithm"),
"name": _get_or_none(name_id, "name"),
"topic": _get_or_none(topic_id, "topic"),
"avatar": _get_or_none(avatar_id, "url"),
"canonical_alias": _get_or_none(canonical_alias_id, "alias"),
},
)

Expand All @@ -233,18 +241,9 @@ def _fetch_data(txn):
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)

current_state_events = len(current_state_ids)
joined_members = self._get_user_count_in_room_txn(
txn, room_id, Membership.JOIN
)
invited_members = self._get_user_count_in_room_txn(
txn, room_id, Membership.INVITE
)
left_members = self._get_user_count_in_room_txn(
txn, room_id, Membership.LEAVE
)
banned_members = self._get_user_count_in_room_txn(
txn, room_id, Membership.BAN
)

membership_counts = self._get_user_counts_in_room_txn(txn, room_id)

total_state_events = self._get_total_state_event_counts_txn(
txn, room_id
)
Expand All @@ -257,10 +256,10 @@ def _fetch_data(txn):
{
"bucket_size": self.stats_bucket_size,
"current_state_events": current_state_events,
"joined_members": joined_members,
"invited_members": invited_members,
"left_members": left_members,
"banned_members": banned_members,
"joined_members": membership_counts.get(Membership.JOIN, 0),
"invited_members": membership_counts.get(Membership.INVITE, 0),
"left_members": membership_counts.get(Membership.LEAVE, 0),
"banned_members": membership_counts.get(Membership.BAN, 0),
"state_events": total_state_events,
},
)
Expand All @@ -270,10 +269,13 @@ def _fetch_data(txn):
{"room_id": room_id, "token": current_token},
)

# We've finished a room. Delete it from the table.
self._simple_delete_one_txn(
txn, TEMP_TABLE + "_rooms", {"room_id": room_id},
)

yield self.runInteraction("update_room_stats", _fetch_data)

# We've finished a room. Delete it from the table.
yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id})
# Update the remaining counter.
progress["remaining"] -= 1
yield self.runInteraction(
Expand Down

0 comments on commit 31d44ec

Please sign in to comment.