-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Remove option to skip locking of tables during emulated upserts #14469
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Remove option to skip locking of tables when performing emulated upserts, to avoid a class of bugs in future. |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -1129,7 +1129,6 @@ async def simple_upsert( | |||||||||||||||||||||||
values: Dict[str, Any], | ||||||||||||||||||||||||
insertion_values: Optional[Dict[str, Any]] = None, | ||||||||||||||||||||||||
desc: str = "simple_upsert", | ||||||||||||||||||||||||
lock: bool = True, | ||||||||||||||||||||||||
) -> bool: | ||||||||||||||||||||||||
"""Insert a row with values + insertion_values; on conflict, update with values. | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
|
@@ -1154,21 +1153,11 @@ async def simple_upsert( | |||||||||||||||||||||||
requiring that a unique index exist on the column names used to detect a | ||||||||||||||||||||||||
conflict (i.e. `keyvalues.keys()`). | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
If there is no such index, we can "emulate" an upsert with a SELECT followed | ||||||||||||||||||||||||
by either an INSERT or an UPDATE. This is unsafe: we cannot make the same | ||||||||||||||||||||||||
atomicity guarantees that a native upsert can and are very vulnerable to races | ||||||||||||||||||||||||
and crashes. Therefore if we wish to upsert without an appropriate unique index, | ||||||||||||||||||||||||
we must either: | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
1. Acquire a table-level lock before the emulated upsert (`lock=True`), or | ||||||||||||||||||||||||
2. VERY CAREFULLY ensure that we are the only thread and worker which will be | ||||||||||||||||||||||||
writing to this table, in which case we can proceed without a lock | ||||||||||||||||||||||||
(`lock=False`). | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
Generally speaking, you should use `lock=True`. If the table in question has a | ||||||||||||||||||||||||
unique index[*], this class will use a native upsert (which is atomic and so can | ||||||||||||||||||||||||
ignore the `lock` argument). Otherwise this class will use an emulated upsert, | ||||||||||||||||||||||||
in which case we want the safer option unless we been VERY CAREFUL. | ||||||||||||||||||||||||
If there is no such index yet[*], we can "emulate" an upsert with a SELECT | ||||||||||||||||||||||||
followed by either an INSERT or an UPDATE. This is unsafe: we cannot make the | ||||||||||||||||||||||||
same atomicity guarantees that a native upsert can and are very vulnerable to | ||||||||||||||||||||||||
races and crashes. Therefore if we wish to upsert without an appropriate unique | ||||||||||||||||||||||||
index, we must acquire a table-level lock before the emulated upsert. | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
[*]: Some tables have unique indices added to them in the background. Those | ||||||||||||||||||||||||
tables `T` are keys in the dictionary UNIQUE_INDEX_BACKGROUND_UPDATES, | ||||||||||||||||||||||||
|
@@ -1189,7 +1178,6 @@ async def simple_upsert( | |||||||||||||||||||||||
values: The nonunique columns and their new values | ||||||||||||||||||||||||
insertion_values: additional key/values to use only when inserting | ||||||||||||||||||||||||
desc: description of the transaction, for logging and metrics | ||||||||||||||||||||||||
lock: True to lock the table when doing the upsert. | ||||||||||||||||||||||||
Returns: | ||||||||||||||||||||||||
Returns True if a row was inserted or updated (i.e. if `values` is | ||||||||||||||||||||||||
not empty then this always returns True) | ||||||||||||||||||||||||
|
@@ -1209,7 +1197,6 @@ async def simple_upsert( | |||||||||||||||||||||||
keyvalues, | ||||||||||||||||||||||||
values, | ||||||||||||||||||||||||
insertion_values, | ||||||||||||||||||||||||
lock=lock, | ||||||||||||||||||||||||
db_autocommit=autocommit, | ||||||||||||||||||||||||
) | ||||||||||||||||||||||||
except self.engine.module.IntegrityError as e: | ||||||||||||||||||||||||
|
@@ -1232,7 +1219,6 @@ def simple_upsert_txn( | |||||||||||||||||||||||
values: Dict[str, Any], | ||||||||||||||||||||||||
insertion_values: Optional[Dict[str, Any]] = None, | ||||||||||||||||||||||||
where_clause: Optional[str] = None, | ||||||||||||||||||||||||
lock: bool = True, | ||||||||||||||||||||||||
) -> bool: | ||||||||||||||||||||||||
""" | ||||||||||||||||||||||||
Pick the UPSERT method which works best on the platform. Either the | ||||||||||||||||||||||||
|
@@ -1245,8 +1231,6 @@ def simple_upsert_txn( | |||||||||||||||||||||||
values: The nonunique columns and their new values | ||||||||||||||||||||||||
insertion_values: additional key/values to use only when inserting | ||||||||||||||||||||||||
where_clause: An index predicate to apply to the upsert. | ||||||||||||||||||||||||
lock: True to lock the table when doing the upsert. Unused when performing | ||||||||||||||||||||||||
a native upsert. | ||||||||||||||||||||||||
Returns: | ||||||||||||||||||||||||
Returns True if a row was inserted or updated (i.e. if `values` is | ||||||||||||||||||||||||
not empty then this always returns True) | ||||||||||||||||||||||||
|
@@ -1270,7 +1254,6 @@ def simple_upsert_txn( | |||||||||||||||||||||||
values, | ||||||||||||||||||||||||
insertion_values=insertion_values, | ||||||||||||||||||||||||
where_clause=where_clause, | ||||||||||||||||||||||||
lock=lock, | ||||||||||||||||||||||||
) | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
def simple_upsert_txn_emulated( | ||||||||||||||||||||||||
|
@@ -1291,14 +1274,15 @@ def simple_upsert_txn_emulated( | |||||||||||||||||||||||
insertion_values: additional key/values to use only when inserting | ||||||||||||||||||||||||
where_clause: An index predicate to apply to the upsert. | ||||||||||||||||||||||||
lock: True to lock the table when doing the upsert. | ||||||||||||||||||||||||
Must not be False unless the table has already been locked. | ||||||||||||||||||||||||
Returns: | ||||||||||||||||||||||||
Returns True if a row was inserted or updated (i.e. if `values` is | ||||||||||||||||||||||||
not empty then this always returns True) | ||||||||||||||||||||||||
""" | ||||||||||||||||||||||||
insertion_values = insertion_values or {} | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
# We need to lock the table :(, unless we're *really* careful | ||||||||||||||||||||||||
if lock: | ||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we keeping this one conditional? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's to support synapse/synapse/storage/database.py Lines 1499 to 1509 in 9cae44f
|
||||||||||||||||||||||||
# We need to lock the table :( | ||||||||||||||||||||||||
self.engine.lock_table(txn, table) | ||||||||||||||||||||||||
DMRobertson marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||
|
||||||||||||||||||||||||
def _getwhere(key: str) -> str: | ||||||||||||||||||||||||
|
@@ -1406,7 +1390,6 @@ async def simple_upsert_many( | |||||||||||||||||||||||
value_names: Collection[str], | ||||||||||||||||||||||||
value_values: Collection[Collection[Any]], | ||||||||||||||||||||||||
desc: str, | ||||||||||||||||||||||||
lock: bool = True, | ||||||||||||||||||||||||
) -> None: | ||||||||||||||||||||||||
""" | ||||||||||||||||||||||||
Upsert, many times. | ||||||||||||||||||||||||
|
@@ -1418,8 +1401,6 @@ async def simple_upsert_many( | |||||||||||||||||||||||
value_names: The value column names | ||||||||||||||||||||||||
value_values: A list of each row's value column values. | ||||||||||||||||||||||||
Ignored if value_names is empty. | ||||||||||||||||||||||||
lock: True to lock the table when doing the upsert. Unused when performing | ||||||||||||||||||||||||
a native upsert. | ||||||||||||||||||||||||
""" | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
# We can autocommit if it safe to upsert | ||||||||||||||||||||||||
|
@@ -1433,7 +1414,6 @@ async def simple_upsert_many( | |||||||||||||||||||||||
key_values, | ||||||||||||||||||||||||
value_names, | ||||||||||||||||||||||||
value_values, | ||||||||||||||||||||||||
lock=lock, | ||||||||||||||||||||||||
db_autocommit=autocommit, | ||||||||||||||||||||||||
) | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
|
@@ -1445,7 +1425,6 @@ def simple_upsert_many_txn( | |||||||||||||||||||||||
key_values: Collection[Iterable[Any]], | ||||||||||||||||||||||||
value_names: Collection[str], | ||||||||||||||||||||||||
value_values: Iterable[Iterable[Any]], | ||||||||||||||||||||||||
lock: bool = True, | ||||||||||||||||||||||||
) -> None: | ||||||||||||||||||||||||
""" | ||||||||||||||||||||||||
Upsert, many times. | ||||||||||||||||||||||||
|
@@ -1457,16 +1436,19 @@ def simple_upsert_many_txn( | |||||||||||||||||||||||
value_names: The value column names | ||||||||||||||||||||||||
value_values: A list of each row's value column values. | ||||||||||||||||||||||||
Ignored if value_names is empty. | ||||||||||||||||||||||||
lock: True to lock the table when doing the upsert. Unused when performing | ||||||||||||||||||||||||
a native upsert. | ||||||||||||||||||||||||
""" | ||||||||||||||||||||||||
if table not in self._unsafe_to_upsert_tables: | ||||||||||||||||||||||||
return self.simple_upsert_many_txn_native_upsert( | ||||||||||||||||||||||||
txn, table, key_names, key_values, value_names, value_values | ||||||||||||||||||||||||
) | ||||||||||||||||||||||||
else: | ||||||||||||||||||||||||
return self.simple_upsert_many_txn_emulated( | ||||||||||||||||||||||||
txn, table, key_names, key_values, value_names, value_values, lock=lock | ||||||||||||||||||||||||
txn, | ||||||||||||||||||||||||
table, | ||||||||||||||||||||||||
key_names, | ||||||||||||||||||||||||
key_values, | ||||||||||||||||||||||||
value_names, | ||||||||||||||||||||||||
value_values, | ||||||||||||||||||||||||
) | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
def simple_upsert_many_txn_emulated( | ||||||||||||||||||||||||
|
@@ -1477,7 +1459,6 @@ def simple_upsert_many_txn_emulated( | |||||||||||||||||||||||
key_values: Collection[Iterable[Any]], | ||||||||||||||||||||||||
value_names: Collection[str], | ||||||||||||||||||||||||
value_values: Iterable[Iterable[Any]], | ||||||||||||||||||||||||
lock: bool = True, | ||||||||||||||||||||||||
) -> None: | ||||||||||||||||||||||||
""" | ||||||||||||||||||||||||
Upsert, many times, but without native UPSERT support or batching. | ||||||||||||||||||||||||
|
@@ -1489,18 +1470,16 @@ def simple_upsert_many_txn_emulated( | |||||||||||||||||||||||
value_names: The value column names | ||||||||||||||||||||||||
value_values: A list of each row's value column values. | ||||||||||||||||||||||||
Ignored if value_names is empty. | ||||||||||||||||||||||||
lock: True to lock the table when doing the upsert. | ||||||||||||||||||||||||
""" | ||||||||||||||||||||||||
# No value columns, therefore make a blank list so that the following | ||||||||||||||||||||||||
# zip() works correctly. | ||||||||||||||||||||||||
if not value_names: | ||||||||||||||||||||||||
value_values = [() for x in range(len(key_values))] | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
if lock: | ||||||||||||||||||||||||
# Lock the table just once, to prevent it being done once per row. | ||||||||||||||||||||||||
# Note that, according to Postgres' documentation, once obtained, | ||||||||||||||||||||||||
# the lock is held for the remainder of the current transaction. | ||||||||||||||||||||||||
self.engine.lock_table(txn, "user_ips") | ||||||||||||||||||||||||
# Lock the table just once, to prevent it being done once per row. | ||||||||||||||||||||||||
# Note that, according to Postgres' documentation, once obtained, | ||||||||||||||||||||||||
# the lock is held for the remainder of the current transaction. | ||||||||||||||||||||||||
self.engine.lock_table(txn, "user_ips") | ||||||||||||||||||||||||
|
||||||||||||||||||||||||
for keyv, valv in zip(key_values, value_values): | ||||||||||||||||||||||||
_keys = {x: y for x, y in zip(key_names, keyv)} | ||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -481,7 +481,6 @@ def _update_profile_in_user_dir_txn(txn: LoggingTransaction) -> None: | |||||||||||||||||
table="user_directory", | ||||||||||||||||||
keyvalues={"user_id": user_id}, | ||||||||||||||||||
values={"display_name": display_name, "avatar_url": avatar_url}, | ||||||||||||||||||
lock=False, # We're only inserter | ||||||||||||||||||
DMRobertson marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||
) | ||||||||||||||||||
|
||||||||||||||||||
if isinstance(self.database_engine, PostgresEngine): | ||||||||||||||||||
|
@@ -511,7 +510,6 @@ def _update_profile_in_user_dir_txn(txn: LoggingTransaction) -> None: | |||||||||||||||||
table="user_directory_search", | ||||||||||||||||||
keyvalues={"user_id": user_id}, | ||||||||||||||||||
values={"value": value}, | ||||||||||||||||||
lock=False, # We're only inserter | ||||||||||||||||||
Comment on lines
510
to
-514
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unique index exists here for postgres:
... but not for sqlite. It's marked as unsafe to upsert into on sqlite here: synapse/synapse/storage/database.py Lines 534 to 538 in c3a4780
Does that mean that with this change we're now going to start locking synapse/synapse/storage/engines/sqlite.py Lines 103 to 104 in 8c94dd3
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good spot with But if locking on sqlite has never done anything, then the rationale for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Oops, sorry. That's probably me jumping to conclusions.
I suppose so! But to check: I don't think that suddenly makes it unsafe to remove the footgun? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, it's not unsafe, it just means this PR is rolling back a previous optimization we decided to add. |
||||||||||||||||||
) | ||||||||||||||||||
else: | ||||||||||||||||||
# This should be unreachable. | ||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if my phrasing was poor here. Maybe we can make those guarantees if we're using a sufficiently high isolation level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to reword it a little. Suggestions welcome.