Skip to content

Commit

Permalink
Add account poller (#836)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadaismail-stripe authored Oct 6, 2022
1 parent 0443c23 commit 5a3a03d
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 22 deletions.
1 change: 1 addition & 0 deletions lib/stripe-force/constants.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ class FeatureFlags < T::Enum
SF_CACHING = new('sf_caching')
CATCH_ALL_ERRORS = new('catch_all_errors')
UPDATE_CUSTOMER_ON_ORDER_TRANSLATION = new('update_customer_on_order_creation')
ACCOUNT_POLLING = new('account_polling')
end
end

Expand Down
62 changes: 62 additions & 0 deletions lib/stripe-force/jobs/account_poller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# frozen_string_literal: true
# typed: true

require_relative 'poller_base'

class StripeForce::AccountPoller < StripeForce::PollerBase
def perform
locker.lock_on_poll_job(self.class)

current_execution_time = Time.now.utc
poll_record = poll_timestamp

return if !should_poll?(current_execution_time, poll_record)

poll_record = T.must(poll_record)
last_polled_at = poll_record.last_polled_at

log.info 'initiating poll',
from: last_polled_at,
to: current_execution_time

updated_sf_accounts = backoff { @user.sf_client.query(generate_soql(last_polled_at, current_execution_time)) }

sf_account_ids_to_sync = []
updated_sf_accounts.each do |sf_account|
sf_account_ids_to_sync << sf_account.Id
end

log.info 'account query complete', size: sf_account_ids_to_sync.size

fail_if_dying_worker!

sf_account_ids_to_sync.each do |sf_account_id|
log.info 'queuing account', sf_account_id: sf_account_id
SalesforceTranslateRecordJob.work(@user, sf_account_id)
end

log.info 'poll complete', poll_size: sf_account_ids_to_sync.count

poll_record.update(last_polled_at: current_execution_time)

sf_account_ids_to_sync
end

# soql query to fetch all the sf account ids that:
# have our custom stripe field, indicating the sf account has been synced to stripe
# have a last modified time between the start and end time
private def generate_soql(time_start, time_end)
<<~EOL
SELECT Id
FROM #{poll_type}
WHERE
#{prefixed_stripe_field(GENERIC_STRIPE_ID)} != null AND
LastModifiedDate >= #{time_start.iso8601} AND
LastModifiedDate < #{time_end.iso8601}
EOL
end

private def poll_type
SF_ACCOUNT
end
end
16 changes: 10 additions & 6 deletions lib/stripe-force/jobs/initiate_poll_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,19 @@ def self.queue_polls_for_user(user)

# TODO should check if there is a valid NS + Stripe connection

log.info 'queuing poll', poll_job: StripeForce::OrderPoller
queue_poll_job_for_user(user, StripeForce::OrderPoller)

if @user.feature_enabled?(StripeForce::Constants::FeatureFlags::ACCOUNT_POLLING)
queue_poll_job_for_user(user, StripeForce::AccountPoller)
end
end

def self.queue_poll_job_for_user(user, poller_job)
log.info 'queuing poll', poll_job: poller_job

# TODO spit out to a separate job
locker = Integrations::Locker.new(user)
locker.lock_on_user do
StripeForce::OrderPoller.perform(
user: user,
locker: locker
)
poller_job.perform(user: user, locker: locker)
end
end
end
58 changes: 58 additions & 0 deletions test/integration/test_account_poller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# frozen_string_literal: true
# typed: ignore

require_relative '../test_helper'

class Critic::AccountPollerTest < Critic::FunctionalTest
before do
@user = make_user(save: true)
inline_job_processing!
end

it 'does not poll if no initial poll timestamp is set' do
@user.enable_feature(FeatureFlags::ACCOUNT_POLLING)

locker = Integrations::Locker.new(@user)

StripeForce::AccountPoller.perform(user: @user, locker: locker)
assert_equal(0, StripeForce::PollTimestamp.count)
end

it 'polls only if feature flag ACCOUNT_POLLING is enabled' do
# enable account polling
@user.enable_feature(FeatureFlags::ACCOUNT_POLLING)

# set up the intiial poll timestamp to enable account polling
initial_poll_timestamp = set_initial_poll_timestamp(SF_ACCOUNT).last_polled_at

# create an sf account and translate
sf_account_id = create_salesforce_account
StripeForce::Translate.perform_inline(@user, sf_account_id)

locker = Integrations::Locker.new(@user)

# assert that account translation will occur
SalesforceTranslateRecordJob.expects(:perform).at_least_once

# kick off account poll job for this user
StripeForce::AccountPoller.perform(user: @user, locker: locker)

# get the poll timestamp for this user's account
poll_timestamp = StripeForce::PollTimestamp.by_user_and_record(
@user,
SF_ACCOUNT
)
# confirm we have polled again since the initial poll timestamp
assert(poll_timestamp.last_polled_at - initial_poll_timestamp > initial_poll_delta)
assert_equal(SF_ACCOUNT, poll_timestamp.integration_record_type)

# disable feature ACCOUNT_POLLING
@user.disable_feature(FeatureFlags::ACCOUNT_POLLING)

# assert that account translation will not occur since feature flag is disabled
SalesforceTranslateRecordJob.expects(:perform).never

# kick off account poll job for this user
StripeForce::AccountPoller.perform(user: @user, locker: locker)
end
end
16 changes: 0 additions & 16 deletions test/integration/test_order_poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,6 @@ class Critic::OrderPollerTest < Critic::FunctionalTest
inline_job_processing!
end

def initial_poll_delta; 60 * 60 * 24 end

def set_initial_poll_timestamp(sf_class)
initial_poll = DateTime.now - initial_poll_delta

poll_timestamp = StripeForce::PollTimestamp.build_with_user_and_record(
@user,
sf_class
)

poll_timestamp.last_polled_at = initial_poll
poll_timestamp.save

poll_timestamp
end

it 'polls orders and does not allow two polls to run at once' do
# must persist user record in order for initial poll job to pick it up
@user.save
Expand Down
16 changes: 16 additions & 0 deletions test/support/common_helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,20 @@ def common_teardown
def redis
Resque.redis
end

def initial_poll_delta; 60 * 60 * 24 end

def set_initial_poll_timestamp(sf_class)
initial_poll = DateTime.now - initial_poll_delta

poll_timestamp = StripeForce::PollTimestamp.build_with_user_and_record(
@user,
sf_class
)

poll_timestamp.last_polled_at = initial_poll
poll_timestamp.save

poll_timestamp
end
end

0 comments on commit 5a3a03d

Please sign in to comment.