Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shutdown workqueue on close #143

Merged
merged 3 commits into from
Aug 18, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions lib/blather/client/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ module Blather
class Client
attr_reader :jid,
:roster,
:caps
:caps,
:queue_size

# Create a new client and set it up
#
Expand All @@ -43,12 +44,17 @@ class Client
# @param [String] host if this isn't set it'll be resolved off the JID's
# domain
# @param [Fixnum, String] port the port to connect to.
# @param [Hash] options a list of options to create the client with
# @option options [Number] :workqueue_count (5) the number of threads used to process incoming XMPP messages.
# If this parameter is specified with 0, no background threads are used;
# instead stanzas are handled in the same process that the Client is running in.
#
# @return [Blather::Client]
def self.setup(jid, password, host = nil, port = nil, certs = nil, connect_timeout = nil)
self.new.setup(jid, password, host, port, certs, connect_timeout)
def self.setup(jid, password, host = nil, port = nil, certs = nil, connect_timeout = nil, options = {})
self.new.setup(jid, password, host, port, certs, connect_timeout, options)
end


def initialize # @private
@state = :initializing

Expand All @@ -58,10 +64,7 @@ def initialize # @private
@filters = {:before => [], :after => []}
@roster = Roster.new self
@caps = Stanza::Capabilities.new

@handler_queue = GirlFriday::WorkQueue.new :handle_stanza, :size => 5 do |stanza|
handle_data stanza
end
@queue_size = 5

setup_initial_handlers
end
Expand Down Expand Up @@ -168,7 +171,11 @@ def write_with_handler(stanza, &handler)

# Close the connection
def close
EM.next_tick { self.stream.close_connection_after_writing }
EM.next_tick {
handler_queue.shutdown if handler_queue
@handler_queue = nil
self.stream.close_connection_after_writing if connected?
}
end

# @private
Expand All @@ -185,7 +192,11 @@ def unbind

# @private
def receive_data(stanza)
@handler_queue << stanza
if handler_queue
handler_queue << stanza
else
handle_data stanza
end
end

def handle_data(stanza)
Expand All @@ -202,16 +213,25 @@ def setup?
end

# @private
def setup(jid, password, host = nil, port = nil, certs = nil, connect_timeout = nil)
def setup(jid, password, host = nil, port = nil, certs = nil, connect_timeout = nil, options = {})
@jid = JID.new(jid)
@setup = [@jid, password]
@setup << host
@setup << port
@setup << certs
@setup << connect_timeout
@queue_size = options[:workqueue_count] || 5
self
end

# @private
def handler_queue
return if queue_size == 0
@handler_queue ||= GirlFriday::WorkQueue.new :handle_stanza, :size => queue_size do |stanza|
handle_data stanza
end
end

protected

def stream
Expand Down
145 changes: 115 additions & 30 deletions spec/blather/client/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,34 +32,74 @@
subject.caps.should be_kind_of Blather::Stanza::Capabilities
end

it 'can be setup' do
subject.should respond_to :setup
subject.setup('[email protected]', 'pass').should == subject
end
describe '#setup' do
it 'can be setup' do
subject.should respond_to :setup
subject.setup('[email protected]', 'pass').should == subject
end

it 'knows if it has been setup' do
subject.should respond_to :setup?
subject.should_not be_setup
subject.setup '[email protected]', 'pass'
subject.should be_setup
end
it 'knows if it has been setup' do
subject.should respond_to :setup?
subject.should_not be_setup
subject.setup '[email protected]', 'pass'
subject.should be_setup
end

it 'cannot be run before being setup' do
lambda { subject.run }.should raise_error RuntimeError
end
it 'cannot be run before being setup' do
lambda { subject.run }.should raise_error RuntimeError
end

it 'starts up a Component connection when setup without a node' do
setup = 'pubsub.jabber.local', 'secret'
subject.setup *setup
Blather::Stream::Component.expects(:start).with subject, *setup + [nil, nil, nil, nil]
subject.run
end
it 'starts up a Component connection when setup without a node' do
setup = 'pubsub.jabber.local', 'secret'
subject.setup *setup
Blather::Stream::Component.expects(:start).with subject, *setup + [nil, nil, nil, nil]
subject.run
end

it 'starts up a Client connection when setup with a node' do
setup = '[email protected]', 'secret'
subject.setup *setup
Blather::Stream::Client.expects(:start).with subject, *setup + [nil, nil, nil, nil]
subject.run
end

context "setting queue size" do
let(:jid) { '[email protected]' }
let(:password) { 'secret' }
let(:queue_size) { 3 }

it 'starts up a Client connection when setup with a node' do
setup = '[email protected]', 'secret'
subject.setup *setup
Blather::Stream::Client.expects(:start).with subject, *setup + [nil, nil, nil, nil]
subject.run
subject { Blather::Client.setup(jid, password, nil, nil, nil, nil, :workqueue_count => queue_size) }

it 'sets the queue size on the client' do
subject.queue_size.should == queue_size
end

describe 'receiving data' do
let(:stanza) { Blather::Stanza::Iq.new }

context 'when the queue size is 0' do
let(:queue_size) { 0 }

it "has no handler queue" do
subject.handler_queue.should be_nil
end

it 'handles the data immediately' do
subject.expects(:handle_data).with(stanza)
subject.receive_data stanza
end
end

context 'when the queue size is non-zero' do
let(:queue_size) { 4 }

it 'enqueues the data on the handler queue' do
subject.handler_queue.expects(:<<).with(stanza)
subject.receive_data stanza
end
end
end
end
end

it 'knows if it is disconnected' do
Expand All @@ -81,12 +121,57 @@
end
end

it 'writes to the connection the closes when #close is called' do
stream.expects(:close_connection_after_writing)
EM.stubs(:next_tick).yields
subject.setup 'me.com', 'secret'
subject.post_init stream, Blather::JID.new('me.com')
subject.close
describe '#close' do
before do
EM.stubs(:next_tick).yields
subject.setup 'me.com', 'secret'
end

context "without a setup stream" do
it "does not close the connection" do
stream.expects(:close_connection_after_writing).never
subject.close
end
end

context "when a stream is setup" do
let(:stream_stopped) { false }
before do
subject.post_init stream, Blather::JID.new('me.com')
stream.stubs(:stopped? => stream_stopped)
end

context "when the stream is stopped" do
let(:stream_stopped) { true }

it "does not close the connection, since it's already closed" do
stream.expects(:close_connection_after_writing).never
end
end

it 'writes to the connection the closes when #close is called' do
stream.expects(:close_connection_after_writing)
subject.close
end

it 'shuts down the workqueue' do
stream.stubs(:close_connection_after_writing)
subject.handler_queue.expects(:shutdown)
subject.close
end

it 'forces the work queue to be re-created when referenced' do
stream.stubs(:close_connection_after_writing)
subject.close

fake_queue = stub('GirlFriday::WorkQueue')
GirlFriday::WorkQueue.expects(:new)
.with(:handle_stanza, :size => subject.queue_size)
.returns(fake_queue)

subject.handler_queue.should == fake_queue
end
end
end

it 'shuts down EM when #unbind is called if it is running' do
Expand Down