Skip to content

Commit

Permalink
Add mock_redis when testing in fake mode
Browse files Browse the repository at this point in the history
close #24
close #25
close #18
  • Loading branch information
mhenrixon committed Nov 24, 2013
1 parent 11ddb8d commit 7787ffc
Show file tree
Hide file tree
Showing 16 changed files with 230 additions and 115 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
Gemfile.lock

.ruby-version
.idea/
3 changes: 3 additions & 0 deletions .rspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
--color
--format progress
--order random
10 changes: 9 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
@@ -1,2 +1,10 @@
source 'http://rubygems.org'
gemspec
gemspec

group :development do
gem 'sidekiq', github: 'mperham/sidekiq'
end

gem 'pry'
gem 'pry-stack_explorer'
gem 'pry-debugger'
12 changes: 4 additions & 8 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
#!/usr/bin/env rake
require "bundler/gem_tasks"
require 'rake/testtask'
Rake::TestTask.new(:test) do |test|
test.libs << 'test'
#SO MUCH NOISE
#test.warning = true
test.pattern = 'test/**/test_*.rb'
end
require 'rspec/core/rake_task'

task :default => :test
RSpec::Core::RakeTask.new(:spec)

task :default => :spec
95 changes: 63 additions & 32 deletions lib/sidekiq-unique-jobs/middleware/client/unique_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,55 @@ module SidekiqUniqueJobs
module Middleware
module Client
class UniqueJobs
def call(worker_class, item, queue)

klass = worker_class_constantize(worker_class)
attr_reader :item, :worker_class

enabled = klass.get_sidekiq_options['unique'] || item['unique']
unique_job_expiration = klass.get_sidekiq_options['unique_job_expiration']
def call(worker_class, item, queue)
@worker_class = worker_class_constantize(worker_class)
@item = item

if enabled
if unique_enabled?
yield if unique?
else
yield
end
end

payload_hash = SidekiqUniqueJobs::PayloadHelper.get_payload(item['class'], item['queue'], item['args'])
def unique?
if testing_enabled?
unique_for_connection?(SidekiqUniqueJobs.redis_mock)
else
Sidekiq.redis do |conn|
unique_for_connection?(conn)
end
end
end

unique = false
def unique_for_connection?(conn)
unique = false
conn.watch(payload_hash)

Sidekiq.redis do |conn|
if conn.get(payload_hash).to_i == 1 ||
(conn.get(payload_hash).to_i == 2 && item['at'])
# if the job is already queued, or is already scheduled and
# we're trying to schedule again, abort
conn.unwatch
else
# if the job was previously scheduled and is now being queued,
# or we've never seen it before
expires_at = unique_job_expiration || SidekiqUniqueJobs::Config.default_expiration
expires_at = ((Time.at(item['at']) - Time.now.utc) + expires_at).to_i if item['at']

conn.watch(payload_hash)

if conn.get(payload_hash).to_i == 1 ||
(conn.get(payload_hash).to_i == 2 && item['at'])
# if the job is already queued, or is already scheduled and
# we're trying to schedule again, abort
conn.unwatch
else
# if the job was previously scheduled and is now being queued,
# or we've never seen it before
expires_at = unique_job_expiration || SidekiqUniqueJobs::Config.default_expiration
expires_at = ((Time.at(item['at']) - Time.now.utc) + expires_at).to_i if item['at']

unique = conn.multi do
# set value of 2 for scheduled jobs, 1 for queued jobs.
conn.setex(payload_hash, expires_at, item['at'] ? 2 : 1)
end
end
unique = conn.multi do
# set value of 2 for scheduled jobs, 1 for queued jobs.
conn.setex(payload_hash, expires_at, item['at'] ? 2 : 1)
end
yield if unique
else
yield
end
unique
end

protected

# Attempt to constantize a string worker_class argument, always
# Attempt to constantize a string worker_class argument, always
# failing back to the original argument.
def worker_class_constantize(worker_class)
if worker_class.is_a?(String)
Expand All @@ -56,7 +62,32 @@ def worker_class_constantize(worker_class)
end
end

private

def payload_hash
SidekiqUniqueJobs::PayloadHelper.get_payload(item['class'], item['queue'], item['args'])
end

# When sidekiq/testing is loaded, the Sidekiq::Testing constant is
# present and testing is enabled.
def testing_enabled?
if Sidekiq.const_defined?('Testing') && Sidekiq::Testing.enabled?
require 'sidekiq-unique-jobs/testing'
return true
end

false
end

def unique_enabled?
worker_class.get_sidekiq_options['unique'] || item['unique']
end

def unique_job_expiration
worker_class.get_sidekiq_options['unique_job_expiration']
end

end
end
end
end
end
6 changes: 6 additions & 0 deletions lib/sidekiq-unique-jobs/testing.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
require 'mock_redis'
module SidekiqUniqueJobs
def self.redis_mock
@redis_mock ||= MockRedis.new
end
end
7 changes: 3 additions & 4 deletions sidekiq-unique-jobs.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ Gem::Specification.new do |gem|
gem.require_paths = ["lib"]
gem.version = SidekiqUniqueJobs::VERSION
gem.add_dependency 'sidekiq', '~> 2.6'
gem.add_development_dependency 'minitest', '~> 3'
gem.add_development_dependency 'sinatra'
gem.add_development_dependency 'slim'
gem.add_development_dependency 'rake'
gem.add_dependency 'mock_redis'
gem.add_development_dependency 'rspec', '>= 2'
gem.add_development_dependency 'rspec-sidekiq'
gem.add_development_dependency 'activesupport', '~> 3'
gem.add_development_dependency 'simplecov'
end
55 changes: 33 additions & 22 deletions test/lib/sidekiq/test_client.rb → spec/lib/client_spec.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
require 'helper'
require 'spec_helper'
require 'celluloid'
require 'sidekiq/worker'
require "sidekiq-unique-jobs"
require 'sidekiq/scheduled'
require 'sidekiq-unique-jobs/middleware/server/unique_jobs'

class TestClient < MiniTest::Unit::TestCase
describe "Client" do
describe 'with real redis' do
before do
Sidekiq.redis = REDIS
Expand All @@ -27,46 +27,54 @@ def run(x)

it 'does not push duplicate messages when configured for unique only' do
QueueWorker.sidekiq_options :unique => true
10.times { Sidekiq::Client.push('class' => TestClient::QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) }
assert_equal 1, Sidekiq.redis {|c| c.llen("queue:customqueue") }
10.times { Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) }
result = Sidekiq.redis {|c| c.llen("queue:customqueue") }
expect(result).to eq 1
end

it 'does not queue duplicates when when calling delay' do
10.times { PlainClass.delay(unique: true, queue: 'customqueue').run(1) }
assert_equal 1, Sidekiq.redis {|c| c.llen("queue:customqueue") }
result = Sidekiq.redis {|c| c.llen("queue:customqueue") }
expect(result).to eq 1
end

it 'does not schedule duplicates when calling perform_in' do
QueueWorker.sidekiq_options :unique => true
10.times { QueueWorker.perform_in(60, [1, 2]) }
assert_equal 1, Sidekiq.redis { |c| c.zcount("schedule", -1, Time.now.to_f + 2 * 60) }
result = Sidekiq.redis { |c| c.zcount("schedule", -1, Time.now.to_f + 2 * 60) }
expect(result).to eq 1
end

it 'enqueues previously scheduled job' do
QueueWorker.sidekiq_options :unique => true
QueueWorker.perform_in(60 * 60, 1, 2)

# time passes and the job is pulled off the schedule:
Sidekiq::Client.push('class' => TestClient::QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2])
Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2])

assert_equal 1, Sidekiq.redis {|c| c.llen("queue:customqueue") }
result = Sidekiq.redis {|c| c.llen("queue:customqueue") }
expect(result).to eq 1
end

it 'sets an expiration when provided by sidekiq options' do
one_hour_expiration = 60 * 60
QueueWorker.sidekiq_options :unique => true, :unique_job_expiration => one_hour_expiration
Sidekiq::Client.push('class' => TestClient::QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2])
Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2])

payload_hash = SidekiqUniqueJobs::PayloadHelper.get_payload("TestClient::QueueWorker", "customqueue", [1, 2])
payload_hash = SidekiqUniqueJobs::PayloadHelper.get_payload("QueueWorker", "customqueue", [1, 2])
actual_expires_at = Sidekiq.redis {|c| c.ttl(payload_hash) }

assert_in_delta one_hour_expiration, actual_expires_at, 2
result = Sidekiq.redis {|c| c.llen("queue:customqueue") }
expect(actual_expires_at).to be_within(2).of(one_hour_expiration)
end

it 'does push duplicate messages when not configured for unique only' do
QueueWorker.sidekiq_options :unique => false
10.times { Sidekiq::Client.push('class' => TestClient::QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) }
assert_equal 10, Sidekiq.redis {|c| c.llen("queue:customqueue") }
10.times { Sidekiq::Client.push('class' => QueueWorker, 'queue' => 'customqueue', 'args' => [1, 2]) }
expect(Sidekiq.redis {|c| c.llen("queue:customqueue") }).to eq 10

result = Sidekiq.redis {|c| c.llen("queue:customqueue") }
expect(result).to eq 10
end

describe 'when unique_args is defined' do
Expand All @@ -87,22 +95,25 @@ class QueueWorkerWithFilterProc < QueueWorker
end

it 'does not push duplicate messages based on args filter method' do
assert TestClient::QueueWorkerWithFilterMethod.respond_to?(:args_filter)
assert_equal :args_filter, TestClient::QueueWorkerWithFilterMethod.get_sidekiq_options['unique_args']
expect(QueueWorkerWithFilterMethod).to respond_to(:args_filter)
expect(QueueWorkerWithFilterMethod.get_sidekiq_options['unique_args']).to eq :args_filter


for i in (0..10).to_a
Sidekiq::Client.push('class' => TestClient::QueueWorkerWithFilterMethod, 'queue' => 'customqueue', 'args' => [1, i])
Sidekiq::Client.push('class' => QueueWorkerWithFilterMethod, 'queue' => 'customqueue', 'args' => [1, i])
end
assert_equal 1, Sidekiq.redis {|c| c.llen("queue:customqueue") }
result = Sidekiq.redis {|c| c.llen("queue:customqueue") }
expect(result).to eq 1
end

it 'does not push duplicate messages based on args filter proc' do
assert_kind_of Proc, TestClient::QueueWorkerWithFilterProc.get_sidekiq_options['unique_args']
expect(QueueWorkerWithFilterProc.get_sidekiq_options['unique_args']).to be_a(Proc)

10.times do
Sidekiq::Client.push('class' => TestClient::QueueWorkerWithFilterProc, 'queue' => 'customqueue', 'args' => [ 1, {:random => rand(), :name => "foobar"} ])
Sidekiq::Client.push('class' => QueueWorkerWithFilterProc, 'queue' => 'customqueue', 'args' => [ 1, {:random => rand(), :name => "foobar"} ])
end
assert_equal 1, Sidekiq.redis {|c| c.llen("queue:customqueue") }
result = Sidekiq.redis {|c| c.llen("queue:customqueue") }
expect(result).to eq 1
end
end

Expand All @@ -116,12 +127,12 @@ class QueueWorkerWithFilterProc < QueueWorker
expected_expires_at = (Time.at(at) - Time.now.utc) + SidekiqUniqueJobs::Config.default_expiration

QueueWorker.perform_in(at, 'mike')
payload_hash = SidekiqUniqueJobs::PayloadHelper.get_payload("TestClient::QueueWorker", "customqueue", ['mike'])
payload_hash = SidekiqUniqueJobs::PayloadHelper.get_payload("QueueWorker", "customqueue", ['mike'])

# deconstruct this into a time format we can use to get a decent delta for
actual_expires_at = Sidekiq.redis {|c| c.ttl(payload_hash) }

assert_in_delta expected_expires_at, actual_expires_at, 2
expect(actual_expires_at).to be_within(2).of(expected_expires_at)
end
end
end
34 changes: 34 additions & 0 deletions spec/lib/sidekiq_testing_enabled_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
require 'spec_helper'
require 'sidekiq/worker'
require "sidekiq-unique-jobs"
require 'sidekiq/scheduled'
require 'sidekiq-unique-jobs/middleware/server/unique_jobs'

describe "When Sidekiq::Testing is enabled" do
describe 'when set to :fake!', sidekiq: :fake do
context "with unique worker" do
it "does not push duplicate messages" do
param = 'work'
expect(UniqueWorker).to have_enqueued_jobs(0)
UniqueWorker.perform_async(param)
expect(UniqueWorker).to have_enqueued_jobs(1)
expect(UniqueWorker).to have_enqueued_job(param)
UniqueWorker.perform_async(param)
expect(UniqueWorker).to have_enqueued_jobs(1)
end
end

context "with non-unique worker" do

it "pushes duplicates messages" do
param = 'work'
expect(MyWorker).to have_enqueued_jobs(0)
MyWorker.perform_async(param)
expect(MyWorker).to have_enqueued_jobs(1)
expect(MyWorker).to have_enqueued_job(param)
MyWorker.perform_async(param)
expect(MyWorker).to have_enqueued_jobs(2)
end
end
end
end
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
require 'helper'
require 'spec_helper'
require 'sidekiq/worker'
require 'sidekiq-unique-jobs/middleware/server/unique_jobs'

class TestUnlockOrdering < MiniTest::Unit::TestCase
describe "Unlock order" do
QUEUE = 'unlock_ordering'

class BeforeYieldOrderingWorker
Expand Down Expand Up @@ -41,7 +41,7 @@ def perform
end
end

assert_nil result
expect(result).to eq nil
end
end

Expand All @@ -56,7 +56,7 @@ def perform
end
end

assert_equal '1', result
expect(result).to eq '1'
end
end
end
Expand Down
Loading

1 comment on commit 7787ffc

@liveh2o
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. 👍

Please sign in to comment.