Skip to content

Commit

Permalink
Create separate class to execute jobs
Browse files Browse the repository at this point in the history
This is a proposal for bkeepers#5. Creates a separate
class to execute the job instead of doing the
execution inside the worker. This allows us to
have different execution mechanisms, from the
current "just run", to a forking runner and
in the future other threaded runners (that
would be better suited for JRuby and Rubinius).
  • Loading branch information
mauricio committed Feb 17, 2014
1 parent 0e3062b commit 72f4582
Show file tree
Hide file tree
Showing 28 changed files with 515 additions and 70 deletions.
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ group :test do
gem 'activesupport', :require => false
gem 'statsd-ruby', :require => false
gem 'rake'
gem 'rspec', '~> 2.0'
gem "rspec", "~> 2.14.1"
end
1 change: 1 addition & 0 deletions lib/qu.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require 'qu/backend/base'
require 'qu/backend/instrumented'
require 'qu/instrumenters/noop'
require 'qu/runner/direct'
require 'qu/worker'

require 'forwardable'
Expand Down
4 changes: 4 additions & 0 deletions lib/qu/backend/immediate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ def size(queue = 'default')

def clear(queue = 'default')
end

def reconnect
end

end
end
end
2 changes: 1 addition & 1 deletion lib/qu/backend/instrumented.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def self.wrap(backend)
end
end

def_delegators :@backend, :connection, :connection=
def_delegators :@backend, :connection, :connection=, :reconnect

def initialize(backend)
@backend = backend
Expand Down
33 changes: 24 additions & 9 deletions lib/qu/backend/memory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,20 @@ class Memory < Base
def initialize
@monitor = Monitor.new
@queues = {}
@messages = {}
@pending = {}
@connection = @messages
end

def reconnect
end

def push(payload)
synchronize do
payload.id = SecureRandom.uuid
queue_for(payload.queue) << payload
payload.id = SecureRandom.uuid
queue_for(payload.queue) do |queue|
queue << payload.id
@messages[payload.id] = dump(payload.attributes_for_push)
payload
end
end

Expand All @@ -34,24 +41,32 @@ def abort(payload)

alias fail abort

def pop(queue = 'default')
synchronize do
queue_for(queue).shift
def pop(queue_name = 'default')
queue_for(queue_name) do |queue|
if id = queue.shift
payload = Payload.new(load(@messages[id]))
@pending[id] = payload
payload
end
end
end

def size(queue = 'default')
queue_for(queue).size
end

def clear(queue = 'default')
synchronize { queue_for(queue).clear }
def clear(queue_name = 'default')
queue_for(queue_name) { |queue| queue.clear }
end

private

def queue_for(queue)
synchronize { @queues[queue] ||= [] }
if block_given?
synchronize { yield(@queues[queue] ||= []) }
else
synchronize { @queues[queue] ||= [] }
end
end

end
Expand Down
4 changes: 4 additions & 0 deletions lib/qu/backend/mongo.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ def connection
end
end

def reconnect
connection.connection.reconnect
end

private

def payload_attributes(payload)
Expand Down
11 changes: 10 additions & 1 deletion lib/qu/backend/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,17 @@ def clear(queue = 'default')
end

def connection
@connection ||= ::Redis::Namespace.new(namespace, :redis => ::Redis.connect(:url => ENV['REDISTOGO_URL'] || ENV['BOXEN_REDIS_URL']))
@connection ||= self.class.create_connection(namespace)
end

def reconnect
connection.client.reconnect
end

def self.create_connection(namespace)
::Redis::Namespace.new(namespace, :redis => ::Redis.connect(:url => ENV['REDISTOGO_URL'] || ENV['BOXEN_REDIS_URL']))
end

end
end
end
5 changes: 5 additions & 0 deletions lib/qu/backend/spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ class CustomQueue < Qu::Job
it "can clear specific queue" do
subject.clear('foo')
end

it 'can reconnect' do
subject.reconnect
end

end

shared_examples_for 'a backend' do
Expand Down
3 changes: 3 additions & 0 deletions lib/qu/backend/sqs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ def connection
@connection ||= ::AWS::SQS.new
end

def reconnect
end

end
end
end
47 changes: 36 additions & 11 deletions lib/qu/hooks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ def self.included(base)
module ClassMethods
def define_hooks(*hooks)
hooks.each do |hook|
%w(before after around).each do |kind|
class_eval <<-end_eval, __FILE__, __LINE__
def self.#{kind}_#{hook}(*methods)
hooks(:#{hook}).add(:#{kind}, *methods)
end
end_eval
end
define_hook_by_kinds(hook, *%w(before after around))
end
end

def define_hook_by_kinds( hook, *kinds )
kinds.each do |kind|
class_eval <<-end_eval, __FILE__, __LINE__
def self.#{kind}_#{hook}(*methods)
hooks(:#{hook}).add(:#{kind}, *methods)
end
end_eval
end
end

Expand All @@ -25,19 +29,39 @@ def hooks(name)
end

module InstanceMethods

def run_hook(name, *args, &block)
hooks = if self.class.superclass < Qu::Hooks
find_hooks_for(name).run(self, args, &block)
end

def run_before_hook( name, *args )
run_hook_by_type(name, *args, :before)
end

def run_after_hook( name, *args )
run_hook_by_type(name, *args, :after)
end

def find_hooks_for(name)
if self.class.superclass < Qu::Hooks
self.class.superclass.hooks(name).dup.concat self.class.hooks(name)
else
self.class.hooks(name)
end

hooks.run(self, args, &block)
end

def halt
throw :halt
end

private

def run_hook_by_type( name, type, *args, &block )
if hook = find_hooks_for(name).find { |hook| hook.type == type }
hook.call(self, args, &block)
end
end

end

class Chain < Array
Expand All @@ -52,6 +76,7 @@ def run(object, args, &block)
def add(kind, *methods)
methods.each {|method| self << Hook.new(kind, method) }
end

end

class Hook
Expand All @@ -66,7 +91,7 @@ def call(obj, args, &chain)
obj.send method, *args, &chain
else
obj.send method, *args if type == :before
chain.call
chain.call if chain
obj.send method, *args if type == :after
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/qu/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module Qu
class Job
include Qu::Hooks
define_hooks :push, :perform, :complete, :abort, :fail
define_hook_by_kinds :fork, :before, :after

attr_accessor :payload

Expand Down
10 changes: 9 additions & 1 deletion lib/qu/payload.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,17 @@ def perform

job.run_hook(:complete) { Qu.complete(self) }
rescue Qu::Worker::Abort
abort
rescue => exception
fail(exception)
end

def abort
job.run_hook(:abort) { Qu.abort(self) }
raise
rescue => exception
end

def fail(exception)
job.run_hook(:fail, exception) { Qu.fail(self) }
Qu::Failure.create(self, exception)
end
Expand Down
5 changes: 5 additions & 0 deletions lib/qu/runner/base.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module Qu
module Runner
RunnerLimitReached = Class.new(StandardError)
end
end
14 changes: 13 additions & 1 deletion lib/qu/runner/direct.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
require 'qu/runner/base'

module Qu
module Runner
class Direct

def run( payload )
def run( worker, payload )
@full = true
payload.perform
ensure
@full = false
end

def stop
end

def full?
@full
end

end
Expand Down
37 changes: 37 additions & 0 deletions lib/qu/runner/forking.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
require 'qu/runner/base'
require 'qu/util/signal_handler'
require 'qu/util/thread_safe_hash'
require 'qu/util/process_wrapper'

module Qu
module Runner

class Forking

attr_reader :fork_limit, :forks

def initialize( fork_limit = 1 )
@fork_limit = fork_limit
@forks = Qu::Util::ThreadSafeHash.new
end

def full?
forks.size == fork_limit
end

def run(worker, payload)
raise RunnerLimitReached.new("#{self.class.name} is already running #{fork_limit} jobs") if full?

process = Qu::Util::ProcessWrapper.new( forks, worker, payload )
process.fork
end

def stop
forks.values.each do |process|
process.stop
end
end

end
end
end
Loading

0 comments on commit 72f4582

Please sign in to comment.