Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove option to skip locking of tables during emulated upserts #14469

Merged
merged 3 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14469.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove option to skip locking of tables when performing emulated upserts, to avoid a class of bugs in future.
56 changes: 18 additions & 38 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,6 @@ async def simple_upsert(
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
desc: str = "simple_upsert",
lock: bool = True,
) -> bool:
"""Insert a row with values + insertion_values; on conflict, update with values.

Expand All @@ -1154,21 +1153,12 @@ async def simple_upsert(
requiring that a unique index exist on the column names used to detect a
conflict (i.e. `keyvalues.keys()`).

If there is no such index, we can "emulate" an upsert with a SELECT followed
by either an INSERT or an UPDATE. This is unsafe: we cannot make the same
atomicity guarantees that a native upsert can and are very vulnerable to races
and crashes. Therefore if we wish to upsert without an appropriate unique index,
we must either:

1. Acquire a table-level lock before the emulated upsert (`lock=True`), or
2. VERY CAREFULLY ensure that we are the only thread and worker which will be
writing to this table, in which case we can proceed without a lock
(`lock=False`).

Generally speaking, you should use `lock=True`. If the table in question has a
unique index[*], this class will use a native upsert (which is atomic and so can
ignore the `lock` argument). Otherwise this class will use an emulated upsert,
in which case we want the safer option unless we been VERY CAREFUL.
If there is no such index yet[*], we can "emulate" an upsert with a SELECT
followed by either an INSERT or an UPDATE. This is unsafe unless *all* upserters
run at the SERIALIZABLE isolation level: we cannot make the same atomicity
guarantees that a native upsert can and are very vulnerable to races and
crashes. Therefore to upsert without an appropriate unique index, we acquire a
table-level lock before the emulated upsert.

[*]: Some tables have unique indices added to them in the background. Those
tables `T` are keys in the dictionary UNIQUE_INDEX_BACKGROUND_UPDATES,
Expand All @@ -1189,7 +1179,6 @@ async def simple_upsert(
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting
desc: description of the transaction, for logging and metrics
lock: True to lock the table when doing the upsert.
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
Expand All @@ -1209,7 +1198,6 @@ async def simple_upsert(
keyvalues,
values,
insertion_values,
lock=lock,
db_autocommit=autocommit,
)
except self.engine.module.IntegrityError as e:
Expand All @@ -1232,7 +1220,6 @@ def simple_upsert_txn(
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
where_clause: Optional[str] = None,
lock: bool = True,
) -> bool:
"""
Pick the UPSERT method which works best on the platform. Either the
Expand All @@ -1245,8 +1232,6 @@ def simple_upsert_txn(
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting
where_clause: An index predicate to apply to the upsert.
lock: True to lock the table when doing the upsert. Unused when performing
a native upsert.
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
Expand All @@ -1270,7 +1255,6 @@ def simple_upsert_txn(
values,
insertion_values=insertion_values,
where_clause=where_clause,
lock=lock,
)

def simple_upsert_txn_emulated(
Expand All @@ -1291,14 +1275,15 @@ def simple_upsert_txn_emulated(
insertion_values: additional key/values to use only when inserting
where_clause: An index predicate to apply to the upsert.
lock: True to lock the table when doing the upsert.
Must not be False unless the table has already been locked.
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
"""
insertion_values = insertion_values or {}

# We need to lock the table :(, unless we're *really* careful
if lock:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we keeping this one conditional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to support simple_upsert_many_txn_emulated, which does its own locking beforehand.

if lock:
# Lock the table just once, to prevent it being done once per row.
# Note that, according to Postgres' documentation, once obtained,
# the lock is held for the remainder of the current transaction.
self.engine.lock_table(txn, "user_ips")
for keyv, valv in zip(key_values, value_values):
_keys = {x: y for x, y in zip(key_names, keyv)}
_vals = {x: y for x, y in zip(value_names, valv)}
self.simple_upsert_txn_emulated(txn, table, _keys, _vals, lock=False)

# We need to lock the table :(
self.engine.lock_table(txn, table)
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

def _getwhere(key: str) -> str:
Expand Down Expand Up @@ -1406,7 +1391,6 @@ async def simple_upsert_many(
value_names: Collection[str],
value_values: Collection[Collection[Any]],
desc: str,
lock: bool = True,
) -> None:
"""
Upsert, many times.
Expand All @@ -1418,8 +1402,6 @@ async def simple_upsert_many(
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
lock: True to lock the table when doing the upsert. Unused when performing
a native upsert.
"""

# We can autocommit if it safe to upsert
Expand All @@ -1433,7 +1415,6 @@ async def simple_upsert_many(
key_values,
value_names,
value_values,
lock=lock,
db_autocommit=autocommit,
)

Expand All @@ -1445,7 +1426,6 @@ def simple_upsert_many_txn(
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[Any]],
lock: bool = True,
) -> None:
"""
Upsert, many times.
Expand All @@ -1457,16 +1437,19 @@ def simple_upsert_many_txn(
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
lock: True to lock the table when doing the upsert. Unused when performing
a native upsert.
"""
if table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_many_txn_native_upsert(
txn, table, key_names, key_values, value_names, value_values
)
else:
return self.simple_upsert_many_txn_emulated(
txn, table, key_names, key_values, value_names, value_values, lock=lock
txn,
table,
key_names,
key_values,
value_names,
value_values,
)

def simple_upsert_many_txn_emulated(
Expand All @@ -1477,7 +1460,6 @@ def simple_upsert_many_txn_emulated(
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[Any]],
lock: bool = True,
) -> None:
"""
Upsert, many times, but without native UPSERT support or batching.
Expand All @@ -1489,18 +1471,16 @@ def simple_upsert_many_txn_emulated(
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
lock: True to lock the table when doing the upsert.
"""
# No value columns, therefore make a blank list so that the following
# zip() works correctly.
if not value_names:
value_values = [() for x in range(len(key_values))]

if lock:
# Lock the table just once, to prevent it being done once per row.
# Note that, according to Postgres' documentation, once obtained,
# the lock is held for the remainder of the current transaction.
self.engine.lock_table(txn, "user_ips")
# Lock the table just once, to prevent it being done once per row.
# Note that, according to Postgres' documentation, once obtained,
# the lock is held for the remainder of the current transaction.
self.engine.lock_table(txn, "user_ips")

for keyv, valv in zip(key_values, value_values):
_keys = {x: y for x, y in zip(key_names, keyv)}
Expand Down
8 changes: 0 additions & 8 deletions synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,6 @@ async def add_account_data_to_room(
content_json = json_encoder.encode(content)

async with self._account_data_id_gen.get_next() as next_id:
# no need to lock here as room_account_data has a unique constraint
# on (user_id, room_id, account_data_type) so simple_upsert will
# retry if there is a conflict.
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
await self.db_pool.simple_upsert(
desc="add_room_account_data",
table="room_account_data",
Expand All @@ -471,7 +468,6 @@ async def add_account_data_to_room(
"account_data_type": account_data_type,
},
values={"stream_id": next_id, "content": content_json},
lock=False,
)

self._account_data_stream_cache.entity_has_changed(user_id, next_id)
Expand Down Expand Up @@ -527,15 +523,11 @@ def _add_account_data_for_user(
) -> None:
content_json = json_encoder.encode(content)

# no need to lock here as account_data has a unique constraint on
# (user_id, account_data_type) so simple_upsert will retry if
# there is a conflict.
self.db_pool.simple_upsert_txn(
txn,
table="account_data",
keyvalues={"user_id": user_id, "account_data_type": account_data_type},
values={"stream_id": next_id, "content": content_json},
lock=False,
)

# Ignored users get denormalized into a separate table as an optimisation.
Expand Down
2 changes: 0 additions & 2 deletions synapse/storage/databases/main/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,6 @@ async def set_appservice_stream_type_pos(
table="application_services_state",
keyvalues={"as_id": service.id},
values={f"{stream_type}_stream_id": pos},
# no need to lock when emulating upsert: as_id is a unique key
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
lock=False,
desc="set_appservice_stream_type_pos",
)

Expand Down
9 changes: 0 additions & 9 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1747,9 +1747,6 @@ def _update_remote_device_list_cache_entry_txn(
table="device_lists_remote_cache",
keyvalues={"user_id": user_id, "device_id": device_id},
values={"content": json_encoder.encode(content)},
# we don't need to lock, because we assume we are the only thread
# updating this user's devices.
lock=False,
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
)

txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id))
Expand All @@ -1763,9 +1760,6 @@ def _update_remote_device_list_cache_entry_txn(
table="device_lists_remote_extremeties",
keyvalues={"user_id": user_id},
values={"stream_id": stream_id},
# again, we can assume we are the only thread updating this user's
# extremity.
lock=False,
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
)

async def update_remote_device_list_cache(
Expand Down Expand Up @@ -1818,9 +1812,6 @@ def _update_remote_device_list_cache_txn(
table="device_lists_remote_extremeties",
keyvalues={"user_id": user_id},
values={"stream_id": stream_id},
# we don't need to lock, because we can assume we are the only thread
# updating this user's extremity.
lock=False,
)

async def add_device_change_to_streams(
Expand Down
1 change: 0 additions & 1 deletion synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1686,7 +1686,6 @@ async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
},
insertion_values={},
desc="insert_insertion_extremity",
lock=False,
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
)

async def insert_received_event_to_staging(
Expand Down
6 changes: 0 additions & 6 deletions synapse/storage/databases/main/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,14 +331,11 @@ async def get_throttle_params_by_room(
async def set_throttle_params(
self, pusher_id: str, room_id: str, params: ThrottleParams
) -> None:
# no need to lock because `pusher_throttle` has a primary key on
# (pusher, room_id) so simple_upsert will retry
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
await self.db_pool.simple_upsert(
"pusher_throttle",
{"pusher": pusher_id, "room_id": room_id},
{"last_sent_ts": params.last_sent_ts, "throttle_ms": params.throttle_ms},
desc="set_throttle_params",
lock=False,
)

async def _remove_deactivated_pushers(self, progress: dict, batch_size: int) -> int:
Expand Down Expand Up @@ -595,8 +592,6 @@ async def add_pusher(
device_id: Optional[str] = None,
) -> None:
async with self._pushers_id_gen.get_next() as stream_id:
# no need to lock because `pushers` has a unique key on
# (app_id, pushkey, user_name) so simple_upsert will retry
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
await self.db_pool.simple_upsert(
table="pushers",
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
Expand All @@ -615,7 +610,6 @@ async def add_pusher(
"device_id": device_id,
},
desc="add_pusher",
lock=False,
)

user_has_pusher = self.get_if_user_has_pusher.cache.get_immediate(
Expand Down
6 changes: 0 additions & 6 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1843,9 +1843,6 @@ async def upsert_room_on_join(
"creator": room_creator,
"has_auth_chain_index": has_auth_chain_index,
},
# rooms has a unique constraint on room_id, so no need to lock when doing an
# emulated upsert.
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
lock=False,
)

async def store_partial_state_room(
Expand Down Expand Up @@ -1966,9 +1963,6 @@ async def maybe_store_room_on_outlier_membership(
"creator": "",
"has_auth_chain_index": has_auth_chain_index,
},
# rooms has a unique constraint on room_id, so no need to lock when doing an
# emulated upsert.
lock=False,
)

async def set_room_is_public(self, room_id: str, is_public: bool) -> None:
Expand Down
2 changes: 0 additions & 2 deletions synapse/storage/databases/main/room_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,4 @@ async def store_state_group_id_for_event_id(
table="event_to_state_groups",
keyvalues={"event_id": event_id},
values={"state_group": state_group_id, "event_id": event_id},
# Unique constraint on event_id so we don't have to lock
lock=False,
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
)
2 changes: 0 additions & 2 deletions synapse/storage/databases/main/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,6 @@ def _update_profile_in_user_dir_txn(txn: LoggingTransaction) -> None:
table="user_directory",
keyvalues={"user_id": user_id},
values={"display_name": display_name, "avatar_url": avatar_url},
lock=False, # We're only inserter
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
)

if isinstance(self.database_engine, PostgresEngine):
Expand Down Expand Up @@ -511,7 +510,6 @@ def _update_profile_in_user_dir_txn(txn: LoggingTransaction) -> None:
table="user_directory_search",
keyvalues={"user_id": user_id},
values={"value": value},
lock=False, # We're only inserter
Comment on lines 510 to -514
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unique index exists here for postgres:

CREATE UNIQUE INDEX user_directory_search_user_idx ON user_directory_search(user_id);

... but not for sqlite. It's marked as unsafe to upsert into on sqlite here:

# We add the user_directory_search table to the blacklist on SQLite
# because the existing search table does not have an index, making it
# unsafe to use native upserts.
if isinstance(self.engine, Sqlite3Engine):
self._unsafe_to_upsert_tables.add("user_directory_search")

Does that mean that with this change we're now going to start locking user_directory_search on sqlite deployments? Ahh, but locking a table is a no-op on sqlite, presumably because there's at most one worker writing?

def lock_table(self, txn: Cursor, table: str) -> None:
return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot with user_directory_search. I wasn't expecting that. Thankfully the no-op locking saves us.

But if locking on sqlite has never done anything, then the rationale for lock=False cannot have been because sqlite had no native upserts. Which means it must have been added as an optimization for the background update case on postgres?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if locking on sqlite has never done anything, then the rationale for lock=False cannot have been because sqlite had no native upserts.

Oops, sorry. That's probably me jumping to conclusions.

Which means it must have been added as an optimization for the background update case on postgres?

I suppose so! But to check: I don't think that suddenly makes it unsafe to remove the footgun?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it's not unsafe, it just means this PR is rolling back a previous optimization we decided to add.

)
else:
# This should be unreachable.
Expand Down