Skip to content

Commit

Permalink
updated bunny to the latest version
Browse files Browse the repository at this point in the history
The tests are green, but we need to study the interplay between the
various new timeout options in the multi threaded bunny implementation.

This is still WIP!!!
  • Loading branch information
skaes committed Aug 14, 2022
1 parent 7034bf0 commit b0911ca
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 88 deletions.
2 changes: 1 addition & 1 deletion beetle.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Gem::Specification.new do |s|
}

s.specification_version = 3
s.add_runtime_dependency "bunny", "~> 0.7.12"
s.add_runtime_dependency "bunny", "~> 2.19.0"
s.add_runtime_dependency "redis", ">= 4.2.1"
s.add_runtime_dependency "hiredis", ">= 0.4.5"
s.add_runtime_dependency "amq-protocol", "= 2.3.2"
Expand Down
1 change: 0 additions & 1 deletion lib/beetle.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
$:.unshift(File.expand_path('..', __FILE__))
require 'bunny' # which bunny picks up
require 'qrack/errors' # needed by the publisher
require 'redis/connection/hiredis' # require *before* redis as specified in the redis-rb gem docs
require 'redis'
require 'active_support/all'
Expand Down
9 changes: 6 additions & 3 deletions lib/beetle/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class Configuration
# to 2047, which is the RabbitMQ default in 3.7. We can't set this to 0 because of a bug
# in bunny.
attr_accessor :channel_max
# the heartbeat setting to be used for RabbitMQ heartbeats (defaults to 0).
attr_accessor :heartbeat

# Lazy queues have the advantage of consuming a lot less memory on the broker. For backwards
# compatibility, they are disabled by default.
Expand Down Expand Up @@ -115,8 +117,8 @@ class Configuration
# Returns the port on which the Rabbit API is hosted
attr_accessor :api_port

# the socket timeout in seconds for message publishing (defaults to <tt>0</tt>).
# consider this a highly experimental feature for now.
# The socket timeout in seconds for message publishing (defaults to <tt>15</tt>).
# Consider this a highly experimental feature for now.
attr_accessor :publishing_timeout

# the connect/disconnect timeout in seconds for the publishing connection
Expand Down Expand Up @@ -176,6 +178,7 @@ def initialize #:nodoc:
self.frame_max = 131072
self.channel_max = 2047
self.prefetch_count = 1
self.heartbeat = 0

self.dead_lettering_enabled = false
self.dead_lettering_msg_ttl = 1000 # 1 second
Expand All @@ -187,7 +190,7 @@ def initialize #:nodoc:

self.update_queue_properties_synchronously = false

self.publishing_timeout = 0
self.publishing_timeout = 15
self.publisher_connect_timeout = 5 # seconds
self.tmpdir = "/tmp"

Expand Down
75 changes: 45 additions & 30 deletions lib/beetle/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ def initialize(client, options = {}) #:nodoc:
@exchanges_with_bound_queues = {}
@dead_servers = {}
@bunnies = {}
@channels = {}
@throttling_options = {}
@next_throttle_refresh = Time.now
@throttled = false
Expand All @@ -27,14 +28,10 @@ def throttling_status
@throttled ? 'throttled' : 'unthrottled'
end

# list of exceptions potentially raised by bunny
# these need to be lazy, because qrack exceptions are only defined after a connection has been established
# List of exceptions potentially raised by bunny.
def bunny_exceptions
[
Bunny::ConnectionError, Bunny::ForcedChannelCloseError, Bunny::ForcedConnectionCloseError,
Bunny::MessageError, Bunny::ProtocolError, Bunny::ServerDownError, Bunny::UnsubscribeError,
Bunny::AcknowledgementError, Qrack::BufferOverflowError, Qrack::InvalidTypeError,
Errno::EHOSTUNREACH, Errno::ECONNRESET, Errno::ETIMEDOUT, Timeout::Error
Bunny::Exception, Errno::EHOSTUNREACH, Errno::ECONNRESET, Errno::ETIMEDOUT, Timeout::Error
]
end

Expand All @@ -46,9 +43,9 @@ def publish(message_name, data, opts={}) #:nodoc:
recycle_dead_servers unless @dead_servers.empty?
throttle!
if opts[:redundant]
publish_with_redundancy(exchange_name, message_name, data, opts)
publish_with_redundancy(exchange_name, message_name, data.to_s, opts)
else
publish_with_failover(exchange_name, message_name, data, opts)
publish_with_failover(exchange_name, message_name, data.to_s, opts)
end
end
end
Expand Down Expand Up @@ -120,6 +117,7 @@ def publish_with_redundancy(exchange_name, message_name, data, opts) #:nodoc:

def rpc(message_name, data, opts={}) #:nodoc:
opts = @client.messages[message_name].merge(opts.symbolize_keys)
timeout = opts.delete(:timeout) || RPC_DEFAULT_TIMEOUT
exchange_name = opts.delete(:exchange)
opts.delete(:queue)
recycle_dead_servers unless @dead_servers.empty?
Expand All @@ -131,17 +129,21 @@ def rpc(message_name, data, opts={}) #:nodoc:
select_next_server
bind_queues_for_exchange(exchange_name)
# create non durable, autodeleted temporary queue with a server assigned name
queue = bunny.queue
queue = channel.queue("", :durable => false, :auto_delete => true)
opts = Message.publishing_options(opts.merge :reply_to => queue.name)
logger.debug "Beetle: trying to send #{message_name}:#{opts[:message_id]} to #{@server}"
exchange(exchange_name).publish(data, opts)
logger.debug "Beetle: message sent!"
logger.debug "Beetle: listening on reply queue #{queue.name}"
queue.subscribe(:message_max => 1, :timeout => opts[:timeout] || RPC_DEFAULT_TIMEOUT) do |msg|
q = Queue.new
consumer = queue.subscribe do |info, properties, payload|
logger.debug "Beetle: received reply!"
result = msg[:payload]
status = msg[:header].properties[:headers][:status]
result = payload
q.push properties[:headers]["status"]
end
Timeout.timeout(timeout) do
status = q.pop
end
consumer.cancel
logger.debug "Beetle: rpc complete!"
rescue *bunny_exceptions => e
stop!(e)
Expand Down Expand Up @@ -194,26 +196,38 @@ def bunny
end

def bunny?
@bunnies[@server]
!!@bunnies[@server]
end

def new_bunny
b = Bunny.new(
:host => current_host,
:port => current_port,
:logging => !!@options[:logging],
:user => @client.config.user,
:pass => @client.config.password,
:vhost => @client.config.vhost,
:frame_max => @client.config.frame_max,
:channel_max => @client.config.channel_max,
:socket_timeout => @client.config.publishing_timeout,
:connect_timeout => @client.config.publisher_connect_timeout,
:spec => '09')
:host => current_host,
:port => current_port,
:logger => @client.config.logger,
:username => @client.config.user,
:password => @client.config.password,
:vhost => @client.config.vhost,
:automatically_recover => false,
:frame_max => @client.config.frame_max,
:channel_max => @client.config.channel_max,
:read_timeout => @client.config.publishing_timeout,
:write_timeout => @client.config.publishing_timeout,
:continuation_timeout => @client.config.publishing_timeout,
:connection_timeout => @client.config.publisher_connect_timeout,
:heartbeat => @client.config.heartbeat,
)
b.start
b
end

def channel
@channels[@server] ||= bunny.create_channel
end

def channel?
!!@channels[@server]
end

# retry dead servers after ignoring them for 10.seconds
# if all servers are dead, retry the one which has been dead for the longest time
def recycle_dead_servers
Expand Down Expand Up @@ -244,7 +258,7 @@ def select_next_server
end

def create_exchange!(name, opts)
bunny.exchange(name, opts)
channel.exchange(name, opts)
end

def bind_queues_for_exchange(exchange_name)
Expand All @@ -255,9 +269,8 @@ def bind_queues_for_exchange(exchange_name)

def declare_queue!(queue_name, creation_options)
logger.debug("Beetle: creating queue with opts: #{creation_options.inspect}")
queue = bunny.queue(queue_name, creation_options)

policy_options = bind_dead_letter_queue!(bunny, queue_name, creation_options)
queue = channel.queue(queue_name, creation_options)
policy_options = bind_dead_letter_queue!(channel, queue_name, creation_options)
publish_policy_options(policy_options)
queue
end
Expand All @@ -272,8 +285,9 @@ def stop!(exception=nil)
Beetle::Timer.timeout(timeout) do
logger.debug "Beetle: closing connection from publisher to #{server}"
if exception
bunny.__send__ :close_socket
bunny.__send__ :close_connection
else
channel.close if channel?
bunny.stop
end
end
Expand All @@ -282,6 +296,7 @@ def stop!(exception=nil)
Beetle::reraise_expectation_errors!
ensure
@bunnies[@server] = nil
@channels[@server] = nil
@exchanges[@server] = {}
@queues[@server] = {}
end
Expand Down
4 changes: 2 additions & 2 deletions test/beetle/amqp_gem_behavior_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ class AMQPGemBehaviorTest < Minitest::Test
EM::Timer.new(1){ connection.close { EM.stop }}
channel = AMQP::Channel.new(connection)
channel.on_error { puts "woot"}
exchange = channel.topic("beetle_tests")
queue = AMQP::Queue.new(channel)
exchange = channel.topic("beetle_tests", :durable => false, :auto_delete => true)
queue = AMQP::Queue.new(channel, :durable => false, :auto_delete => true)
queue.bind(exchange, :key => "#")
queue.subscribe { }
queue.subscribe { }
Expand Down
2 changes: 1 addition & 1 deletion test/beetle/beetle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Beetle
class HostnameTest < Minitest::Test
test "should use canonical name if possible " do
test "should use canonical name if possible " do
addr = mock("addr")
addr.expects(:canonname).returns("a.b.com")
Socket.expects(:gethostname).returns("a.b.com")
Expand Down
Loading

0 comments on commit b0911ca

Please sign in to comment.