Skip to content

Commit

Permalink
Merge pull request #2 from bensheldon/thread_grow
Browse files Browse the repository at this point in the history
Fetch new jobs within the worker thread itself; incrementally grow worker threads
  • Loading branch information
bensheldon authored Mar 2, 2020
2 parents 6f4b7b9 + 856b28d commit 609b903
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 63 deletions.
15 changes: 5 additions & 10 deletions lib/good_job/job_wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,12 @@ def initialize(good_job)
end

def perform
# Rails.logger.info "Perform job_id #{@good_job.id}: on thread #{Thread.current.name}"
@good_job.with_advisory_lock do
@good_job.reload
serialized_params = @good_job.serialized_params.merge(
"provider_job_id" => @good_job.id
)
ActiveJob::Base.execute(serialized_params)

serialized_params = @good_job.serialized_params.merge(
"provider_job_id" => @good_job.id
)
ActiveJob::Base.execute(serialized_params)

@good_job.destroy!
end
@good_job.destroy!
end
end
end
10 changes: 5 additions & 5 deletions lib/good_job/lockable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ module Lockable
end)

scope :advisory_unlocked, -> { joins_advisory_locks.where(pg_locks: { locktype: nil }) }
# scope :with_advisory_lock, (lambda do
# joins(<<~SQL)
# RIGHT JOIN pg_try_advisory_lock(('x'||substr(md5(id::text), 1, 16))::bit(64)::bigint) ON 1 = 1
# SQL
# end)
scope :with_advisory_lock, (lambda do
where(<<~SQL)
pg_try_advisory_lock(('x'||substr(md5(id::text), 1, 16))::bit(64)::bigint)
SQL
end)

def self.first_advisory_locked_row(query)
find_by_sql(<<~SQL)
Expand Down
47 changes: 15 additions & 32 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,16 @@ class Scheduler
def initialize(query = GoodJob::Job.all, **options)
@query = query

@active_jobs = Concurrent::Array.new
@pool = Concurrent::ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS)
@timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS) do
schedule_jobs
idle_threads = @pool.max_length - @pool.length
puts "There are idle_threads: #{idle_threads}"
create_thread if idle_threads.positive?
true
end
# @timer.add_observer(TaskObserver.new)
@timer.execute
end

def active_jobs
@active_jobs
end

def execute
end

Expand All @@ -56,50 +52,37 @@ def shutdown(wait: true)
end
end

def schedule_jobs(count = 100)
idle_threads = @pool.max_length - @active_jobs.count
to_enqueue = [count, idle_threads].min
return if to_enqueue.zero?

jobs = @query.advisory_unlocked.limit(to_enqueue).load
puts "Scheduling #{jobs.size} job(s)"
jobs.each { |job| schedule_job(job) }
end

def schedule_job(job)
future = Concurrent::Future.new(args: [job], executor: @pool) do |j|
def create_thread
future = Concurrent::Future.new(args: [@query], executor: @pool) do |query|
Rails.application.executor.wrap do
thread_name = Thread.current.name || Thread.current.object_id
puts "Executing job #{job.id} in thread #{thread_name}"
while job = query.with_advisory_lock.first
puts "Executing job #{job.id} in thread #{thread_name}"

JobWrapper.new(j).perform
JobWrapper.new(job).perform

job.advisory_unlock
end
true
end
end
future.add_observer(TaskObserver.new(job, @active_jobs, self))
future.add_observer(TaskObserver.new(self))
future.execute
@active_jobs << job
end

class TaskObserver
def initialize(job, active_jobs, scheduler)
@job = job
@active_jobs = active_jobs
def initialize(scheduler)
@scheduler = scheduler
end

def update(time, result, ex)
if result
puts "(#{time}) Execution of #{@job.id} successfully returned #{result}\n"
puts "(#{time}) Execution successfully returned #{result}\n"
elsif ex.is_a?(Concurrent::TimeoutError)
puts "(#{time}) Execution of #{@job.id} timed out\n"
puts "(#{time}) Execution timed out\n"
else
puts "(#{time}) Execution of #{@job.id} failed with error #{result} #{ex}\n"
puts "(#{time}) Execution failed with error #{result} #{ex}\n"
end

@active_jobs.delete(@job)
@scheduler.schedule_jobs(1)
end
end
end
Expand Down
14 changes: 0 additions & 14 deletions spec/good_job/job_wrapper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,4 @@ def perform(*args, **kwargs)
end
end)
end

it 'locks the job and prevents from being run at same time twice' do
ExampleJob.perform_later

good_job = GoodJob::Job.last
expect(good_job).to be_present

thread1 = Concurrent::Promises.future(good_job) { |j| GoodJob::JobWrapper.new(j).perform }
thread2 = Concurrent::Promises.future(good_job) { |j| GoodJob::JobWrapper.new(j).perform }

expect do
Concurrent::Promises.zip(thread1, thread2).value!
end.to raise_error GoodJob::Lockable::RecordAlreadyAdvisoryLockedError
end
end
4 changes: 2 additions & 2 deletions spec/good_job/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def perform(*args, **kwargs)

let(:adapter) { GoodJob::Adapter.new }

let(:number_of_jobs) { 50 }
let(:number_of_jobs) { 250 }

let!(:good_jobs) do
number_of_jobs.times do |i|
Expand All @@ -38,7 +38,7 @@ def perform(*args, **kwargs)
it 'pops items off of the queue and runs them' do
scheduler = GoodJob::Scheduler.new

Timeout::timeout(5) do
Timeout.timeout(5) do
sleep(0.5) until GoodJob::Job.count == 0
end

Expand Down

0 comments on commit 609b903

Please sign in to comment.