Skip to content

Commit

Permalink
use yugabyte adapter for getting the lock connection
Browse files Browse the repository at this point in the history
  • Loading branch information
ankithads committed Jun 25, 2024
1 parent f29a8b8 commit 5b3de03
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 307 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,6 @@ jobs:
ruby-version: "${{ matrix.ruby-version }}"
- name: Run specs
run: |
bundle exec rspec
bundle exec rspec
- name: Run Specs With YUAGBUTE_QUE_WORKER_ENABLED Enabled
run: YUAGBUTE_QUE_WORKER_ENABLED=true bundle exec rspec
56 changes: 28 additions & 28 deletions lib/que/adapters/yugabyte.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,34 @@ def checkout_activerecord_adapter(&block)
YugabyteRecord.connection_pool.with_connection(&block)
end

# def establish_lock_database_connection
# Thread.current["lock_database_connection_#{Thread.current.__id__}"] = LockDatabaseRecord.connection
# end
def checkout_lock_database_connection
# when multiple threads are running we need to make sure
# the acquiring and releasing of advisory locks is done by the
# same connection
Thread.current[:db_connection] ||= LockDatabaseRecord.connection_pool.checkout
end

# def lock_database_connection
# # connection = @lock_database_connection[Thread.current.name]
# # return connection unless connection.nil?
# # @lock_database_connection[Thread.current.name] = LockDatabaseRecord.connection
# @lock_database_connection ||= LockDatabaseRecord.connection
# end
def lock_database_connection
Thread.current[:db_connection]
end

def setup_lock_database_connection
::LockDatabaseRecord.connection
def release_lock_database_connection
LockDatabaseRecord.connection_pool.checkin(Thread.current[:db_connection])
end

# def execute(command, params=[])
# if command == :lock_job
# queue, cursor, lock_database_connection = params
# lock_job_with_lock_database(queue, cursor, lock_database_connection)
# elsif command == :unlock_job
# job_id, lock_database_connection = params
# unlock_job(job_id, lock_database_connection)
# else
# super(command, params)
# end
# end
def execute(command, params=[])
if command == :lock_job
queue, cursor = params
lock_job_with_lock_database(queue, cursor)
elsif command == :unlock_job
job_id = params[0]
unlock_job(job_id)
else
super(command, params)
end
end

def lock_job_with_lock_database(queue, cursor, lock_database_connection)
def lock_job_with_lock_database(queue, cursor)
query = QueJob.select(:job_id, :queue, :priority, :run_at, :job_class, :retryable, :args, :error_count)
.select("extract(epoch from (now() - run_at)) as latency")
.where("queue = ? AND job_id >= ? AND run_at <= ?", queue, cursor, Time.now)
Expand All @@ -50,24 +50,24 @@ def lock_job_with_lock_database(queue, cursor, lock_database_connection)
result = Que.execute(query)
return result if result.empty?

if locked?(result.first['job_id'], lock_database_connection)
if locked?(result.first['job_id'])
return result
end

# continue the recursion to fetch the next available job
lock_job_with_lock_database(queue, result.first['job_id'], lock_database_connection)
lock_job_with_lock_database(queue, result.first['job_id'])
end

def cleanup!
YugabyteRecord.connection_pool.release_connection
LockDatabaseRecord.connection_pool.release_connection
end

def locked?(job_id, lock_database_connection)
lock_database_connection.execute("SELECT pg_try_advisory_lock(#{job_id})").first["pg_try_advisory_lock"]
def locked?(job_id)
lock_database_connection.execute("SELECT pg_try_advisory_lock(#{job_id})").try(:first)&.fetch('pg_try_advisory_lock')
end

def unlock_job(job_id, lock_database_connection)
def unlock_job(job_id)
lock_database_connection.execute("SELECT pg_advisory_unlock(#{job_id})")
end
end
Expand Down
29 changes: 13 additions & 16 deletions lib/que/locker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,6 @@ def initialize(queue:, cursor_expiry:, window: nil, budget: nil, secondary_queue
@queue_expires_at = {}
@secondary_queues = secondary_queues
@consolidated_queues = Array.wrap(queue).concat(secondary_queues)
@using_lock_database = ENV.fetch("YUGABYTE_QUE_WORKER_ENABLED", false)

if @using_lock_database
@lock_database_connection = LockDatabaseRecord.connection
end

# Create a bucket that has 100% capacity, so even when we don't apply a limit we
# have a valid bucket that we can use everywhere
@leaky_bucket = LeakyBucket.new(window: window || 1.0, budget: budget || 1.0)
Expand Down Expand Up @@ -126,11 +120,12 @@ def with_locked_job
ensure
if job
observe(UnlockTotal, UnlockSecondsTotal, worked_queue: job[:queue]) do
if @using_lock_database
@lock_database_connection.execute("SELECT pg_advisory_unlock(#{job["job_id"]})")
else
Que.execute("SELECT pg_advisory_unlock($1)", [job[:job_id]])
end
# if @using_lock_database
# @lock_database_connection.execute("SELECT pg_advisory_unlock(#{job["job_id"]})")
# else
# Que.execute("SELECT pg_advisory_unlock($1)", [job[:job_id]])
# end
Que.execute(:unlock_job, [job[:job_id]])
end
end
end
Expand Down Expand Up @@ -158,11 +153,11 @@ def exists?(job)
end

def lock_job_query(queue, cursor)
if @using_lock_database
lock_job_with_lock_database(queue, cursor)
else
# if @using_lock_database
# lock_job_with_lock_database(queue, cursor)
# else
Que.execute(:lock_job, [queue, cursor]).first
end
# end
end

def lock_job_with_lock_database(queue, cursor)
Expand All @@ -183,7 +178,9 @@ def lock_job_with_lock_database(queue, cursor)
end

def locked?(job_id)
@lock_database_connection.execute("SELECT pg_try_advisory_lock(#{job_id})").first["pg_try_advisory_lock"]
result = @lock_database_connection.execute("SELECT pg_try_advisory_lock(#{job_id})")
return false if result.nil?
result.first["pg_try_advisory_lock"]
end

def handle_expired_cursors!
Expand Down
4 changes: 4 additions & 0 deletions lib/que/sql.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ module Que
WHERE locktype = 'advisory'
) pg USING (job_id)
},

unlock_job: %{
SELECT pg_advisory_unlock($1)
}
}
# rubocop:enable Style/MutableConstant
end
207 changes: 105 additions & 102 deletions lib/que/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -142,125 +142,128 @@ def initialize(

def work_loop
return if @stop

Que.adapter.checkout_lock_database_connection if ENV.fetch("YUGABYTE_QUE_WORKER_ENABLED", false)
@tracer.trace(RunningSecondsTotal, queue: @queue, primary_queue: @queue) do
loop do
case event = work
when :postgres_error
Que.logger&.info(event: "que.postgres_error", wake_interval: @wake_interval)
@tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do
sleep(@wake_interval)
loop do
case event = work
when :postgres_error
Que.logger&.info(event: "que.postgres_error", wake_interval: @wake_interval)
@tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do
sleep(@wake_interval)
end
when :job_not_found
Que.logger&.debug(event: "que.job_not_found", wake_interval: @wake_interval)
@tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do
sleep(@wake_interval)
end
when :job_worked
nil # immediately find a new job to work
end
when :job_not_found
Que.logger&.debug(event: "que.job_not_found", wake_interval: @wake_interval)
@tracer.trace(SleepingSecondsTotal, queue: @queue, primary_queue: @queue) do
sleep(@wake_interval)

if @stop
Que.adapter.release_lock_database_connection if ENV.fetch("YUGABYTE_QUE_WORKER_ENABLED", false)
break
end
when :job_worked
nil # immediately find a new job to work
end

break if @stop
end
end
ensure
@stopped = true
end

def work
Que.adapter.checkout do
@locker.with_locked_job do |job|
return :job_not_found if job.nil?

log_keys = {
priority: job["priority"],
# TODO alerting / monitoring pre addition of secondary queues
# assume that a worker would not work a job from another queue.
# With the addition of secondary queues this is no longer true.
# To support moving to the new `primary_queue` field without
# disrupting existing tooling we "lie" and say the queue worked
# is the primary queue. Longer term alerting / monitoring should move
# to look at `primary_queue` and `worked_queue` instead.
queue: @queue,
primary_queue: @queue,
worked_queue: job["queue"],
handler: job["job_class"],
job_class: actual_job_class_name(job["job_class"], job["args"]),
job_error_count: job["error_count"],
que_job_id: job["job_id"],
}

labels = {
job_class: actual_job_class_name(job["job_class"], job["args"]),
priority: job["priority"],
queue: @queue,
primary_queue: @queue,
worked_queue: job["queue"],
}

begin
klass = class_for(job[:job_class])

log_keys.merge!(
(klass.log_context_proc&.call(job) || {}),
)

Que.logger&.info(
log_keys.merge(
event: "que_job.job_begin",
msg: "Job acquired, beginning work",
latency: job["latency"],
),
)

# Note the time spent waiting in the queue before being processed, and update
# the jobs worked count here so that latency_seconds_total / worked_total
# doesn't suffer from skew.
JobLatencySecondsTotal.increment(by: job[:latency], labels: labels)
JobWorkedTotal.increment(labels: labels)

duration = Benchmark.measure do
# TODO: _run -> run_and_destroy(*job[:args])
@tracer.trace(JobWorkedSecondsTotal, labels) do
klass.new(job).tap do |job_instance|
@current_running_job = job_instance
begin
job_instance._run
ensure
@current_running_job = nil
return :job_not_found if job.nil?

log_keys = {
priority: job["priority"],
# TODO alerting / monitoring pre addition of secondary queues
# assume that a worker would not work a job from another queue.
# With the addition of secondary queues this is no longer true.
# To support moving to the new `primary_queue` field without
# disrupting existing tooling we "lie" and say the queue worked
# is the primary queue. Longer term alerting / monitoring should move
# to look at `primary_queue` and `worked_queue` instead.
queue: @queue,
primary_queue: @queue,
worked_queue: job["queue"],
handler: job["job_class"],
job_class: actual_job_class_name(job["job_class"], job["args"]),
job_error_count: job["error_count"],
que_job_id: job["job_id"],
}

labels = {
job_class: actual_job_class_name(job["job_class"], job["args"]),
priority: job["priority"],
queue: @queue,
primary_queue: @queue,
worked_queue: job["queue"],
}

begin
klass = class_for(job[:job_class])

log_keys.merge!(
(klass.log_context_proc&.call(job) || {}),
)

Que.logger&.info(
log_keys.merge(
event: "que_job.job_begin",
msg: "Job acquired, beginning work",
latency: job["latency"],
),
)

# Note the time spent waiting in the queue before being processed, and update
# the jobs worked count here so that latency_seconds_total / worked_total
# doesn't suffer from skew.
JobLatencySecondsTotal.increment(by: job[:latency], labels: labels)
JobWorkedTotal.increment(labels: labels)

duration = Benchmark.measure do
# TODO: _run -> run_and_destroy(*job[:args])
@tracer.trace(JobWorkedSecondsTotal, labels) do
klass.new(job).tap do |job_instance|
@current_running_job = job_instance
begin
job_instance._run
ensure
@current_running_job = nil
end
end
end
end.real

Que.logger&.info(
log_keys.merge(
event: "que_job.job_worked",
msg: "Successfully worked job",
duration: duration,
),
)
rescue StandardError, NotImplementedError, JobTimeoutError => e
JobErrorTotal.increment(labels: labels)
Que.logger&.error(
log_keys.merge(
event: "que_job.job_error",
msg: "Job failed with error",
error: e.inspect,
),
)

# For compatibility with que-failure, we need to allow failure handlers to be
# defined on the job class.
if klass&.respond_to?(:handle_job_failure)
klass.handle_job_failure(e, job)
else
handle_job_failure(e, job)
end
end.real

Que.logger&.info(
log_keys.merge(
event: "que_job.job_worked",
msg: "Successfully worked job",
duration: duration,
),
)
rescue StandardError, NotImplementedError, JobTimeoutError => e
JobErrorTotal.increment(labels: labels)
Que.logger&.error(
log_keys.merge(
event: "que_job.job_error",
msg: "Job failed with error",
error: e.inspect,
),
)

# For compatibility with que-failure, we need to allow failure handlers to be
# defined on the job class.
if klass&.respond_to?(:handle_job_failure)
klass.handle_job_failure(e, job)
else
handle_job_failure(e, job)
end
end
:job_worked
:job_worked
end
end
end
rescue PG::Error, Adapters::UnavailableConnection => _e
# In the event that our Postgres connection is bad, we don't want that error to halt
# the work loop. Instead, we should let the work loop sleep and retry.
Expand Down
Loading

0 comments on commit 5b3de03

Please sign in to comment.