Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove more remaining pieces of groups code. #12966

Merged
merged 7 commits into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12966.removal
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove support for the non-standard groups/communities feature from Synapse.
1 change: 1 addition & 0 deletions synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@
"groups",
"local_group_membership",
"local_group_updates",
"remote_profile_cache",
}


Expand Down
83 changes: 1 addition & 82 deletions synapse/handlers/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,7 @@
StoreError,
SynapseError,
)
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.types import (
JsonDict,
Requester,
UserID,
create_requester,
get_domain_from_id,
)
from synapse.types import JsonDict, Requester, UserID, create_requester
from synapse.util.caches.descriptors import cached
from synapse.util.stringutils import parse_and_validate_mxc_uri

Expand All @@ -50,9 +43,6 @@ class ProfileHandler:
delegate to master when necessary.
"""

PROFILE_UPDATE_MS = 60 * 1000
PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000

def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self.clock = hs.get_clock()
Expand All @@ -73,11 +63,6 @@ def __init__(self, hs: "HomeServer"):

self._third_party_rules = hs.get_third_party_event_rules()

if hs.config.worker.run_background_tasks:
self.clock.looping_call(
self._update_remote_profile_cache, self.PROFILE_UPDATE_MS
)

async def get_profile(self, user_id: str) -> JsonDict:
target_user = UserID.from_string(user_id)

Expand Down Expand Up @@ -116,30 +101,6 @@ async def get_profile(self, user_id: str) -> JsonDict:
raise SynapseError(502, "Failed to fetch profile")
raise e.to_synapse_error()

async def get_profile_from_cache(self, user_id: str) -> JsonDict:
"""Get the profile information from our local cache. If the user is
ours then the profile information will always be correct. Otherwise,
it may be out of date/missing.
"""
target_user = UserID.from_string(user_id)
if self.hs.is_mine(target_user):
try:
displayname = await self.store.get_profile_displayname(
target_user.localpart
)
avatar_url = await self.store.get_profile_avatar_url(
target_user.localpart
)
except StoreError as e:
if e.code == 404:
raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
raise

return {"displayname": displayname, "avatar_url": avatar_url}
else:
profile = await self.store.get_from_remote_profile_cache(user_id)
return profile or {}

async def get_displayname(self, target_user: UserID) -> Optional[str]:
if self.hs.is_mine(target_user):
try:
Expand Down Expand Up @@ -509,45 +470,3 @@ async def check_profile_query_allowed(
# so we act as if we couldn't find the profile.
raise SynapseError(403, "Profile isn't available", Codes.FORBIDDEN)
raise

@wrap_as_background_process("Update remote profile")
async def _update_remote_profile_cache(self) -> None:
"""Called periodically to check profiles of remote users we haven't
checked in a while.
"""
entries = await self.store.get_remote_profile_cache_entries_that_expire(
last_checked=self.clock.time_msec() - self.PROFILE_UPDATE_EVERY_MS
)

for user_id, displayname, avatar_url in entries:
is_subscribed = await self.store.is_subscribed_remote_profile_for_user(
user_id
)
if not is_subscribed:
await self.store.maybe_delete_remote_profile_cache(user_id)
continue

try:
profile = await self.federation.make_query(
destination=get_domain_from_id(user_id),
query_type="profile",
args={"user_id": user_id},
ignore_backoff=True,
)
except Exception:
logger.exception("Failed to get avatar_url")

await self.store.update_remote_profile_cache(
user_id, displayname, avatar_url
)
continue

new_name = profile.get("displayname")
if not isinstance(new_name, str):
new_name = None
new_avatar = profile.get("avatar_url")
if not isinstance(new_avatar, str):
new_avatar = None

# We always hit update to update the last_check timestamp
await self.store.update_remote_profile_cache(user_id, new_name, new_avatar)
18 changes: 0 additions & 18 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,6 @@ def __init__(
],
)

self._group_updates_id_gen = StreamIdGenerator(
db_conn, "local_group_updates", "stream_id"
)

self._cache_id_gen: Optional[MultiWriterIdGenerator]
if isinstance(self.database_engine, PostgresEngine):
# We set the `writers` to an empty list here as we don't care about
Expand Down Expand Up @@ -197,20 +193,6 @@ def __init__(
prefilled_cache=curr_state_delta_prefill,
)

_group_updates_prefill, min_group_updates_id = self.db_pool.get_cache_dict(
db_conn,
"local_group_updates",
entity_column="user_id",
stream_column="stream_id",
max_value=self._group_updates_id_gen.get_current_token(),
limit=1000,
)
self._group_updates_stream_cache = StreamChangeCache(
"_group_updates_stream_cache",
min_group_updates_id,
prefilled_cache=_group_updates_prefill,
)

self._stream_order_on_start = self.get_room_max_stream_ordering()
self._min_stream_order_on_start = self.get_room_min_stream_ordering()

Expand Down
107 changes: 2 additions & 105 deletions synapse/storage/databases/main/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Dict, List, Optional
from typing import Optional

from synapse.api.errors import StoreError
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main.roommember import ProfileInfo


Expand Down Expand Up @@ -55,17 +54,6 @@ async def get_profile_avatar_url(self, user_localpart: str) -> Optional[str]:
desc="get_profile_avatar_url",
)

async def get_from_remote_profile_cache(
self, user_id: str
) -> Optional[Dict[str, Any]]:
return await self.db_pool.simple_select_one(
table="remote_profile_cache",
keyvalues={"user_id": user_id},
retcols=("displayname", "avatar_url"),
allow_none=True,
desc="get_from_remote_profile_cache",
)

async def create_profile(self, user_localpart: str) -> None:
await self.db_pool.simple_insert(
table="profiles", values={"user_id": user_localpart}, desc="create_profile"
Expand All @@ -91,97 +79,6 @@ async def set_profile_avatar_url(
desc="set_profile_avatar_url",
)

async def update_remote_profile_cache(
self, user_id: str, displayname: Optional[str], avatar_url: Optional[str]
) -> int:
return await self.db_pool.simple_update(
table="remote_profile_cache",
keyvalues={"user_id": user_id},
updatevalues={
"displayname": displayname,
"avatar_url": avatar_url,
"last_check": self._clock.time_msec(),
},
desc="update_remote_profile_cache",
)

async def maybe_delete_remote_profile_cache(self, user_id: str) -> None:
"""Check if we still care about the remote user's profile, and if we
don't then remove their profile from the cache
"""
subscribed = await self.is_subscribed_remote_profile_for_user(user_id)
if not subscribed:
await self.db_pool.simple_delete(
table="remote_profile_cache",
keyvalues={"user_id": user_id},
desc="delete_remote_profile_cache",
)

async def is_subscribed_remote_profile_for_user(self, user_id: str) -> bool:
"""Check whether we are interested in a remote user's profile."""
res: Optional[str] = await self.db_pool.simple_select_one_onecol(
table="group_users",
keyvalues={"user_id": user_id},
retcol="user_id",
allow_none=True,
desc="should_update_remote_profile_cache_for_user",
)

if res:
return True

res = await self.db_pool.simple_select_one_onecol(
table="group_invites",
keyvalues={"user_id": user_id},
retcol="user_id",
allow_none=True,
desc="should_update_remote_profile_cache_for_user",
)

if res:
return True
return False

async def get_remote_profile_cache_entries_that_expire(
self, last_checked: int
) -> List[Dict[str, str]]:
"""Get all users who haven't been checked since `last_checked`"""

def _get_remote_profile_cache_entries_that_expire_txn(
txn: LoggingTransaction,
) -> List[Dict[str, str]]:
sql = """
SELECT user_id, displayname, avatar_url
FROM remote_profile_cache
WHERE last_check < ?
"""

txn.execute(sql, (last_checked,))

return self.db_pool.cursor_to_dict(txn)

return await self.db_pool.runInteraction(
"get_remote_profile_cache_entries_that_expire",
_get_remote_profile_cache_entries_that_expire_txn,
)


class ProfileStore(ProfileWorkerStore):
async def add_remote_profile_cache(
self, user_id: str, displayname: str, avatar_url: str
) -> None:
"""Ensure we are caching the remote user's profiles.

This should only be called when `is_subscribed_remote_profile_for_user`
would return true for the user.
"""
await self.db_pool.simple_upsert(
table="remote_profile_cache",
keyvalues={"user_id": user_id},
values={
"displayname": displayname,
"avatar_url": avatar_url,
"last_check": self._clock.time_msec(),
},
desc="add_remote_profile_cache",
)
pass
2 changes: 0 additions & 2 deletions synapse/storage/databases/main/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,6 @@ def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
"partial_state_events",
"events",
"federation_inbound_events_staging",
"group_rooms",
"local_current_membership",
"partial_state_rooms_servers",
"partial_state_rooms",
Expand All @@ -413,7 +412,6 @@ def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
"e2e_room_keys",
"event_push_summary",
"pusher_throttle",
"group_summary_rooms",
"room_account_data",
"room_tags",
# "rooms" happens last, to keep the foreign keys in the other tables
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@

Changes in SCHEMA_VERSION = 71:
- event_edges.room_id is no longer read from.
- Tables related to groups are no longer accessed.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the correct note to make? In the future we'll set the SCHEMA_COMPAT_VERSION to 71 to enforce this?

"""


Expand Down
2 changes: 0 additions & 2 deletions tests/rest/admin/test_room.py
Original file line number Diff line number Diff line change
Expand Up @@ -2467,7 +2467,6 @@ def _block_room(self, room_id: str) -> None:
"event_push_actions",
"event_search",
"events",
"group_rooms",
"receipts_graph",
"receipts_linearized",
"room_aliases",
Expand All @@ -2484,7 +2483,6 @@ def _block_room(self, room_id: str) -> None:
"e2e_room_keys",
"event_push_summary",
"pusher_throttle",
"group_summary_rooms",
"room_account_data",
"room_tags",
# "state_groups", # Current impl leaves orphaned state groups around.
Expand Down