Skip to content

Commit

Permalink
Add shutdown-timeout option to configure the wait for jobs to gracefu…
Browse files Browse the repository at this point in the history
…lly finish before stopping them
  • Loading branch information
bensheldon committed Jan 31, 2021
1 parent b1cd222 commit 6ab265b
Show file tree
Hide file tree
Showing 12 changed files with 283 additions and 179 deletions.
17 changes: 11 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,13 @@ Usage:
good_job start
Options:
[--max-threads=COUNT] # Maximum number of threads to use for working jobs. (env var: GOOD_JOB_MAX_THREADS, default: 5)
[--queues=QUEUE_LIST] # Queues to work from. (env var: GOOD_JOB_QUEUES, default: *)
[--poll-interval=SECONDS] # Interval between polls for available jobs in seconds (env var: GOOD_JOB_POLL_INTERVAL, default: 1)
[--max-cache=COUNT] # Maximum number of scheduled jobs to cache in memory (env var: GOOD_JOB_MAX_CACHE, default: 10000)
[--daemonize] # Run as a background daemon (default: false)
[--pidfile=PIDFILE] # Path to write daemonized Process ID (env var: GOOD_JOB_PIDFILE, default: tmp/pids/good_job.pid)
[--max-threads=COUNT] # Maximum number of threads to use for working jobs. (env var: GOOD_JOB_MAX_THREADS, default: 5)
[--queues=QUEUE_LIST] # Queues to work from. (env var: GOOD_JOB_QUEUES, default: *)
[--poll-interval=SECONDS] # Interval between polls for available jobs in seconds (env var: GOOD_JOB_POLL_INTERVAL, default: 1)
[--max-cache=COUNT] # Maximum number of scheduled jobs to cache in memory (env var: GOOD_JOB_MAX_CACHE, default: 10000)
[--shutdown-timeout=SECONDS] # Number of seconds to wait for jobs to finish when shutting down before stopping the thread. (env var: GOOD_JOB_SHUTDOWN_TIMEOUT, default: -1 (forever))
[--daemonize] # Run as a background daemon (default: false)
[--pidfile=PIDFILE] # Path to write daemonized Process ID (env var: GOOD_JOB_PIDFILE, default: tmp/pids/good_job.pid)
Executes queued jobs.
Expand Down Expand Up @@ -208,12 +209,15 @@ config.active_job.queue_adapter = :good_job
config.good_job.execution_mode = :async
config.good_job.max_threads = 5
config.good_job.poll_interval = 30 # seconds
config.good_job.shutdown_timeout = 25 # seconds
# ...or all at once.
config.good_job = {
execution_mode: :async,
max_threads: 5,
poll_interval: 30,
shutdown_timeout: 25,
}
```

Expand All @@ -227,6 +231,7 @@ Available configuration options are:
- `queues` (string) determines which queues to execute jobs from when `execution_mode` is set to `:async`. See the description of `good_job start` for more details on the format of this string. You can also set this with the environment variable `GOOD_JOB_QUEUES`.
- `poll_interval` (integer) sets the number of seconds between polls for jobs when `execution_mode` is set to `:async`. You can also set this with the environment variable `GOOD_JOB_POLL_INTERVAL`.
- `max_cache` (integer) sets the maximum number of scheduled jobs that will be stored in memory to reduce execution latency when also polling for scheduled jobs. Caching 10,000 scheduled jobs uses approximately 20MB of memory. You can also set this with the environment variable `GOOD_JOB_MAX_CACHE`.
- `shutdown_timeout` (float) number of seconds to wait for jobs to finish when shutting down before stopping the thread. Defaults to forever: `-1`. You can also set this with the environment variable `GOOD_JOB_SHUTDOWN_TIMEOUT`.

By default, GoodJob configures the following execution modes per environment:

Expand Down
41 changes: 34 additions & 7 deletions lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,26 @@ def self.reperform_jobs_on_standard_error=(value)
# See the {file:README.md#executing-jobs-async--in-process} for more explanation and examples.
# @param wait [Boolean] whether to wait for shutdown
# @return [void]
def self.shutdown(wait: true)
Notifier.instances.each { |notifier| notifier.shutdown(wait: wait) }
Scheduler.instances.each { |scheduler| scheduler.shutdown(wait: wait) }
def self.shutdown(timeout: -1, wait: nil)
timeout = if wait.present?
ActiveSupport::Deprecation.warn(
"Using `GoodJob.shutdown` with `wait:` kwarg is deprecated; use `timeout:` kwarg instead e.g. GoodJob.shutdown(timeout: #{wait ? '-1' : 'nil'})"
)
wait ? -1 : nil
else
timeout
end

executables = Array(Notifier.instances) + Array(Poller.instances) + Array(Scheduler.instances)
_shutdown_all(executables, timeout: timeout)
end

# Tests whether jobs have stopped executing.
# @return [Boolean] whether background threads are shut down
def self.shutdown?
Notifier.instances.all?(&:shutdown?) && Scheduler.instances.all?(&:shutdown?)
Notifier.instances.all?(&:shutdown?) &&
Poller.instances.all?(&:shutdown?) &&
Scheduler.instances.all?(&:shutdown?)
end

# Stops and restarts executing jobs.
Expand All @@ -95,9 +106,25 @@ def self.shutdown?
# For example, you should use +shutdown+ and +restart+ when using async execution mode with Puma.
# See the {file:README.md#executing-jobs-async--in-process} for more explanation and examples.
# @return [void]
def self.restart
Notifier.instances.each(&:restart)
Scheduler.instances.each(&:restart)
def self.restart(timeout: -1)
executables = Array(Notifier.instances) + Array(Poller.instances) + Array(Scheduler.instances)
_shutdown_all(executables, :restart, timeout: timeout)
end

# Sends +#shutdown+ or +#restart+ to executable objects ({GoodJob::Notifier}, {GoodJob::Poller}, {GoodJob::Scheduler})
# @param executables [Array<(Notifier, Poller, Scheduler)>] Objects to shut down.
# @param method_name [:symbol] Method to call, e.g. +:shutdown+ or +:restart+.
# @param timeout [nil,Numeric]
# @return [void]
def self._shutdown_all(executables, method_name = :shutdown, timeout: -1)
if timeout.positive?
executables.each { |executable| executable.send(method_name, timeout: nil) }

stop_at = Time.current + timeout
executables.each { |executable| executable.send(method_name, timeout: [stop_at - Time.current, 0].max) }
else
executables.each { |executable| executable.send(method_name, timeout: timeout) }
end
end

ActiveSupport.run_load_hooks(:good_job, self)
Expand Down
48 changes: 33 additions & 15 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def initialize(execution_mode: nil, queues: nil, max_threads: nil, poll_interval
DEPRECATION
end

configuration = GoodJob::Configuration.new(
@configuration = GoodJob::Configuration.new(
{
execution_mode: execution_mode,
queues: queues,
Expand All @@ -47,13 +47,12 @@ def initialize(execution_mode: nil, queues: nil, max_threads: nil, poll_interval
}
)

@execution_mode = configuration.execution_mode
raise ArgumentError, "execution_mode: must be one of #{EXECUTION_MODES.join(', ')}." unless EXECUTION_MODES.include?(@execution_mode)
raise ArgumentError, "execution_mode: must be one of #{EXECUTION_MODES.join(', ')}." unless EXECUTION_MODES.include?(@configuration.execution_mode)

if execute_async? # rubocop:disable Style/GuardClause
@notifier = GoodJob::Notifier.new
@poller = GoodJob::Poller.new(poll_interval: configuration.poll_interval)
@scheduler = GoodJob::Scheduler.from_configuration(configuration, warm_cache_on_initialize: Rails.application.initialized?)
@poller = GoodJob::Poller.new(poll_interval: @configuration.poll_interval)
@scheduler = GoodJob::Scheduler.from_configuration(@configuration, warm_cache_on_initialize: Rails.application.initialized?)
@notifier.recipients << [@scheduler, :create_thread]
@poller.recipients << [@scheduler, :create_thread]
end
Expand Down Expand Up @@ -96,29 +95,48 @@ def enqueue_at(active_job, timestamp)
good_job
end

# Gracefully stop processing jobs.
# Waits for termination by default.
# @param wait [Boolean] Whether to wait for shut down.
# Shut down the thread pool executors.
# @param timeout [nil, Numeric] Seconds to wait for active threads.
#
# * +nil+, the scheduler will trigger a shutdown but not wait for it to complete.
# * +-1+, the scheduler will wait until the shutdown is complete.
# * +0+, the scheduler will immediately shutdown and stop any threads.
# * A positive number will wait that many seconds before stopping any remaining active threads.
# @param wait [Boolean] Deprecated. Use +timeout:+ instead.
# @return [void]
def shutdown(wait: true)
@notifier&.shutdown(wait: wait)
@poller&.shutdown(wait: wait)
@scheduler&.shutdown(wait: wait)
def shutdown(timeout: :default, wait: nil)
timeout = if wait.present?
ActiveSupport::Deprecation.warn(
"Using `GoodJob::Adapter.shutdown` with `wait:` kwarg is deprecated; use `timeout:` kwarg instead e.g. GoodJob::Adapter.shutdown(timeout: #{wait ? '-1' : 'nil'})"
)
wait ? -1 : nil
else
timeout
end

timeout = if timeout == :default
@configuration.shutdown_timeout
else
timeout
end

executables = [@notifier, @poller, @scheduler].compact
GoodJob._shutdown_all(executables, timeout: timeout)
end

# Whether in +:async+ execution mode.
def execute_async?
@execution_mode == :async
@configuration.execution_mode == :async
end

# Whether in +:external+ execution mode.
def execute_externally?
@execution_mode == :external
@configuration.execution_mode == :external
end

# Whether in +:inline+ execution mode.
def execute_inline?
@execution_mode == :inline
@configuration.execution_mode == :inline
end
end
end
9 changes: 6 additions & 3 deletions lib/good_job/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ def self.exit_on_failure?
type: :numeric,
banner: 'COUNT',
desc: "Maximum number of scheduled jobs to cache in memory (env var: GOOD_JOB_MAX_CACHE, default: 10000)"
method_option :shutdown_timeout,
type: :numeric,
banner: 'SECONDS',
desc: "Number of seconds to wait for jobs to finish when shutting down before stopping the thread. (env var: GOOD_JOB_SHUTDOWN_TIMEOUT, default: -1 (forever))"
method_option :daemonize,
type: :boolean,
desc: "Run as a background daemon (default: false)"
Expand Down Expand Up @@ -81,9 +85,8 @@ def start
break if @stop_good_job_executable || scheduler.shutdown? || notifier.shutdown?
end

notifier.shutdown
poller.shutdown
scheduler.shutdown
executors = [notifier, poller, scheduler]
GoodJob._shutdown_all(executors, timeout: configuration.shutdown_timeout)
end

default_task :start
Expand Down
34 changes: 24 additions & 10 deletions lib/good_job/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ class Configuration
DEFAULT_MAX_CACHE = 10000
# Default number of seconds to preserve jobs for {CLI#cleanup_preserved_jobs}
DEFAULT_CLEANUP_PRESERVED_JOBS_BEFORE_SECONDS_AGO = 24 * 60 * 60
# Default to always wait for jobs to finish for {#shutdown}
DEFAULT_SHUTDOWN_TIMEOUT = -1

# The options that were explicitly set when initializing +Configuration+.
# @return [Hash]
Expand Down Expand Up @@ -77,10 +79,10 @@ def rails_execution_mode
def max_threads
(
options[:max_threads] ||
rails_config[:max_threads] ||
env['GOOD_JOB_MAX_THREADS'] ||
env['RAILS_MAX_THREADS'] ||
DEFAULT_MAX_THREADS
rails_config[:max_threads] ||
env['GOOD_JOB_MAX_THREADS'] ||
env['RAILS_MAX_THREADS'] ||
DEFAULT_MAX_THREADS
).to_i
end

Expand All @@ -103,9 +105,9 @@ def queue_string
def poll_interval
(
options[:poll_interval] ||
rails_config[:poll_interval] ||
env['GOOD_JOB_POLL_INTERVAL'] ||
DEFAULT_POLL_INTERVAL
rails_config[:poll_interval] ||
env['GOOD_JOB_POLL_INTERVAL'] ||
DEFAULT_POLL_INTERVAL
).to_i
end

Expand All @@ -122,15 +124,27 @@ def max_cache
).to_i
end

# The number of seconds to wait for jobs to finish when shutting down
# before stopping the thread. +-1+ is forever.
# @return [Numeric]
def shutdown_timeout
(
options[:shutdown_timeout] ||
rails_config[:shutdown_timeout] ||
env['GOOD_JOB_SHUTDOWN_TIMEOUT'] ||
DEFAULT_SHUTDOWN_TIMEOUT
).to_f
end

# Number of seconds to preserve jobs when using the +good_job cleanup_preserved_jobs+ CLI command.
# This configuration is only used when {GoodJob.preserve_job_records} is +true+.
# @return [Integer]
def cleanup_preserved_jobs_before_seconds_ago
(
options[:before_seconds_ago] ||
rails_config[:cleanup_preserved_jobs_before_seconds_ago] ||
env['GOOD_JOB_CLEANUP_PRESERVED_JOBS_BEFORE_SECONDS_AGO'] ||
DEFAULT_CLEANUP_PRESERVED_JOBS_BEFORE_SECONDS_AGO
rails_config[:cleanup_preserved_jobs_before_seconds_ago] ||
env['GOOD_JOB_CLEANUP_PRESERVED_JOBS_BEFORE_SECONDS_AGO'] ||
DEFAULT_CLEANUP_PRESERVED_JOBS_BEFORE_SECONDS_AGO
).to_i
end

Expand Down
5 changes: 3 additions & 2 deletions lib/good_job/job_performer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ def next?(state = {})
end

# The Returns timestamps of when next tasks may be available.
# @param count [Integer] number of timestamps to return
# @param count [DateTime, Time, nil] jobs scheduled after this time
# @param after [DateTime, Time, nil] future jobs scheduled after this time
# @param limit [Integer] number of future timestamps to return
# @param now_limit [Integer] number of past timestamps to return
# @return [Array<(Time, Timestamp)>, nil]
def next_at(after: nil, limit: nil, now_limit: nil)
job_query.next_scheduled_at(after: after, limit: limit, now_limit: now_limit)
Expand Down
17 changes: 11 additions & 6 deletions lib/good_job/multi_scheduler.rb
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
module GoodJob
# Delegates the interface of a single {Scheduler} to multiple Schedulers.
class MultiScheduler
# @return [array<Scheduler>] List of the scheduler delegates
# @return [Array<Scheduler>] List of the scheduler delegates
attr_reader :schedulers

def initialize(schedulers)
@schedulers = schedulers
end

# Delegates to {Scheduler#shutdown}.
def shutdown(wait: true)
schedulers.each { |s| s.shutdown(wait: wait) }
# Delegates to {Scheduler#running?}.
def running?
schedulers.all?(&:running?)
end

# Delegates to {Scheduler#shutdown?}.
def shutdown?
schedulers.all?(&:shutdown?)
end

# Delegates to {Scheduler#shutdown}.
def shutdown(timeout: -1)
GoodJob._shutdown_all(schedulers, timeout: timeout)
end

# Delegates to {Scheduler#restart}.
def restart(wait: true)
schedulers.each { |s| s.restart(wait: wait) }
def restart(timeout: -1)
GoodJob._shutdown_all(schedulers, :restart, timeout: timeout)
end

# Delegates to {Scheduler#create_thread}.
Expand Down
Loading

0 comments on commit 6ab265b

Please sign in to comment.