diff --git a/lib/good_job/execution_result.rb b/lib/good_job/execution_result.rb new file mode 100644 index 000000000..74ca78ac4 --- /dev/null +++ b/lib/good_job/execution_result.rb @@ -0,0 +1,20 @@ +module GoodJob + # Stores the results of job execution + class ExecutionResult + # @return [Object, nil] + attr_reader :value + # @return [Exception, nil] + attr_reader :handled_error + # @return [Exception, nil] + attr_reader :unhandled_error + + # @param value [Object, nil] + # @param handled_error [Exception, nil] + # @param unhandled_error [Exception, nil] + def initialize(value:, handled_error: nil, unhandled_error: nil) + @value = value + @handled_error = handled_error + @unhandled_error = unhandled_error + end + end +end diff --git a/lib/good_job/job.rb b/lib/good_job/job.rb index 636c16862..80f4942be 100644 --- a/lib/good_job/job.rb +++ b/lib/good_job/job.rb @@ -19,6 +19,7 @@ class Job < Object.const_get(GoodJob.active_record_parent_class) # Parse a string representing a group of queues into a more readable data # structure. + # @param string [String] Queue string # @return [Hash] # How to match a given queue. It can have the following keys and values: # - +{ all: true }+ indicates that all queues match. @@ -134,29 +135,26 @@ def self.queue_parser(string) # Finds the next eligible Job, acquire an advisory lock related to it, and # executes the job. - # @return [Array<(GoodJob::Job, Object, Exception)>, nil] + # @return [ExecutionResult, nil] # If a job was executed, returns an array with the {Job} record, the # return value for the job's +#perform+ method, and the exception the job # raised, if any (if the job raised, then the second array entry will be # +nil+). If there were no jobs to execute, returns +nil+. def self.perform_with_advisory_lock - good_job = nil - result = nil - error = nil - unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock do |good_jobs| good_job = good_jobs.first # TODO: Determine why some records are fetched without an advisory lock at all break unless good_job&.executable? - result, error = good_job.perform + good_job.perform end - - [good_job, result, error] if good_job end # Fetches the scheduled execution time of the next eligible Job(s). - # @return [Array<(DateTime)>] + # @param after [DateTime] + # @param limit [Integer] + # @param now_limit [Integer, nil] + # @return [Array] def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil) query = advisory_unlocked.unfinished.schedule_ordered @@ -182,7 +180,6 @@ def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil) # @return [Job] # The new {Job} instance representing the queued ActiveJob job. def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) - good_job = nil ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload| good_job = GoodJob::Job.new( queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME, @@ -196,49 +193,37 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false good_job.save! active_job.provider_job_id = good_job.id - end - good_job + good_job + end end # Execute the ActiveJob job this {Job} represents. - # @return [Array<(Object, Exception)>] + # @return [ExecutionResult] # An array of the return value of the job's +#perform+ method and the # exception raised by the job, if any. If the job completed successfully, # the second array entry (the exception) will be +nil+ and vice versa. def perform raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at - GoodJob::CurrentExecution.reset - self.performed_at = Time.current save! if GoodJob.preserve_job_records - result, unhandled_error = execute - - result_error = nil - if result.is_a?(Exception) - result_error = result - result = nil - end - - job_error = unhandled_error || - result_error || - GoodJob::CurrentExecution.error_on_retry || - GoodJob::CurrentExecution.error_on_discard + result = execute + job_error = result.handled_error || result.unhandled_error self.error = "#{job_error.class}: #{job_error.message}" if job_error - if unhandled_error && GoodJob.retry_on_unhandled_error + if result.unhandled_error && GoodJob.retry_on_unhandled_error save! - elsif GoodJob.preserve_job_records == true || (unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) + elsif GoodJob.preserve_job_records == true || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) self.finished_at = Time.current save! else destroy! end - [result, job_error] + result end # Tests whether this job is safe to be executed by this thread. @@ -249,16 +234,26 @@ def executable? private + # @return [GoodJob::ExecutionResult] def execute params = serialized_params.merge( "provider_job_id" => id ) + GoodJob::CurrentExecution.reset ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self, process_id: GoodJob::CurrentExecution.process_id, thread_name: GoodJob::CurrentExecution.thread_name }) do - [ActiveJob::Base.execute(params), nil] + value = ActiveJob::Base.execute(params) + + if value.is_a?(Exception) + handled_error = value + value = nil + end + handled_error ||= GoodJob::CurrentExecution.error_on_retry || GoodJob::CurrentExecution.error_on_discard + + ExecutionResult.new(value: value, handled_error: handled_error) + rescue StandardError => e + ExecutionResult.new(value: nil, unhandled_error: e) end - rescue StandardError => e - [nil, e] end end end diff --git a/spec/lib/good_job/job_spec.rb b/spec/lib/good_job/job_spec.rb index 0fd03cea0..f01a0e163 100644 --- a/spec/lib/good_job/job_spec.rb +++ b/spec/lib/good_job/job_spec.rb @@ -68,19 +68,19 @@ def perform(result_value = nil, raise_error: false) expect { good_job_2.reload }.not_to raise_error end - it 'returns the good_job, result, and error object if there is a result; nil if not' do - worked_good_job, worked_result, worked_error = described_class.all.perform_with_advisory_lock + it 'returns the result or nil if not' do + result = described_class.all.perform_with_advisory_lock - expect(worked_good_job).to eq good_job - expect(worked_result).to eq 'a string' - expect(worked_error).to eq nil + expect(result).to be_a GoodJob::ExecutionResult + expect(result.value).to eq 'a string' + expect(result.unhandled_error).to eq nil - e_good_job = described_class.enqueue(ExampleJob.new(true, raise_error: true)) - errored_good_job, errored_result, errored_error = described_class.all.perform_with_advisory_lock + described_class.enqueue(ExampleJob.new(true, raise_error: true)) + errored_result = described_class.all.perform_with_advisory_lock - expect(errored_good_job).to eq e_good_job - expect(errored_result).to eq nil - expect(errored_error).to be_an ExpectedError + expect(result).to be_a GoodJob::ExecutionResult + expect(errored_result.value).to eq nil + expect(errored_result.unhandled_error).to be_an ExpectedError end end @@ -188,20 +188,20 @@ def perform(result_value = nil, raise_error: false) describe 'return value' do it 'returns the results of the job' do - result, error = good_job.perform + result = good_job.perform - expect(result).to eq "a string" - expect(error).to be_nil + expect(result.value).to eq "a string" + expect(result.unhandled_error).to be_nil end context 'when there is an error' do let(:active_job) { ExampleJob.new("whoops", raise_error: true) } it 'returns the error' do - result, error = good_job.perform + result = good_job.perform - expect(result).to eq nil - expect(error).to be_an_instance_of ExpectedError + expect(result.value).to eq nil + expect(result.unhandled_error).to be_an_instance_of ExpectedError end context 'when there is an retry handler with exhausted attempts' do @@ -240,10 +240,10 @@ def perform(result_value = nil, raise_error: false) end it 'returns the error' do - result, error = good_job.perform + result = good_job.perform - expect(result).to eq nil - expect(error).to be_an_instance_of ExpectedError + expect(result.value).to eq nil + expect(result.unhandled_error).to be_an_instance_of ExpectedError end if Gem::Version.new(Rails.version) > Gem::Version.new("6") @@ -254,10 +254,10 @@ def perform(result_value = nil, raise_error: false) end it 'returns the error' do - result, error = good_job.perform + result = good_job.perform - expect(result).to eq nil - expect(error).to be_an_instance_of ExpectedError + expect(result.value).to eq nil + expect(result.handled_error).to be_an_instance_of ExpectedError end end @@ -267,10 +267,10 @@ def perform(result_value = nil, raise_error: false) end it 'returns the error' do - result, error = good_job.perform + result = good_job.perform - expect(result).to eq nil - expect(error).to be_an_instance_of ExpectedError + expect(result.value).to eq nil + expect(result.handled_error).to be_an_instance_of ExpectedError end end end @@ -312,10 +312,10 @@ def perform(result_value = nil, raise_error: false) end it 'returns the results of the job' do - result, error = good_job.perform + result = good_job.perform - expect(result).to be_nil - expect(error).to be_a(ExpectedError) + expect(result.value).to be_nil + expect(result.handled_error).to be_a(ExpectedError) end it 'destroys the job' do @@ -341,10 +341,10 @@ def perform(result_value = nil, raise_error: false) let(:active_job) { ExampleJob.new("a string", raise_error: true) } it 'returns the results of the job' do - result, error = good_job.perform + result = good_job.perform - expect(result).to be_nil - expect(error).to be_a(ExpectedError) + expect(result.value).to eq nil + expect(result.unhandled_error).to be_a(ExpectedError) end describe 'GoodJob.reperform_jobs_on_standard_error behavior' do