Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(DailyUsage): Add daily_usages:fill_history task #2751

Merged
merged 2 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/models/clickhouse/events_raw.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def organization
# Table name: events_raw
#
# code :string not null
# ingested_at :datetime not null
# precise_total_amount_cents :decimal(40, 15)
# properties :string not null
# timestamp :datetime not null
Expand Down
2 changes: 1 addition & 1 deletion app/services/daily_usages/compute_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def call
customer: subscription.customer,
subscription:,
external_subscription_id: subscription.external_id,
usage: ::V1::Customers::UsageSerializer.new(current_usage).serialize,
usage: ::V1::Customers::UsageSerializer.new(current_usage, includes: %i[charges_usage]).serialize,
from_datetime: current_usage.from_datetime,
to_datetime: current_usage.to_datetime,
refreshed_at: timestamp
Expand Down
33 changes: 20 additions & 13 deletions app/services/invoices/customer_usage_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,37 @@

module Invoices
class CustomerUsageService < BaseService
def initialize(customer:, subscription:, apply_taxes: true)
def initialize(customer:, subscription:, apply_taxes: true, with_cache: true, max_to_datetime: nil)
super

@apply_taxes = apply_taxes
@customer = customer
@subscription = subscription
@with_cache = with_cache

# NOTE: used to force charges_to_datetime boundary
@max_to_datetime = max_to_datetime
end

def self.with_external_ids(customer_external_id:, external_subscription_id:, organization_id:, apply_taxes: true)
customer = Customer.find_by!(external_id: customer_external_id, organization_id:)
subscription = customer&.active_subscriptions&.find_by(external_id: external_subscription_id)
new(customer:, subscription:, apply_taxes:)
rescue ActiveRecord::RecordNotFound
result.not_found_failure!(resource: 'customer')
result.not_found_failure!(resource: "customer")
end

def self.with_ids(organization_id:, customer_id:, subscription_id:, apply_taxes: true)
customer = Customer.find_by(id: customer_id, organization_id:)
subscription = customer&.active_subscriptions&.find_by(id: subscription_id)
new(customer:, subscription:, apply_taxes:)
rescue ActiveRecord::RecordNotFound
result.not_found_failure!(resource: 'customer')
result.not_found_failure!(resource: "customer")
end

def call
return result.not_found_failure!(resource: 'customer') unless @customer
return result.not_allowed_failure!(code: 'no_active_subscription') if subscription.blank?
return result.not_found_failure!(resource: "customer") unless @customer
return result.not_allowed_failure!(code: "no_active_subscription") if subscription.blank?

result.usage = compute_usage
result.invoice = invoice
Expand All @@ -37,8 +41,7 @@ def call

private

attr_reader :invoice, :subscription, :apply_taxes

attr_reader :invoice, :subscription, :apply_taxes, :with_cache, :max_to_datetime
delegate :plan, to: :subscription
delegate :organization, to: :subscription

Expand Down Expand Up @@ -71,12 +74,12 @@ def compute_usage
def add_charge_fees
query = subscription.plan.charges.joins(:billable_metric)
.includes(:taxes, billable_metric: :organization, filters: {values: :billable_metric_filter})
.order(Arel.sql('lower(unaccent(billable_metrics.name)) ASC'))
.order(Arel.sql("lower(unaccent(billable_metrics.name)) ASC"))

# we're capturing the context here so we can re-use inside the threads. This will correctly propagate spans to this current span
context = OpenTelemetry::Context.current

invoice.fees = Parallel.flat_map(query.all, in_threads: ENV['LAGO_PARALLEL_THREADS_COUNT']&.to_i || 0) do |charge|
invoice.fees = Parallel.flat_map(query.all, in_threads: ENV["LAGO_PARALLEL_THREADS_COUNT"]&.to_i || 0) do |charge|
OpenTelemetry::Context.with_current(context) do
ActiveRecord::Base.connection_pool.with_connection do
charge_usage(charge)
Expand All @@ -90,11 +93,15 @@ def charge_usage(charge)
subscription:,
charge:,
to_datetime: boundaries[:charges_to_datetime],
cache: !organization.clickhouse_events_store? # NOTE: Will be turned on in the future
# NOTE: Will be turned on for clickhouse in the future
cache: organization.clickhouse_events_store? ? false : with_cache
)

applied_boundaries = boundaries
applied_boundaries = applied_boundaries.merge(charges_to_datetime: max_to_datetime) if max_to_datetime

Fees::ChargeService
.call(invoice:, charge:, subscription:, boundaries:, current_usage: true, cache_middleware:)
.call(invoice:, charge:, subscription:, boundaries: applied_boundaries, current_usage: true, cache_middleware:)
.raise_if_error!
.fees
end
Expand Down Expand Up @@ -171,10 +178,10 @@ def compute_amounts_with_provider_taxes

def provider_taxes_cache_key
[
'provider-taxes',
"provider-taxes",
subscription.id,
plan.updated_at.iso8601
].join('/')
].join("/")
end

def format_usage
Expand Down
58 changes: 58 additions & 0 deletions lib/tasks/daily_usages.rake
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# frozen_string_literal: true

require 'timecop'

namespace :daily_usages do
desc "Fill past daily usage"
task :fill_history, [:organization_id, :days_ago] => :environment do |_task, args|
abort "Missing organization_id\n\n" unless args[:organization_id]

Rails.logger.level = Logger::INFO

days_ago = (args[:days_ago] || 120).to_i.days.ago
organization = Organization.find(args[:organization_id])

subscriptions = organization.subscriptions
.where(status: [:active, :terminated])
.where.not(started_at: nil)
.where('terminated_at IS NULL OR terminated_at >= ?', days_ago)
.includes(customer: :organization)

subscriptions.find_each do |subscription|
from = subscription.started_at.to_date
if from < days_ago
from = days_ago.to_date
end

to = (subscription.terminated_at || Time.current).to_date

(from..to).each do |date|
datetime = date.in_time_zone(subscription.customer.applicable_timezone).beginning_of_day.utc

next if date == Date.today &&
DailyUsage.refreshed_at_in_timezone(datetime).where(subscription_id: subscription.id).exists?

Timecop.freeze(datetime + 5.minutes) do
usage = Invoices::CustomerUsageService.call(
customer: subscription.customer,
subscription: subscription,
apply_taxes: false,
with_cache: false,
max_to_datetime: datetime
).raise_if_error!.usage

DailyUsage.create!(
organization:,
customer: subscription.customer,
subscription:,
external_subscription_id: subscription.external_id,
usage: ::V1::Customers::UsageSerializer.new(usage, includes: %i[charges_usage]).serialize,
from_datetime: usage.from_datetime,
to_datetime: usage.to_datetime,
refreshed_at: datetime
)
end
end
end
end
end