From c62f769318225b8c986912d88bbe9cb164c91335 Mon Sep 17 00:00:00 2001 From: Vincent Pochet Date: Tue, 28 May 2024 10:10:10 +0200 Subject: [PATCH] feat: Parralelize charge usage computation --- Gemfile | 1 + Gemfile.lock | 1 + app/models/invoice.rb | 2 +- .../charge_filters/matching_and_ignored_service.rb | 9 ++++----- .../charges/pay_in_advance_aggregation_service.rb | 2 +- app/services/fees/charge_service.rb | 2 +- app/services/invoices/calculate_fees_service.rb | 2 +- app/services/invoices/customer_usage_service.rb | 5 ++++- spec/scenarios/billable_metrics/sum_spec.rb | 12 ++++++++++-- .../matching_and_ignored_service_spec.rb | 2 +- 10 files changed, 25 insertions(+), 13 deletions(-) diff --git a/Gemfile b/Gemfile index c37fa34bdc3..c8fabdd306b 100644 --- a/Gemfile +++ b/Gemfile @@ -10,6 +10,7 @@ gem 'aasm' gem 'activejob-uniqueness', require: 'active_job/uniqueness/sidekiq_patch' gem 'bootsnap', require: false gem 'clockwork', require: false +gem 'parallel' gem 'puma', '~> 6.4' gem 'rails', '~> 7.0.8' gem 'sidekiq' diff --git a/Gemfile.lock b/Gemfile.lock index 9da9a74463b..653b2df77e3 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -892,6 +892,7 @@ DEPENDENCIES opentelemetry-instrumentation-all opentelemetry-sdk paper_trail + parallel pg puma (~> 6.4) rack-cors diff --git a/app/models/invoice.rb b/app/models/invoice.rb index 970261a18ce..2cad4a28fb9 100644 --- a/app/models/invoice.rb +++ b/app/models/invoice.rb @@ -170,7 +170,7 @@ def recurring_breakdown(fee) filters = {} if fee.charge_filter - result = ChargeFilters::MatchingAndIgnoredService.call(filter: fee.charge_filter) + result = ChargeFilters::MatchingAndIgnoredService.call(charge: fee.charge, filter: fee.charge_filter) filters[:charge_filter] = fee.charge_filter if fee.charge_filter filters[:matching_filters] = result.matching_filters filters[:ignored_filters] = result.ignored_filters diff --git a/app/services/charge_filters/matching_and_ignored_service.rb b/app/services/charge_filters/matching_and_ignored_service.rb index 843608359f4..798346902d5 100644 --- a/app/services/charge_filters/matching_and_ignored_service.rb +++ b/app/services/charge_filters/matching_and_ignored_service.rb @@ -2,7 +2,8 @@ module ChargeFilters class MatchingAndIgnoredService < BaseService - def initialize(filter:) + def initialize(charge:, filter:) + @charge = charge @filter = filter super end @@ -56,12 +57,10 @@ def call private - attr_reader :filter - - delegate :charge, to: :filter + attr_reader :charge, :filter def other_filters - @other_filters ||= charge.filters.where.not(id: filter.id).includes(values: :billable_metric_filter) + @other_filters ||= charge.filters.select { _1.id != filter.id } end end end diff --git a/app/services/charges/pay_in_advance_aggregation_service.rb b/app/services/charges/pay_in_advance_aggregation_service.rb index 47770bfa427..acafec63796 100644 --- a/app/services/charges/pay_in_advance_aggregation_service.rb +++ b/app/services/charges/pay_in_advance_aggregation_service.rb @@ -52,7 +52,7 @@ def aggregation_filters end if charge_filter.present? - result = ChargeFilters::MatchingAndIgnoredService.call(filter: charge_filter) + result = ChargeFilters::MatchingAndIgnoredService.call(charge:, filter: charge_filter) filters[:charge_filter] = charge_filter if charge_filter.persisted? filters[:matching_filters] = result.matching_filters filters[:ignored_filters] = result.ignored_filters diff --git a/app/services/fees/charge_service.rb b/app/services/fees/charge_service.rb index 4b817a61610..7f04db2b858 100644 --- a/app/services/fees/charge_service.rb +++ b/app/services/fees/charge_service.rb @@ -242,7 +242,7 @@ def aggregation_filters(charge_filter: nil) filters[:grouped_by] = properties['grouped_by'] if charge.standard? && properties['grouped_by'].present? if charge_filter.present? - result = ChargeFilters::MatchingAndIgnoredService.call(filter: charge_filter) + result = ChargeFilters::MatchingAndIgnoredService.call(charge:, filter: charge_filter) filters[:charge_filter] = charge_filter filters[:matching_filters] = result.matching_filters filters[:ignored_filters] = result.ignored_filters diff --git a/app/services/invoices/calculate_fees_service.rb b/app/services/invoices/calculate_fees_service.rb index 1239896d39a..ffdbbc22536 100644 --- a/app/services/invoices/calculate_fees_service.rb +++ b/app/services/invoices/calculate_fees_service.rb @@ -101,7 +101,7 @@ def create_charges_fees(subscription, boundaries) subscription .plan .charges - .includes(:billable_metric, :taxes, filters: {values: :billable_metric_filter}) + .includes(:taxes, billable_metric: :organization, filters: {values: :billable_metric_filter}) .joins(:billable_metric) .where(invoiceable: true) .where diff --git a/app/services/invoices/customer_usage_service.rb b/app/services/invoices/customer_usage_service.rb index 0a4d19531d1..c8bae62e765 100644 --- a/app/services/invoices/customer_usage_service.rb +++ b/app/services/invoices/customer_usage_service.rb @@ -63,9 +63,12 @@ def customer(customer_id: nil) def add_charge_fees query = subscription.plan.charges.joins(:billable_metric) + .includes(:taxes, billable_metric: :organization, filters: {values: :billable_metric_filter}) .order(Arel.sql('lower(unaccent(billable_metrics.name)) ASC')) - query.each { |charge| invoice.fees << charge_usage(charge) } + invoice.fees = Parallel.flat_map(query.all, in_threads: ENV['LAGO_PARALLEL_THREADS_COUNT']&.to_i || 1) do |charge| + charge_usage(charge) + end end def charge_usage(charge) diff --git a/spec/scenarios/billable_metrics/sum_spec.rb b/spec/scenarios/billable_metrics/sum_spec.rb index 3a178593cad..8a03c3ab429 100644 --- a/spec/scenarios/billable_metrics/sum_spec.rb +++ b/spec/scenarios/billable_metrics/sum_spec.rb @@ -150,7 +150,7 @@ subscription2 = customer.subscriptions.order(:created_at).last - travel_to(DateTime.new(2024, 2, 6)) do + travel_to(DateTime.new(2024, 2, 6, 0, 1)) do create_event( { code: billable_metric.code, @@ -171,7 +171,9 @@ fetch_current_usage(customer:, subscription: subscription1) expect(json[:customer_usage][:total_amount_cents]).to eq(24_000) expect(json[:customer_usage][:charges_usage].first[:units]).to eq('10.0') + end + travel_to(DateTime.new(2024, 2, 6, 0, 2)) do create_event( { code: billable_metric.code, @@ -192,7 +194,9 @@ fetch_current_usage(customer:, subscription: subscription1) expect(json[:customer_usage][:total_amount_cents]).to eq(24_000) expect(json[:customer_usage][:charges_usage].first[:units]).to eq('5.0') + end + travel_to(DateTime.new(2024, 2, 6, 0, 3)) do create_event( { code: billable_metric.code, @@ -215,7 +219,7 @@ expect(json[:customer_usage][:charges_usage].first[:units]).to eq('7.0') end - travel_to(DateTime.new(2024, 2, 6, 1)) do + travel_to(DateTime.new(2024, 2, 6, 1, 0, 1)) do create_event( { code: billable_metric.code, @@ -234,7 +238,9 @@ fetch_current_usage(customer:, subscription: subscription2) expect(json[:customer_usage][:total_amount_cents]).to eq(24_000) expect(json[:customer_usage][:charges_usage].first[:units]).to eq('10.0') + end + travel_to(DateTime.new(2024, 2, 6, 1, 0, 2)) do create_event( { code: billable_metric.code, @@ -253,7 +259,9 @@ fetch_current_usage(customer:, subscription: subscription2) expect(json[:customer_usage][:total_amount_cents]).to eq(24_000) expect(json[:customer_usage][:charges_usage].first[:units]).to eq('5.0') + end + travel_to(DateTime.new(2024, 2, 6, 1, 0, 3)) do create_event( { code: billable_metric.code, diff --git a/spec/services/charge_filters/matching_and_ignored_service_spec.rb b/spec/services/charge_filters/matching_and_ignored_service_spec.rb index aee3f8303e6..85783e3df4a 100644 --- a/spec/services/charge_filters/matching_and_ignored_service_spec.rb +++ b/spec/services/charge_filters/matching_and_ignored_service_spec.rb @@ -3,7 +3,7 @@ require 'rails_helper' RSpec.describe ChargeFilters::MatchingAndIgnoredService do - subject(:service_result) { described_class.call(filter: current_filter) } + subject(:service_result) { described_class.call(charge:, filter: current_filter) } let(:billable_metric) { create(:billable_metric) } let(:charge) { create(:standard_charge, billable_metric:) }