Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configuration for maximum number of job execution threads #18

Merged
merged 1 commit into from
Mar 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ $ bundle
```bash
$ bundle exec good_job
```

### Configuring Job Execution Threads

GoodJob executes enqueued jobs using threads. There is a lot than can be said about [multithreaded behavior in Ruby on Rails](https://guides.rubyonrails.org/threading_and_code_execution.html), but briefly:

- Each GoodJob execution thread requires its own database connection, which are automatically checked out from Rails’s connection pool. _Allowing GoodJob to schedule more threads than are available in the database connection pool can lead to timeouts and is not recommended._
- The maximum number of GoodJob threads can be configured, in decreasing precedence:
1. `$ bundle exec good_job --max_threads 4`
2. `$ GOOD_JOB_MAX_THREADS=4 bundle exec good_job`
3. `$ RAILS_MAX_THREADS=4 bundle exec good_job`
4. Implicitly via Rails's database connection pool size (`ActiveRecord::Base.connection_pool.size`)

## Development

Expand Down
11 changes: 8 additions & 3 deletions lib/good_job/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,23 @@ class CLI < Thor
RAILS_ENVIRONMENT_RB = File.expand_path("config/environment.rb")

desc :start, "Start jobs"
method_option :max_threads, type: :numeric
def start
require RAILS_ENVIRONMENT_RB

scheduler = GoodJob::Scheduler.new
max_threads = options[:max_threads] ||
ENV['GOOD_JOB_MAX_THREADS'] ||
ENV['RAILS_MAX_THREADS'] ||
ActiveRecord::Base.connection_pool.size

$stdout.puts "GoodJob starting with max_threads=#{max_threads}"
scheduler = GoodJob::Scheduler.new(pool_options: { max_threads: max_threads })

%w[INT TERM].each do |signal|
trap(signal) { @stop_good_job_executable = true }
end
@stop_good_job_executable = false

$stdout.puts "GoodJob waiting for jobs..."

Kernel.loop do
sleep 0.1
break if @stop_good_job_executable || scheduler.shutdown?
Expand Down
23 changes: 11 additions & 12 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

module GoodJob
class Scheduler
MAX_THREADS = Concurrent.processor_count

DEFAULT_TIMER_OPTIONS = {
execution_interval: 1,
timeout_interval: 1,
Expand All @@ -15,18 +13,18 @@ class Scheduler
DEFAULT_POOL_OPTIONS = {
name: 'good_job',
min_threads: 0,
max_threads: MAX_THREADS,
max_threads: Concurrent.processor_count,
auto_terminate: true,
idletime: 0,
max_queue: 0,
fallback_policy: :abort, # shouldn't matter -- 0 max queue
}.freeze

def initialize(query = GoodJob::Job.all, **_options)
def initialize(query = GoodJob::Job.all, timer_options: {}, pool_options: {})
@query = query

@pool = Concurrent::ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS)
@timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS) do
@pool = Concurrent::ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS.merge(pool_options))
@timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS.merge(timer_options)) do
idle_threads = @pool.max_length - @pool.length
create_thread if idle_threads.positive?
end
Expand Down Expand Up @@ -64,20 +62,21 @@ def shutdown?

def create_thread
future = Concurrent::Future.new(args: [ordered_query], executor: @pool) do |query|
Rails.application.executor.wrap do
loop do
loop do
executed_job = false

Rails.application.executor.wrap do
good_job = query.with_advisory_lock.first
break unless good_job

executed_job = true
ActiveSupport::Notifications.instrument("job_started.good_job", { good_job: good_job })

JobWrapper.new(good_job).perform

good_job.advisory_unlock
end
end

true
break unless executed_job
end
end
future.add_observer(TaskObserver.new)
future.execute
Expand Down
16 changes: 16 additions & 0 deletions spec/good_job/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,21 @@
expect(GoodJob::Scheduler).to have_received(:new)
expect(scheduler_mock).to have_received(:shutdown)
end

describe 'max threads' do
it 'defaults to --max_threads, GOOD_JOB_MAX_THREADS, RAILS_MAX_THREADS, database connection pool size' do
allow(Kernel).to receive(:loop)

cli = described_class.new([], { max_threads: 4 }, {})
stub_const 'ENV', ENV.to_hash.merge({ 'RAILS_MAX_THREADS' => 3, 'GOOD_JOB_MAX_THREADS' => 2 })
allow(ActiveRecord::Base.connection_pool).to receive(:size).and_return(1)

expect do
cli.start
end.to output.to_stdout

expect(GoodJob::Scheduler).to have_received(:new).with(pool_options: { max_threads: 4 })
end
end
end
end
2 changes: 1 addition & 1 deletion spec/good_job/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def perform(*args, **kwargs)

it 'pops items off of the queue and runs them' do
scheduler = described_class.new
sleep_until { GoodJob::Job.count == 0 }
sleep_until(max: 5, increments_of: 0.5) { GoodJob::Job.count == 0 }

if RUN_JOBS.size != number_of_jobs
jobs = THREAD_JOBS.values.flatten
Expand Down