From bba7c55c49d78c27063f97ca7a8a04f1936dfe29 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 21 Jul 2021 12:01:51 +0100 Subject: [PATCH 1/6] Change upsert return values to be more useful --- synapse/storage/database.py | 30 +++++---- .../databases/main/monthly_active_users.py | 4 +- .../storage/databases/main/user_directory.py | 66 ++++--------------- 3 files changed, 29 insertions(+), 71 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index ccf9ac51ef8c..9c5f5001682d 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -930,7 +930,7 @@ async def simple_upsert( insertion_values: Optional[Dict[str, Any]] = None, desc: str = "simple_upsert", lock: bool = True, - ) -> Optional[bool]: + ) -> bool: """ `lock` should generally be set to True (the default), but can be set @@ -951,8 +951,8 @@ async def simple_upsert( desc: description of the transaction, for logging and metrics lock: True to lock the table when doing the upsert. Returns: - Native upserts always return None. Emulated upserts return True if a - new entry was created, False if an existing one was updated. + Returns True if a row was inserted or updated (i.e. if `values` is + not empty then this always returns True) """ insertion_values = insertion_values or {} @@ -995,7 +995,7 @@ def simple_upsert_txn( values: Dict[str, Any], insertion_values: Optional[Dict[str, Any]] = None, lock: bool = True, - ) -> Optional[bool]: + ) -> bool: """ Pick the UPSERT method which works best on the platform. Either the native one (Pg9.5+, recent SQLites), or fall back to an emulated method. @@ -1008,16 +1008,15 @@ def simple_upsert_txn( insertion_values: additional key/values to use only when inserting lock: True to lock the table when doing the upsert. Returns: - Native upserts always return None. Emulated upserts return True if a - new entry was created, False if an existing one was updated. + Returns True if a row was inserted or updated (i.e. if `values` is + not empty then this always returns True) """ insertion_values = insertion_values or {} if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables: - self.simple_upsert_txn_native_upsert( + return self.simple_upsert_txn_native_upsert( txn, table, keyvalues, values, insertion_values=insertion_values ) - return None else: return self.simple_upsert_txn_emulated( txn, @@ -1045,8 +1044,8 @@ def simple_upsert_txn_emulated( insertion_values: additional key/values to use only when inserting lock: True to lock the table when doing the upsert. Returns: - Returns True if a new entry was created, False if an existing - one was updated. + Returns True if a row was inserted or updated (i.e. if `values` is + not empty then this always returns True) """ insertion_values = insertion_values or {} @@ -1086,8 +1085,7 @@ def _getwhere(key): txn.execute(sql, sqlargs) if txn.rowcount > 0: - # successfully updated at least one row. - return False + return True # We didn't find any existing rows, so insert a new one allvalues: Dict[str, Any] = {} @@ -1111,7 +1109,7 @@ def simple_upsert_txn_native_upsert( keyvalues: Dict[str, Any], values: Dict[str, Any], insertion_values: Optional[Dict[str, Any]] = None, - ) -> None: + ) -> bool: """ Use the native UPSERT functionality in recent PostgreSQL versions. @@ -1120,6 +1118,10 @@ def simple_upsert_txn_native_upsert( keyvalues: The unique key tables and their new values values: The nonunique columns and their new values insertion_values: additional key/values to use only when inserting + + Returns: + Returns True if a row was inserted or updated (i.e. if `values` is + not empty then this always returns True) """ allvalues: Dict[str, Any] = {} allvalues.update(keyvalues) @@ -1140,6 +1142,8 @@ def simple_upsert_txn_native_upsert( ) txn.execute(sql, list(allvalues.values())) + return bool(txn.rowcount) + async def simple_upsert_many( self, table: str, diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py index fe25638289cd..74ea9ea6ab76 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py @@ -307,7 +307,7 @@ def upsert_monthly_active_user_txn(self, txn, user_id): # never be a big table and alternative approaches (batching multiple # upserts into a single txn) introduced a lot of extra complexity. # See https://github.com/matrix-org/synapse/issues/3854 for more - is_insert = self.db_pool.simple_upsert_txn( + self.db_pool.simple_upsert_txn( txn, table="monthly_active_users", keyvalues={"user_id": user_id}, @@ -322,8 +322,6 @@ def upsert_monthly_active_user_txn(self, txn, user_id): txn, self.user_last_seen_monthly_active, (user_id,) ) - return is_insert - async def populate_monthly_active_users(self, user_id): """Checks on the state of monthly active user limits and optionally add the user to the monthly active tables diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index a6bfb4902a1c..9d28d69ac7d1 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -377,7 +377,7 @@ async def update_profile_in_user_dir( avatar_url = None def _update_profile_in_user_dir_txn(txn): - new_entry = self.db_pool.simple_upsert_txn( + self.db_pool.simple_upsert_txn( txn, table="user_directory", keyvalues={"user_id": user_id}, @@ -388,8 +388,7 @@ def _update_profile_in_user_dir_txn(txn): if isinstance(self.database_engine, PostgresEngine): # We weight the localpart most highly, then display name and finally # server name - if self.database_engine.can_native_upsert: - sql = """ + sql = """ INSERT INTO user_directory_search(user_id, vector) VALUES (?, setweight(to_tsvector('simple', ?), 'A') @@ -397,58 +396,15 @@ def _update_profile_in_user_dir_txn(txn): || setweight(to_tsvector('simple', COALESCE(?, '')), 'B') ) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector """ - txn.execute( - sql, - ( - user_id, - get_localpart_from_id(user_id), - get_domain_from_id(user_id), - display_name, - ), - ) - else: - # TODO: Remove this code after we've bumped the minimum version - # of postgres to always support upserts, so we can get rid of - # `new_entry` usage - if new_entry is True: - sql = """ - INSERT INTO user_directory_search(user_id, vector) - VALUES (?, - setweight(to_tsvector('simple', ?), 'A') - || setweight(to_tsvector('simple', ?), 'D') - || setweight(to_tsvector('simple', COALESCE(?, '')), 'B') - ) - """ - txn.execute( - sql, - ( - user_id, - get_localpart_from_id(user_id), - get_domain_from_id(user_id), - display_name, - ), - ) - elif new_entry is False: - sql = """ - UPDATE user_directory_search - SET vector = setweight(to_tsvector('simple', ?), 'A') - || setweight(to_tsvector('simple', ?), 'D') - || setweight(to_tsvector('simple', COALESCE(?, '')), 'B') - WHERE user_id = ? - """ - txn.execute( - sql, - ( - get_localpart_from_id(user_id), - get_domain_from_id(user_id), - display_name, - user_id, - ), - ) - else: - raise RuntimeError( - "upsert returned None when 'can_native_upsert' is False" - ) + txn.execute( + sql, + ( + user_id, + get_localpart_from_id(user_id), + get_domain_from_id(user_id), + display_name, + ), + ) elif isinstance(self.database_engine, Sqlite3Engine): value = "%s %s" % (user_id, display_name) if display_name else user_id self.db_pool.simple_upsert_txn( From d94db6a7a0c5bb628d1b978e0c6ea934910a781b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 21 Jul 2021 12:03:26 +0100 Subject: [PATCH 2/6] Remove or_ignore param in simple_insert Now that we have `simple_upsert` that should be used in preference to trying to insert and looking for an exception. The main benefit is that we ERROR message don't get written to postgres logs. --- synapse/storage/database.py | 19 ++----------------- synapse/storage/databases/main/devices.py | 9 ++++++--- .../storage/databases/main/transactions.py | 7 ++++--- 3 files changed, 12 insertions(+), 23 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 9c5f5001682d..e5a1c979794e 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -832,31 +832,16 @@ async def simple_insert( self, table: str, values: Dict[str, Any], - or_ignore: bool = False, desc: str = "simple_insert", - ) -> bool: + ): """Executes an INSERT query on the named table. Args: table: string giving the table name values: dict of new column names and values for them - or_ignore: bool stating whether an exception should be raised - when a conflicting row already exists. If True, False will be - returned by the function instead desc: description of the transaction, for logging and metrics - - Returns: - Whether the row was inserted or not. Only useful when `or_ignore` is True """ - try: - await self.runInteraction(desc, self.simple_insert_txn, table, values) - except self.engine.module.IntegrityError: - # We have to do or_ignore flag at this layer, since we can't reuse - # a cursor after we receive an error from the db. - if not or_ignore: - raise - return False - return True + await self.runInteraction(desc, self.simple_insert_txn, table, values) @staticmethod def simple_insert_txn( diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 18f07d96dcd0..3816a0ca5382 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1078,16 +1078,18 @@ async def store_device( return False try: - inserted = await self.db_pool.simple_insert( + inserted = await self.db_pool.simple_upsert( "devices", - values={ + keyvalues={ "user_id": user_id, "device_id": device_id, + }, + values={}, + insertion_values={ "display_name": initial_device_display_name, "hidden": False, }, desc="store_device", - or_ignore=True, ) if not inserted: # if the device already exists, check if it's a real device, or @@ -1099,6 +1101,7 @@ async def store_device( ) if hidden: raise StoreError(400, "The device ID is in use", Codes.FORBIDDEN) + self.device_id_exists_cache.set(key, True) return inserted except StoreError: diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index d211c423b2ce..2579571d5829 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -134,16 +134,17 @@ async def set_received_txn_response( response_dict: The response, to be encoded into JSON. """ - await self.db_pool.simple_insert( + await self.db_pool.simple_upsert( table="received_transactions", - values={ + keyvalues={ "transaction_id": transaction_id, "origin": origin, + }, + values={ "response_code": code, "response_json": db_binary_type(encode_canonical_json(response_dict)), "ts": self._clock.time_msec(), }, - or_ignore=True, desc="set_received_txn_response", ) From fe19b14e8ff48bc8e89d8cac5caacfa39d6c5357 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 21 Jul 2021 12:07:19 +0100 Subject: [PATCH 3/6] Newsfile --- changelog.d/10442.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/10442.misc diff --git a/changelog.d/10442.misc b/changelog.d/10442.misc new file mode 100644 index 000000000000..b8d412d73289 --- /dev/null +++ b/changelog.d/10442.misc @@ -0,0 +1 @@ +Replace usage of `or_ignore` in `simple_insert` with `simple_upsert` usage, to stop spamming postgres logs with spurious ERROR messages. From 793da53b1f40e37fc94bb1f3cdad367244e9e66a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 21 Jul 2021 12:35:33 +0100 Subject: [PATCH 4/6] Code review --- synapse/storage/database.py | 2 +- synapse/storage/databases/main/monthly_active_users.py | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index e5a1c979794e..ffdb88985e7d 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -833,7 +833,7 @@ async def simple_insert( table: str, values: Dict[str, Any], desc: str = "simple_insert", - ): + ) -> None: """Executes an INSERT query on the named table. Args: diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py index 74ea9ea6ab76..d213b2670364 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py @@ -297,10 +297,6 @@ def upsert_monthly_active_user_txn(self, txn, user_id): Args: txn (cursor): user_id (str): user to add/update - - Returns: - bool: True if a new entry was created, False if an - existing one was updated. """ # Am consciously deciding to lock the table on the basis that is ought From eb48a24da7f73ee44df8dc55c487bd0fca0798b5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 22 Jul 2021 10:34:47 +0100 Subject: [PATCH 5/6] Update synapse/storage/database.py Co-authored-by: Patrick Cloke --- synapse/storage/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index ffdb88985e7d..4d4643619f68 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1096,7 +1096,7 @@ def simple_upsert_txn_native_upsert( insertion_values: Optional[Dict[str, Any]] = None, ) -> bool: """ - Use the native UPSERT functionality in recent PostgreSQL versions. + Use the native UPSERT functionality in PostgreSQL. Args: table: The table to upsert into From a55709245f21b2bd42d35869459782c3fcd4ee90 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 22 Jul 2021 10:35:22 +0100 Subject: [PATCH 6/6] Update synapse/storage/databases/main/transactions.py --- synapse/storage/databases/main/transactions.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 2579571d5829..7728d5f1023c 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -140,7 +140,8 @@ async def set_received_txn_response( "transaction_id": transaction_id, "origin": origin, }, - values={ + values={}, + insertion_values={ "response_code": code, "response_json": db_binary_type(encode_canonical_json(response_dict)), "ts": self._clock.time_msec(),