Skip to content

Commit

Permalink
Intends to fix #30
Browse files Browse the repository at this point in the history
  • Loading branch information
driskell committed Aug 12, 2014
1 parent 7a05cbd commit 05c6229
Showing 1 changed file with 22 additions and 34 deletions.
56 changes: 22 additions & 34 deletions lib/log-courier/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,49 +56,37 @@ def initialize(options = {})
def run(&block)
# TODO: Make queue size configurable
event_queue = SizedQueue.new 10
spooler_thread = nil
server_thread = nil

begin
# Why a spooler thread? Well we don't know what &block is! We want connection threads to be non-blocking so they DON'T timeout
# Non-blocking means we can keep clients informed of progress, and response in a timely fashion. We could create this with
# a timeout wrapper around the &block call but we'd then be generating exceptions in someone else's code
# So we allow the caller to block us - but only our spooler thread - our other threads are safe and we can use timeout

# TODO: Spooler thread becomes run() thread - then any exception falls through to the LogStash plugin
# The only exception that raises in this small chunk of code should be a Logstash Shutdown exception
# Therefore, when Logstash plugin receives the exception, it can call a teardown here which pulls
# down the other threads with the correct exceptions
# This resolves the problem where we want to keep all Logstash-ness outside of this library, and as a result
# have problems with shutdown as we start receiving Logstash shutdown exceptions in here
spooler_thread = Thread.new do
loop do
events = event_queue.pop
break if events.nil?
events.each do |event|
block.call event
server_thread = Thread.new do
# Receive messages and process them
@server.run do |signature, message, comm|
case signature
when 'PING'
process_ping message, comm
when 'JDAT'
process_jdat message, comm, event_queue
else
@logger.warn("[LogCourierServer] Unknown message received from #{comm.peer}") unless @logger.nil?
# Don't kill a client that sends a bad message
# Just reject it and let it send it again, potentially to another server
comm.send '????', ''
end
end
end

# Receive messages and process them
@server.run do |signature, message, comm|
case signature
when 'PING'
process_ping message, comm
when 'JDAT'
process_jdat message, comm, event_queue
else
@logger.warn("[LogCourierServer] Unknown message received from #{comm.peer}") unless @logger.nil?
# Don't kill a client that sends a bad message
# Just reject it and let it send it again, potentially to another server
comm.send '????', ''
loop do
events = event_queue.pop
events.each do |event|
block.call event
end
end
ensure
# Signal the spooler thread to stop
unless spooler_thread.nil?
event_queue << nil
spooler_thread.join
# Signal the server thread to stop
unless server_thread.nil?
server_thread.raise ShutdownSignal
server_thread.join
end
end
end
Expand Down

0 comments on commit 05c6229

Please sign in to comment.