Skip to content

Commit

Permalink
feat(DailyUsage): Add daily_usages:fill_daily_usage task
Browse files Browse the repository at this point in the history
  • Loading branch information
vincent-pochet committed Nov 5, 2024
1 parent d65061c commit c420967
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 13 deletions.
30 changes: 17 additions & 13 deletions app/services/invoices/customer_usage_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,37 @@

module Invoices
class CustomerUsageService < BaseService
def initialize(customer:, subscription:, apply_taxes: true)
def initialize(customer:, subscription:, apply_taxes: true, with_cache: true, max_to_datetime: nil)
super

@apply_taxes = apply_taxes
@customer = customer
@subscription = subscription
@with_cache = with_cache

# NOTE: used to force charges_to_datetime boundary
@max_to_datetime = max_to_datetime
end

def self.with_external_ids(customer_external_id:, external_subscription_id:, organization_id:, apply_taxes: true)
customer = Customer.find_by!(external_id: customer_external_id, organization_id:)
subscription = customer&.active_subscriptions&.find_by(external_id: external_subscription_id)
new(customer:, subscription:, apply_taxes:)
rescue ActiveRecord::RecordNotFound
result.not_found_failure!(resource: 'customer')
result.not_found_failure!(resource: "customer")
end

def self.with_ids(organization_id:, customer_id:, subscription_id:, apply_taxes: true)
customer = Customer.find_by(id: customer_id, organization_id:)
subscription = customer&.active_subscriptions&.find_by(id: subscription_id)
new(customer:, subscription:, apply_taxes:)
rescue ActiveRecord::RecordNotFound
result.not_found_failure!(resource: 'customer')
result.not_found_failure!(resource: "customer")
end

def call
return result.not_found_failure!(resource: 'customer') unless @customer
return result.not_allowed_failure!(code: 'no_active_subscription') if subscription.blank?
return result.not_found_failure!(resource: "customer") unless @customer
return result.not_allowed_failure!(code: "no_active_subscription") if subscription.blank?

result.usage = compute_usage
result.invoice = invoice
Expand All @@ -37,8 +41,7 @@ def call

private

attr_reader :invoice, :subscription, :apply_taxes

attr_reader :invoice, :subscription, :apply_taxes, :with_cache, :max_to_datetime
delegate :plan, to: :subscription
delegate :organization, to: :subscription

Expand Down Expand Up @@ -71,12 +74,12 @@ def compute_usage
def add_charge_fees
query = subscription.plan.charges.joins(:billable_metric)
.includes(:taxes, billable_metric: :organization, filters: {values: :billable_metric_filter})
.order(Arel.sql('lower(unaccent(billable_metrics.name)) ASC'))
.order(Arel.sql("lower(unaccent(billable_metrics.name)) ASC"))

# we're capturing the context here so we can re-use inside the threads. This will correctly propagate spans to this current span
context = OpenTelemetry::Context.current

invoice.fees = Parallel.flat_map(query.all, in_threads: ENV['LAGO_PARALLEL_THREADS_COUNT']&.to_i || 0) do |charge|
invoice.fees = Parallel.flat_map(query.all, in_threads: ENV["LAGO_PARALLEL_THREADS_COUNT"]&.to_i || 0) do |charge|
OpenTelemetry::Context.with_current(context) do
ActiveRecord::Base.connection_pool.with_connection do
charge_usage(charge)
Expand All @@ -90,7 +93,8 @@ def charge_usage(charge)
subscription:,
charge:,
to_datetime: boundaries[:charges_to_datetime],
cache: !organization.clickhouse_events_store? # NOTE: Will be turned on in the future
# NOTE: Will be turned on for clickhouse in the future
cache: organization.clickhouse_events_store? ? false : with_cache
)

Fees::ChargeService
Expand All @@ -112,7 +116,7 @@ def boundaries
from_datetime: date_service.from_datetime,
to_datetime: date_service.to_datetime,
charges_from_datetime: date_service.charges_from_datetime,
charges_to_datetime: date_service.charges_to_datetime,
charges_to_datetime: max_to_datetime || date_service.charges_to_datetime,
issuing_date: date_service.next_end_of_period,
charges_duration: date_service.charges_duration_in_days
}
Expand Down Expand Up @@ -171,10 +175,10 @@ def compute_amounts_with_provider_taxes

def provider_taxes_cache_key
[
'provider-taxes',
"provider-taxes",
subscription.id,
plan.updated_at.iso8601
].join('/')
].join("/")
end

def format_usage
Expand Down
43 changes: 43 additions & 0 deletions lib/tasks/daily_usages.rake
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# frozen_string_literal: true

namespace :daily_usages do
desc "Fill past daily usage"
task :fill_history, [:organization_id] => :environment do |_task, args|
abort "Missing organization_id\n\n" unless args[:organization_id]

organization = Organization.find(args[:organization_id])

subscriptions = organization.subscriptions
.where(status: [:active, :terminated])
.where("started_at >= ?", 4.month.ago)

subscriptions.find_each do |subscription|
from = subscription.started_at.to_date
to = (subscription.terminated_at || Time.current).to_date

from..to.each do |date|
datetime = date + 5.minutes

Timecop.freeze(datetime) do
usage = Invoices::CustomerUsageService.call(
customer: subscription.customer,
subscription: subscription,
apply_taxes: false,
with_cache: false
).raise_if_error!.usage

DailyUsage.create!(
organization:,
customer: subscription.customer,
subscription:,
external_subscription_id: subscription.external_id,
usage: usage,
from_datetime: usage.from_datetime,
to_datetime: usage.to_datetime,
refreshed_at: date
)
end
end
end
end
end

0 comments on commit c420967

Please sign in to comment.