From ff03b2da4f04a5095af915107c11621ab93fe6ee Mon Sep 17 00:00:00 2001 From: Ben Sheldon Date: Sat, 7 Mar 2020 21:41:51 -0800 Subject: [PATCH] Configuration for maximum number of job execution threads --- README.md | 11 +++++++++++ lib/good_job/cli.rb | 11 ++++++++--- lib/good_job/scheduler.rb | 23 +++++++++++------------ spec/good_job/cli_spec.rb | 16 ++++++++++++++++ 4 files changed, 46 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index f4bb236b4..3abdaff3c 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,17 @@ $ bundle ```bash $ bundle exec good_job ``` + +### Configuring Job Execution Threads + +GoodJob’s executes enqueued jobs using threads. There is a lot than can be said about [multithreaded behavior in Ruby on Rails](https://guides.rubyonrails.org/threading_and_code_execution.html), but briefly: + +- Each GoodJob thread requires its own database connection, which are automatically checked out from Rails’s connection pool. _Allowing GoodJob to schedule more threads than are available in the database connection pool can lead to timeouts and is not recommended._ +- The maximum number of GoodJob threads can be configured, in decreasing precedence: + 1. `$ bundle exec good_job --max_threads 4` + 2. `$ GOOD_JOB_MAX_THREADS=4 bundle exec good_job` + 3. `$ RAILS_MAX_THREADS=4 bundle exec good_job` + 4. Implicitly via Rails's database connection pool size (`ActiveRecord::Base.connection_pool.size`) ## Development diff --git a/lib/good_job/cli.rb b/lib/good_job/cli.rb index a7ff7b30c..43cf756d0 100644 --- a/lib/good_job/cli.rb +++ b/lib/good_job/cli.rb @@ -5,18 +5,23 @@ class CLI < Thor RAILS_ENVIRONMENT_RB = File.expand_path("config/environment.rb") desc :start, "Start jobs" + method_option :max_threads, type: :numeric def start require RAILS_ENVIRONMENT_RB - scheduler = GoodJob::Scheduler.new + max_threads = options[:max_threads] || + ENV['GOOD_JOB_MAX_THREADS'] || + ENV['RAILS_MAX_THREADS'] || + ActiveRecord::Base.connection_pool.size + + $stdout.puts "GoodJob starting with max_threads=#{max_threads}" + scheduler = GoodJob::Scheduler.new(pool_options: { max_threads: max_threads }) %w[INT TERM].each do |signal| trap(signal) { @stop_good_job_executable = true } end @stop_good_job_executable = false - $stdout.puts "GoodJob waiting for jobs..." - Kernel.loop do sleep 0.1 break if @stop_good_job_executable || scheduler.shutdown? diff --git a/lib/good_job/scheduler.rb b/lib/good_job/scheduler.rb index b5f715b9e..edc9c3973 100644 --- a/lib/good_job/scheduler.rb +++ b/lib/good_job/scheduler.rb @@ -4,8 +4,6 @@ module GoodJob class Scheduler - MAX_THREADS = Concurrent.processor_count - DEFAULT_TIMER_OPTIONS = { execution_interval: 1, timeout_interval: 1, @@ -15,18 +13,18 @@ class Scheduler DEFAULT_POOL_OPTIONS = { name: 'good_job', min_threads: 0, - max_threads: MAX_THREADS, + max_threads: Concurrent.processor_count, auto_terminate: true, idletime: 0, max_queue: 0, fallback_policy: :abort, # shouldn't matter -- 0 max queue }.freeze - def initialize(query = GoodJob::Job.all, **_options) + def initialize(query = GoodJob::Job.all, timer_options: {}, pool_options: {}) @query = query - @pool = Concurrent::ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS) - @timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS) do + @pool = Concurrent::ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS.merge(pool_options)) + @timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS.merge(timer_options)) do idle_threads = @pool.max_length - @pool.length create_thread if idle_threads.positive? end @@ -64,20 +62,21 @@ def shutdown? def create_thread future = Concurrent::Future.new(args: [ordered_query], executor: @pool) do |query| - Rails.application.executor.wrap do - loop do + loop do + executed_job = false + + Rails.application.executor.wrap do good_job = query.with_advisory_lock.first break unless good_job + executed_job = true ActiveSupport::Notifications.instrument("job_started.good_job", { good_job: good_job }) - JobWrapper.new(good_job).perform - good_job.advisory_unlock end - end - true + break unless executed_job + end end future.add_observer(TaskObserver.new) future.execute diff --git a/spec/good_job/cli_spec.rb b/spec/good_job/cli_spec.rb index 8998c2a35..6ca0d9b5e 100644 --- a/spec/good_job/cli_spec.rb +++ b/spec/good_job/cli_spec.rb @@ -43,5 +43,21 @@ expect(GoodJob::Scheduler).to have_received(:new) expect(scheduler_mock).to have_received(:shutdown) end + + describe 'max threads' do + it 'defaults to --max_threads, GOOD_JOB_MAX_THREADS, RAILS_MAX_THREADS, database connection pool size' do + allow(Kernel).to receive(:loop) + + cli = described_class.new([], { max_threads: 4 }, {}) + stub_const 'ENV', ENV.to_hash.merge({ 'RAILS_MAX_THREADS' => 3, 'GOOD_JOB_MAX_THREADS' => 2 }) + allow(ActiveRecord::Base.connection_pool).to receive(:size).and_return(1) + + expect do + cli.start + end.to output.to_stdout + + expect(GoodJob::Scheduler).to have_received(:new).with(pool_options: { max_threads: 4 }) + end + end end end