Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] delayed_sidekiq strategy #869

Merged
merged 8 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### New Features

* [#869](https://github.com/toptal/chewy/pull/869): New strategy - `delayed_sidekiq`. Allow passing `strategy: :delayed_sidekiq` option to `SomeIndex.import([1, ...], strategy: :delayed_sidekiq)`. The strategy is compatible with `update_fields` option as well. ([@skcc321][])

### Changes

### Bugs Fixed
Expand Down
74 changes: 74 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,80 @@ The default queue name is `chewy`, you can customize it in settings: `sidekiq.qu
Chewy.settings[:sidekiq] = {queue: :low}
```

#### `:delayed_sidekiq`

It accumulates ids of records to be reindexed during the latency window in redis and then does the reindexing of all accumulated records at once.
The strategy is very useful in case of frequently mutated records.
It supports `update_fields` option, so it will try to select just enough data from the DB

There are three options that can be defined in the index:
```ruby
class CitiesIndex...
strategy_config delayed_sidekiq: {
latency: 3,
margin: 2,
ttl: 60 * 60 * 24,
reindex_wrapper: ->(&reindex) {
ActiveRecord::Base.connected_to(role: :reading) { reindex.call }
}
# latency - will prevent scheduling identical jobs
# margin - main purpose is to cover db replication lag by the margin
# ttl - a chunk expiration time (in seconds)
# reindex_wrapper - lambda that accepts block to wrap that reindex process AR connection block.
}

...
end
```

Also you can define defaults in the `initializers/chewy.rb`
```ruby
Chewy.settings = {
strategy_config: {
delayed_sidekiq: {
latency: 3,
margin: 2,
ttl: 60 * 60 * 24,
reindex_wrapper: ->(&reindex) {
ActiveRecord::Base.connected_to(role: :reading) { reindex.call }
}
}
}
}

```
or in `config/chewy.yml`
```ruby
strategy_config:
delayed_sidekiq:
latency: 3
margin: 2
ttl: <%= 60 * 60 * 24 %>
# reindex_wrapper setting is not possible here!!! use the initializer instead
```

You can use the strategy identically to other strategies
```ruby
Chewy.strategy(:delayed_sidekiq) do
City.popular.map(&:do_some_update_action!)
end
```

The default queue name is `chewy`, you can customize it in settings: `sidekiq.queue_name`
```
Chewy.settings[:sidekiq] = {queue: :low}
```

Explicit call of the reindex using `:delayed_sidekiq strategy`
```ruby
CitiesIndex.import([1, 2, 3], strategy: :delayed_sidekiq)
```

Explicit call of the reindex using `:delayed_sidekiq` strategy with `:update_fields` support
```ruby
CitiesIndex.import([1, 2, 3], update_fields: [:name], strategy: :delayed_sidekiq)
```

#### `:active_job`

This does the same thing as `:atomic`, but using ActiveJob. This will inherit the ActiveJob configuration settings including the `active_job.queue_adapter` setting for the environment. Patch `Chewy::Strategy::ActiveJob::Worker` for index updates improving.
Expand Down
1 change: 1 addition & 0 deletions chewy.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Gem::Specification.new do |spec| # rubocop:disable Metrics/BlockLength

spec.add_development_dependency 'database_cleaner'
spec.add_development_dependency 'elasticsearch-extensions'
spec.add_development_dependency 'mock_redis'
spec.add_development_dependency 'rake'
spec.add_development_dependency 'rspec', '>= 3.7.0'
spec.add_development_dependency 'rspec-collection_matchers'
Expand Down
25 changes: 25 additions & 0 deletions lib/chewy/index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ class Index
pipeline raw_import refresh replication
].freeze

STRATEGY_OPTIONS = {
delayed_sidekiq: %i[latency margin ttl reindex_wrapper]
}.freeze

include Search
include Actions
include Aliases
Expand Down Expand Up @@ -221,6 +225,27 @@ def default_import_options(params)
params.assert_valid_keys(IMPORT_OPTIONS_KEYS)
self._default_import_options = _default_import_options.merge(params)
end

def strategy_config(params = {})
@strategy_config ||= begin
config_struct = Struct.new(*STRATEGY_OPTIONS.keys).new

STRATEGY_OPTIONS.each_with_object(config_struct) do |(strategy, options), res|
res[strategy] = case strategy
when :delayed_sidekiq
Struct.new(*STRATEGY_OPTIONS[strategy]).new.tap do |config|
options.each do |option|
config[option] = params.dig(strategy, option) || Chewy.configuration.dig(:strategy_config, strategy, option)
end

config[:reindex_wrapper] ||= ->(&reindex) { reindex.call } # default wrapper
end
else
raise NotImplementedError, "Unsupported strategy: '#{strategy}'"
end
end
end
end
end
end
end
31 changes: 29 additions & 2 deletions lib/chewy/index/import.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ module ClassMethods
# @option options [true, Integer, Hash] parallel enables parallel import processing with the Parallel gem, accepts the number of workers or any Parallel gem acceptable options
# @return [true, false] false in case of errors
ruby2_keywords def import(*args)
import_routine(*args).blank?
intercept_import_using_strategy(*args).blank?
end

# @!method import!(*collection, **options)
Expand All @@ -84,7 +84,8 @@ module ClassMethods
#
# @raise [Chewy::ImportFailed] in case of errors
ruby2_keywords def import!(*args)
errors = import_routine(*args)
errors = intercept_import_using_strategy(*args)

raise Chewy::ImportFailed.new(self, errors) if errors.present?

true
Expand Down Expand Up @@ -126,6 +127,32 @@ def compose(object, crutches = nil, fields: [])

private

def intercept_import_using_strategy(*args)
args_clone = args.deep_dup
options = args_clone.extract_options!
strategy = options.delete(:strategy)

return import_routine(*args) if strategy.blank?

ids = args_clone.flatten
return {} if ids.blank?
return {argument: {"#{strategy} supports ids only!" => ids}} unless ids.all? do |id|
id.respond_to?(:to_i)
end

case strategy
when :delayed_sidekiq
begin
Chewy::Strategy::DelayedSidekiq::Scheduler.new(self, ids, options).postpone
{} # success. errors handling convention
rescue StandardError => e
{scheduler: {e.message => ids}}
end
else
{argument: {"unsupported strategy: '#{strategy}'" => ids}}
end
end

def import_routine(*args)
return if !args.first.nil? && empty_objects_or_scope?(args.first)

Expand Down
1 change: 1 addition & 0 deletions lib/chewy/strategy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require 'sidekiq'
require 'chewy/strategy/sidekiq'
require 'chewy/strategy/lazy_sidekiq'
require 'chewy/strategy/delayed_sidekiq'
rescue LoadError
nil
end
Expand Down
17 changes: 17 additions & 0 deletions lib/chewy/strategy/delayed_sidekiq.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

module Chewy
class Strategy
class DelayedSidekiq < Sidekiq
require_relative 'delayed_sidekiq/scheduler'

def leave
@stash.each do |type, ids|
next if ids.empty?

DelayedSidekiq::Scheduler.new(type, ids).postpone
end
end
end
end
end
148 changes: 148 additions & 0 deletions lib/chewy/strategy/delayed_sidekiq/scheduler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# frozen_string_literal: true

require_relative '../../index'

# The class is responsible for accumulating in redis [type, ids]
# that were requested to be reindexed during `latency` seconds.
# The reindex job is going to be scheduled after a `latency` seconds.
# that job is going to read accumulated [type, ids] from the redis
# and reindex all them at once.
module Chewy
class Strategy
class DelayedSidekiq
require_relative 'worker'

class Scheduler
DEFAULT_TTL = 60 * 60 * 24 # in seconds
DEFAULT_LATENCY = 10
DEFAULT_MARGIN = 2
DEFAULT_QUEUE = 'chewy'
KEY_PREFIX = 'chewy:delayed_sidekiq'
FALLBACK_FIELDS = 'all'
FIELDS_IDS_SEPARATOR = ';'
IDS_SEPARATOR = ','

def initialize(type, ids, options = {})
@type = type
@ids = ids
@options = options
end

# the diagram:
#
# inputs:
# latency == 2
# reindex_time = Time.current
#
# Parallel OR Sequential triggers of reindex: | What is going on in reindex store (Redis):
# --------------------------------------------------------------------------------------------------
# |
# process 1 (reindex_time): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1]
# Schedule.new(CitiesIndex, [1]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}]
# | & schedule a DelayedSidekiq::Worker at 1679347869 (at + 3)
# | it will zpop chewy:delayed_sidekiq:timechunks up to 1679347866 score and reindex all ids with zpoped keys
# | chewy:delayed_sidekiq:CitiesIndex:1679347866
# |
# |
# process 2 (reindex_time): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2]
# Schedule.new(CitiesIndex, [2]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}]
# | & do not schedule a new worker
# |
# |
# process 1 (reindex_time + (latency - 1).seconds): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2, 3]
# Schedule.new(CitiesIndex, [3]).postpone | chewy:delayed_sidekiq:timechunks = [{ score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}]
# | & do not schedule a new worker
# |
# |
# process 2 (reindex_time + (latency + 1).seconds): | chewy:delayed_sidekiq:CitiesIndex:1679347866 = [1, 2, 3]
# Schedule.new(CitiesIndex, [4]).postpone | chewy:delayed_sidekiq:CitiesIndex:1679347868 = [4]
# | chewy:delayed_sidekiq:timechunks = [
# | { score: 1679347866, "chewy:delayed_sidekiq:CitiesIndex:1679347866"}
# | { score: 1679347868, "chewy:delayed_sidekiq:CitiesIndex:1679347868"}
# | ]
# | & schedule a DelayedSidekiq::Worker at 1679347871 (at + 3)
# | it will zpop chewy:delayed_sidekiq:timechunks up to 1679347868 score and reindex all ids with zpoped keys
# | chewy:delayed_sidekiq:CitiesIndex:1679347866 (in case of failed previous reindex),
# | chewy:delayed_sidekiq:CitiesIndex:1679347868
def postpone
::Sidekiq.redis do |redis|
# warning: Redis#sadd will always return an Integer in Redis 5.0.0. Use Redis#sadd? instead
if redis.respond_to?(:sadd?)
redis.sadd?(timechunk_key, serialize_data)
else
redis.sadd(timechunk_key, serialize_data)
end

redis.expire(timechunk_key, ttl)

unless redis.zrank(timechunks_key, timechunk_key)
redis.zadd(timechunks_key, at, timechunk_key)
redis.expire(timechunks_key, ttl)

::Sidekiq::Client.push(
'queue' => sidekiq_queue,
'at' => at + margin,
'class' => Chewy::Strategy::DelayedSidekiq::Worker,
'args' => [type_name, at]
)
end
end
end

private

attr_reader :type, :ids, :options

# this method returns predictable value that jumps by latency value
# another words each latency seconds it return the same value
def at
@at ||= begin
schedule_at = latency.seconds.from_now.to_f

(schedule_at - (schedule_at % latency)).to_i
end
end

def fields
options[:update_fields].presence || [FALLBACK_FIELDS]
end

def timechunks_key
"#{KEY_PREFIX}:#{type_name}:timechunks"
end

def timechunk_key
"#{KEY_PREFIX}:#{type_name}:#{at}"
end

def serialize_data
[ids.join(IDS_SEPARATOR), fields.join(IDS_SEPARATOR)].join(FIELDS_IDS_SEPARATOR)
end

def type_name
type.name
end

def latency
strategy_config.latency || DEFAULT_LATENCY
end

def margin
strategy_config.margin || DEFAULT_MARGIN
end

def ttl
strategy_config.ttl || DEFAULT_TTL
end

def sidekiq_queue
Chewy.settings.dig(:sidekiq, :queue) || DEFAULT_QUEUE
end

def strategy_config
type.strategy_config.delayed_sidekiq
end
end
end
end
end
Loading