Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler should always push a new task on completion of previous task, regardless of available thread calculation #209

Merged
merged 1 commit into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -358,4 +358,4 @@ DEPENDENCIES
yard-activesupport-concern

BUNDLED WITH
2.2.6
2.2.7
27 changes: 20 additions & 7 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class Scheduler
max_threads: Configuration::DEFAULT_MAX_THREADS,
auto_terminate: true,
idletime: 60,
max_queue: 1, # ideally zero, but 0 == infinite
max_queue: Configuration::DEFAULT_MAX_THREADS,
fallback_policy: :discard,
}.freeze

Expand Down Expand Up @@ -71,7 +71,10 @@ def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initia

@max_cache = max_cache || 0
@pool_options = DEFAULT_POOL_OPTIONS.dup
@pool_options[:max_threads] = max_threads if max_threads.present?
if max_threads.present?
@pool_options[:max_threads] = max_threads
@pool_options[:max_queue] = max_threads
end
@pool_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@pool_options[:max_threads]})"

create_pool
Expand Down Expand Up @@ -163,7 +166,7 @@ def create_thread(state = nil)
def task_observer(time, output, thread_error)
GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
instrument("finished_job_task", { result: output, error: thread_error, time: time })
create_thread if output
create_task if output
end

def warm_cache
Expand All @@ -181,11 +184,11 @@ def stats
{
name: @performer.name,
max_threads: @pool_options[:max_threads],
active_threads: @pool.ready_worker_count - @pool_options[:max_threads],
inactive_threads: @pool.ready_worker_count,
active_threads: @pool_options[:max_threads] - @pool.ready_worker_count,
available_threads: @pool.ready_worker_count,
max_cache: @max_cache,
cache_count: cache_count,
cache_remaining: remaining_cache_count,
active_cache: cache_count,
available_cache: remaining_cache_count,
}
end

Expand All @@ -200,6 +203,16 @@ def create_pool
end
end

def create_task(delay = 0)
future = Concurrent::ScheduledTask.new(delay, args: [@performer], executor: @pool, timer_set: timer_set) do |performer|
output = nil
Rails.application.executor.wrap { output = performer.next }
output
end
future.add_observer(self, :task_observer)
future.execute
end

def instrument(name, payload = {}, &block)
payload = payload.reverse_merge({
scheduler: self,
Expand Down
24 changes: 23 additions & 1 deletion spec/integration/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def perform(*args, **kwargs)
let(:adapter) { GoodJob::Adapter.new }

context 'when there are a large number of jobs' do
let(:number_of_jobs) { 1000 }
let(:number_of_jobs) { 500 }
let(:max_threads) { 5 }

let!(:good_jobs) do
Expand Down Expand Up @@ -72,6 +72,28 @@ def perform(*args, **kwargs)
end
end

context 'when a single thread' do
let(:max_threads) { 1 }
let(:number_of_jobs) { 50 }

let!(:good_jobs) do
number_of_jobs.times do |i|
ExampleJob.perform_later(i)
end
end

it 'executes all jobs' do
performer = GoodJob::JobPerformer.new('*')
scheduler = GoodJob::Scheduler.new(performer, max_threads: max_threads)
scheduler.create_thread

sleep_until(max: 10, increments_of: 0.5) do
GoodJob::Job.count == 0
end
scheduler.shutdown
end
end

context 'when job has errors' do
let!(:jobs) { ErrorJob.perform_later }

Expand Down
6 changes: 3 additions & 3 deletions spec/lib/good_job/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@
name: performer.name,
max_threads: max_threads,
active_threads: 0,
inactive_threads: max_threads,
available_threads: max_threads,
max_cache: max_cache,
cache_count: 0,
cache_remaining: max_cache,
active_cache: 0,
available_cache: max_cache,
})
end
end
Expand Down