Skip to content

Commit

Permalink
Configuration for maximum number of job execution threads
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon committed Mar 8, 2020
1 parent 8220462 commit ff03b2d
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 15 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ $ bundle
```bash
$ bundle exec good_job
```

### Configuring Job Execution Threads

GoodJob’s executes enqueued jobs using threads. There is a lot than can be said about [multithreaded behavior in Ruby on Rails](https://guides.rubyonrails.org/threading_and_code_execution.html), but briefly:

- Each GoodJob thread requires its own database connection, which are automatically checked out from Rails’s connection pool. _Allowing GoodJob to schedule more threads than are available in the database connection pool can lead to timeouts and is not recommended._
- The maximum number of GoodJob threads can be configured, in decreasing precedence:
1. `$ bundle exec good_job --max_threads 4`
2. `$ GOOD_JOB_MAX_THREADS=4 bundle exec good_job`
3. `$ RAILS_MAX_THREADS=4 bundle exec good_job`
4. Implicitly via Rails's database connection pool size (`ActiveRecord::Base.connection_pool.size`)
## Development
Expand Down
11 changes: 8 additions & 3 deletions lib/good_job/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,23 @@ class CLI < Thor
RAILS_ENVIRONMENT_RB = File.expand_path("config/environment.rb")

desc :start, "Start jobs"
method_option :max_threads, type: :numeric
def start
require RAILS_ENVIRONMENT_RB

scheduler = GoodJob::Scheduler.new
max_threads = options[:max_threads] ||
ENV['GOOD_JOB_MAX_THREADS'] ||
ENV['RAILS_MAX_THREADS'] ||
ActiveRecord::Base.connection_pool.size

$stdout.puts "GoodJob starting with max_threads=#{max_threads}"
scheduler = GoodJob::Scheduler.new(pool_options: { max_threads: max_threads })

%w[INT TERM].each do |signal|
trap(signal) { @stop_good_job_executable = true }
end
@stop_good_job_executable = false

$stdout.puts "GoodJob waiting for jobs..."

Kernel.loop do
sleep 0.1
break if @stop_good_job_executable || scheduler.shutdown?
Expand Down
23 changes: 11 additions & 12 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

module GoodJob
class Scheduler
MAX_THREADS = Concurrent.processor_count

DEFAULT_TIMER_OPTIONS = {
execution_interval: 1,
timeout_interval: 1,
Expand All @@ -15,18 +13,18 @@ class Scheduler
DEFAULT_POOL_OPTIONS = {
name: 'good_job',
min_threads: 0,
max_threads: MAX_THREADS,
max_threads: Concurrent.processor_count,
auto_terminate: true,
idletime: 0,
max_queue: 0,
fallback_policy: :abort, # shouldn't matter -- 0 max queue
}.freeze

def initialize(query = GoodJob::Job.all, **_options)
def initialize(query = GoodJob::Job.all, timer_options: {}, pool_options: {})
@query = query

@pool = Concurrent::ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS)
@timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS) do
@pool = Concurrent::ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS.merge(pool_options))
@timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS.merge(timer_options)) do
idle_threads = @pool.max_length - @pool.length
create_thread if idle_threads.positive?
end
Expand Down Expand Up @@ -64,20 +62,21 @@ def shutdown?

def create_thread
future = Concurrent::Future.new(args: [ordered_query], executor: @pool) do |query|
Rails.application.executor.wrap do
loop do
loop do
executed_job = false

Rails.application.executor.wrap do
good_job = query.with_advisory_lock.first
break unless good_job

executed_job = true
ActiveSupport::Notifications.instrument("job_started.good_job", { good_job: good_job })

JobWrapper.new(good_job).perform

good_job.advisory_unlock
end
end

true
break unless executed_job
end
end
future.add_observer(TaskObserver.new)
future.execute
Expand Down
16 changes: 16 additions & 0 deletions spec/good_job/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,21 @@
expect(GoodJob::Scheduler).to have_received(:new)
expect(scheduler_mock).to have_received(:shutdown)
end

describe 'max threads' do
it 'defaults to --max_threads, GOOD_JOB_MAX_THREADS, RAILS_MAX_THREADS, database connection pool size' do
allow(Kernel).to receive(:loop)

cli = described_class.new([], { max_threads: 4 }, {})
stub_const 'ENV', ENV.to_hash.merge({ 'RAILS_MAX_THREADS' => 3, 'GOOD_JOB_MAX_THREADS' => 2 })
allow(ActiveRecord::Base.connection_pool).to receive(:size).and_return(1)

expect do
cli.start
end.to output.to_stdout

expect(GoodJob::Scheduler).to have_received(:new).with(pool_options: { max_threads: 4 })
end
end
end
end

0 comments on commit ff03b2d

Please sign in to comment.