Skip to content

Commit

Permalink
Merge pull request #410 from rails/fix-queue-selection
Browse files Browse the repository at this point in the history
Fix queue order when combining multiple prefixes or prefixes and names
  • Loading branch information
rosa authored Nov 16, 2024
2 parents 42ce2ac + 6276b4e commit 24bad6f
Show file tree
Hide file tree
Showing 15 changed files with 361 additions and 68 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,3 @@ jobs:
bin/rails db:setup
- name: Run tests
run: bin/rails test
- name: Run tests with separate connection
run: SEPARATE_CONNECTION=1 bin/rails test
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
/tmp/
/test/dummy/db/*.sqlite3
/test/dummy/db/*.sqlite3-*
/test/dummy/log/*.log
/test/dummy/log/*.log*
/test/dummy/tmp/

# Folder for JetBrains IDEs
Expand Down
1 change: 1 addition & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ AllCops:
TargetRubyVersion: 3.0
Exclude:
- "test/dummy/db/schema.rb"
- "test/dummy/db/queue_schema.rb"
64 changes: 64 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ Here's an overview of the different options:
This will create a worker fetching jobs from all queues starting with `staging`. The wildcard `*` is only allowed on its own or at the end of a queue name; you can't specify queue names such as `*_some_queue`. These will be ignored.

Finally, you can combine prefixes with exact names, like `[ staging*, background ]`, and the behaviour with respect to order will be the same as with only exact names.

Check the sections below on [how queue order behaves combined with priorities](#queue-order-and-priorities), and [how the way you specify the queues per worker might affect performance](#queues-specification-and-performance).

- `threads`: this is the max size of the thread pool that each worker will have to run jobs. Each worker will fetch this number of jobs from their queue(s), at most and will post them to the thread pool to be run. By default, this is `3`. Only workers have this setting.
- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting.
- `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else.
Expand All @@ -164,6 +167,67 @@ This is useful when you run jobs with different importance or urgency in the sam

We recommend not mixing queue order with priorities but either choosing one or the other, as that will make job execution order more straightforward for you.

### Queues specification and performance

To keep polling performant and ensure a covering index is always used, Solid Queue only does two types of polling queries:
```sql
-- No filtering by queue
SELECT job_id
FROM solid_queue_ready_executions
ORDER BY priority ASC, job_id ASC
LIMIT ?
FOR UPDATE SKIP LOCKED;
-- Filtering by a single queue
SELECT job_id
FROM solid_queue_ready_executions
WHERE queue_name = ?
ORDER BY priority ASC, job_id ASC
LIMIT ?
FOR UPDATE SKIP LOCKED;
```

The first one (no filtering by queue) is used when you specify
```yml
queues: *
```
and there aren't any queues paused, as we want to target all queues.

In other cases, we need to have a list of queues to filter by, in order, because we can only filter by a single queue at a time to ensure we use an index to sort. This means that if you specify your queues as:
```yml
queues: beta*
```

we'll need to get a list of all existing queues matching that prefix first, with a query that would look like this:
```sql
SELECT DISTINCT(queue_name)
FROM solid_queue_ready_executions
WHERE queue_name LIKE 'beta%';
```

This type of `DISTINCT` query on a column that's the leftmost column in an index can be performed very fast in MySQL thanks to a technique called [Loose Index Scan](https://dev.mysql.com/doc/refman/8.0/en/group-by-optimization.html#loose-index-scan). PostgreSQL and SQLite, however, don't implement this technique, which means that if your `solid_queue_ready_executions` table is very big because your queues get very deep, this query will get slow. Normally your `solid_queue_ready_executions` table will be small, but it can happen.

Similarly to using prefixes, the same will happen if you have paused queues, because we need to get a list of all queues with a query like
```sql
SELECT DISTINCT(queue_name)
FROM solid_queue_ready_executions
```

and then remove the paused ones. Pausing in general should be something rare, used in special circumstances, and for a short period of time. If you don't want to process jobs from a queue anymore, the best way to do that is to remove it from your list of queues.

💡 To sum up, **if you want to ensure optimal performance on polling**, the best way to do that is to always specify exact names for them, and not have any queues paused.

Do this:

```yml
queues: background, backend
```

instead of this:
```yml
queues: back*
```


### Threads, processes and signals

Expand Down
40 changes: 35 additions & 5 deletions app/models/solid_queue/queue_selector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,20 @@ def queue_names
def eligible_queues
if include_all_queues? then all_queues
else
exact_names + prefixed_names
in_raw_order(exact_names + prefixed_names)
end
end

def include_all_queues?
"*".in? raw_queues
end

def all_queues
relation.distinct(:queue_name).pluck(:queue_name)
end

def exact_names
raw_queues.select { |queue| !queue.include?("*") }
raw_queues.select { |queue| exact_name?(queue) }
end

def prefixed_names
Expand All @@ -54,15 +58,41 @@ def prefixed_names
end

def prefixes
@prefixes ||= raw_queues.select { |queue| queue.ends_with?("*") }.map { |queue| queue.tr("*", "%") }
@prefixes ||= raw_queues.select { |queue| prefixed_name?(queue) }.map { |queue| queue.tr("*", "%") }
end

def all_queues
relation.distinct(:queue_name).pluck(:queue_name)
def exact_name?(queue)
!queue.include?("*")
end

def prefixed_name?(queue)
queue.ends_with?("*")
end

def paused_queues
@paused_queues ||= Pause.all.pluck(:queue_name)
end

def in_raw_order(queues)
# Only need to sort if we have prefixes and more than one queue name.
# Exact names are selected in the same order as they're found
if queues.one? || prefixes.empty?
queues
else
queues = queues.dup
raw_queues.flat_map { |raw_queue| delete_in_order(raw_queue, queues) }.compact
end
end

def delete_in_order(raw_queue, queues)
if exact_name?(raw_queue)
queues.delete(raw_queue)
elsif prefixed_name?(raw_queue)
prefix = raw_queue.tr("*", "")
queues.select { |queue| queue.start_with?(prefix) }.tap do |matches|
queues -= matches
end
end
end
end
end
14 changes: 11 additions & 3 deletions bin/setup
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,19 @@
set -eu
cd "$(dirname "${BASH_SOURCE[0]}")"

docker-compose up -d --remove-orphans
docker-compose ps
if docker compose version &> /dev/null; then
DOCKER_COMPOSE_CMD="docker compose"
else
DOCKER_COMPOSE_CMD="docker-compose"
fi

$DOCKER_COMPOSE_CMD up -d --remove-orphans
$DOCKER_COMPOSE_CMD ps

bundle

echo "Creating databases..."

rails db:reset
rails db:reset TARGET_DB=sqlite
rails db:reset TARGET_DB=mysql
rails db:reset TARGET_DB=postgres
6 changes: 6 additions & 0 deletions test/dummy/bin/jobs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env ruby

require_relative "../config/environment"
require "solid_queue/cli"

SolidQueue::Cli.start(ARGV)
4 changes: 0 additions & 4 deletions test/dummy/config/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,5 @@ class Application < Rails::Application
# config.eager_load_paths << Rails.root.join("extras")

config.active_job.queue_adapter = :solid_queue

if ENV["SEPARATE_CONNECTION"] && ENV["TARGET_DB"] != "sqlite"
config.solid_queue.connects_to = { database: { writing: :primary, reading: :replica } }
end
end
end
16 changes: 8 additions & 8 deletions test/dummy/config/database.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@ default: &default
development:
primary:
<<: *default
database: <%= database_name_from("solid_queue_development") %>
replica:
database: <%= database_name_from("development") %>
queue:
<<: *default
database: <%= database_name_from("solid_queue_development") %>
replica: true
database: <%= database_name_from("development_queue") %>
migrations_paths: db/queue_migrate

test:
primary:
<<: *default
pool: 20
database: <%= database_name_from("solid_queue_test") %>
replica:
database: <%= database_name_from("test") %>
queue:
<<: *default
pool: 20
database: <%= database_name_from("solid_queue_test") %>
replica: true
database: <%= database_name_from("test_queue") %>
migrations_paths: db/queue_migrate
4 changes: 4 additions & 0 deletions test/dummy/config/environments/development.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
# Raises error for missing translations.
# config.i18n.raise_on_missing_translations = true

# Replace the default in-process and non-durable queuing backend for Active Job.
config.active_job.queue_adapter = :solid_queue
config.solid_queue.connects_to = { database: { writing: :queue } }

# Annotate rendered view with file names.
# config.action_view.annotate_rendered_view_with_filenames = true

Expand Down
6 changes: 4 additions & 2 deletions test/dummy/config/environments/production.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@
# Use a different cache store in production.
# config.cache_store = :mem_cache_store

# Use a real queuing backend for Active Job (and separate queues per environment).
# config.active_job.queue_adapter = :resque
# Replace the default in-process and non-durable queuing backend for Active Job.
config.active_job.queue_adapter = :solid_queue
config.solid_queue.connects_to = { database: { writing: :queue } }

# config.active_job.queue_name_prefix = "dummy_production"

# Enable locale fallbacks for I18n (makes lookups for any locale fall back to
Expand Down
4 changes: 4 additions & 0 deletions test/dummy/config/environments/test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
# Raises error for missing translations.
# config.i18n.raise_on_missing_translations = true

# Replace the default in-process and non-durable queuing backend for Active Job.
config.active_job.queue_adapter = :solid_queue
config.solid_queue.connects_to = { database: { writing: :queue } }

# Annotate rendered view with file names.
# config.action_view.annotate_rendered_view_with_filenames = true

Expand Down
Loading

0 comments on commit 24bad6f

Please sign in to comment.