Skip to content

Commit

Permalink
POC - runners
Browse files Browse the repository at this point in the history
  • Loading branch information
mauricio committed Feb 15, 2014
1 parent 589d316 commit 0e3062b
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 1 deletion.
3 changes: 2 additions & 1 deletion lib/qu.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ module Qu

@interval = 5

attr_accessor :logger, :graceful_shutdown, :instrumenter, :interval
attr_accessor :logger, :graceful_shutdown, :instrumenter, :interval, :runner

def_delegators :backend, :push, :pop, :complete, :abort, :fail, :size, :clear

Expand Down Expand Up @@ -51,4 +51,5 @@ def load_json(object)
config.logger = Logger.new(STDOUT)
config.logger.level = Logger::INFO
config.instrumenter = Qu::Instrumenters::Noop
config.runner = Qu::Runner::Direct.new
end
59 changes: 59 additions & 0 deletions lib/qu/backend/memory.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
require 'forwardable'
require 'securerandom'

module Qu
module Backend
class Memory < Base
extend Forwardable

def_delegator :@monitor, :synchronize

def initialize
@monitor = Monitor.new
@queues = {}
@pending = {}
end

def push(payload)
synchronize do
payload.id = SecureRandom.uuid
queue_for(payload.queue) << payload
end
end

def complete(payload)
synchronize { @pending.delete(payload.id) }
end

def abort(payload)
synchronize do
@pending.delete(payload.id)
push(payload)
end
end

alias fail abort

def pop(queue = 'default')
synchronize do
queue_for(queue).shift
end
end

def size(queue = 'default')
queue_for(queue).size
end

def clear(queue = 'default')
synchronize { queue_for(queue).clear }
end

private

def queue_for(queue)
synchronize { @queues[queue] ||= [] }
end

end
end
end
11 changes: 11 additions & 0 deletions lib/qu/runner/direct.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module Qu
module Runner
class Direct

def run( payload )
payload.perform
end

end
end
end
15 changes: 15 additions & 0 deletions lib/qu/runner/threaded.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module Qu
module Runner
class Threaded

attr_reader :thread_count

def initialize( options = {} )
@thread_count = options[:threads] || 1
end



end
end
end
7 changes: 7 additions & 0 deletions spec/qu/backend/memory_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
require 'spec_helper'
require 'qu/backend/memory'

describe Qu::Backend::Memory do
it_should_behave_like 'a backend'
it_should_behave_like 'a backend interface'
end
11 changes: 11 additions & 0 deletions spec/qu/worker_spec.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'spec_helper'
require 'qu/backend/memory'

describe Qu::Worker do
let(:job) { Qu::Payload.new(:id => '1', :klass => SimpleJob) }
Expand Down Expand Up @@ -133,6 +134,16 @@
end
end

context 'with job in many queues' do

before do
Qu.stub(:backend).and_return(Qu::Backend::Memory.new)
end

it 'should dequeue jobs following the queue priority'

end

context 'with no job in any queue' do
before do
Qu.stub(:pop).and_return(nil)
Expand Down
18 changes: 18 additions & 0 deletions spec/support/sample_jobs.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
class MessageQueue

end

class PriorityJob < Qu::Job
queue :priority

def initialize( message )
@message
end



end

class NonPriorityJob < Qu::Job
queue :not_priority
end

0 comments on commit 0e3062b

Please sign in to comment.