Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Prevent account_data content from being sent over TCP replication #6333

Merged
merged 4 commits into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6333.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prevent account data syncs getting lost across TCP replication.
7 changes: 3 additions & 4 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@
"TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
)
AccountDataStreamRow = namedtuple(
"AccountDataStream",
("user_id", "room_id", "data_type", "data"), # str # str # str # dict
"AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
)
GroupsStreamRow = namedtuple(
"GroupsStreamRow",
Expand Down Expand Up @@ -421,8 +420,8 @@ def update_function(self, from_token, to_token, limit):

results = list(room_results)
results.extend(
(stream_id, user_id, None, account_data_type, content)
for stream_id, user_id, account_data_type, content in global_results
(stream_id, user_id, None, account_data_type)
for stream_id, user_id, account_data_type in global_results
)

return results
Expand Down
6 changes: 3 additions & 3 deletions synapse/storage/data_stores/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,22 +184,22 @@ def get_all_updated_account_data(
current_id(int): The position to fetch up to.
Returns:
A deferred pair of lists of tuples of stream_id int, user_id string,
room_id string, type string, and content string.
room_id string, and type string.
"""
if last_room_id == current_id and last_global_id == current_id:
return defer.succeed(([], []))

def get_updated_account_data_txn(txn):
sql = (
"SELECT stream_id, user_id, account_data_type, content"
"SELECT stream_id, user_id, account_data_type"
" FROM account_data WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC LIMIT ?"
)
txn.execute(sql, (last_global_id, current_id, limit))
global_results = txn.fetchall()

sql = (
"SELECT stream_id, user_id, room_id, account_data_type, content"
"SELECT stream_id, user_id, room_id, account_data_type"
" FROM room_account_data WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC LIMIT ?"
)
Expand Down