diff --git a/lib/que/job.rb b/lib/que/job.rb index 6093b162..87d6b799 100644 --- a/lib/que/job.rb +++ b/lib/que/job.rb @@ -29,21 +29,18 @@ class Job SQL[:bulk_insert_jobs] = %{ - WITH args_and_kwargs as ( - SELECT * from json_to_recordset(coalesce($5, '[{args:{},kwargs:{}}]')::json) as x(args jsonb, kwargs jsonb) - ) INSERT INTO public.que_jobs (queue, priority, run_at, job_class, args, kwargs, data, job_schema_version) SELECT - coalesce($1, 'default')::text, - coalesce($2, 100)::smallint, - coalesce($3, now())::timestamptz, - $4::text, - args_and_kwargs.args, - args_and_kwargs.kwargs, - coalesce($6, '{}')::jsonb, + coalesce(queue, 'default')::text, + coalesce(priority, 100)::smallint, + coalesce(run_at, now())::timestamptz, + job_class, + coalesce(args, '[]')::jsonb, + coalesce(kwargs, '{}')::jsonb, + coalesce(data, '{}')::jsonb, #{Que.job_schema_version} - FROM args_and_kwargs + FROM json_populate_recordset(null::que_jobs, $1) RETURNING * } @@ -82,6 +79,9 @@ def enqueue(*args) job_options = kwargs.delete(:job_options) || {} + job_class = job_options[:job_class] || name || + raise(Error, "Can't enqueue an anonymous subclass of Que::Job") + if job_options[:tags] if job_options[:tags].length > MAXIMUM_TAGS_COUNT raise Que::Error, "Can't enqueue a job with more than #{MAXIMUM_TAGS_COUNT} tags! (passed #{job_options[:tags].length})" @@ -94,19 +94,21 @@ def enqueue(*args) end end - attrs = { - queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue, - priority: job_options[:priority] || resolve_que_setting(:priority), - run_at: job_options[:run_at] || resolve_que_setting(:run_at), - args: args, - kwargs: kwargs, - data: job_options[:tags] ? { tags: job_options[:tags] } : {}, - job_class: \ - job_options[:job_class] || name || - raise(Error, "Can't enqueue an anonymous subclass of Que::Job"), - } - if Thread.current[:que_jobs_to_bulk_insert] + # Don't resolve class settings during `.enqueue`, only resolve them + # during `._bulk_enqueue_insert` so they can be overwritten by specifying + # them in `.bulk_enqueue`. + attrs = { + queue: job_options[:queue], + priority: job_options[:priority], + run_at: job_options[:run_at], + job_class: job_class == 'Que::Job' ? nil : job_class, + args: args, + kwargs: kwargs, + data: job_options[:tags] && { tags: job_options[:tags] }, + klass: self, + } + if self.name == 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper' raise Que::Error, "Que.bulk_enqueue does not support ActiveJob." end @@ -114,8 +116,20 @@ def enqueue(*args) raise Que::Error, "When using .bulk_enqueue, job_options must be passed to that method rather than .enqueue" unless job_options == {} Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] << attrs - new({}) - elsif attrs[:run_at].nil? && resolve_que_setting(:run_synchronously) + return new({}) + end + + attrs = { + queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue, + priority: job_options[:priority] || resolve_que_setting(:priority), + run_at: job_options[:run_at] || resolve_que_setting(:run_at), + job_class: job_class, + args: args, + kwargs: kwargs, + data: job_options[:tags] ? { tags: job_options[:tags] } : {}, + } + + if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously) attrs.merge!( args: Que.deserialize_json(Que.serialize_json(attrs[:args])), kwargs: Que.deserialize_json(Que.serialize_json(attrs[:kwargs])), @@ -144,16 +158,13 @@ def bulk_enqueue(job_options: {}, notify: false) jobs_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] job_options = Thread.current[:que_jobs_to_bulk_insert][:job_options] return [] if jobs_attrs.empty? - raise Que::Error, "When using .bulk_enqueue, all jobs enqueued must be of the same job class" unless jobs_attrs.map { |attrs| attrs[:job_class] }.uniq.one? - args_and_kwargs_array = jobs_attrs.map { |attrs| attrs.slice(:args, :kwargs) } - klass = job_options[:job_class] ? Que::Job : Que.constantize(jobs_attrs.first[:job_class]) - klass._bulk_enqueue_insert(args_and_kwargs_array, job_options: job_options, notify: notify) + _bulk_enqueue_insert(jobs_attrs, job_options: job_options, notify: notify) ensure Thread.current[:que_jobs_to_bulk_insert] = nil end - def _bulk_enqueue_insert(args_and_kwargs_array, job_options: {}, notify:) - raise 'Unexpected bulk args format' if !args_and_kwargs_array.is_a?(Array) || !args_and_kwargs_array.all? { |a| a.is_a?(Hash) } + def _bulk_enqueue_insert(jobs_attrs, job_options: {}, notify: false) + raise 'Unexpected bulk args format' if !jobs_attrs.is_a?(Array) || !jobs_attrs.all? { |a| a.is_a?(Hash) } if job_options[:tags] if job_options[:tags].length > MAXIMUM_TAGS_COUNT @@ -167,49 +178,43 @@ def _bulk_enqueue_insert(args_and_kwargs_array, job_options: {}, notify:) end end - args_and_kwargs_array = args_and_kwargs_array.map do |args_and_kwargs| - args_and_kwargs.merge( - args: args_and_kwargs.fetch(:args, []), - kwargs: args_and_kwargs.fetch(:kwargs, {}), - ) - end - - attrs = { - queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue, - priority: job_options[:priority] || resolve_que_setting(:priority), - run_at: job_options[:run_at] || resolve_que_setting(:run_at), - args_and_kwargs_array: args_and_kwargs_array, - data: job_options[:tags] ? { tags: job_options[:tags] } : {}, - job_class: \ - job_options[:job_class] || name || - raise(Error, "Can't enqueue an anonymous subclass of Que::Job"), - } - - if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously) - args_and_kwargs_array = Que.deserialize_json(Que.serialize_json(attrs.delete(:args_and_kwargs_array))) - args_and_kwargs_array.map do |args_and_kwargs| - _run_attrs( - attrs.merge( - args: args_and_kwargs.fetch(:args), - kwargs: args_and_kwargs.fetch(:kwargs), - ), + jobs_attrs = jobs_attrs.map do |attrs| + klass = attrs[:klass] || self + + attrs = { + queue: attrs[:queue] || job_options[:queue] || klass.resolve_que_setting(:queue) || Que.default_queue, + priority: attrs[:priority] || job_options[:priority] || klass.resolve_que_setting(:priority), + run_at: attrs[:run_at] || job_options[:run_at] || klass.resolve_que_setting(:run_at), + job_class: attrs[:job_class] || job_options[:job_class] || klass.name, + args: attrs[:args] || [], + kwargs: attrs[:kwargs] || {}, + data: attrs[:data] || (job_options[:tags] ? { tags: job_options[:tags] } : {}), + klass: klass + } + + if attrs[:run_at].nil? && klass.resolve_que_setting(:run_synchronously) + klass._run_attrs( + attrs.except(:klass).merge( + args: Que.deserialize_json(Que.serialize_json(attrs[:args])), + kwargs: Que.deserialize_json(Que.serialize_json(attrs[:kwargs])), + data: Que.deserialize_json(Que.serialize_json(attrs[:data])), + ) ) + nil + else + attrs end - else - attrs.merge!( - args_and_kwargs_array: Que.serialize_json(attrs[:args_and_kwargs_array]), - data: Que.serialize_json(attrs[:data]), - ) - values_array = - Que.transaction do - Que.execute('SET LOCAL que.skip_notify TO true') unless notify - Que.execute( - :bulk_insert_jobs, - attrs.values_at(:queue, :priority, :run_at, :job_class, :args_and_kwargs_array, :data), - ) - end - values_array.map(&method(:new)) - end + end.compact + + values_array = + Que.transaction do + Que.execute('SET LOCAL que.skip_notify TO true') unless notify + Que.execute( + :bulk_insert_jobs, + [Que.serialize_json(jobs_attrs.map { |attrs| attrs.except(:klass) })] + ) + end + values_array.zip(jobs_attrs).map { |values, attrs| attrs.fetch(:klass).new(values) } end def run(*args) @@ -237,7 +242,7 @@ def resolve_que_setting(setting, *args) end end - private + protected def _run_attrs(attrs) attrs[:error_count] = 0