Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove code which updates application_services_state.last_txn #12680

Merged
merged 3 commits into from
May 17, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12680.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove code which updates unused database column `application_services_state.last_txn`.
47 changes: 24 additions & 23 deletions synapse/storage/databases/main/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,19 +203,29 @@ async def get_appservice_state(
"""Get the application service state.

Args:
service: The service whose state to set.
service: The service whose state to get.
Returns:
An ApplicationServiceState or none.
An ApplicationServiceState, or None if we have yet to attempt any
transactions to the AS.
"""
result = await self.db_pool.simple_select_one(
# if we have created transactions for this AS but not yet attempted to send
# them, we will have a row in the table with state=NULL (recording the stream
# positions we have processed up to).
#
# On the other hand, if we have yet to create any transactions for this AS at
# all, then there will be no row for the AS.
#
# In either case, we return None to indicate "we don't yet know the state of
# this AS".
result = await self.db_pool.simple_select_one_onecol(
"application_services_state",
{"as_id": service.id},
["state"],
retcol="state",
allow_none=True,
desc="get_appservice_state",
)
if result:
return ApplicationServiceState(result.get("state"))
return ApplicationServiceState(result)
Comment on lines +220 to +228
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this changes things so that if there is a row for the AS but it has state IS NULL, we return None rather than raising an exception.

And I think this is necessary because fixing the race in set_appservice_stream_type_pos now means that we hit this code at a point where we have a row with {stream_type}_stream_id but not yet a state. Previously, there would have (incorrectly) been no row at all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this changes things so that if there is a row for the AS but it has state IS NULL, we return None rather than raising an exception.

It took me a second to understand that ApplicationServiceState is a enum, and so raises if given a None.

We appear to have changed this behaviour #11488, and previously we returned None if state was None.

return None

async def set_appservice_state(
Expand Down Expand Up @@ -296,14 +306,6 @@ async def complete_appservice_txn(
"""

def _complete_appservice_txn(txn: LoggingTransaction) -> None:
# Set current txn_id for AS to 'txn_id'
self.db_pool.simple_upsert_txn(
txn,
"application_services_state",
{"as_id": service.id},
{"last_txn": txn_id},
)

# Delete txn
self.db_pool.simple_delete_txn(
txn,
Expand Down Expand Up @@ -452,16 +454,15 @@ async def set_appservice_stream_type_pos(
% (stream_type,)
)

def set_appservice_stream_type_pos_txn(txn: LoggingTransaction) -> None:
stream_id_type = "%s_stream_id" % stream_type
txn.execute(
"UPDATE application_services_state SET %s = ? WHERE as_id=?"
% stream_id_type,
(pos, service.id),
)

await self.db_pool.runInteraction(
"set_appservice_stream_type_pos", set_appservice_stream_type_pos_txn
# this may be the first time that we're recording any state for this AS, so
# we don't yet know if a row for it exists; hence we have to upsert here.
await self.db_pool.simple_upsert(
table="application_services_state",
keyvalues={"as_id": service.id},
values={f"{stream_type}_stream_id": pos},
# no need to lock when emulating upsert: as_id is a unique key
lock=False,
desc="set_appservice_stream_type_pos",
)
Comment on lines -455 to 466
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is actually fixing a previously-existing race. Since the row for service.id wasn't added until the transaction was successfully sent - which happened asynchronously, if at all - there was no guarantee that such a row actually existed at this point.

Of course, removing the code to add the row when the txn was sent meant that the row never got added at all, so the race became a consistent failure.

TL;DR: I assert this should have been an upsert all along.



Expand Down
5 changes: 4 additions & 1 deletion synapse/storage/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,16 @@

Changes in SCHEMA_VERSION = 69:
- We now write to `device_lists_changes_in_room` table.
- Use sequence to generate future `application_services_txns.txn_id`s
- We now use a PostgreSQL sequence to generate future txn_ids for
`application_services_txns`. `application_services_state.last_txn` is no longer
updated.
"""


SCHEMA_COMPAT_VERSION = (
# We now assume that `device_lists_changes_in_room` has been filled out for
# recent device_list_updates.
# ... and that `application_services_state.last_txn` is not used.
69
)
"""Limit on how far the synapse codebase can be rolled back without breaking db compat
Expand Down
10 changes: 0 additions & 10 deletions tests/handlers/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,16 +434,6 @@ def test_sending_read_receipt_batches_to_application_services(self):
},
)

# "Complete" a transaction.
# All this really does for us is make an entry in the application_services_state
# database table, which tracks the current stream_token per stream ID per AS.
self.get_success(
self.hs.get_datastores().main.complete_appservice_txn(
0,
interested_appservice,
)
)

# Now, pretend that we receive a large burst of read receipts (300 total) that
# all come in at once.
for i in range(300):
Expand Down
27 changes: 6 additions & 21 deletions tests/storage/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import json
import os
import tempfile
from typing import List, Optional, cast
from typing import List, cast
from unittest.mock import Mock

import yaml
Expand Down Expand Up @@ -149,15 +149,12 @@ def _add_service(self, url, as_token, id) -> None:
outfile.write(yaml.dump(as_yaml))
self.as_yaml_files.append(as_token)

def _set_state(
self, id: str, state: ApplicationServiceState, txn: Optional[int] = None
):
def _set_state(self, id: str, state: ApplicationServiceState):
return self.db_pool.runOperation(
self.engine.convert_param_style(
"INSERT INTO application_services_state(as_id, state, last_txn) "
"VALUES(?,?,?)"
"INSERT INTO application_services_state(as_id, state) VALUES(?,?)"
),
(id, state.value, txn),
(id, state.value),
)

def _insert_txn(self, as_id, txn_id, events):
Expand Down Expand Up @@ -280,17 +277,6 @@ def test_complete_appservice_txn_first_txn(
self.store.complete_appservice_txn(txn_id=txn_id, service=service)
)

res = self.get_success(
self.db_pool.runQuery(
self.engine.convert_param_style(
"SELECT last_txn FROM application_services_state WHERE as_id=?"
),
(service.id,),
)
)
self.assertEqual(1, len(res))
self.assertEqual(txn_id, res[0][0])

res = self.get_success(
self.db_pool.runQuery(
self.engine.convert_param_style(
Expand All @@ -316,14 +302,13 @@ def test_complete_appservice_txn_updates_last_txn_state(
res = self.get_success(
self.db_pool.runQuery(
self.engine.convert_param_style(
"SELECT last_txn, state FROM application_services_state WHERE as_id=?"
"SELECT state FROM application_services_state WHERE as_id=?"
),
(service.id,),
)
)
self.assertEqual(1, len(res))
self.assertEqual(txn_id, res[0][0])
self.assertEqual(ApplicationServiceState.UP.value, res[0][1])
self.assertEqual(ApplicationServiceState.UP.value, res[0][0])

res = self.get_success(
self.db_pool.runQuery(
Expand Down