Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Parrallelize charge usage computation #2076

Merged
merged 1 commit into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ gem 'aasm'
gem 'activejob-uniqueness', require: 'active_job/uniqueness/sidekiq_patch'
gem 'bootsnap', require: false
gem 'clockwork', require: false
gem 'parallel'
gem 'puma', '~> 6.4'
gem 'rails', '~> 7.0.8'
gem 'sidekiq'
Expand Down
1 change: 1 addition & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,7 @@ DEPENDENCIES
opentelemetry-instrumentation-all
opentelemetry-sdk
paper_trail
parallel
pg
puma (~> 6.4)
rack-cors
Expand Down
2 changes: 1 addition & 1 deletion app/models/invoice.rb
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def recurring_breakdown(fee)

filters = {}
if fee.charge_filter
result = ChargeFilters::MatchingAndIgnoredService.call(filter: fee.charge_filter)
result = ChargeFilters::MatchingAndIgnoredService.call(charge: fee.charge, filter: fee.charge_filter)
filters[:charge_filter] = fee.charge_filter if fee.charge_filter
filters[:matching_filters] = result.matching_filters
filters[:ignored_filters] = result.ignored_filters
Expand Down
9 changes: 4 additions & 5 deletions app/services/charge_filters/matching_and_ignored_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

module ChargeFilters
class MatchingAndIgnoredService < BaseService
def initialize(filter:)
def initialize(charge:, filter:)
@charge = charge
@filter = filter
super
end
Expand Down Expand Up @@ -56,12 +57,10 @@ def call

private

attr_reader :filter

delegate :charge, to: :filter
attr_reader :charge, :filter

def other_filters
@other_filters ||= charge.filters.where.not(id: filter.id).includes(values: :billable_metric_filter)
@other_filters ||= charge.filters.select { _1.id != filter.id }
end
end
end
2 changes: 1 addition & 1 deletion app/services/charges/pay_in_advance_aggregation_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def aggregation_filters
end

if charge_filter.present?
result = ChargeFilters::MatchingAndIgnoredService.call(filter: charge_filter)
result = ChargeFilters::MatchingAndIgnoredService.call(charge:, filter: charge_filter)
filters[:charge_filter] = charge_filter if charge_filter.persisted?
filters[:matching_filters] = result.matching_filters
filters[:ignored_filters] = result.ignored_filters
Expand Down
2 changes: 1 addition & 1 deletion app/services/fees/charge_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def aggregation_filters(charge_filter: nil)
filters[:grouped_by] = properties['grouped_by'] if charge.standard? && properties['grouped_by'].present?

if charge_filter.present?
result = ChargeFilters::MatchingAndIgnoredService.call(filter: charge_filter)
result = ChargeFilters::MatchingAndIgnoredService.call(charge:, filter: charge_filter)
filters[:charge_filter] = charge_filter
filters[:matching_filters] = result.matching_filters
filters[:ignored_filters] = result.ignored_filters
Expand Down
2 changes: 1 addition & 1 deletion app/services/invoices/calculate_fees_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def create_charges_fees(subscription, boundaries)
subscription
.plan
.charges
.includes(:billable_metric, :taxes, filters: {values: :billable_metric_filter})
.includes(:taxes, billable_metric: :organization, filters: {values: :billable_metric_filter})
.joins(:billable_metric)
.where(invoiceable: true)
.where
Expand Down
5 changes: 4 additions & 1 deletion app/services/invoices/customer_usage_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ def customer(customer_id: nil)

def add_charge_fees
query = subscription.plan.charges.joins(:billable_metric)
.includes(:taxes, billable_metric: :organization, filters: {values: :billable_metric_filter})
.order(Arel.sql('lower(unaccent(billable_metrics.name)) ASC'))

query.each { |charge| invoice.fees << charge_usage(charge) }
invoice.fees = Parallel.flat_map(query.all, in_threads: ENV['LAGO_PARALLEL_THREADS_COUNT']&.to_i || 1) do |charge|
charge_usage(charge)
end
end

def charge_usage(charge)
Expand Down
12 changes: 10 additions & 2 deletions spec/scenarios/billable_metrics/sum_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@

subscription2 = customer.subscriptions.order(:created_at).last

travel_to(DateTime.new(2024, 2, 6)) do
travel_to(DateTime.new(2024, 2, 6, 0, 1)) do
create_event(
{
code: billable_metric.code,
Expand All @@ -171,7 +171,9 @@
fetch_current_usage(customer:, subscription: subscription1)
expect(json[:customer_usage][:total_amount_cents]).to eq(24_000)
expect(json[:customer_usage][:charges_usage].first[:units]).to eq('10.0')
end

travel_to(DateTime.new(2024, 2, 6, 0, 2)) do
create_event(
{
code: billable_metric.code,
Expand All @@ -192,7 +194,9 @@
fetch_current_usage(customer:, subscription: subscription1)
expect(json[:customer_usage][:total_amount_cents]).to eq(24_000)
expect(json[:customer_usage][:charges_usage].first[:units]).to eq('5.0')
end

travel_to(DateTime.new(2024, 2, 6, 0, 3)) do
create_event(
{
code: billable_metric.code,
Expand All @@ -215,7 +219,7 @@
expect(json[:customer_usage][:charges_usage].first[:units]).to eq('7.0')
end

travel_to(DateTime.new(2024, 2, 6, 1)) do
travel_to(DateTime.new(2024, 2, 6, 1, 0, 1)) do
create_event(
{
code: billable_metric.code,
Expand All @@ -234,7 +238,9 @@
fetch_current_usage(customer:, subscription: subscription2)
expect(json[:customer_usage][:total_amount_cents]).to eq(24_000)
expect(json[:customer_usage][:charges_usage].first[:units]).to eq('10.0')
end

travel_to(DateTime.new(2024, 2, 6, 1, 0, 2)) do
create_event(
{
code: billable_metric.code,
Expand All @@ -253,7 +259,9 @@
fetch_current_usage(customer:, subscription: subscription2)
expect(json[:customer_usage][:total_amount_cents]).to eq(24_000)
expect(json[:customer_usage][:charges_usage].first[:units]).to eq('5.0')
end

travel_to(DateTime.new(2024, 2, 6, 1, 0, 3)) do
create_event(
{
code: billable_metric.code,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
require 'rails_helper'

RSpec.describe ChargeFilters::MatchingAndIgnoredService do
subject(:service_result) { described_class.call(filter: current_filter) }
subject(:service_result) { described_class.call(charge:, filter: current_filter) }

let(:billable_metric) { create(:billable_metric) }
let(:charge) { create(:standard_charge, billable_metric:) }
Expand Down
Loading