Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Postgres LISTEN/NOTIFY support #82

Merged
merged 1 commit into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ GEM
multipart-post (>= 1.2, < 3)
faraday-http-cache (2.2.0)
faraday (>= 0.8)
ffi (1.13.1)
foreman (0.87.1)
gem-release (2.1.1)
github_changelog_generator (1.15.2)
Expand All @@ -110,6 +111,7 @@ GEM
mini_mime (1.0.2)
mini_portile2 (2.4.0)
minitest (5.14.1)
msgpack (1.3.3)
multi_json (1.15.0)
multipart-post (2.1.1)
nio4r (2.5.2)
Expand All @@ -118,13 +120,16 @@ GEM
octokit (4.18.0)
faraday (>= 0.9)
sawyer (~> 0.8.0, >= 0.5.3)
optimist (3.0.1)
parallel (1.19.2)
parser (2.7.1.4)
ast (~> 2.4.1)
pg (1.2.3)
pry (0.13.1)
coderay (~> 1.1)
method_source (~> 1.0)
pry-rails (0.3.9)
pry (>= 0.10.4)
public_suffix (4.0.5)
puma (4.3.5)
nio4r (~> 2.0)
Expand Down Expand Up @@ -159,6 +164,10 @@ GEM
thor (>= 0.20.3, < 2.0)
rainbow (3.0.0)
rake (13.0.1)
rbtrace (0.4.14)
ffi (>= 1.0.6)
msgpack (>= 0.4.3)
optimist (>= 3.0.0)
regexp_parser (1.7.1)
retriable (3.1.2)
rexml (3.2.4)
Expand Down Expand Up @@ -202,6 +211,7 @@ GEM
sawyer (0.8.2)
addressable (>= 2.3.5)
faraday (> 0.8, < 2.0)
sigdump (0.2.4)
sprockets (4.0.2)
concurrent-ruby (~> 1.0)
rack (> 1, < 3)
Expand Down Expand Up @@ -231,13 +241,15 @@ DEPENDENCIES
gem-release
github_changelog_generator
good_job!
pry
pry-rails
puma
rbtrace
rspec-rails
rubocop
rubocop-performance
rubocop-rails
rubocop-rspec
sigdump
yard

BUNDLED WITH
Expand Down
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ GoodJob is a multithreaded, Postgres-based, ActiveJob backend for Ruby on Rails.

- **Designed for ActiveJob.** Complete support for [async, queues, delays, priorities, timeouts, and retries](https://edgeguides.rubyonrails.org/active_job_basics.html) with near-zero configuration.
- **Built for Rails.** Fully adopts Ruby on Rails [threading and code execution guidelines](https://guides.rubyonrails.org/threading_and_code_execution.html) with [Concurrent::Ruby](https://github.com/ruby-concurrency/concurrent-ruby).
- **Backed by Postgres.** Relies upon Postgres integrity and session-level Advisory Locks to provide run-once safety and stay within the limits of `schema.rb`.
- **Backed by Postgres.** Relies upon Postgres integrity, session-level Advisory Locks to provide run-once safety and stay within the limits of `schema.rb`, and LISTEN/NOTIFY to reduce queuing latency.
- **For most workloads.** Targets full-stack teams, economy-minded solo developers, and applications that enqueue less than 1-million jobs/day.

For more of the story of GoodJob, read the [introductory blog post](https://island94.org/2020/07/introducing-goodjob-1-0).
Expand Down Expand Up @@ -284,15 +284,20 @@ Depending on your application configuration, you may need to take additional ste
# config/puma.rb

before_fork do
GoodJob::Scheduler.instances.each { |s| s.shutdown }
GoodJob.shutdown
end

on_worker_boot do
GoodJob::Scheduler.instances.each { |s| s.restart }
GoodJob.restart
end

on_worker_shutdown do
GoodJob::Scheduler.instances.each { |s| s.shutdown }
GoodJob.shutdown
end

MAIN_PID = Process.pid
at_exit do
GoodJob.shutdown if Process.pid == MAIN_PID
end
```

Expand Down
14 changes: 13 additions & 1 deletion gemfiles/rails_5.2.gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ GEM
multipart-post (>= 1.2, < 3)
faraday-http-cache (2.2.0)
faraday (>= 0.8)
ffi (1.13.1)
foreman (0.87.1)
gem-release (2.1.1)
github_changelog_generator (1.15.2)
Expand All @@ -97,6 +98,7 @@ GEM
mini_mime (1.0.2)
mini_portile2 (2.4.0)
minitest (5.14.1)
msgpack (1.3.3)
multi_json (1.15.0)
multipart-post (2.1.1)
nio4r (2.5.2)
Expand All @@ -105,13 +107,16 @@ GEM
octokit (4.18.0)
faraday (>= 0.9)
sawyer (~> 0.8.0, >= 0.5.3)
optimist (3.0.1)
parallel (1.19.2)
parser (2.7.1.4)
ast (~> 2.4.1)
pg (1.2.3)
pry (0.13.1)
coderay (~> 1.1)
method_source (~> 1.0)
pry-rails (0.3.9)
pry (>= 0.10.4)
public_suffix (4.0.5)
puma (4.3.5)
nio4r (~> 2.0)
Expand Down Expand Up @@ -144,6 +149,10 @@ GEM
thor (>= 0.19.0, < 2.0)
rainbow (3.0.0)
rake (13.0.1)
rbtrace (0.4.14)
ffi (>= 1.0.6)
msgpack (>= 0.4.3)
optimist (>= 3.0.0)
regexp_parser (1.7.1)
retriable (3.1.2)
rexml (3.2.4)
Expand Down Expand Up @@ -187,6 +196,7 @@ GEM
sawyer (0.8.2)
addressable (>= 2.3.5)
faraday (> 0.8, < 2.0)
sigdump (0.2.4)
sprockets (4.0.2)
concurrent-ruby (~> 1.0)
rack (> 1, < 3)
Expand Down Expand Up @@ -215,14 +225,16 @@ DEPENDENCIES
gem-release
github_changelog_generator
good_job!
pry
pry-rails
puma
rails (~> 5.2.0)
rbtrace
rspec-rails
rubocop
rubocop-performance
rubocop-rails
rubocop-rspec
sigdump
yard

BUNDLED WITH
Expand Down
14 changes: 13 additions & 1 deletion gemfiles/rails_6.0.gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ GEM
multipart-post (>= 1.2, < 3)
faraday-http-cache (2.2.0)
faraday (>= 0.8)
ffi (1.13.1)
foreman (0.87.1)
gem-release (2.1.1)
github_changelog_generator (1.15.2)
Expand All @@ -110,6 +111,7 @@ GEM
mini_mime (1.0.2)
mini_portile2 (2.4.0)
minitest (5.14.1)
msgpack (1.3.3)
multi_json (1.15.0)
multipart-post (2.1.1)
nio4r (2.5.2)
Expand All @@ -118,13 +120,16 @@ GEM
octokit (4.18.0)
faraday (>= 0.9)
sawyer (~> 0.8.0, >= 0.5.3)
optimist (3.0.1)
parallel (1.19.2)
parser (2.7.1.4)
ast (~> 2.4.1)
pg (1.2.3)
pry (0.13.1)
coderay (~> 1.1)
method_source (~> 1.0)
pry-rails (0.3.9)
pry (>= 0.10.4)
public_suffix (4.0.5)
puma (4.3.5)
nio4r (~> 2.0)
Expand Down Expand Up @@ -159,6 +164,10 @@ GEM
thor (>= 0.20.3, < 2.0)
rainbow (3.0.0)
rake (13.0.1)
rbtrace (0.4.14)
ffi (>= 1.0.6)
msgpack (>= 0.4.3)
optimist (>= 3.0.0)
regexp_parser (1.7.1)
retriable (3.1.2)
rexml (3.2.4)
Expand Down Expand Up @@ -202,6 +211,7 @@ GEM
sawyer (0.8.2)
addressable (>= 2.3.5)
faraday (> 0.8, < 2.0)
sigdump (0.2.4)
sprockets (4.0.2)
concurrent-ruby (~> 1.0)
rack (> 1, < 3)
Expand Down Expand Up @@ -231,14 +241,16 @@ DEPENDENCIES
gem-release
github_changelog_generator
good_job!
pry
pry-rails
puma
rails (~> 6.0.0)
rbtrace
rspec-rails
rubocop
rubocop-performance
rubocop-rails
rubocop-rspec
sigdump
yard

BUNDLED WITH
Expand Down
4 changes: 3 additions & 1 deletion good_job.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ Gem::Specification.new do |spec|
spec.add_development_dependency "foreman"
spec.add_development_dependency "gem-release"
spec.add_development_dependency "github_changelog_generator"
spec.add_development_dependency "pry"
spec.add_development_dependency "pry-rails"
spec.add_development_dependency "puma"
spec.add_development_dependency "rbtrace"
spec.add_development_dependency "rspec-rails"
spec.add_development_dependency "rubocop"
spec.add_development_dependency "rubocop-performance"
spec.add_development_dependency "rubocop-rails"
spec.add_development_dependency "rubocop-rspec"
spec.add_development_dependency "sigdump"
spec.add_development_dependency "yard"
end
22 changes: 22 additions & 0 deletions lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
require 'good_job/pg_locks'
require 'good_job/performer'
require 'good_job/current_execution'
require 'good_job/notifier'

require 'active_job/queue_adapters/good_job_adapter'

Expand All @@ -21,4 +22,25 @@ module GoodJob
mattr_accessor :on_thread_error, default: nil

ActiveSupport.run_load_hooks(:good_job, self)

# Shuts down all execution pools
# @param wait [Boolean] whether to wait for shutdown
# @return [void]
def self.shutdown(wait: true)
Notifier.instances.each { |adapter| adapter.shutdown(wait: wait) }
Scheduler.instances.each { |scheduler| scheduler.shutdown(wait: wait) }
end

# Tests if execution pools are shut down
# @return [Boolean] whether execution pools are shut down
def self.shutdown?
Notifier.instances.all?(&:shutdown?) && Scheduler.instances.all?(&:shutdown?)
end

# Restarts all execution pools
# @return [void]
def self.restart
Notifier.instances.each(&:restart)
Scheduler.instances.each(&:restart)
end
end
28 changes: 16 additions & 12 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,27 @@ module GoodJob
class Adapter
EXECUTION_MODES = [:async, :external, :inline].freeze

def initialize(execution_mode: nil, max_threads: nil, poll_interval: nil, scheduler: nil, inline: false)
def initialize(execution_mode: nil, queues: nil, max_threads: nil, poll_interval: nil, scheduler: nil, notifier: nil, inline: false)
if inline && execution_mode.nil?
ActiveSupport::Deprecation.warn('GoodJob::Adapter#new(inline: true) is deprecated; use GoodJob::Adapter.new(execution_mode: :inline) instead')
execution_mode = :inline
end

configuration = GoodJob::Configuration.new({
execution_mode: execution_mode,
max_threads: max_threads,
poll_interval: poll_interval,
},
env: ENV)

raise ArgumentError, "execution_mode: must be one of #{EXECUTION_MODES.join(', ')}." unless EXECUTION_MODES.include?(configuration.execution_mode)
configuration = GoodJob::Configuration.new(
execution_mode: execution_mode,
queues: queues,
max_threads: max_threads,
poll_interval: poll_interval
)

@execution_mode = configuration.execution_mode
raise ArgumentError, "execution_mode: must be one of #{EXECUTION_MODES.join(', ')}." unless EXECUTION_MODES.include?(@execution_mode)

@scheduler = scheduler
@scheduler = GoodJob::Scheduler.from_configuration(configuration) if @execution_mode == :async && @scheduler.blank?
if @execution_mode == :async # rubocop:disable Style/GuardClause
@notifier = notifier || GoodJob::Notifier.new
@scheduler = scheduler || GoodJob::Scheduler.from_configuration(configuration)
@notifier.recipients << [@scheduler, :create_thread]
end
end

def enqueue(active_job)
Expand All @@ -42,12 +44,14 @@ def enqueue_at(active_job, timestamp)
end
end

@scheduler.create_thread(queue_name: good_job.queue_name) if execute_async?
executed_locally = execute_async? && @scheduler.create_thread(queue_name: good_job.queue_name)
Notifier.notify(queue_name: good_job.queue_name) unless executed_locally

good_job
end

def shutdown(wait: true)
@notifier&.shutdown(wait: wait)
@scheduler&.shutdown(wait: wait)
end

Expand Down
10 changes: 7 additions & 3 deletions lib/good_job/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ class CLI < Thor
def start
set_up_application!

configuration = Configuration.new(options, env: ENV)
scheduler = Scheduler.from_configuration(configuration)
notifier = GoodJob::Notifier.new
configuration = GoodJob::Configuration.new(options)
scheduler = GoodJob::Scheduler.from_configuration(configuration)
notifier.recipients << [scheduler, :create_thread]

@stop_good_job_executable = false
%w[INT TERM].each do |signal|
Expand All @@ -28,9 +30,10 @@ def start

Kernel.loop do
sleep 0.1
break if @stop_good_job_executable || scheduler.shutdown?
break if @stop_good_job_executable || scheduler.shutdown? || notifier.shutdown?
end

notifier.shutdown
scheduler.shutdown
end

Expand All @@ -41,6 +44,7 @@ def start
type: :numeric,
default: 24 * 60 * 60,
desc: "Delete records finished more than this many seconds ago"

def cleanup_preserved_jobs
set_up_application!

Expand Down
2 changes: 1 addition & 1 deletion lib/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false
queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME,
priority: active_job.priority || DEFAULT_PRIORITY,
serialized_params: active_job.serialize,
scheduled_at: scheduled_at || Time.current,
scheduled_at: scheduled_at,
create_with_advisory_lock: create_with_advisory_lock
)

Expand Down
Loading