From a829402d82f834f303b360e1b2fe4af8ca09c044 Mon Sep 17 00:00:00 2001 From: Beth Skurrie Date: Thu, 21 Mar 2024 15:26:37 +1100 Subject: [PATCH 1/2] feat: reduce contention when updating the contract_data_updated_at field for integrations --- lib/pact_broker/dataset.rb | 12 +++++++++++ lib/pact_broker/integrations/repository.rb | 23 +++++++++++++++------- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/lib/pact_broker/dataset.rb b/lib/pact_broker/dataset.rb index 84eb1e7e0..ddf366dd6 100644 --- a/lib/pact_broker/dataset.rb +++ b/lib/pact_broker/dataset.rb @@ -101,6 +101,18 @@ def order_ignore_case column_name = :name def order_append_ignore_case column_name = :name order_append(Sequel.function(:lower, column_name)) end + + # Executes a SELECT query FOR UPDATE, with SKIP LOCKED if supported (postgres only). + # With FOR UPDATE SKIP LOCKED, the SELECT will run immediately, not waiting for any other transactions, + # and only return rows that are not already locked by another transaction. + # The FOR UPDATE is required to make it work this way - SKIP LOCKED on its own does not work. + def for_update_skip_locked_if_supported + if supports_skip_locked? + for_update.skip_locked + else + self + end + end end end diff --git a/lib/pact_broker/integrations/repository.rb b/lib/pact_broker/integrations/repository.rb index 7568eed66..db6678cba 100644 --- a/lib/pact_broker/integrations/repository.rb +++ b/lib/pact_broker/integrations/repository.rb @@ -50,18 +50,27 @@ def delete(consumer_id, provider_id) # @param [PactBroker::Domain::Pacticipant, nil] consumer the consumer for the integration, or nil if for a provider-only event (eg. Pactflow provider contract published) # @param [PactBroker::Domain::Pacticipant] provider the provider for the integration def set_contract_data_updated_at(consumer, provider) - Integration - .where({ consumer_id: consumer&.id, provider_id: provider.id }.compact ) - .update(contract_data_updated_at: Sequel.datetime_class.now) + set_contract_data_updated_at_for_multiple_integrations([OpenStruct.new(consumer: consumer, provider: provider)]) end - # Sets the contract_data_updated_at for the integrations as specified by an array of objects which each have a consumer and provider - # @param [Array] where each object has a consumer and a provider + # Sets the contract_data_updated_at for the integrations as specified by an array of objects which each have a consumer and provider. + # + # The contract_data_updated_at attribute is only ever used for ordering the list of integrations on the index page of the *Pact Broker* UI, + # so that the most recently updated integrations (the ones you're likely working on) are showed at the top of the first page. + # There is often contention around updating it however, which can cause deadlocks, and slow down API responses. + # Because it's not a critical field (eg. it won't change any can-i-deploy results), the easiest way to reduce this contention + # is to just not update it if the row is locked, because if it is locked, the value of contract_data_updated_at is already + # going to be a date from a few seconds ago, which is perfectly fine for the purposes for which we are using the value. + # @param [Array] where each object MAY have a consumer and does have a provider (for Pactflow provider contract published there is no consumer) def set_contract_data_updated_at_for_multiple_integrations(objects_with_consumer_and_provider) - consumer_and_provider_ids = objects_with_consumer_and_provider.collect{ | object | [object.consumer.id, object.provider.id] }.uniq + consumer_and_provider_ids = objects_with_consumer_and_provider.collect{ | object | { consumer_id: object.consumer&.id, provider_id: object.provider.id }.compact }.uniq + integration_ids_to_update = Integration + .select(:id) + .where(Sequel.|(*consumer_and_provider_ids)) + .for_update_skip_locked_if_supported Integration - .where([:consumer_id, :provider_id] => consumer_and_provider_ids) + .where(id: integration_ids_to_update) .update(contract_data_updated_at: Sequel.datetime_class.now) end end From b7de537c903d2f5adb22aaea2055aa501110e89c Mon Sep 17 00:00:00 2001 From: Beth Skurrie Date: Thu, 21 Mar 2024 15:57:09 +1100 Subject: [PATCH 2/2] chore: make tests pass for MySQL --- lib/pact_broker/dataset.rb | 12 ---------- lib/pact_broker/integrations/repository.rb | 26 +++++++++++++++++----- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/lib/pact_broker/dataset.rb b/lib/pact_broker/dataset.rb index ddf366dd6..84eb1e7e0 100644 --- a/lib/pact_broker/dataset.rb +++ b/lib/pact_broker/dataset.rb @@ -101,18 +101,6 @@ def order_ignore_case column_name = :name def order_append_ignore_case column_name = :name order_append(Sequel.function(:lower, column_name)) end - - # Executes a SELECT query FOR UPDATE, with SKIP LOCKED if supported (postgres only). - # With FOR UPDATE SKIP LOCKED, the SELECT will run immediately, not waiting for any other transactions, - # and only return rows that are not already locked by another transaction. - # The FOR UPDATE is required to make it work this way - SKIP LOCKED on its own does not work. - def for_update_skip_locked_if_supported - if supports_skip_locked? - for_update.skip_locked - else - self - end - end end end diff --git a/lib/pact_broker/integrations/repository.rb b/lib/pact_broker/integrations/repository.rb index db6678cba..9890171a6 100644 --- a/lib/pact_broker/integrations/repository.rb +++ b/lib/pact_broker/integrations/repository.rb @@ -62,15 +62,31 @@ def set_contract_data_updated_at(consumer, provider) # Because it's not a critical field (eg. it won't change any can-i-deploy results), the easiest way to reduce this contention # is to just not update it if the row is locked, because if it is locked, the value of contract_data_updated_at is already # going to be a date from a few seconds ago, which is perfectly fine for the purposes for which we are using the value. + # + # Notes on SKIP LOCKED: + # SKIP LOCKED is only supported by Postgres. + # When executing SELECT ... FOR UPDATE SKIP LOCKED, the SELECT will run immediately, not waiting for any other transactions, + # and only return rows that are not already locked by another transaction. + # The FOR UPDATE is required to make it work this way - SKIP LOCKED on its own does not work. + # # @param [Array] where each object MAY have a consumer and does have a provider (for Pactflow provider contract published there is no consumer) def set_contract_data_updated_at_for_multiple_integrations(objects_with_consumer_and_provider) consumer_and_provider_ids = objects_with_consumer_and_provider.collect{ | object | { consumer_id: object.consumer&.id, provider_id: object.provider.id }.compact }.uniq - integration_ids_to_update = Integration - .select(:id) - .where(Sequel.|(*consumer_and_provider_ids)) - .for_update_skip_locked_if_supported + + # MySQL doesn't support an UPDATE with a subquery. FFS. Really need to do a major version release and delete the support code. + criteria = if Integration.dataset.supports_skip_locked? + integration_ids_to_update = Integration + .select(:id) + .where(Sequel.|(*consumer_and_provider_ids)) + .for_update + .skip_locked + { id: integration_ids_to_update } + else + Sequel.|(*consumer_and_provider_ids) + end + Integration - .where(id: integration_ids_to_update) + .where(criteria) .update(contract_data_updated_at: Sequel.datetime_class.now) end end