diff --git a/lib/logstash/outputs/http.rb b/lib/logstash/outputs/http.rb index 3f9bac1..897fc21 100644 --- a/lib/logstash/outputs/http.rb +++ b/lib/logstash/outputs/http.rb @@ -124,6 +124,8 @@ def register # Run named Timer as daemon thread @timer = java.util.Timer.new("HTTP Output #{self.params['id']}", true) + + @request_metrics = metric.namespace(:requests) end # def register def multi_receive(events) @@ -163,9 +165,6 @@ def log_error_response(response, url, event) end def send_events(events) - successes = java.util.concurrent.atomic.AtomicInteger.new(0) - failures = java.util.concurrent.atomic.AtomicInteger.new(0) - retries = java.util.concurrent.atomic.AtomicInteger.new(0) event_count = @is_batch ? 1 : events.size pending = Queue.new @@ -175,6 +174,8 @@ def send_events(events) events.each {|e| pending << [e, 0]} end + successes, failures = 0, 0 + while popped = pending.pop break if popped == :done @@ -189,41 +190,30 @@ def send_events(events) case action when :success - successes.incrementAndGet - when :retry - retries.incrementAndGet + successes += 1 - next_attempt = attempt+1 + pending << :done if successes + failures == event_count + when :retry + next_attempt = attempt + 1 sleep_for = sleep_for_attempt(next_attempt) @logger.info("Retrying http request, will sleep for #{sleep_for} seconds") timer_task = RetryTimerTask.new(pending, event, next_attempt) - @timer.schedule(timer_task, sleep_for*1000) + @timer.schedule(timer_task, sleep_for * 1000) when :failure - failures.incrementAndGet + failures += 1 + + pending << :done if successes + failures == event_count else raise "Unknown action #{action}" end - - if action == :success || action == :failure - if successes.get+failures.get == event_count - pending << :done - end - end rescue => e # This should never happen unless there's a flat out bug in the code - @logger.error("Error sending HTTP Request", - :class => e.class.name, - :message => e.message, - :backtrace => e.backtrace) - failures.incrementAndGet + @logger.error("Error sending HTTP Request", :message => e.message, :class => e.class, :backtrace => e.backtrace) raise e end end rescue => e - @logger.error("Error in http output loop", - :class => e.class.name, - :message => e.message, - :backtrace => e.backtrace) + @logger.error("Error in http output loop", :message => e.message, :class => e.class, :backtrace => e.backtrace) raise e end @@ -251,18 +241,22 @@ def send_event(event, attempt) body = gzip(body) end - # Create an async request + @logger.debug? && @logger.debug("Sending request", url: url, headers: headers, body_length: body.length, attempt: attempt) + response = client.send(@http_method, url, :body => body, :headers => headers).call if !response_success?(response) if retryable_response?(response) log_retryable_response(response) + @request_metrics.increment(:retryable_failures) return :retry, event, attempt else log_error_response(response, url, event) + @request_metrics.increment(:failures) return :failure, event, attempt end else + @request_metrics.increment(:successes) return :success, event, attempt end @@ -286,8 +280,10 @@ def send_event(event, attempt) log_failure("Could not fetch URL", log_params) if will_retry + @request_metrics.increment(:retryable_failures) return :retry, event, attempt else + @request_metrics.increment(:failures) return :failure, event, attempt end end