Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use GoodJob::Job::ExecutionResult object instead of job execution returning an ordered array #241

Merged
merged 1 commit into from
Apr 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions lib/good_job/execution_result.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
module GoodJob
# Stores the results of job execution
class ExecutionResult
# @return [Object, nil]
attr_reader :value
# @return [Exception, nil]
attr_reader :handled_error
# @return [Exception, nil]
attr_reader :unhandled_error

# @param value [Object, nil]
# @param handled_error [Exception, nil]
# @param unhandled_error [Exception, nil]
def initialize(value:, handled_error: nil, unhandled_error: nil)
@value = value
@handled_error = handled_error
@unhandled_error = unhandled_error
end
end
end
61 changes: 28 additions & 33 deletions lib/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Job < Object.const_get(GoodJob.active_record_parent_class)

# Parse a string representing a group of queues into a more readable data
# structure.
# @param string [String] Queue string
# @return [Hash]
# How to match a given queue. It can have the following keys and values:
# - +{ all: true }+ indicates that all queues match.
Expand Down Expand Up @@ -134,29 +135,26 @@ def self.queue_parser(string)

# Finds the next eligible Job, acquire an advisory lock related to it, and
# executes the job.
# @return [Array<(GoodJob::Job, Object, Exception)>, nil]
# @return [ExecutionResult, nil]
# If a job was executed, returns an array with the {Job} record, the
# return value for the job's +#perform+ method, and the exception the job
# raised, if any (if the job raised, then the second array entry will be
# +nil+). If there were no jobs to execute, returns +nil+.
def self.perform_with_advisory_lock
good_job = nil
result = nil
error = nil

unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock do |good_jobs|
good_job = good_jobs.first
# TODO: Determine why some records are fetched without an advisory lock at all
break unless good_job&.executable?

result, error = good_job.perform
good_job.perform
end

[good_job, result, error] if good_job
end

# Fetches the scheduled execution time of the next eligible Job(s).
# @return [Array<(DateTime)>]
# @param after [DateTime]
# @param limit [Integer]
# @param now_limit [Integer, nil]
# @return [Array<DateTime>]
def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil)
query = advisory_unlocked.unfinished.schedule_ordered

Expand All @@ -182,7 +180,6 @@ def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil)
# @return [Job]
# The new {Job} instance representing the queued ActiveJob job.
def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false)
good_job = nil
ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload|
good_job = GoodJob::Job.new(
queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME,
Expand All @@ -196,49 +193,37 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false

good_job.save!
active_job.provider_job_id = good_job.id
end

good_job
good_job
end
end

# Execute the ActiveJob job this {Job} represents.
# @return [Array<(Object, Exception)>]
# @return [ExecutionResult]
# An array of the return value of the job's +#perform+ method and the
# exception raised by the job, if any. If the job completed successfully,
# the second array entry (the exception) will be +nil+ and vice versa.
def perform
raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at

GoodJob::CurrentExecution.reset

self.performed_at = Time.current
save! if GoodJob.preserve_job_records

result, unhandled_error = execute

result_error = nil
if result.is_a?(Exception)
result_error = result
result = nil
end

job_error = unhandled_error ||
result_error ||
GoodJob::CurrentExecution.error_on_retry ||
GoodJob::CurrentExecution.error_on_discard
result = execute

job_error = result.handled_error || result.unhandled_error
self.error = "#{job_error.class}: #{job_error.message}" if job_error

if unhandled_error && GoodJob.retry_on_unhandled_error
if result.unhandled_error && GoodJob.retry_on_unhandled_error
save!
elsif GoodJob.preserve_job_records == true || (unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error)
elsif GoodJob.preserve_job_records == true || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error)
self.finished_at = Time.current
save!
else
destroy!
end

[result, job_error]
result
end

# Tests whether this job is safe to be executed by this thread.
Expand All @@ -249,16 +234,26 @@ def executable?

private

# @return [GoodJob::ExecutionResult]
def execute
params = serialized_params.merge(
"provider_job_id" => id
)

GoodJob::CurrentExecution.reset
ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self, process_id: GoodJob::CurrentExecution.process_id, thread_name: GoodJob::CurrentExecution.thread_name }) do
[ActiveJob::Base.execute(params), nil]
value = ActiveJob::Base.execute(params)

if value.is_a?(Exception)
handled_error = value
value = nil
end
handled_error ||= GoodJob::CurrentExecution.error_on_retry || GoodJob::CurrentExecution.error_on_discard

ExecutionResult.new(value: value, handled_error: handled_error)
rescue StandardError => e
ExecutionResult.new(value: nil, unhandled_error: e)
end
rescue StandardError => e
[nil, e]
end
end
end
62 changes: 31 additions & 31 deletions spec/lib/good_job/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,19 @@ def perform(result_value = nil, raise_error: false)
expect { good_job_2.reload }.not_to raise_error
end

it 'returns the good_job, result, and error object if there is a result; nil if not' do
worked_good_job, worked_result, worked_error = described_class.all.perform_with_advisory_lock
it 'returns the result or nil if not' do
result = described_class.all.perform_with_advisory_lock

expect(worked_good_job).to eq good_job
expect(worked_result).to eq 'a string'
expect(worked_error).to eq nil
expect(result).to be_a GoodJob::ExecutionResult
expect(result.value).to eq 'a string'
expect(result.unhandled_error).to eq nil

e_good_job = described_class.enqueue(ExampleJob.new(true, raise_error: true))
errored_good_job, errored_result, errored_error = described_class.all.perform_with_advisory_lock
described_class.enqueue(ExampleJob.new(true, raise_error: true))
errored_result = described_class.all.perform_with_advisory_lock

expect(errored_good_job).to eq e_good_job
expect(errored_result).to eq nil
expect(errored_error).to be_an ExpectedError
expect(result).to be_a GoodJob::ExecutionResult
expect(errored_result.value).to eq nil
expect(errored_result.unhandled_error).to be_an ExpectedError
end
end

Expand Down Expand Up @@ -188,20 +188,20 @@ def perform(result_value = nil, raise_error: false)

describe 'return value' do
it 'returns the results of the job' do
result, error = good_job.perform
result = good_job.perform

expect(result).to eq "a string"
expect(error).to be_nil
expect(result.value).to eq "a string"
expect(result.unhandled_error).to be_nil
end

context 'when there is an error' do
let(:active_job) { ExampleJob.new("whoops", raise_error: true) }

it 'returns the error' do
result, error = good_job.perform
result = good_job.perform

expect(result).to eq nil
expect(error).to be_an_instance_of ExpectedError
expect(result.value).to eq nil
expect(result.unhandled_error).to be_an_instance_of ExpectedError
end

context 'when there is an retry handler with exhausted attempts' do
Expand Down Expand Up @@ -240,10 +240,10 @@ def perform(result_value = nil, raise_error: false)
end

it 'returns the error' do
result, error = good_job.perform
result = good_job.perform

expect(result).to eq nil
expect(error).to be_an_instance_of ExpectedError
expect(result.value).to eq nil
expect(result.unhandled_error).to be_an_instance_of ExpectedError
end

if Gem::Version.new(Rails.version) > Gem::Version.new("6")
Expand All @@ -254,10 +254,10 @@ def perform(result_value = nil, raise_error: false)
end

it 'returns the error' do
result, error = good_job.perform
result = good_job.perform

expect(result).to eq nil
expect(error).to be_an_instance_of ExpectedError
expect(result.value).to eq nil
expect(result.handled_error).to be_an_instance_of ExpectedError
end
end

Expand All @@ -267,10 +267,10 @@ def perform(result_value = nil, raise_error: false)
end

it 'returns the error' do
result, error = good_job.perform
result = good_job.perform

expect(result).to eq nil
expect(error).to be_an_instance_of ExpectedError
expect(result.value).to eq nil
expect(result.handled_error).to be_an_instance_of ExpectedError
end
end
end
Expand Down Expand Up @@ -312,10 +312,10 @@ def perform(result_value = nil, raise_error: false)
end

it 'returns the results of the job' do
result, error = good_job.perform
result = good_job.perform

expect(result).to be_nil
expect(error).to be_a(ExpectedError)
expect(result.value).to be_nil
expect(result.handled_error).to be_a(ExpectedError)
end

it 'destroys the job' do
Expand All @@ -341,10 +341,10 @@ def perform(result_value = nil, raise_error: false)
let(:active_job) { ExampleJob.new("a string", raise_error: true) }

it 'returns the results of the job' do
result, error = good_job.perform
result = good_job.perform

expect(result).to be_nil
expect(error).to be_a(ExpectedError)
expect(result.value).to eq nil
expect(result.unhandled_error).to be_a(ExpectedError)
end

describe 'GoodJob.reperform_jobs_on_standard_error behavior' do
Expand Down