diff --git a/README.md b/README.md index d3eb6e5ec..3c206357c 100644 --- a/README.md +++ b/README.md @@ -149,12 +149,13 @@ Usage: good_job start Options: - [--max-threads=COUNT] # Maximum number of threads to use for working jobs. (env var: GOOD_JOB_MAX_THREADS, default: 5) - [--queues=QUEUE_LIST] # Queues to work from. (env var: GOOD_JOB_QUEUES, default: *) - [--poll-interval=SECONDS] # Interval between polls for available jobs in seconds (env var: GOOD_JOB_POLL_INTERVAL, default: 1) - [--max-cache=COUNT] # Maximum number of scheduled jobs to cache in memory (env var: GOOD_JOB_MAX_CACHE, default: 10000) - [--daemonize] # Run as a background daemon (default: false) - [--pidfile=PIDFILE] # Path to write daemonized Process ID (env var: GOOD_JOB_PIDFILE, default: tmp/pids/good_job.pid) + [--max-threads=COUNT] # Maximum number of threads to use for working jobs. (env var: GOOD_JOB_MAX_THREADS, default: 5) + [--queues=QUEUE_LIST] # Queues to work from. (env var: GOOD_JOB_QUEUES, default: *) + [--poll-interval=SECONDS] # Interval between polls for available jobs in seconds (env var: GOOD_JOB_POLL_INTERVAL, default: 1) + [--max-cache=COUNT] # Maximum number of scheduled jobs to cache in memory (env var: GOOD_JOB_MAX_CACHE, default: 10000) + [--shutdown-timeout=SECONDS] # Number of seconds to wait for jobs to finish when shutting down before stopping the thread. (env var: GOOD_JOB_SHUTDOWN_TIMEOUT, default: -1 (forever)) + [--daemonize] # Run as a background daemon (default: false) + [--pidfile=PIDFILE] # Path to write daemonized Process ID (env var: GOOD_JOB_PIDFILE, default: tmp/pids/good_job.pid) Executes queued jobs. @@ -208,12 +209,15 @@ config.active_job.queue_adapter = :good_job config.good_job.execution_mode = :async config.good_job.max_threads = 5 config.good_job.poll_interval = 30 # seconds +config.good_job.shutdown_timeout = 25 # seconds + # ...or all at once. config.good_job = { execution_mode: :async, max_threads: 5, poll_interval: 30, + shutdown_timeout: 25, } ``` @@ -227,6 +231,7 @@ Available configuration options are: - `queues` (string) determines which queues to execute jobs from when `execution_mode` is set to `:async`. See the description of `good_job start` for more details on the format of this string. You can also set this with the environment variable `GOOD_JOB_QUEUES`. - `poll_interval` (integer) sets the number of seconds between polls for jobs when `execution_mode` is set to `:async`. You can also set this with the environment variable `GOOD_JOB_POLL_INTERVAL`. - `max_cache` (integer) sets the maximum number of scheduled jobs that will be stored in memory to reduce execution latency when also polling for scheduled jobs. Caching 10,000 scheduled jobs uses approximately 20MB of memory. You can also set this with the environment variable `GOOD_JOB_MAX_CACHE`. +- `shutdown_timeout` (float) number of seconds to wait for jobs to finish when shutting down before stopping the thread. Defaults to forever: `-1`. You can also set this with the environment variable `GOOD_JOB_SHUTDOWN_TIMEOUT`. By default, GoodJob configures the following execution modes per environment: diff --git a/lib/good_job.rb b/lib/good_job.rb index 907df369a..e4ad69e01 100644 --- a/lib/good_job.rb +++ b/lib/good_job.rb @@ -78,15 +78,26 @@ def self.reperform_jobs_on_standard_error=(value) # See the {file:README.md#executing-jobs-async--in-process} for more explanation and examples. # @param wait [Boolean] whether to wait for shutdown # @return [void] - def self.shutdown(wait: true) - Notifier.instances.each { |notifier| notifier.shutdown(wait: wait) } - Scheduler.instances.each { |scheduler| scheduler.shutdown(wait: wait) } + def self.shutdown(timeout: -1, wait: nil) + timeout = if wait.present? + ActiveSupport::Deprecation.warn( + "Using `GoodJob.shutdown` with `wait:` kwarg is deprecated; use `timeout:` kwarg instead e.g. GoodJob.shutdown(timeout: #{wait ? '-1' : 'nil'})" + ) + wait ? -1 : nil + else + timeout + end + + executables = Array(Notifier.instances) + Array(Poller.instances) + Array(Scheduler.instances) + _shutdown_all(executables, timeout: timeout) end # Tests whether jobs have stopped executing. # @return [Boolean] whether background threads are shut down def self.shutdown? - Notifier.instances.all?(&:shutdown?) && Scheduler.instances.all?(&:shutdown?) + Notifier.instances.all?(&:shutdown?) && + Poller.instances.all?(&:shutdown?) && + Scheduler.instances.all?(&:shutdown?) end # Stops and restarts executing jobs. @@ -95,9 +106,25 @@ def self.shutdown? # For example, you should use +shutdown+ and +restart+ when using async execution mode with Puma. # See the {file:README.md#executing-jobs-async--in-process} for more explanation and examples. # @return [void] - def self.restart - Notifier.instances.each(&:restart) - Scheduler.instances.each(&:restart) + def self.restart(timeout: -1) + executables = Array(Notifier.instances) + Array(Poller.instances) + Array(Scheduler.instances) + _shutdown_all(executables, :restart, timeout: timeout) + end + + # Sends +#shutdown+ or +#restart+ to executable objects ({GoodJob::Notifier}, {GoodJob::Poller}, {GoodJob::Scheduler}) + # @param executables [Array<(Notifier, Poller, Scheduler)>] Objects to shut down. + # @param method_name [:symbol] Method to call, e.g. +:shutdown+ or +:restart+. + # @param timeout [nil,Numeric] + # @return [void] + def self._shutdown_all(executables, method_name = :shutdown, timeout: -1) + if timeout.positive? + executables.each { |executable| executable.send(method_name, timeout: nil) } + + stop_at = Time.current + timeout + executables.each { |executable| executable.send(method_name, timeout: [stop_at - Time.current, 0].max) } + else + executables.each { |executable| executable.send(method_name, timeout: timeout) } + end end ActiveSupport.run_load_hooks(:good_job, self) diff --git a/lib/good_job/adapter.rb b/lib/good_job/adapter.rb index 28167187e..6b405e338 100644 --- a/lib/good_job/adapter.rb +++ b/lib/good_job/adapter.rb @@ -38,7 +38,7 @@ def initialize(execution_mode: nil, queues: nil, max_threads: nil, poll_interval DEPRECATION end - configuration = GoodJob::Configuration.new( + @configuration = GoodJob::Configuration.new( { execution_mode: execution_mode, queues: queues, @@ -47,13 +47,12 @@ def initialize(execution_mode: nil, queues: nil, max_threads: nil, poll_interval } ) - @execution_mode = configuration.execution_mode - raise ArgumentError, "execution_mode: must be one of #{EXECUTION_MODES.join(', ')}." unless EXECUTION_MODES.include?(@execution_mode) + raise ArgumentError, "execution_mode: must be one of #{EXECUTION_MODES.join(', ')}." unless EXECUTION_MODES.include?(@configuration.execution_mode) if execute_async? # rubocop:disable Style/GuardClause @notifier = GoodJob::Notifier.new - @poller = GoodJob::Poller.new(poll_interval: configuration.poll_interval) - @scheduler = GoodJob::Scheduler.from_configuration(configuration, warm_cache_on_initialize: Rails.application.initialized?) + @poller = GoodJob::Poller.new(poll_interval: @configuration.poll_interval) + @scheduler = GoodJob::Scheduler.from_configuration(@configuration, warm_cache_on_initialize: Rails.application.initialized?) @notifier.recipients << [@scheduler, :create_thread] @poller.recipients << [@scheduler, :create_thread] end @@ -96,29 +95,48 @@ def enqueue_at(active_job, timestamp) good_job end - # Gracefully stop processing jobs. - # Waits for termination by default. - # @param wait [Boolean] Whether to wait for shut down. + # Shut down the thread pool executors. + # @param timeout [nil, Numeric] Seconds to wait for active threads. + # + # * +nil+, the scheduler will trigger a shutdown but not wait for it to complete. + # * +-1+, the scheduler will wait until the shutdown is complete. + # * +0+, the scheduler will immediately shutdown and stop any threads. + # * A positive number will wait that many seconds before stopping any remaining active threads. + # @param wait [Boolean] Deprecated. Use +timeout:+ instead. # @return [void] - def shutdown(wait: true) - @notifier&.shutdown(wait: wait) - @poller&.shutdown(wait: wait) - @scheduler&.shutdown(wait: wait) + def shutdown(timeout: :default, wait: nil) + timeout = if wait.present? + ActiveSupport::Deprecation.warn( + "Using `GoodJob::Adapter.shutdown` with `wait:` kwarg is deprecated; use `timeout:` kwarg instead e.g. GoodJob::Adapter.shutdown(timeout: #{wait ? '-1' : 'nil'})" + ) + wait ? -1 : nil + else + timeout + end + + timeout = if timeout == :default + @configuration.shutdown_timeout + else + timeout + end + + executables = [@notifier, @poller, @scheduler].compact + GoodJob._shutdown_all(executables, timeout: timeout) end # Whether in +:async+ execution mode. def execute_async? - @execution_mode == :async + @configuration.execution_mode == :async end # Whether in +:external+ execution mode. def execute_externally? - @execution_mode == :external + @configuration.execution_mode == :external end # Whether in +:inline+ execution mode. def execute_inline? - @execution_mode == :inline + @configuration.execution_mode == :inline end end end diff --git a/lib/good_job/cli.rb b/lib/good_job/cli.rb index 58d77e372..1d2a5353a 100644 --- a/lib/good_job/cli.rb +++ b/lib/good_job/cli.rb @@ -53,6 +53,10 @@ def self.exit_on_failure? type: :numeric, banner: 'COUNT', desc: "Maximum number of scheduled jobs to cache in memory (env var: GOOD_JOB_MAX_CACHE, default: 10000)" + method_option :shutdown_timeout, + type: :numeric, + banner: 'SECONDS', + desc: "Number of seconds to wait for jobs to finish when shutting down before stopping the thread. (env var: GOOD_JOB_SHUTDOWN_TIMEOUT, default: -1 (forever))" method_option :daemonize, type: :boolean, desc: "Run as a background daemon (default: false)" @@ -81,9 +85,8 @@ def start break if @stop_good_job_executable || scheduler.shutdown? || notifier.shutdown? end - notifier.shutdown - poller.shutdown - scheduler.shutdown + executors = [notifier, poller, scheduler] + GoodJob._shutdown_all(executors, timeout: configuration.shutdown_timeout) end default_task :start diff --git a/lib/good_job/configuration.rb b/lib/good_job/configuration.rb index 077dfa194..00bf2ac5a 100644 --- a/lib/good_job/configuration.rb +++ b/lib/good_job/configuration.rb @@ -13,6 +13,8 @@ class Configuration DEFAULT_MAX_CACHE = 10000 # Default number of seconds to preserve jobs for {CLI#cleanup_preserved_jobs} DEFAULT_CLEANUP_PRESERVED_JOBS_BEFORE_SECONDS_AGO = 24 * 60 * 60 + # Default to always wait for jobs to finish for {#shutdown} + DEFAULT_SHUTDOWN_TIMEOUT = -1 # The options that were explicitly set when initializing +Configuration+. # @return [Hash] @@ -77,10 +79,10 @@ def rails_execution_mode def max_threads ( options[:max_threads] || - rails_config[:max_threads] || - env['GOOD_JOB_MAX_THREADS'] || - env['RAILS_MAX_THREADS'] || - DEFAULT_MAX_THREADS + rails_config[:max_threads] || + env['GOOD_JOB_MAX_THREADS'] || + env['RAILS_MAX_THREADS'] || + DEFAULT_MAX_THREADS ).to_i end @@ -103,9 +105,9 @@ def queue_string def poll_interval ( options[:poll_interval] || - rails_config[:poll_interval] || - env['GOOD_JOB_POLL_INTERVAL'] || - DEFAULT_POLL_INTERVAL + rails_config[:poll_interval] || + env['GOOD_JOB_POLL_INTERVAL'] || + DEFAULT_POLL_INTERVAL ).to_i end @@ -122,15 +124,27 @@ def max_cache ).to_i end + # The number of seconds to wait for jobs to finish when shutting down + # before stopping the thread. +-1+ is forever. + # @return [Numeric] + def shutdown_timeout + ( + options[:shutdown_timeout] || + rails_config[:shutdown_timeout] || + env['GOOD_JOB_SHUTDOWN_TIMEOUT'] || + DEFAULT_SHUTDOWN_TIMEOUT + ).to_f + end + # Number of seconds to preserve jobs when using the +good_job cleanup_preserved_jobs+ CLI command. # This configuration is only used when {GoodJob.preserve_job_records} is +true+. # @return [Integer] def cleanup_preserved_jobs_before_seconds_ago ( options[:before_seconds_ago] || - rails_config[:cleanup_preserved_jobs_before_seconds_ago] || - env['GOOD_JOB_CLEANUP_PRESERVED_JOBS_BEFORE_SECONDS_AGO'] || - DEFAULT_CLEANUP_PRESERVED_JOBS_BEFORE_SECONDS_AGO + rails_config[:cleanup_preserved_jobs_before_seconds_ago] || + env['GOOD_JOB_CLEANUP_PRESERVED_JOBS_BEFORE_SECONDS_AGO'] || + DEFAULT_CLEANUP_PRESERVED_JOBS_BEFORE_SECONDS_AGO ).to_i end diff --git a/lib/good_job/job_performer.rb b/lib/good_job/job_performer.rb index f1c70e998..f91469ac8 100644 --- a/lib/good_job/job_performer.rb +++ b/lib/good_job/job_performer.rb @@ -51,8 +51,9 @@ def next?(state = {}) end # The Returns timestamps of when next tasks may be available. - # @param count [Integer] number of timestamps to return - # @param count [DateTime, Time, nil] jobs scheduled after this time + # @param after [DateTime, Time, nil] future jobs scheduled after this time + # @param limit [Integer] number of future timestamps to return + # @param now_limit [Integer] number of past timestamps to return # @return [Array<(Time, Timestamp)>, nil] def next_at(after: nil, limit: nil, now_limit: nil) job_query.next_scheduled_at(after: after, limit: limit, now_limit: now_limit) diff --git a/lib/good_job/multi_scheduler.rb b/lib/good_job/multi_scheduler.rb index 2d47e2e56..bb0601dd5 100644 --- a/lib/good_job/multi_scheduler.rb +++ b/lib/good_job/multi_scheduler.rb @@ -1,16 +1,16 @@ module GoodJob # Delegates the interface of a single {Scheduler} to multiple Schedulers. class MultiScheduler - # @return [array] List of the scheduler delegates + # @return [Array] List of the scheduler delegates attr_reader :schedulers def initialize(schedulers) @schedulers = schedulers end - # Delegates to {Scheduler#shutdown}. - def shutdown(wait: true) - schedulers.each { |s| s.shutdown(wait: wait) } + # Delegates to {Scheduler#running?}. + def running? + schedulers.all?(&:running?) end # Delegates to {Scheduler#shutdown?}. @@ -18,9 +18,14 @@ def shutdown? schedulers.all?(&:shutdown?) end + # Delegates to {Scheduler#shutdown}. + def shutdown(timeout: -1) + GoodJob._shutdown_all(schedulers, timeout: timeout) + end + # Delegates to {Scheduler#restart}. - def restart(wait: true) - schedulers.each { |s| s.restart(wait: wait) } + def restart(timeout: -1) + GoodJob._shutdown_all(schedulers, :restart, timeout: timeout) end # Delegates to {Scheduler#create_thread}. diff --git a/lib/good_job/notifier.rb b/lib/good_job/notifier.rb index ac65f61e5..8f8e671f0 100644 --- a/lib/good_job/notifier.rb +++ b/lib/good_job/notifier.rb @@ -15,7 +15,7 @@ class Notifier # Default Postgres channel for LISTEN/NOTIFY CHANNEL = 'good_job'.freeze # Defaults for instance of Concurrent::ThreadPoolExecutor - POOL_OPTIONS = { + EXECUTOR_OPTIONS = { name: name, min_threads: 0, max_threads: 1, @@ -30,7 +30,7 @@ class Notifier # @!attribute [r] instances # @!scope class # List of all instantiated Notifiers in the current process. - # @return [array] + # @return [Array] cattr_reader :instances, default: [], instance_reader: false # Send a message via Postgres NOTIFY @@ -53,7 +53,7 @@ def initialize(*recipients) self.class.instances << self - create_pool + create_executor listen end @@ -63,34 +63,43 @@ def listening? @listening.true? end - # Restart the notifier. - # When shutdown, start; or shutdown and start. - # @param wait [Boolean] Wait for background thread to finish - # @return [void] - def restart(wait: true) - shutdown(wait: wait) - create_pool - listen - end + # Tests whether the notifier is running. + # @return [true, false, nil] + delegate :running?, to: :executor, allow_nil: true + + # Tests whether the scheduler is shutdown. + # @return [true, false, nil] + delegate :shutdown?, to: :executor, allow_nil: true # Shut down the notifier. # This stops the background LISTENing thread. - # If +wait+ is +true+, the notifier will wait for background thread to shutdown. - # If +wait+ is +false+, this method will return immediately even though threads may still be running. # Use {#shutdown?} to determine whether threads have stopped. - # @param wait [Boolean] Wait for actively executing threads to finish + # @param timeout [nil, Numeric] Seconds to wait for active threads. + # + # * +nil+, the scheduler will trigger a shutdown but not wait for it to complete. + # * +-1+, the scheduler will wait until the shutdown is complete. + # * +0+, the scheduler will immediately shutdown and stop any threads. + # * A positive number will wait that many seconds before stopping any remaining active threads. # @return [void] - def shutdown(wait: true) - return unless @pool.running? + def shutdown(timeout: -1) + return if executor.nil? || executor.shutdown? - @pool.shutdown - @pool.wait_for_termination if wait + executor.shutdown if executor.running? + + if executor.shuttingdown? && timeout # rubocop:disable Style/GuardClause + executor_wait = timeout.negative? ? nil : timeout + executor.kill unless executor.wait_for_termination(executor_wait) + end end - # Tests whether the notifier is shutdown. - # @return [true, false, nil] - def shutdown? - !@pool.running? + # Restart the notifier. + # When shutdown, start; or shutdown and start. + # @param timeout [nil, Numeric] Seconds to wait; shares same values as {#shutdown}. + # @return [void] + def restart(timeout: -1) + shutdown(timeout: timeout) if running? + create_executor + listen end # Invoked on completion of ThreadPoolExecutor task @@ -109,36 +118,36 @@ def listen_observer(_time, _result, thread_error) private - def create_pool - @pool = Concurrent::ThreadPoolExecutor.new(POOL_OPTIONS) + attr_reader :executor + + def create_executor + @executor = Concurrent::ThreadPoolExecutor.new(EXECUTOR_OPTIONS) end def listen - future = Concurrent::Future.new(args: [@recipients, @pool, @listening], executor: @pool) do |recipients, pool, listening| + future = Concurrent::Future.new(args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening| with_listen_connection do |conn| ActiveSupport::Notifications.instrument("notifier_listen.good_job") do conn.async_exec("LISTEN #{CHANNEL}").clear end ActiveSupport::Dependencies.interlock.permit_concurrent_loads do - while pool.running? - listening.make_true + thr_listening.make_true + while thr_executor.running? conn.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload| - listening.make_false next unless channel == CHANNEL ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload }) parsed_payload = JSON.parse(payload, symbolize_names: true) - recipients.each do |recipient| + thr_recipients.each do |recipient| target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call] target.send(method_name, parsed_payload) end end - listening.make_false end end ensure - listening.make_false + thr_listening.make_false ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do conn.async_exec("UNLISTEN *").clear end diff --git a/lib/good_job/poller.rb b/lib/good_job/poller.rb index fe1b97150..0ce2a2005 100644 --- a/lib/good_job/poller.rb +++ b/lib/good_job/poller.rb @@ -16,7 +16,7 @@ class Poller # @!attribute [r] instances # @!scope class # List of all instantiated Pollers in the current process. - # @return [array] + # @return [Array] cattr_reader :instances, default: [], instance_reader: false # Creates GoodJob::Poller from a GoodJob::Configuration instance. @@ -40,35 +40,44 @@ def initialize(*recipients, poll_interval: nil) self.class.instances << self - create_pool + create_timer end - # Shut down the poller. - # If +wait+ is +true+, the poller will wait for background thread to shutdown. - # If +wait+ is +false+, this method will return immediately even though threads may still be running. + # Tests whether the timer is running. + # @return [true, false, nil] + delegate :running?, to: :timer, allow_nil: true + + # Tests whether the timer is shutdown. + # @return [true, false, nil] + delegate :shutdown?, to: :timer, allow_nil: true + + # Shut down the notifier. # Use {#shutdown?} to determine whether threads have stopped. - # @param wait [Boolean] Wait for actively executing threads to finish + # @param timeout [nil, Numeric] Seconds to wait for active threads. + # + # * +nil+, the scheduler will trigger a shutdown but not wait for it to complete. + # * +-1+, the scheduler will wait until the shutdown is complete. + # * +0+, the scheduler will immediately shutdown and stop any threads. + # * A positive number will wait that many seconds before stopping any remaining active threads. # @return [void] - def shutdown(wait: true) - return unless @timer&.running? + def shutdown(timeout: -1) + return if timer.nil? || timer.shutdown? - @timer.shutdown - @timer.wait_for_termination if wait - end + timer.shutdown if timer.running? - # Tests whether the poller is shutdown. - # @return [true, false, nil] - def shutdown? - !@timer&.running? + if timer.shuttingdown? && timeout # rubocop:disable Style/GuardClause + timer_wait = timeout.negative? ? nil : timeout + timer.kill unless timer.wait_for_termination(timer_wait) + end end # Restart the poller. # When shutdown, start; or shutdown and start. - # @param wait [Boolean] Wait for background thread to finish + # @param timeout [nil, Numeric] Seconds to wait; shares same values as {#shutdown}. # @return [void] - def restart(wait: true) - shutdown(wait: wait) - create_pool + def restart(timeout: -1) + shutdown(timeout: timeout) if running? + create_timer end # Invoked on completion of TimerTask task. @@ -81,7 +90,9 @@ def timer_observer(time, executed_task, thread_error) private - def create_pool + attr_reader :timer + + def create_timer return if @timer_options[:execution_interval] <= 0 @timer = Concurrent::TimerTask.new(@timer_options) do diff --git a/lib/good_job/scheduler.rb b/lib/good_job/scheduler.rb index c6ba300cc..514420faa 100644 --- a/lib/good_job/scheduler.rb +++ b/lib/good_job/scheduler.rb @@ -16,8 +16,8 @@ module GoodJob # :nodoc: # class Scheduler # Defaults for instance of Concurrent::ThreadPoolExecutor - # The thread pool is where work is performed. - DEFAULT_POOL_OPTIONS = { + # The thread pool executor is where work is performed. + DEFAULT_EXECUTOR_OPTIONS = { name: name, min_threads: 0, max_threads: Configuration::DEFAULT_MAX_THREADS, @@ -30,7 +30,7 @@ class Scheduler # @!attribute [r] instances # @!scope class # List of all instantiated Schedulers in the current process. - # @return [array] + # @return [Array] cattr_reader :instances, default: [], instance_reader: false # Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration instance. @@ -70,66 +70,76 @@ def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initia @performer = performer @max_cache = max_cache || 0 - @pool_options = DEFAULT_POOL_OPTIONS.dup + @executor_options = DEFAULT_EXECUTOR_OPTIONS.dup if max_threads.present? - @pool_options[:max_threads] = max_threads - @pool_options[:max_queue] = max_threads + @executor_options[:max_threads] = max_threads + @executor_options[:max_queue] = max_threads end - @pool_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@pool_options[:max_threads]})" + @executor_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@executor_options[:max_threads]})" - create_pool + create_executor warm_cache if warm_cache_on_initialize end + # Tests whether the scheduler is running. + # @return [true, false, nil] + delegate :running?, to: :executor, allow_nil: true + + # Tests whether the scheduler is shutdown. + # @return [true, false, nil] + delegate :shutdown?, to: :executor, allow_nil: true + # Shut down the scheduler. - # This stops all threads in the pool. - # If +wait+ is +true+, the scheduler will wait for any active tasks to finish. - # If +wait+ is +false+, this method will return immediately even though threads may still be running. + # This stops all threads in the thread pool. # Use {#shutdown?} to determine whether threads have stopped. - # @param wait [Boolean] Wait for actively executing jobs to finish + # @param timeout [nil, Numeric] Seconds to wait for actively executing jobs to finish + # + # * +nil+, the scheduler will trigger a shutdown but not wait for it to complete. + # * +-1+, the scheduler will wait until the shutdown is complete. + # * +0+, the scheduler will immediately shutdown and stop any active tasks. + # * A positive number will wait that many seconds before stopping any remaining active tasks. # @return [void] - def shutdown(wait: true) - return unless @pool&.running? - - instrument("scheduler_shutdown_start", { wait: wait }) - instrument("scheduler_shutdown", { wait: wait }) do - @timer_set.shutdown + def shutdown(timeout: -1) + return if executor.nil? || executor.shutdown? + + instrument("scheduler_shutdown_start", { timeout: timeout }) + instrument("scheduler_shutdown", { timeout: timeout }) do + if executor.running? + @timer_set.shutdown + executor.shutdown + end - @pool.shutdown - @pool.wait_for_termination if wait - # TODO: Should be killed if wait is not true + if executor.shuttingdown? && timeout + executor_wait = timeout.negative? ? nil : timeout + executor.kill unless executor.wait_for_termination(executor_wait) + end end end - # Tests whether the scheduler is shutdown. - # @return [true, false, nil] - def shutdown? - !@pool&.running? - end - # Restart the Scheduler. # When shutdown, start; or shutdown and start. - # @param wait [Boolean] Wait for actively executing jobs to finish + # @param timeout [nil, Numeric] Seconds to wait for actively executing jobs to finish; shares same values as {#shutdown}. # @return [void] - def restart(wait: true) + def restart(timeout: -1) instrument("scheduler_restart_pools") do - shutdown(wait: wait) unless shutdown? - create_pool + shutdown(timeout: timeout) if running? + create_executor warm_cache end end # Wakes a thread to allow the performer to execute a task. - # @param state [nil, Object] Contextual information for the performer. See {Performer#next?}. + # @param state [nil, Object] Contextual information for the performer. See {JobPerformer#next?}. # @return [nil, Boolean] Whether work was started. - # Returns +nil+ if the scheduler is unable to take new work, for example if the thread pool is shut down or at capacity. - # Returns +true+ if the performer started executing work. - # Returns +false+ if the performer decides not to attempt to execute a task based on the +state+ that is passed to it. + # + # * +nil+ if the scheduler is unable to take new work, for example if the thread pool is shut down or at capacity. + # * +true+ if the performer started executing work. + # * +false+ if the performer decides not to attempt to execute a task based on the +state+ that is passed to it. def create_thread(state = nil) - return nil unless @pool.running? + return nil unless executor.running? if state - return false unless @performer.next?(state) + return false unless performer.next?(state) if state[:scheduled_at] scheduled_at = if state[:scheduled_at].is_a? String @@ -144,18 +154,12 @@ def create_thread(state = nil) delay ||= 0 run_now = delay <= 0.01 if run_now - return nil unless @pool.ready_worker_count.positive? + return nil unless executor.ready_worker_count.positive? elsif @max_cache.positive? return nil unless remaining_cache_count.positive? end - future = Concurrent::ScheduledTask.new(delay, args: [@performer], executor: @pool, timer_set: timer_set) do |performer| - output = nil - Rails.application.executor.wrap { output = performer.next } - output - end - future.add_observer(self, :task_observer) - future.execute + create_task(delay) run_now ? true : nil end @@ -169,23 +173,14 @@ def task_observer(time, output, thread_error) create_task if output end - def warm_cache - return if @max_cache.zero? - - @performer.next_at( - limit: @max_cache, - now_limit: @pool_options[:max_threads] - ).each do |scheduled_at| - create_thread({ scheduled_at: scheduled_at }) - end - end - + # Information about the Scheduler + # @return [Hash] def stats { - name: @performer.name, - max_threads: @pool_options[:max_threads], - active_threads: @pool_options[:max_threads] - @pool.ready_worker_count, - available_threads: @pool.ready_worker_count, + name: performer.name, + max_threads: @executor_options[:max_threads], + active_threads: @executor_options[:max_threads] - executor.ready_worker_count, + available_threads: executor.ready_worker_count, max_cache: @max_cache, active_cache: cache_count, available_cache: remaining_cache_count, @@ -194,19 +189,30 @@ def stats private - attr_reader :timer_set + attr_reader :performer, :executor, :timer_set - def create_pool - instrument("scheduler_create_pool", { performer_name: @performer.name, max_threads: @pool_options[:max_threads] }) do + def create_executor + instrument("scheduler_create_pool", { performer_name: performer.name, max_threads: @executor_options[:max_threads] }) do @timer_set = Concurrent::TimerSet.new - @pool = ThreadPoolExecutor.new(@pool_options) + @executor = ThreadPoolExecutor.new(@executor_options) + end + end + + def warm_cache + return if @max_cache.zero? + + performer.next_at( + limit: @max_cache, + now_limit: @executor_options[:max_threads] + ).each do |scheduled_at| + create_thread({ scheduled_at: scheduled_at }) end end def create_task(delay = 0) - future = Concurrent::ScheduledTask.new(delay, args: [@performer], executor: @pool, timer_set: timer_set) do |performer| + future = Concurrent::ScheduledTask.new(delay, args: [performer], executor: executor, timer_set: timer_set) do |thr_performer| output = nil - Rails.application.executor.wrap { output = performer.next } + Rails.application.executor.wrap { output = thr_performer.next } output end future.add_observer(self, :task_observer) diff --git a/spec/lib/good_job/scheduler_spec.rb b/spec/lib/good_job/scheduler_spec.rb index 8b5387e13..7e49c0e4f 100644 --- a/spec/lib/good_job/scheduler_spec.rb +++ b/spec/lib/good_job/scheduler_spec.rb @@ -54,9 +54,8 @@ it 'shuts down the theadpools' do scheduler = described_class.new(performer) - scheduler.shutdown - - expect(scheduler.instance_variable_get(:@pool).running?).to be false + expect { scheduler.shutdown } + .to change(scheduler, :running?).from(true).to(false) end end @@ -65,9 +64,8 @@ scheduler = described_class.new(performer) scheduler.shutdown - scheduler.restart - - expect(scheduler.instance_variable_get(:@pool).running?).to be true + expect { scheduler.restart } + .to change(scheduler, :running?).from(false).to(true) end end @@ -121,14 +119,20 @@ all_scheduler, rodents_scheduler, elephants_scheduler = multi_scheduler.schedulers - expect(all_scheduler.instance_variable_get(:@performer).name).to eq '*' - expect(all_scheduler.instance_variable_get(:@pool_options)[:max_threads]).to eq 1 + expect(all_scheduler.stats).to include( + name: '*', + max_threads: 1 + ) - expect(rodents_scheduler.instance_variable_get(:@performer).name).to eq 'mice,ferrets' - expect(rodents_scheduler.instance_variable_get(:@pool_options)[:max_threads]).to eq 2 + expect(rodents_scheduler.stats).to include( + name: 'mice,ferrets', + max_threads: 2 + ) - expect(elephants_scheduler.instance_variable_get(:@performer).name).to eq 'elephant' - expect(elephants_scheduler.instance_variable_get(:@pool_options)[:max_threads]).to eq 4 + expect(elephants_scheduler.stats).to include( + name: 'elephant', + max_threads: 4 + ) end end end diff --git a/spec/support/reset_good_job.rb b/spec/support/reset_good_job.rb index aa65ef446..70b9b8bb7 100644 --- a/spec/support/reset_good_job.rb +++ b/spec/support/reset_good_job.rb @@ -6,6 +6,7 @@ config.after do GoodJob.shutdown GoodJob::Notifier.instances.clear + GoodJob::Poller.instances.clear GoodJob::Scheduler.instances.clear end end