Skip to content

Commit

Permalink
Add Postgres LISTEN/NOTIFY support
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon committed Aug 19, 2020
1 parent 0c858be commit 6ebeee0
Show file tree
Hide file tree
Showing 26 changed files with 480 additions and 64 deletions.
14 changes: 13 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ GEM
multipart-post (>= 1.2, < 3)
faraday-http-cache (2.2.0)
faraday (>= 0.8)
ffi (1.13.1)
foreman (0.87.1)
gem-release (2.1.1)
github_changelog_generator (1.15.2)
Expand All @@ -110,6 +111,7 @@ GEM
mini_mime (1.0.2)
mini_portile2 (2.4.0)
minitest (5.14.1)
msgpack (1.3.3)
multi_json (1.15.0)
multipart-post (2.1.1)
nio4r (2.5.2)
Expand All @@ -118,13 +120,16 @@ GEM
octokit (4.18.0)
faraday (>= 0.9)
sawyer (~> 0.8.0, >= 0.5.3)
optimist (3.0.1)
parallel (1.19.2)
parser (2.7.1.4)
ast (~> 2.4.1)
pg (1.2.3)
pry (0.13.1)
coderay (~> 1.1)
method_source (~> 1.0)
pry-rails (0.3.9)
pry (>= 0.10.4)
public_suffix (4.0.5)
puma (4.3.5)
nio4r (~> 2.0)
Expand Down Expand Up @@ -159,6 +164,10 @@ GEM
thor (>= 0.20.3, < 2.0)
rainbow (3.0.0)
rake (13.0.1)
rbtrace (0.4.14)
ffi (>= 1.0.6)
msgpack (>= 0.4.3)
optimist (>= 3.0.0)
regexp_parser (1.7.1)
retriable (3.1.2)
rexml (3.2.4)
Expand Down Expand Up @@ -202,6 +211,7 @@ GEM
sawyer (0.8.2)
addressable (>= 2.3.5)
faraday (> 0.8, < 2.0)
sigdump (0.2.4)
sprockets (4.0.2)
concurrent-ruby (~> 1.0)
rack (> 1, < 3)
Expand Down Expand Up @@ -231,13 +241,15 @@ DEPENDENCIES
gem-release
github_changelog_generator
good_job!
pry
pry-rails
puma
rbtrace
rspec-rails
rubocop
rubocop-performance
rubocop-rails
rubocop-rspec
sigdump
yard

BUNDLED WITH
Expand Down
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ GoodJob is a multithreaded, Postgres-based, ActiveJob backend for Ruby on Rails.

- **Designed for ActiveJob.** Complete support for [async, queues, delays, priorities, timeouts, and retries](https://edgeguides.rubyonrails.org/active_job_basics.html) with near-zero configuration.
- **Built for Rails.** Fully adopts Ruby on Rails [threading and code execution guidelines](https://guides.rubyonrails.org/threading_and_code_execution.html) with [Concurrent::Ruby](https://github.com/ruby-concurrency/concurrent-ruby).
- **Backed by Postgres.** Relies upon Postgres integrity and session-level Advisory Locks to provide run-once safety and stay within the limits of `schema.rb`.
- **Backed by Postgres.** Relies upon Postgres integrity, session-level Advisory Locks to provide run-once safety and stay within the limits of `schema.rb`, and LISTEN/NOTIFY to reduce queuing latency.
- **For most workloads.** Targets full-stack teams, economy-minded solo developers, and applications that enqueue less than 1-million jobs/day.

For more of the story of GoodJob, read the [introductory blog post](https://island94.org/2020/07/introducing-goodjob-1-0).
Expand Down Expand Up @@ -284,15 +284,20 @@ Depending on your application configuration, you may need to take additional ste
# config/puma.rb
before_fork do
GoodJob::Scheduler.instances.each { |s| s.shutdown }
GoodJob.shutdown
end
on_worker_boot do
GoodJob::Scheduler.instances.each { |s| s.restart }
GoodJob.restart
end
on_worker_shutdown do
GoodJob::Scheduler.instances.each { |s| s.shutdown }
GoodJob.shutdown
end
MAIN_PID = Process.pid
at_exit do
GoodJob.shutdown if Process.pid == MAIN_PID
end
```
Expand Down
14 changes: 13 additions & 1 deletion gemfiles/rails_5.2.gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ GEM
multipart-post (>= 1.2, < 3)
faraday-http-cache (2.2.0)
faraday (>= 0.8)
ffi (1.13.1)
foreman (0.87.1)
gem-release (2.1.1)
github_changelog_generator (1.15.2)
Expand All @@ -97,6 +98,7 @@ GEM
mini_mime (1.0.2)
mini_portile2 (2.4.0)
minitest (5.14.1)
msgpack (1.3.3)
multi_json (1.15.0)
multipart-post (2.1.1)
nio4r (2.5.2)
Expand All @@ -105,13 +107,16 @@ GEM
octokit (4.18.0)
faraday (>= 0.9)
sawyer (~> 0.8.0, >= 0.5.3)
optimist (3.0.1)
parallel (1.19.2)
parser (2.7.1.4)
ast (~> 2.4.1)
pg (1.2.3)
pry (0.13.1)
coderay (~> 1.1)
method_source (~> 1.0)
pry-rails (0.3.9)
pry (>= 0.10.4)
public_suffix (4.0.5)
puma (4.3.5)
nio4r (~> 2.0)
Expand Down Expand Up @@ -144,6 +149,10 @@ GEM
thor (>= 0.19.0, < 2.0)
rainbow (3.0.0)
rake (13.0.1)
rbtrace (0.4.14)
ffi (>= 1.0.6)
msgpack (>= 0.4.3)
optimist (>= 3.0.0)
regexp_parser (1.7.1)
retriable (3.1.2)
rexml (3.2.4)
Expand Down Expand Up @@ -187,6 +196,7 @@ GEM
sawyer (0.8.2)
addressable (>= 2.3.5)
faraday (> 0.8, < 2.0)
sigdump (0.2.4)
sprockets (4.0.2)
concurrent-ruby (~> 1.0)
rack (> 1, < 3)
Expand Down Expand Up @@ -215,14 +225,16 @@ DEPENDENCIES
gem-release
github_changelog_generator
good_job!
pry
pry-rails
puma
rails (~> 5.2.0)
rbtrace
rspec-rails
rubocop
rubocop-performance
rubocop-rails
rubocop-rspec
sigdump
yard

BUNDLED WITH
Expand Down
14 changes: 13 additions & 1 deletion gemfiles/rails_6.0.gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ GEM
multipart-post (>= 1.2, < 3)
faraday-http-cache (2.2.0)
faraday (>= 0.8)
ffi (1.13.1)
foreman (0.87.1)
gem-release (2.1.1)
github_changelog_generator (1.15.2)
Expand All @@ -110,6 +111,7 @@ GEM
mini_mime (1.0.2)
mini_portile2 (2.4.0)
minitest (5.14.1)
msgpack (1.3.3)
multi_json (1.15.0)
multipart-post (2.1.1)
nio4r (2.5.2)
Expand All @@ -118,13 +120,16 @@ GEM
octokit (4.18.0)
faraday (>= 0.9)
sawyer (~> 0.8.0, >= 0.5.3)
optimist (3.0.1)
parallel (1.19.2)
parser (2.7.1.4)
ast (~> 2.4.1)
pg (1.2.3)
pry (0.13.1)
coderay (~> 1.1)
method_source (~> 1.0)
pry-rails (0.3.9)
pry (>= 0.10.4)
public_suffix (4.0.5)
puma (4.3.5)
nio4r (~> 2.0)
Expand Down Expand Up @@ -159,6 +164,10 @@ GEM
thor (>= 0.20.3, < 2.0)
rainbow (3.0.0)
rake (13.0.1)
rbtrace (0.4.14)
ffi (>= 1.0.6)
msgpack (>= 0.4.3)
optimist (>= 3.0.0)
regexp_parser (1.7.1)
retriable (3.1.2)
rexml (3.2.4)
Expand Down Expand Up @@ -202,6 +211,7 @@ GEM
sawyer (0.8.2)
addressable (>= 2.3.5)
faraday (> 0.8, < 2.0)
sigdump (0.2.4)
sprockets (4.0.2)
concurrent-ruby (~> 1.0)
rack (> 1, < 3)
Expand Down Expand Up @@ -231,14 +241,16 @@ DEPENDENCIES
gem-release
github_changelog_generator
good_job!
pry
pry-rails
puma
rails (~> 6.0.0)
rbtrace
rspec-rails
rubocop
rubocop-performance
rubocop-rails
rubocop-rspec
sigdump
yard

BUNDLED WITH
Expand Down
4 changes: 3 additions & 1 deletion good_job.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ Gem::Specification.new do |spec|
spec.add_development_dependency "foreman"
spec.add_development_dependency "gem-release"
spec.add_development_dependency "github_changelog_generator"
spec.add_development_dependency "pry"
spec.add_development_dependency "pry-rails"
spec.add_development_dependency "puma"
spec.add_development_dependency "rbtrace"
spec.add_development_dependency "rspec-rails"
spec.add_development_dependency "rubocop"
spec.add_development_dependency "rubocop-performance"
spec.add_development_dependency "rubocop-rails"
spec.add_development_dependency "rubocop-rspec"
spec.add_development_dependency "sigdump"
spec.add_development_dependency "yard"
end
22 changes: 22 additions & 0 deletions lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
require 'good_job/pg_locks'
require 'good_job/performer'
require 'good_job/current_execution'
require 'good_job/notifier'

require 'active_job/queue_adapters/good_job_adapter'

Expand All @@ -21,4 +22,25 @@ module GoodJob
mattr_accessor :on_thread_error, default: nil

ActiveSupport.run_load_hooks(:good_job, self)

# Shuts down all execution pools
# @param wait [Boolean] whether to wait for shutdown
# @return [void]
def self.shutdown(wait: true)
Notifier.instances.each { |adapter| adapter.shutdown(wait: wait) }
Scheduler.instances.each { |scheduler| scheduler.shutdown(wait: wait) }
end

# Tests if execution pools are shut down
# @return [Boolean] whether execution pools are shut down
def self.shutdown?
Notifier.instances.all?(&:shutdown?) && Scheduler.instances.all?(&:shutdown?)
end

# Restarts all execution pools
# @return [void]
def self.restart
Notifier.instances.each(&:restart)
Scheduler.instances.each(&:restart)
end
end
28 changes: 16 additions & 12 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,27 @@ module GoodJob
class Adapter
EXECUTION_MODES = [:async, :external, :inline].freeze

def initialize(execution_mode: nil, max_threads: nil, poll_interval: nil, scheduler: nil, inline: false)
def initialize(execution_mode: nil, queues: nil, max_threads: nil, poll_interval: nil, scheduler: nil, notifier: nil, inline: false)
if inline && execution_mode.nil?
ActiveSupport::Deprecation.warn('GoodJob::Adapter#new(inline: true) is deprecated; use GoodJob::Adapter.new(execution_mode: :inline) instead')
execution_mode = :inline
end

configuration = GoodJob::Configuration.new({
execution_mode: execution_mode,
max_threads: max_threads,
poll_interval: poll_interval,
},
env: ENV)

raise ArgumentError, "execution_mode: must be one of #{EXECUTION_MODES.join(', ')}." unless EXECUTION_MODES.include?(configuration.execution_mode)
configuration = GoodJob::Configuration.new(
execution_mode: execution_mode,
queues: queues,
max_threads: max_threads,
poll_interval: poll_interval
)

@execution_mode = configuration.execution_mode
raise ArgumentError, "execution_mode: must be one of #{EXECUTION_MODES.join(', ')}." unless EXECUTION_MODES.include?(@execution_mode)

@scheduler = scheduler
@scheduler = GoodJob::Scheduler.from_configuration(configuration) if @execution_mode == :async && @scheduler.blank?
if @execution_mode == :async # rubocop:disable Style/GuardClause
@notifier = notifier || GoodJob::Notifier.new
@scheduler = scheduler || GoodJob::Scheduler.from_configuration(configuration)
@notifier.recipients << [@scheduler, :create_thread]
end
end

def enqueue(active_job)
Expand All @@ -42,12 +44,14 @@ def enqueue_at(active_job, timestamp)
end
end

@scheduler.create_thread(queue_name: good_job.queue_name) if execute_async?
executed_locally = execute_async? && @scheduler.create_thread(queue_name: good_job.queue_name)
Notifier.notify(queue_name: good_job.queue_name) unless executed_locally

good_job
end

def shutdown(wait: true)
@notifier&.shutdown(wait: wait)
@scheduler&.shutdown(wait: wait)
end

Expand Down
10 changes: 7 additions & 3 deletions lib/good_job/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ class CLI < Thor
def start
set_up_application!

configuration = Configuration.new(options, env: ENV)
scheduler = Scheduler.from_configuration(configuration)
notifier = GoodJob::Notifier.new
configuration = GoodJob::Configuration.new(options)
scheduler = GoodJob::Scheduler.from_configuration(configuration)
notifier.recipients << [scheduler, :create_thread]

@stop_good_job_executable = false
%w[INT TERM].each do |signal|
Expand All @@ -28,9 +30,10 @@ def start

Kernel.loop do
sleep 0.1
break if @stop_good_job_executable || scheduler.shutdown?
break if @stop_good_job_executable || scheduler.shutdown? || notifier.shutdown?
end

notifier.shutdown
scheduler.shutdown
end

Expand All @@ -41,6 +44,7 @@ def start
type: :numeric,
default: 24 * 60 * 60,
desc: "Delete records finished more than this many seconds ago"

def cleanup_preserved_jobs
set_up_application!

Expand Down
2 changes: 1 addition & 1 deletion lib/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false
queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME,
priority: active_job.priority || DEFAULT_PRIORITY,
serialized_params: active_job.serialize,
scheduled_at: scheduled_at || Time.current,
scheduled_at: scheduled_at,
create_with_advisory_lock: create_with_advisory_lock
)

Expand Down
Loading

0 comments on commit 6ebeee0

Please sign in to comment.