Skip to content

Commit

Permalink
Merge pull request #81 from bensheldon/filter_create_thread
Browse files Browse the repository at this point in the history
Allow Schedulers to filter #create_thread to avoid flood of queries when running async with multiple schedulers
  • Loading branch information
bensheldon authored Aug 15, 2020
2 parents 9eb09ea + 4958949 commit 0c858be
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 25 deletions.
2 changes: 1 addition & 1 deletion lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def enqueue_at(active_job, timestamp)
end
end

@scheduler.create_thread if execute_async?
@scheduler.create_thread(queue_name: good_job.queue_name) if execute_async?

good_job
end
Expand Down
41 changes: 27 additions & 14 deletions lib/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,25 @@ class Job < ActiveRecord::Base

self.table_name = 'good_jobs'.freeze

def self.queue_parser(string)
string = string.presence || '*'

if string.first == '-'
exclude_queues = true
string = string[1..-1]
end

queues = string.split(',').map(&:strip)

if queues.include?('*')
{ all: true }
elsif exclude_queues
{ exclude: queues }
else
{ include: queues }
end
end

scope :unfinished, (lambda do
if column_names.include?('finished_at')
where(finished_at: nil)
Expand All @@ -21,20 +40,14 @@ class Job < ActiveRecord::Base
scope :priority_ordered, -> { order('priority DESC NULLS LAST') }
scope :finished, ->(timestamp = nil) { timestamp ? where(arel_table['finished_at'].lteq(timestamp)) : where.not(finished_at: nil) }
scope :queue_string, (lambda do |string|
string = string.presence || '*'

if string.first == '-'
exclude_queues = true
string = string[1..-1]
end

queue_names_without_all = string.split(',').map(&:strip).reject { |q| q == '*' }
return if queue_names_without_all.size.zero?

if exclude_queues
where.not(queue_name: queue_names_without_all).or where(queue_name: nil)
else
where(queue_name: queue_names_without_all)
parsed = queue_parser(string)

if parsed[:all]
all
elsif parsed[:exclude]
where.not(queue_name: parsed[:exclude]).or where(queue_name: nil)
elsif parsed[:include]
where(queue_name: parsed[:include])
end
end)

Expand Down
11 changes: 9 additions & 2 deletions lib/good_job/multi_scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,15 @@ def restart(wait: true)
schedulers.each { |s| s.restart(wait: wait) }
end

def create_thread
schedulers.all?(&:create_thread)
def create_thread(state = nil)
if state
schedulers.any? { |scheduler| scheduler.create_thread(state) }
else
results = schedulers.map { |scheduler| scheduler.create_thread(state) }
return nil if results.all(&:nil?)

results.any?
end
end
end
end
9 changes: 8 additions & 1 deletion lib/good_job/performer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,21 @@ module GoodJob
class Performer
attr_reader :name

def initialize(target, method_name, name: nil)
def initialize(target, method_name, name: nil, filter: nil)
@target = target
@method_name = method_name
@name = name
@filter = filter
end

def next
@target.public_send(@method_name)
end

def next?(state = {})
return true unless @filter.respond_to?(:call)

@filter.call(state)
end
end
end
20 changes: 17 additions & 3 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,17 @@ def self.from_configuration(configuration)
max_threads = (max_threads || configuration.max_threads).to_i

job_query = GoodJob::Job.queue_string(queue_string)
job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock, name: queue_string)
parsed = GoodJob::Job.queue_parser(queue_string)
job_filter = proc do |state|
if parsed[:exclude]
!parsed[:exclude].include? state[:queue_name]
elsif parsed[:include]
parsed[:include].include? state[:queue_name]
else
true
end
end
job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock, name: queue_string, filter: job_filter)

timer_options = {}
timer_options[:execution_interval] = configuration.poll_interval if configuration.poll_interval.positive?
Expand Down Expand Up @@ -116,8 +126,12 @@ def restart(wait: true)

# Triggers the execution the Performer, if an execution thread is available.
# @return [Boolean]
def create_thread
return false unless @pool.ready_worker_count.positive?
def create_thread(state = nil)
return nil unless @pool.ready_worker_count.positive?

if state
return false unless @performer.next?(state)
end

future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer|
output = nil
Expand Down
4 changes: 2 additions & 2 deletions spec/lib/good_job/adapter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
RSpec.describe GoodJob::Adapter do
let(:adapter) { described_class.new }
let(:active_job) { instance_double(ApplicationJob) }
let(:good_job) { instance_double(GoodJob::Job) }
let(:good_job) { instance_double(GoodJob::Job, queue_name: 'default') }

describe '#initialize' do
it 'guards against improper execution modes' do
Expand All @@ -29,7 +29,7 @@

context 'when async' do
it 'trigger an execution thread' do
allow(GoodJob::Job).to receive(:enqueue).and_return(:good_job)
allow(GoodJob::Job).to receive(:enqueue).and_return(good_job)

scheduler = instance_double(GoodJob::Scheduler, shutdown: nil, create_thread: nil)
adapter = described_class.new(execution_mode: :async, scheduler: scheduler)
Expand Down
19 changes: 19 additions & 0 deletions spec/lib/good_job/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,25 @@ def perform(result_value = nil, raise_error: false)
end
end

describe '.queue_parser' do
it 'creates an intermediary hash' do
result = described_class.queue_parser('first,second')
expect(result).to eq({
include: %w[first second],
})

result = described_class.queue_parser('-first,second')
expect(result).to eq({
exclude: %w[first second],
})

result = described_class.queue_parser('')
expect(result).to eq({
all: true,
})
end
end

describe '.queue_string' do
it 'separates commas' do
query = described_class.queue_string('first,second')
Expand Down
16 changes: 14 additions & 2 deletions spec/lib/good_job/performer_spec.rb
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
require 'rails_helper'

RSpec.describe GoodJob::Performer do
subject(:performer) { described_class.new(target, :the_method) }

let(:target) { double('The Target', the_method: nil) } # rubocop:disable RSpec/VerifiedDoubles

describe '#next' do
it 'delegates to target#method_name' do
performer = described_class.new(target, :the_method)
performer.next

expect(target).to have_received(:the_method)
end
end

describe '#next?' do
it 'defaults to true' do
expect(performer.next?).to eq true
end

it 'returns the result of the filter and state' do
filter = ->(state) { "more #{state}" }
performer = described_class.new(target, :the_method, filter: filter)
expect(performer.next?("state")).to eq "more state"
end
end

describe '#name' do
it 'is assignable' do
performer = described_class.new(target, :the_method, name: 'test-performer')
Expand Down
20 changes: 20 additions & 0 deletions spec/lib/good_job/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
RSpec.describe GoodJob::Scheduler do
let(:performer) { instance_double(GoodJob::Performer, next: nil, name: '') }

after do
described_class.instances.each(&:shutdown)
end

context 'when thread error' do
let(:error_proc) { double("Error Collector", call: nil) } # rubocop:disable RSpec/VerifiedDoubles

Expand Down Expand Up @@ -79,6 +83,22 @@
end
end

describe '#create_thread' do
it 'returns true if the state matches the performer' do
configuration = GoodJob::Configuration.new({ queues: 'mice:2' })
scheduler = described_class.from_configuration(configuration)

expect(scheduler.create_thread(queue_name: 'mice')).to eq true
end

it 'returns false if the state does not match the performer' do
configuration = GoodJob::Configuration.new({ queues: 'mice:2' })
scheduler = described_class.from_configuration(configuration)

expect(scheduler.create_thread(queue_name: 'elephant')).to eq false
end
end

describe '.from_configuration' do
describe 'multi-scheduling' do
it 'instantiates multiple schedulers' do
Expand Down

0 comments on commit 0c858be

Please sign in to comment.