From 1603a0907ac8177c8e02f2ff4548bf95124cc98f Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Sun, 22 Mar 2020 08:49:20 -0700 Subject: [PATCH] Generate a new future for every executed job (#20) --- lib/good_job/lockable.rb | 10 +++------- lib/good_job/scheduler.rb | 41 +++++++++++++++++---------------------- 2 files changed, 21 insertions(+), 30 deletions(-) diff --git a/lib/good_job/lockable.rb b/lib/good_job/lockable.rb index 5773b4d69..7c3784b68 100644 --- a/lib/good_job/lockable.rb +++ b/lib/good_job/lockable.rb @@ -15,18 +15,14 @@ module Lockable end) scope :advisory_unlocked, -> { joins_advisory_locks.where(pg_locks: { locktype: nil }) } - scope :with_advisory_lock, (lambda do - where(<<~SQL) - pg_try_advisory_lock(('x'||substr(md5(id::text), 1, 16))::bit(64)::bigint) - SQL - end) def self.first_advisory_locked_row(query) - find_by_sql(<<~SQL) + find_by_sql(<<~SQL).first WITH rows AS (#{query.to_sql}) - SELECT rows.id + SELECT rows.* FROM rows WHERE pg_try_advisory_lock(('x'||substr(md5(id::text), 1, 16))::bit(64)::bigint) + LIMIT 1 SQL end # private_class_method :first_advisory_locked_row diff --git a/lib/good_job/scheduler.rb b/lib/good_job/scheduler.rb index edc9c3973..534eced53 100644 --- a/lib/good_job/scheduler.rb +++ b/lib/good_job/scheduler.rb @@ -1,5 +1,5 @@ -require "concurrent/scheduled_task" require "concurrent/executor/thread_pool_executor" +require "concurrent/timer_task" require "concurrent/utility/processor_counter" module GoodJob @@ -28,7 +28,7 @@ def initialize(query = GoodJob::Job.all, timer_options: {}, pool_options: {}) idle_threads = @pool.max_length - @pool.length create_thread if idle_threads.positive? end - @timer.add_observer(TimerObserver.new) + @timer.add_observer(self, :timer_observer) @timer.execute end @@ -62,36 +62,31 @@ def shutdown? def create_thread future = Concurrent::Future.new(args: [ordered_query], executor: @pool) do |query| - loop do - executed_job = false + executed_job = false - Rails.application.executor.wrap do - good_job = query.with_advisory_lock.first - break unless good_job + Rails.application.executor.wrap do + good_job = GoodJob::Job.first_advisory_locked_row(query) + break unless good_job - executed_job = true - ActiveSupport::Notifications.instrument("job_started.good_job", { good_job: good_job }) - JobWrapper.new(good_job).perform - good_job.advisory_unlock - end - - break unless executed_job + executed_job = true + ActiveSupport::Notifications.instrument("job_started.good_job", { good_job: good_job }) + JobWrapper.new(good_job).perform + good_job.advisory_unlock end + + executed_job end - future.add_observer(TaskObserver.new) + future.add_observer(self, :task_observer) future.execute end - class TimerObserver - def update(time, result, error) - ActiveSupport::Notifications.instrument("timer_task_finished.good_job", { result: result, error: error, time: time }) - end + def timer_observer(time, result, error) + ActiveSupport::Notifications.instrument("job_finished.good_job", { result: result, error: error, time: time }) end - class TaskObserver - def update(time, result, error) - ActiveSupport::Notifications.instrument("job_finished.good_job", { result: result, error: error, time: time }) - end + def task_observer(time, executed_job, error) + ActiveSupport::Notifications.instrument("job_finished.good_job", { result: executed_job, error: error, time: time }) + create_thread if executed_job end end end