Skip to content

Commit

Permalink
Support bulk enqueue with differing class and options
Browse files Browse the repository at this point in the history
Allow bulk enqueue of multiple different job classes with differing
job options in a single `Que.bulk_enqueue` block.
  • Loading branch information
dtcristo committed Mar 4, 2024
1 parent 6aac827 commit 5674609
Showing 1 changed file with 78 additions and 73 deletions.
151 changes: 78 additions & 73 deletions lib/que/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,18 @@ class Job

SQL[:bulk_insert_jobs] =
%{
WITH args_and_kwargs as (
SELECT * from json_to_recordset(coalesce($5, '[{args:{},kwargs:{}}]')::json) as x(args jsonb, kwargs jsonb)
)
INSERT INTO public.que_jobs
(queue, priority, run_at, job_class, args, kwargs, data, job_schema_version)
SELECT
coalesce($1, 'default')::text,
coalesce($2, 100)::smallint,
coalesce($3, now())::timestamptz,
$4::text,
args_and_kwargs.args,
args_and_kwargs.kwargs,
coalesce($6, '{}')::jsonb,
coalesce(queue, 'default')::text,
coalesce(priority, 100)::smallint,
coalesce(run_at, now())::timestamptz,
job_class,
coalesce(args, '[]')::jsonb,
coalesce(kwargs, '{}')::jsonb,
coalesce(data, '{}')::jsonb,
#{Que.job_schema_version}
FROM args_and_kwargs
FROM json_populate_recordset(null::que_jobs, $1)
RETURNING *
}

Expand Down Expand Up @@ -82,6 +79,9 @@ def enqueue(*args)

job_options = kwargs.delete(:job_options) || {}

job_class = job_options[:job_class] || name ||
raise(Error, "Can't enqueue an anonymous subclass of Que::Job")

if job_options[:tags]
if job_options[:tags].length > MAXIMUM_TAGS_COUNT
raise Que::Error, "Can't enqueue a job with more than #{MAXIMUM_TAGS_COUNT} tags! (passed #{job_options[:tags].length})"
Expand All @@ -94,28 +94,42 @@ def enqueue(*args)
end
end

attrs = {
queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue,
priority: job_options[:priority] || resolve_que_setting(:priority),
run_at: job_options[:run_at] || resolve_que_setting(:run_at),
args: args,
kwargs: kwargs,
data: job_options[:tags] ? { tags: job_options[:tags] } : {},
job_class: \
job_options[:job_class] || name ||
raise(Error, "Can't enqueue an anonymous subclass of Que::Job"),
}

if Thread.current[:que_jobs_to_bulk_insert]
# Don't resolve class settings during `.enqueue`, only resolve them
# during `._bulk_enqueue_insert` so they can be overwritten by specifying
# them in `.bulk_enqueue`.
attrs = {
queue: job_options[:queue],
priority: job_options[:priority],
run_at: job_options[:run_at],
job_class: job_class == 'Que::Job' ? nil : job_class,
args: args,
kwargs: kwargs,
data: job_options[:tags] && { tags: job_options[:tags] },
klass: self,
}

if self.name == 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper'
raise Que::Error, "Que.bulk_enqueue does not support ActiveJob."
end

raise Que::Error, "When using .bulk_enqueue, job_options must be passed to that method rather than .enqueue" unless job_options == {}

Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] << attrs
new({})
elsif attrs[:run_at].nil? && resolve_que_setting(:run_synchronously)
return new({})
end

attrs = {
queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue,
priority: job_options[:priority] || resolve_que_setting(:priority),
run_at: job_options[:run_at] || resolve_que_setting(:run_at),
job_class: job_class,
args: args,
kwargs: kwargs,
data: job_options[:tags] ? { tags: job_options[:tags] } : {},
}

if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously)
attrs.merge!(
args: Que.deserialize_json(Que.serialize_json(attrs[:args])),
kwargs: Que.deserialize_json(Que.serialize_json(attrs[:kwargs])),
Expand Down Expand Up @@ -144,16 +158,13 @@ def bulk_enqueue(job_options: {}, notify: false)
jobs_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs]
job_options = Thread.current[:que_jobs_to_bulk_insert][:job_options]
return [] if jobs_attrs.empty?
raise Que::Error, "When using .bulk_enqueue, all jobs enqueued must be of the same job class" unless jobs_attrs.map { |attrs| attrs[:job_class] }.uniq.one?
args_and_kwargs_array = jobs_attrs.map { |attrs| attrs.slice(:args, :kwargs) }
klass = job_options[:job_class] ? Que::Job : Que.constantize(jobs_attrs.first[:job_class])
klass._bulk_enqueue_insert(args_and_kwargs_array, job_options: job_options, notify: notify)
_bulk_enqueue_insert(jobs_attrs, job_options: job_options, notify: notify)
ensure
Thread.current[:que_jobs_to_bulk_insert] = nil
end

def _bulk_enqueue_insert(args_and_kwargs_array, job_options: {}, notify:)
raise 'Unexpected bulk args format' if !args_and_kwargs_array.is_a?(Array) || !args_and_kwargs_array.all? { |a| a.is_a?(Hash) }
def _bulk_enqueue_insert(jobs_attrs, job_options: {}, notify: false)
raise 'Unexpected bulk args format' if !jobs_attrs.is_a?(Array) || !jobs_attrs.all? { |a| a.is_a?(Hash) }

if job_options[:tags]
if job_options[:tags].length > MAXIMUM_TAGS_COUNT
Expand All @@ -167,49 +178,43 @@ def _bulk_enqueue_insert(args_and_kwargs_array, job_options: {}, notify:)
end
end

args_and_kwargs_array = args_and_kwargs_array.map do |args_and_kwargs|
args_and_kwargs.merge(
args: args_and_kwargs.fetch(:args, []),
kwargs: args_and_kwargs.fetch(:kwargs, {}),
)
end

attrs = {
queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue,
priority: job_options[:priority] || resolve_que_setting(:priority),
run_at: job_options[:run_at] || resolve_que_setting(:run_at),
args_and_kwargs_array: args_and_kwargs_array,
data: job_options[:tags] ? { tags: job_options[:tags] } : {},
job_class: \
job_options[:job_class] || name ||
raise(Error, "Can't enqueue an anonymous subclass of Que::Job"),
}

if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously)
args_and_kwargs_array = Que.deserialize_json(Que.serialize_json(attrs.delete(:args_and_kwargs_array)))
args_and_kwargs_array.map do |args_and_kwargs|
_run_attrs(
attrs.merge(
args: args_and_kwargs.fetch(:args),
kwargs: args_and_kwargs.fetch(:kwargs),
),
jobs_attrs = jobs_attrs.map do |attrs|
klass = attrs[:klass] || self

attrs = {
queue: attrs[:queue] || job_options[:queue] || klass.resolve_que_setting(:queue) || Que.default_queue,
priority: attrs[:priority] || job_options[:priority] || klass.resolve_que_setting(:priority),
run_at: attrs[:run_at] || job_options[:run_at] || klass.resolve_que_setting(:run_at),
job_class: attrs[:job_class] || job_options[:job_class] || klass.name,
args: attrs[:args] || [],
kwargs: attrs[:kwargs] || {},
data: attrs[:data] || (job_options[:tags] ? { tags: job_options[:tags] } : {}),
klass: klass
}

if attrs[:run_at].nil? && klass.resolve_que_setting(:run_synchronously)
klass._run_attrs(
attrs.except(:klass).merge(
args: Que.deserialize_json(Que.serialize_json(attrs[:args])),
kwargs: Que.deserialize_json(Que.serialize_json(attrs[:kwargs])),
data: Que.deserialize_json(Que.serialize_json(attrs[:data])),
)
)
nil
else
attrs
end
else
attrs.merge!(
args_and_kwargs_array: Que.serialize_json(attrs[:args_and_kwargs_array]),
data: Que.serialize_json(attrs[:data]),
)
values_array =
Que.transaction do
Que.execute('SET LOCAL que.skip_notify TO true') unless notify
Que.execute(
:bulk_insert_jobs,
attrs.values_at(:queue, :priority, :run_at, :job_class, :args_and_kwargs_array, :data),
)
end
values_array.map(&method(:new))
end
end.compact

values_array =
Que.transaction do
Que.execute('SET LOCAL que.skip_notify TO true') unless notify
Que.execute(
:bulk_insert_jobs,
[Que.serialize_json(jobs_attrs.map { |attrs| attrs.except(:klass) })]
)
end
values_array.zip(jobs_attrs).map { |values, attrs| attrs.fetch(:klass).new(values) }
end

def run(*args)
Expand Down Expand Up @@ -237,7 +242,7 @@ def resolve_que_setting(setting, *args)
end
end

private
protected

def _run_attrs(attrs)
attrs[:error_count] = 0
Expand Down

0 comments on commit 5674609

Please sign in to comment.