Skip to content

Commit

Permalink
Add a new que adapter to support the que functionality on databse tha…
Browse files Browse the repository at this point in the history
…t does not support advisory locking

What?
This is an attempt to use que with the database that does not support advisory locking.
It will use 2 databases. The primary one will be processing the jobs and uses a 2nd database to acquire the
advisory lock on the job.
  • Loading branch information
ankithads committed Aug 14, 2024
1 parent 8519451 commit 99061e9
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 10 deletions.
61 changes: 56 additions & 5 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: tests
on:
push:

jobs:
jobs:
rubocop:
runs-on: ubuntu-latest
env:
Expand All @@ -24,7 +24,6 @@ jobs:
fail-fast: false
matrix:
ruby_version: ["3.0", "3.1", "3.2", "3.3"]

runs-on: ubuntu-latest
services:
postgres:
Expand All @@ -40,7 +39,6 @@ jobs:
--health-interval 10s
--health-timeout 5s
--health-retries 10
env:
PGDATABASE: que-test
PGUSER: ubuntu
Expand All @@ -63,7 +61,6 @@ jobs:
fail-fast: false
matrix:
ruby_version: ["3.0", "3.1", "3.2", "3.3"]

runs-on: ubuntu-latest
services:
postgres:
Expand All @@ -79,7 +76,6 @@ jobs:
--health-interval 10s
--health-timeout 5s
--health-retries 10
env:
PGDATABASE: que-test
PGUSER: ubuntu
Expand All @@ -96,3 +92,58 @@ jobs:
- name: Run specs
run: |
bundle exec rspec
active_record_with_lock_adapter_rspec:
strategy:
fail-fast: false
matrix:
ruby_version: ["3.0", "3.1", "3.2", "3.3"]
runs-on: ubuntu-latest
services:
postgres:
image: postgres:14.2
env:
POSTGRES_DB: que-test
POSTGRES_USER: ubuntu
POSTGRES_PASSWORD: password
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 10
lock_database:
image: postgres:14.2
env:
POSTGRES_DB: lock-test
POSTGRES_USER: ubuntu
POSTGRES_PASSWORD: password
ports:
- 5434:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 10
env:
PGDATABASE: que-test
PGUSER: ubuntu
PGPASSWORD: password
PGHOST: localhost
BUNDLE_RUBYGEMS__PKG__GITHUB__COM: gocardless-robot-readonly:${{ secrets.GITHUB_TOKEN }}
LOCK_PGDATABASE: lock-test
LOCK_PGUSER: ubuntu
LOCK_PGPASSWORD: password
LOCK_PGHOST: localhost
ADAPTER: ActiveRecordWithLock
steps:
- uses: actions/checkout@v4
- name: Set up Ruby
uses: ruby/setup-ruby@v1
with:
bundler-cache: true
ruby-version: "${{ matrix.ruby-version }}"
- name: Run Specs With ActiveRecordWithLock Adapter
run: bundle exec rspec

1 change: 1 addition & 0 deletions lib/que.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def connection=(connection)
Adapters::ActiveRecord.new
else
case connection.class.to_s
when "Que::Adapters::ActiveRecordWithLock" then connection
when "Sequel::Postgres::Database" then Adapters::Sequel.new(connection)
when "ConnectionPool" then Adapters::ConnectionPool.new(connection)
when "PG::Connection" then Adapters::PG.new(connection)
Expand Down
68 changes: 68 additions & 0 deletions lib/que/adapters/active_record_with_lock.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# frozen_string_literal: true

module Que
module Adapters
class ActiveRecordWithLock < Que::Adapters::ActiveRecord
def initialize(job_connection_pool:, lock_connection_pool:)
@job_connection_pool = job_connection_pool
@lock_connection_pool = lock_connection_pool
super
end

def checkout_activerecord_adapter(&block)
checkout_lock_database_connection do
@job_connection_pool.with_connection(&block)
end
end

def checkout_lock_database_connection(&block)
@lock_connection_pool.with_connection(&block)
end

def execute(command, params = [])
case command
when :lock_job
queue, cursor = params
lock_job_with_lock_database(queue, cursor)
when :unlock_job
job_id = params[0]
unlock_job(job_id)
else
super
end
end

def lock_job_with_lock_database(queue, cursor)
result = []
loop do
result = Que.execute(:find_job_to_lock, [queue, cursor])

break if result.empty?

cursor = result.first["job_id"]
break if pg_try_advisory_lock?(cursor)
end
result
end

def pg_try_advisory_lock?(job_id)
checkout_lock_database_connection do |conn|
conn.execute(
"SELECT pg_try_advisory_lock(#{job_id})",
).try(:first)&.fetch("pg_try_advisory_lock")
end
end

def unlock_job(job_id)
# If for any reason the connection that is used to get this advisory lock
# is corrupted, the lock on this job_id would already be released when the
# connection holding the lock goes bad.
# Now, if a new connection tries to release the non existing lock this would just no op
# by returning false and return a warning "WARNING: you don't own a lock of type ExclusiveLock"
checkout_lock_database_connection do |conn|
conn.execute("SELECT pg_advisory_unlock(#{job_id})")
end
end
end
end
end
1 change: 1 addition & 0 deletions lib/que/adapters/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module Adapters
autoload :PG, "que/adapters/pg"
autoload :Pond, "que/adapters/pond"
autoload :Sequel, "que/adapters/sequel"
autoload :ActiveRecordWithLock, "que/adapters/active_record_with_lock"

class UnavailableConnection < StandardError; end

Expand Down
2 changes: 1 addition & 1 deletion lib/que/locker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def with_locked_job
ensure
if job
observe(UnlockTotal, UnlockSecondsTotal, worked_queue: job[:queue]) do
Que.execute("SELECT pg_advisory_unlock($1)", [job[:job_id]])
Que.execute(:unlock_job, [job[:job_id]])
end
end
end
Expand Down
24 changes: 24 additions & 0 deletions lib/que/sql.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,30 @@ module Que
WHERE locktype = 'advisory'
) pg USING (job_id)
},

unlock_job: %{
SELECT pg_advisory_unlock($1)
},

find_job_to_lock: %{
SELECT
queue,
priority,
run_at,
job_id,
job_class,
retryable,
args,
error_count,
extract(epoch from (now() - run_at)) as latency
FROM que_jobs
WHERE queue = $1::text
AND run_at <= now()
AND retryable = true
AND job_id >= $2
ORDER BY priority, run_at, job_id
LIMIT 1
},
}
# rubocop:enable Style/MutableConstant
end
30 changes: 30 additions & 0 deletions spec/active_record_with_lock_spec_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# frozen_string_literal: true

class LockDatabaseRecord < ActiveRecord::Base
establish_connection(
adapter: "postgresql",
host: ENV.fetch("LOCK_PGHOST", "localhost"),
user: ENV.fetch("LOCK_PGUSER", "postgres"),
password: ENV.fetch("LOCK_PGPASSWORD", "password"),
database: ENV.fetch("LOCK_PGDATABASE", "lock-test"),
port: ENV.fetch("LOCK_PGPORT", 5434),
pool: 5,
)
end

class JobRecord < ActiveRecord::Base
establish_connection(
adapter: "postgresql",
host: ENV.fetch("PGHOST", "localhost"),
user: ENV.fetch("PGUSER", "ubuntu"),
password: ENV.fetch("PGPASSWORD", "password"),
database: ENV.fetch("PGDATABASE", "que-test"),
)
end

def active_record_with_lock_adapter_connection
Que::Adapters::ActiveRecordWithLock.new(
job_connection_pool: JobRecord.connection_pool,
lock_connection_pool: LockDatabaseRecord.connection_pool,
)
end
2 changes: 1 addition & 1 deletion spec/lib/que/locker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def expect_to_work(job)
with_locked_job do |actual_job|
expect(actual_job[:job_id]).to eql(job[:job_id])
expect(Que).to receive(:execute).
with("SELECT pg_advisory_unlock($1)", [job[:job_id]])
with(:unlock_job, [job[:job_id]])

# Destroy the job to simulate the behaviour of the queue, and allow our lock query
# to discover new jobs.
Expand Down
12 changes: 9 additions & 3 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
require_relative "helpers/sleep_job"
require_relative "helpers/interruptible_sleep_job"
require_relative "helpers/user"
require_relative "active_record_with_lock_spec_helper"

def postgres_now
ActiveRecord::Base.connection.execute("SELECT NOW();")[0]["now"]
Expand All @@ -22,16 +23,21 @@ def establish_database_connection
ActiveRecord::Base.establish_connection(
adapter: "postgresql",
host: ENV.fetch("PGHOST", "localhost"),
user: ENV.fetch("PGUSER", "postgres"),
password: ENV.fetch("PGPASSWORD", ""),
user: ENV.fetch("PGUSER", "ubuntu"),
password: ENV.fetch("PGPASSWORD", "password"),
database: ENV.fetch("PGDATABASE", "que-test"),
)
end

establish_database_connection

# Make sure our test database is prepared to run Que
Que.connection = ActiveRecord
Que.connection =
case ENV["ADAPTER"]
when "ActiveRecordWithLock" then active_record_with_lock_adapter_connection
else ActiveRecord
end

Que.migrate!

# Ensure we have a logger, so that we can test the code paths that log
Expand Down

0 comments on commit 99061e9

Please sign in to comment.