Skip to content

Commit

Permalink
Generate a new future for every executed job (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon authored Mar 22, 2020
1 parent 39600a5 commit 1603a09
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 30 deletions.
10 changes: 3 additions & 7 deletions lib/good_job/lockable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,14 @@ module Lockable
end)

scope :advisory_unlocked, -> { joins_advisory_locks.where(pg_locks: { locktype: nil }) }
scope :with_advisory_lock, (lambda do
where(<<~SQL)
pg_try_advisory_lock(('x'||substr(md5(id::text), 1, 16))::bit(64)::bigint)
SQL
end)

def self.first_advisory_locked_row(query)
find_by_sql(<<~SQL)
find_by_sql(<<~SQL).first
WITH rows AS (#{query.to_sql})
SELECT rows.id
SELECT rows.*
FROM rows
WHERE pg_try_advisory_lock(('x'||substr(md5(id::text), 1, 16))::bit(64)::bigint)
LIMIT 1
SQL
end
# private_class_method :first_advisory_locked_row
Expand Down
41 changes: 18 additions & 23 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
require "concurrent/scheduled_task"
require "concurrent/executor/thread_pool_executor"
require "concurrent/timer_task"
require "concurrent/utility/processor_counter"

module GoodJob
Expand Down Expand Up @@ -28,7 +28,7 @@ def initialize(query = GoodJob::Job.all, timer_options: {}, pool_options: {})
idle_threads = @pool.max_length - @pool.length
create_thread if idle_threads.positive?
end
@timer.add_observer(TimerObserver.new)
@timer.add_observer(self, :timer_observer)
@timer.execute
end

Expand Down Expand Up @@ -62,36 +62,31 @@ def shutdown?

def create_thread
future = Concurrent::Future.new(args: [ordered_query], executor: @pool) do |query|
loop do
executed_job = false
executed_job = false

Rails.application.executor.wrap do
good_job = query.with_advisory_lock.first
break unless good_job
Rails.application.executor.wrap do
good_job = GoodJob::Job.first_advisory_locked_row(query)
break unless good_job

executed_job = true
ActiveSupport::Notifications.instrument("job_started.good_job", { good_job: good_job })
JobWrapper.new(good_job).perform
good_job.advisory_unlock
end

break unless executed_job
executed_job = true
ActiveSupport::Notifications.instrument("job_started.good_job", { good_job: good_job })
JobWrapper.new(good_job).perform
good_job.advisory_unlock
end

executed_job
end
future.add_observer(TaskObserver.new)
future.add_observer(self, :task_observer)
future.execute
end

class TimerObserver
def update(time, result, error)
ActiveSupport::Notifications.instrument("timer_task_finished.good_job", { result: result, error: error, time: time })
end
def timer_observer(time, result, error)
ActiveSupport::Notifications.instrument("job_finished.good_job", { result: result, error: error, time: time })
end

class TaskObserver
def update(time, result, error)
ActiveSupport::Notifications.instrument("job_finished.good_job", { result: result, error: error, time: time })
end
def task_observer(time, executed_job, error)
ActiveSupport::Notifications.instrument("job_finished.good_job", { result: executed_job, error: error, time: time })
create_thread if executed_job
end
end
end

0 comments on commit 1603a09

Please sign in to comment.