diff --git a/Gemfile b/Gemfile index ae49dc4..c8d58be 100644 --- a/Gemfile +++ b/Gemfile @@ -10,5 +10,5 @@ group :test do gem 'activesupport', :require => false gem 'statsd-ruby', :require => false gem 'rake' - gem 'rspec', '~> 2.0' + gem "rspec", "~> 2.14.1" end diff --git a/lib/qu.rb b/lib/qu.rb index 9eb7a2f..522138d 100644 --- a/lib/qu.rb +++ b/lib/qu.rb @@ -7,6 +7,7 @@ require 'qu/backend/base' require 'qu/backend/instrumented' require 'qu/instrumenters/noop' +require 'qu/runner/direct' require 'qu/worker' require 'forwardable' diff --git a/lib/qu/backend/immediate.rb b/lib/qu/backend/immediate.rb index 655b5a0..d548eb0 100644 --- a/lib/qu/backend/immediate.rb +++ b/lib/qu/backend/immediate.rb @@ -28,6 +28,10 @@ def size(queue = 'default') def clear(queue = 'default') end + + def reconnect + end + end end end diff --git a/lib/qu/backend/instrumented.rb b/lib/qu/backend/instrumented.rb index 9f5c0ae..9df3b6f 100644 --- a/lib/qu/backend/instrumented.rb +++ b/lib/qu/backend/instrumented.rb @@ -15,7 +15,7 @@ def self.wrap(backend) end end - def_delegators :@backend, :connection, :connection= + def_delegators :@backend, :connection, :connection=, :reconnect def initialize(backend) @backend = backend diff --git a/lib/qu/backend/memory.rb b/lib/qu/backend/memory.rb index 8f91ccc..e464c46 100644 --- a/lib/qu/backend/memory.rb +++ b/lib/qu/backend/memory.rb @@ -11,13 +11,20 @@ class Memory < Base def initialize @monitor = Monitor.new @queues = {} + @messages = {} @pending = {} + @connection = @messages + end + + def reconnect end def push(payload) - synchronize do - payload.id = SecureRandom.uuid - queue_for(payload.queue) << payload + payload.id = SecureRandom.uuid + queue_for(payload.queue) do |queue| + queue << payload.id + @messages[payload.id] = dump(payload.attributes_for_push) + payload end end @@ -34,9 +41,13 @@ def abort(payload) alias fail abort - def pop(queue = 'default') - synchronize do - queue_for(queue).shift + def pop(queue_name = 'default') + queue_for(queue_name) do |queue| + if id = queue.shift + payload = Payload.new(load(@messages[id])) + @pending[id] = payload + payload + end end end @@ -44,14 +55,18 @@ def size(queue = 'default') queue_for(queue).size end - def clear(queue = 'default') - synchronize { queue_for(queue).clear } + def clear(queue_name = 'default') + queue_for(queue_name) { |queue| queue.clear } end private def queue_for(queue) - synchronize { @queues[queue] ||= [] } + if block_given? + synchronize { yield(@queues[queue] ||= []) } + else + synchronize { @queues[queue] ||= [] } + end end end diff --git a/lib/qu/backend/mongo.rb b/lib/qu/backend/mongo.rb index 43f1bd7..59103f7 100644 --- a/lib/qu/backend/mongo.rb +++ b/lib/qu/backend/mongo.rb @@ -81,6 +81,10 @@ def connection end end + def reconnect + connection.connection.reconnect + end + private def payload_attributes(payload) diff --git a/lib/qu/backend/redis.rb b/lib/qu/backend/redis.rb index 5307671..6266790 100644 --- a/lib/qu/backend/redis.rb +++ b/lib/qu/backend/redis.rb @@ -51,8 +51,17 @@ def clear(queue = 'default') end def connection - @connection ||= ::Redis::Namespace.new(namespace, :redis => ::Redis.connect(:url => ENV['REDISTOGO_URL'] || ENV['BOXEN_REDIS_URL'])) + @connection ||= self.class.create_connection(namespace) end + + def reconnect + connection.client.reconnect + end + + def self.create_connection(namespace) + ::Redis::Namespace.new(namespace, :redis => ::Redis.connect(:url => ENV['REDISTOGO_URL'] || ENV['BOXEN_REDIS_URL'])) + end + end end end diff --git a/lib/qu/backend/spec.rb b/lib/qu/backend/spec.rb index 69e5e6f..7e76e66 100644 --- a/lib/qu/backend/spec.rb +++ b/lib/qu/backend/spec.rb @@ -47,6 +47,11 @@ class CustomQueue < Qu::Job it "can clear specific queue" do subject.clear('foo') end + + it 'can reconnect' do + subject.reconnect + end + end shared_examples_for 'a backend' do diff --git a/lib/qu/backend/sqs.rb b/lib/qu/backend/sqs.rb index 6370924..c30fa68 100644 --- a/lib/qu/backend/sqs.rb +++ b/lib/qu/backend/sqs.rb @@ -73,6 +73,9 @@ def connection @connection ||= ::AWS::SQS.new end + def reconnect + end + end end end diff --git a/lib/qu/hooks.rb b/lib/qu/hooks.rb index fc46a57..0b7aa7b 100644 --- a/lib/qu/hooks.rb +++ b/lib/qu/hooks.rb @@ -8,13 +8,17 @@ def self.included(base) module ClassMethods def define_hooks(*hooks) hooks.each do |hook| - %w(before after around).each do |kind| - class_eval <<-end_eval, __FILE__, __LINE__ - def self.#{kind}_#{hook}(*methods) - hooks(:#{hook}).add(:#{kind}, *methods) - end - end_eval - end + define_hook_by_kinds(hook, *%w(before after around)) + end + end + + def define_hook_by_kinds( hook, *kinds ) + kinds.each do |kind| + class_eval <<-end_eval, __FILE__, __LINE__ + def self.#{kind}_#{hook}(*methods) + hooks(:#{hook}).add(:#{kind}, *methods) + end + end_eval end end @@ -25,19 +29,39 @@ def hooks(name) end module InstanceMethods + def run_hook(name, *args, &block) - hooks = if self.class.superclass < Qu::Hooks + find_hooks_for(name).run(self, args, &block) + end + + def run_before_hook( name, *args ) + run_hook_by_type(name, *args, :before) + end + + def run_after_hook( name, *args ) + run_hook_by_type(name, *args, :after) + end + + def find_hooks_for(name) + if self.class.superclass < Qu::Hooks self.class.superclass.hooks(name).dup.concat self.class.hooks(name) else self.class.hooks(name) end - - hooks.run(self, args, &block) end def halt throw :halt end + + private + + def run_hook_by_type( name, type, *args, &block ) + if hook = find_hooks_for(name).find { |hook| hook.type == type } + hook.call(self, args, &block) + end + end + end class Chain < Array @@ -52,6 +76,7 @@ def run(object, args, &block) def add(kind, *methods) methods.each {|method| self << Hook.new(kind, method) } end + end class Hook @@ -66,7 +91,7 @@ def call(obj, args, &chain) obj.send method, *args, &chain else obj.send method, *args if type == :before - chain.call + chain.call if chain obj.send method, *args if type == :after end end diff --git a/lib/qu/job.rb b/lib/qu/job.rb index 54b8375..0ed5cb4 100644 --- a/lib/qu/job.rb +++ b/lib/qu/job.rb @@ -2,6 +2,7 @@ module Qu class Job include Qu::Hooks define_hooks :push, :perform, :complete, :abort, :fail + define_hook_by_kinds :fork, :before, :after attr_accessor :payload diff --git a/lib/qu/payload.rb b/lib/qu/payload.rb index 6270283..1f63a26 100644 --- a/lib/qu/payload.rb +++ b/lib/qu/payload.rb @@ -34,9 +34,17 @@ def perform job.run_hook(:complete) { Qu.complete(self) } rescue Qu::Worker::Abort + abort + rescue => exception + fail(exception) + end + + def abort job.run_hook(:abort) { Qu.abort(self) } raise - rescue => exception + end + + def fail(exception) job.run_hook(:fail, exception) { Qu.fail(self) } Qu::Failure.create(self, exception) end diff --git a/lib/qu/runner/base.rb b/lib/qu/runner/base.rb new file mode 100644 index 0000000..f9dbcfb --- /dev/null +++ b/lib/qu/runner/base.rb @@ -0,0 +1,5 @@ +module Qu + module Runner + RunnerLimitReached = Class.new(StandardError) + end +end \ No newline at end of file diff --git a/lib/qu/runner/direct.rb b/lib/qu/runner/direct.rb index a5c7296..17b0788 100644 --- a/lib/qu/runner/direct.rb +++ b/lib/qu/runner/direct.rb @@ -1,9 +1,21 @@ +require 'qu/runner/base' + module Qu module Runner class Direct - def run( payload ) + def run( worker, payload ) + @full = true payload.perform + ensure + @full = false + end + + def stop + end + + def full? + @full end end diff --git a/lib/qu/runner/forking.rb b/lib/qu/runner/forking.rb new file mode 100644 index 0000000..02641b3 --- /dev/null +++ b/lib/qu/runner/forking.rb @@ -0,0 +1,37 @@ +require 'qu/runner/base' +require 'qu/util/signal_handler' +require 'qu/util/thread_safe_hash' +require 'qu/util/process_wrapper' + +module Qu + module Runner + + class Forking + + attr_reader :fork_limit, :forks + + def initialize( fork_limit = 1 ) + @fork_limit = fork_limit + @forks = Qu::Util::ThreadSafeHash.new + end + + def full? + forks.size == fork_limit + end + + def run(worker, payload) + raise RunnerLimitReached.new("#{self.class.name} is already running #{fork_limit} jobs") if full? + + process = Qu::Util::ProcessWrapper.new( forks, worker, payload ) + process.fork + end + + def stop + forks.values.each do |process| + process.stop + end + end + + end + end +end \ No newline at end of file diff --git a/lib/qu/runner/spec.rb b/lib/qu/runner/spec.rb new file mode 100644 index 0000000..ddbc601 --- /dev/null +++ b/lib/qu/runner/spec.rb @@ -0,0 +1,80 @@ +require 'qu/backend/redis' + +class RunnerJob < Qu::Job +end + +class RedisPusherJob < Qu::Job + + def initialize(list, value) + @list = list + @value = value + end + + def perform + self.class.client.lpush(@list, @value) + end + + def self.client + @client ||= Qu::Backend::Redis.create_connection("qu-test") + end + +end + +class SleepJob < Qu::Job + + def initialize(sleep_time = 5) + @sleep = sleep_time + end + + def perform + sleep(@sleep) + end + +end + +shared_examples_for 'a runner interface' do + + let(:payload) { Qu::Payload.new(:klass => RunnerJob) } + + it 'can run a payload' do + subject.run(double("worker"), payload) + end + + it 'can check if if it is full' do + subject.full? + end + + it 'can be stopped' do + subject.stop + end + +end + +shared_examples_for 'a single job runner' do + + let(:list) { 'push-test-list' } + let(:payload) { Qu::Payload.new(:klass => RedisPusherJob, :args => [list, '1']) } + let(:timeout) { 5 } + + before do + RedisPusherJob.client.del(list) + end + + def expect_values(*args) + timeout.times do + break unless subject.full? + sleep(1) + end + + result = RedisPusherJob.client.lrange(list, 0, -1) + if result.size == args.size + return expect(result).to eq(args) + end + end + + it 'can execute a payload' do + subject.run(double('worker'), payload) + expect_values('1') + end + +end \ No newline at end of file diff --git a/lib/qu/runner/threaded.rb b/lib/qu/runner/threaded.rb deleted file mode 100644 index 0daa35d..0000000 --- a/lib/qu/runner/threaded.rb +++ /dev/null @@ -1,15 +0,0 @@ -module Qu - module Runner - class Threaded - - attr_reader :thread_count - - def initialize( options = {} ) - @thread_count = options[:threads] || 1 - end - - - - end - end -end \ No newline at end of file diff --git a/lib/qu/util/process_wrapper.rb b/lib/qu/util/process_wrapper.rb new file mode 100644 index 0000000..a3921e8 --- /dev/null +++ b/lib/qu/util/process_wrapper.rb @@ -0,0 +1,98 @@ +require 'qu/util/signal_handler' +require 'qu/util/procline' + +module Qu + module Util + + FailedToForkError = Class.new(StandardError) + ExitFailureError = Class.new(StandardError) + + class ProcessWrapper + + attr_reader :pid, :payload, :worker, :kill_timeout + + def initialize(process_collection, worker, payload, kill_timeout = 5) + @process_collection = process_collection + @worker = worker + @payload = payload + @kill_timeout = kill_timeout + end + + def fork + payload.job.run_before_hook(:fork) + parent_pid = Process.pid + @pid = Kernel.fork do + begin + $stdout.sync = true + $stderr.sync = true + Qu::Util::Procline.set("fork of #{parent_pid} working on #{payload.id} from #{payload.queue}") + Qu.backend.reconnect + SignalHandler.clear(*Qu::Worker::SIGNALS) + payload.job.run_after_hook(:fork) + payload.perform + ensure + exit! + end + end + + if @pid + setup_wait_watcher + else + raise FailedToForkError.new("Could not fork process") + end + + @pid + end + + def setup_wait_watcher + @process_collection[pid] = self + Thread.new do + begin + Process.waitpid(pid) rescue SystemCallError + payload.fail(ExitFailureError.new($?.to_s)) if $?.signaled? + rescue => e + logger.error("Failed waiting for process #{e.message}\n#{e.backtrace.join("\n")}") + ensure + @process_collection.delete(pid) + end + end + end + + def stop + return unless pid + + if Process.waitpid(pid, Process::WNOHANG) + logger.info "Child #{pid} already quit." + return + end + + signal_child("TERM") + signal_child("KILL") unless quit_gracefully? + rescue SystemCallError + logger.info "Child #{pid} already quit and reaped." + ensure + @process_collection.delete(pid) + end + + def signal_child(signal) + logger.info "Sending #{signal} signal to child #{pid}" + Process.kill(signal, pid) + end + + def quit_gracefully? + if Qu.graceful_shutdown + self.kill_timeout.times do + sleep(1) + return true if Process.waitpid(pid, Process::WNOHANG) + end + end + return false + end + + def logger + Qu.logger + end + + end + end +end \ No newline at end of file diff --git a/lib/qu/util/procline.rb b/lib/qu/util/procline.rb new file mode 100644 index 0000000..37afaf6 --- /dev/null +++ b/lib/qu/util/procline.rb @@ -0,0 +1,9 @@ +module Qu + module Util + class Procline + def self.set(message) + $0 = "qu-#{Qu::VERSION}: #{message}" + end + end + end +end \ No newline at end of file diff --git a/lib/qu/util/signal_handler.rb b/lib/qu/util/signal_handler.rb new file mode 100644 index 0000000..f657769 --- /dev/null +++ b/lib/qu/util/signal_handler.rb @@ -0,0 +1,25 @@ +module Qu + module Util + class SignalHandler + + def self.trap( *signals, &block ) + signals.each do |signal| + Signal.trap(signal) do + block.call(signal) + end + end + end + + def self.clear(*signals) + signals.each do |signal| + begin + Signal.trap(signal, 'DEFAULT') + rescue ArgumentError => e + warn "Could not trap signal #{signal} - #{e}" + end + end + end + + end + end +end \ No newline at end of file diff --git a/lib/qu/util/thread_safe_hash.rb b/lib/qu/util/thread_safe_hash.rb new file mode 100644 index 0000000..fa2bb72 --- /dev/null +++ b/lib/qu/util/thread_safe_hash.rb @@ -0,0 +1,36 @@ +require 'forwardable' +require 'monitor' + +module Qu + module Util + class ThreadSafeHash + include Enumerable + extend Forwardable + + def_delegator :@monitor, :synchronize + def_delegators :@items, :size, :[] + + def initialize(original = {}) + @items = original.dup + @monitor = Monitor.new + end + + def each( &block ) + synchronize { @items.each(&block) } + end + + def []=(key,value) + synchronize { @items[key] = value } + end + + def delete(value) + synchronize { @items.delete(value) } + end + + def values + synchronize { @items.values } + end + + end + end +end \ No newline at end of file diff --git a/lib/qu/worker.rb b/lib/qu/worker.rb index bde24c5..687a418 100644 --- a/lib/qu/worker.rb +++ b/lib/qu/worker.rb @@ -1,9 +1,12 @@ require 'socket' +require 'qu/util/signal_handler' module Qu class Worker include Logger + SIGNALS = [:INT, :TERM] + attr_accessor :queues # Internal: Raised when signal received, no job is being performed, and @@ -29,18 +32,20 @@ def id def work did_work = false - queues.each { |queue_name| - if payload = Qu.pop(queue_name) - begin - @performing = true - payload.perform - ensure - did_work = true - @performing = false - break + unless Qu.runner.full? + queues.each do |queue_name| + if payload = Qu.pop(queue_name) + begin + @performing = true + Qu.runner.run(self, payload) + ensure + did_work = true + @performing = false + break + end end end - } + end did_work end @@ -61,12 +66,16 @@ def start sleep Qu.interval end end + rescue => e + logger.error("Failed run loop #{e.message}\n#{e.backtrace.join("\n")}") + raise ensure stop end def stop @running = false + Qu.runner.stop if performing? raise Abort unless Qu.graceful_shutdown @@ -95,8 +104,11 @@ def hostname def register_signal_handlers logger.debug "Worker #{id} registering traps for INT and TERM signals" - trap(:INT) { puts "Worker #{id} received INT, stopping"; stop } - trap(:TERM) { puts "Worker #{id} received TERM, stopping"; stop } + Qu::Util::SignalHandler.trap( *SIGNALS ) do |signal| + logger.info("Worker #{id} received #{signal}, stopping") + stop + end end + end end diff --git a/spec/qu/hooks_spec.rb b/spec/qu/hooks_spec.rb index 8f4e0cd..2e5929f 100644 --- a/spec/qu/hooks_spec.rb +++ b/spec/qu/hooks_spec.rb @@ -142,8 +142,6 @@ def fight_peter_pan end describe 'with a halt before' do - let(:captain) { Captain.new } - before do Captain.before_pillage :fight_peter_pan, :drink end @@ -161,8 +159,6 @@ def fight_peter_pan end describe 'with a halt after' do - let(:captain) { Captain.new } - before do Captain.after_pillage :drink, :fight_peter_pan end @@ -179,4 +175,29 @@ def fight_peter_pan end end end + + describe 'run_hook_by_type' do + before do + Captain.before_pillage :be_merry + Captain.after_pillage :drink + end + + it 'should not call the before hook' do + expect(captain).not_to receive(:be_merry).with(no_args()) + captain.run_after_hook(:pillage) + expect(captain.events).to eq([:drink]) + end + + it 'should not call the after hook' do + expect(captain).not_to receive(:drink).with(no_args()) + captain.run_before_hook(:pillage) + expect(captain.events).to eq([:be_merry]) + end + + it 'should not run hooks if they are not defined' do + expect { captain.run_before_hook(:some_hook) }.to_not raise_error + end + + end + end diff --git a/spec/qu/runner/direct_spec.rb b/spec/qu/runner/direct_spec.rb new file mode 100644 index 0000000..fe28bc0 --- /dev/null +++ b/spec/qu/runner/direct_spec.rb @@ -0,0 +1,9 @@ +require 'spec_helper' +require 'qu/runner/direct' + +describe Qu::Runner::Direct do + + it_should_behave_like 'a runner interface' + it_should_behave_like 'a single job runner' + +end \ No newline at end of file diff --git a/spec/qu/runner/forking_spec.rb b/spec/qu/runner/forking_spec.rb new file mode 100644 index 0000000..d7a14f4 --- /dev/null +++ b/spec/qu/runner/forking_spec.rb @@ -0,0 +1,9 @@ +require 'spec_helper' +require 'qu/runner/forking' + +describe Qu::Runner::Forking do + + it_should_behave_like 'a runner interface' + it_should_behave_like 'a single job runner' + +end \ No newline at end of file diff --git a/spec/qu/util/thread_safe_hash_spec.rb b/spec/qu/util/thread_safe_hash_spec.rb new file mode 100644 index 0000000..c186252 --- /dev/null +++ b/spec/qu/util/thread_safe_hash_spec.rb @@ -0,0 +1,41 @@ +require 'spec_helper' +require 'qu/util/thread_safe_hash' + +describe Qu::Util::ThreadSafeHash do + + it 'should wrap a hash and dup it' do + options = {options: 'some options'} + hash = Qu::Util::ThreadSafeHash.new(options) + options[:other] = 'some-value' + expect(hash[:other]).to be_nil + end + + it 'should delete by key' do + hash = Qu::Util::ThreadSafeHash.new(options: 'some options') + hash.delete(:options) + expect(hash[:options]).to be_nil + end + + it 'should a copy of the values' do + hash = Qu::Util::ThreadSafeHash.new(options: 'some options', some: 'value') + values = hash.values + hash.delete(:options) + expect(values).to eq(['some options', 'value']) + end + + it 'should have a size' do + hash = Qu::Util::ThreadSafeHash.new(options: 'some options', some: 'value') + expect(hash.size).to eq(2) + end + + it 'should be navigable' do + hash = Qu::Util::ThreadSafeHash.new(options: 'some options', some: 'value') + pairs = [] + hash.each do |key,value| + pairs << [key,value] + end + + expect(pairs).to eq([[:options, 'some options'], [:some, 'value']]) + end + +end diff --git a/spec/qu/worker_spec.rb b/spec/qu/worker_spec.rb index e7a0a31..7620311 100644 --- a/spec/qu/worker_spec.rb +++ b/spec/qu/worker_spec.rb @@ -92,13 +92,13 @@ describe 'work' do context 'with job in first queue' do + before do - Qu.stub(:pop).and_return(job) + expect(Qu).to receive(:pop).with(subject.queues.first).and_return(job) end it 'should pop a payload and perform it' do - Qu.should_receive(:pop).with(subject.queues.first).and_return(job) - job.should_receive(:perform) + expect(job).to receive(:perform) subject.work end @@ -134,16 +134,6 @@ end end - context 'with job in many queues' do - - before do - Qu.stub(:backend).and_return(Qu::Backend::Memory.new) - end - - it 'should dequeue jobs following the queue priority' - - end - context 'with no job in any queue' do before do Qu.stub(:pop).and_return(nil) diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index f1a7493..31ba62c 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -2,6 +2,7 @@ Bundler.require :test require 'qu' require 'qu/backend/spec' +require 'qu/runner/spec' root_path = Pathname(__FILE__).dirname.join('..').expand_path Dir[root_path.join("spec/support/**/*.rb")].each { |f| require f } @@ -70,4 +71,4 @@ def self.running?(service) end end -Qu.logger = Logger.new('/dev/null') +Qu.logger = Logger.new(STDOUT)