Skip to content

Commit

Permalink
Refactor: scope and review global java_imports
Browse files Browse the repository at this point in the history
  • Loading branch information
kares committed Mar 12, 2020
1 parent f3b70bf commit cdac9ea
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 41 deletions.
21 changes: 9 additions & 12 deletions lib/logstash/inputs/azure_event_hubs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,17 @@
require "logstash/inputs/named_thread_factory"
require "logstash/inputs/look_back_position_provider"

class LogStash::Inputs::AzureEventHubs < LogStash::Inputs::Base

java_import com.microsoft.azure.eventprocessorhost.EventProcessorHost
java_import com.microsoft.azure.eventprocessorhost.EventProcessorOptions
java_import com.microsoft.azure.eventprocessorhost.InMemoryCheckpointManager
java_import com.microsoft.azure.eventprocessorhost.InMemoryLeaseManager
java_import com.microsoft.azure.eventprocessorhost.HostContext
java_import com.microsoft.azure.eventhubs.ConnectionStringBuilder
java_import java.util.concurrent.Executors
java_import java.util.concurrent.TimeUnit

java_import com.microsoft.azure.eventprocessorhost.EventProcessorHost
java_import com.microsoft.azure.eventprocessorhost.EventProcessorOptions
java_import com.microsoft.azure.eventprocessorhost.InMemoryCheckpointManager
java_import com.microsoft.azure.eventprocessorhost.InMemoryLeaseManager
java_import com.microsoft.azure.eventprocessorhost.HostContext
java_import com.microsoft.azure.eventhubs.ConnectionStringBuilder
java_import java.util.concurrent.Executors
java_import java.util.concurrent.TimeUnit

class LogStash::Inputs::AzureEventHubs < LogStash::Inputs::Base
config_name "azure_event_hubs"

# This plugin supports two styles of configuration
Expand Down Expand Up @@ -476,5 +475,3 @@ def run(queue)
end
end
end


7 changes: 3 additions & 4 deletions lib/logstash/inputs/error_notification_handler.rb
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
# encoding: utf-8
require "logstash/util/loggable"
java_import java.util.function.Consumer

module LogStash
module Inputs
module Azure
class ErrorNotificationHandler
include Consumer
include java.util.function.Consumer
include LogStash::Util::Loggable

def initialize
@logger = self.logger
end

def accept(exception_received_event_args)
@logger.error("Error with Event Processor Host. ",
@logger.error("Error with Event Processor Host. ",
:host_name => exception_received_event_args.getHostname(),
:action => exception_received_event_args.getAction(),
:action => exception_received_event_args.getAction(),
:exception => exception_received_event_args.getException().toString())
end
end
Expand Down
9 changes: 5 additions & 4 deletions lib/logstash/inputs/look_back_position_provider.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
# encoding: utf-8
require "logstash/util/loggable"
java_import java.util.function.Function
java_import com.microsoft.azure.eventhubs.EventPosition
java_import java.time.Instant

module LogStash
module Inputs
module Azure
class LookBackPositionProvider
include Function

java_import com.microsoft.azure.eventhubs.EventPosition
java_import java.time.Instant

include java.util.function.Function
include LogStash::Util::Loggable

def initialize(look_back_seconds)
Expand Down
4 changes: 1 addition & 3 deletions lib/logstash/inputs/processor_factory.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# encoding: utf-8
require "logstash/inputs/processor"

module LogStash
module Inputs
module Azure
Expand All @@ -22,6 +23,3 @@ def createEventProcessor(context)
end
end
end



28 changes: 10 additions & 18 deletions spec/inputs/azure_event_hub_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,9 @@
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/azure_event_hubs"


java_import com.microsoft.azure.eventprocessorhost.EventProcessorHost
java_import com.microsoft.azure.eventprocessorhost.EventProcessorOptions
java_import com.microsoft.azure.eventprocessorhost.InMemoryCheckpointManager
java_import com.microsoft.azure.eventprocessorhost.InMemoryLeaseManager
java_import java.util.concurrent.ScheduledThreadPoolExecutor
java_import java.util.concurrent.CompletableFuture
java_import java.util.concurrent.TimeUnit
java_import java.util.concurrent.atomic.AtomicInteger


describe LogStash::Inputs::AzureEventHubs do

Expand Down Expand Up @@ -67,18 +60,18 @@
mock_queue = double("queue")
mock_host = double("event_processor_host")
mock_host_context = double("host_context")
completable_future = CompletableFuture.new
completable_future = java.util.concurrent.CompletableFuture.new
#simulate work being done before completing the future
Thread.new do
sleep 2
completable_future.complete("")
end

# rspec has issues with counters and concurrent code, so use threadsafe counters instead
host_counter = AtomicInteger.new
register_counter = AtomicInteger.new
unregister_counter = AtomicInteger.new
assertion_count = AtomicInteger.new
host_counter = java.util.concurrent.atomic.AtomicInteger.new
register_counter = java.util.concurrent.atomic.AtomicInteger.new
unregister_counter = java.util.concurrent.atomic.AtomicInteger.new
assertion_count = java.util.concurrent.atomic.AtomicInteger.new

allow(mock_host).to receive(:getHostContext) {mock_host_context}
allow(mock_host_context).to receive(:getEventHubPath) {"foo"}
Expand Down Expand Up @@ -202,18 +195,18 @@
mock_queue = double("queue")
mock_host = double("event_processor_host")
mock_host_context = double("host_context")
completable_future = CompletableFuture.new
completable_future = java.util.concurrent.CompletableFuture.new
#simulate work being done before completing the future
Thread.new do
sleep 2
completable_future.complete("")
end

# rspec has issues with counters and concurrent code, so use threadsafe counters instead
host_counter = AtomicInteger.new
register_counter = AtomicInteger.new
unregister_counter = AtomicInteger.new
assertion_count = AtomicInteger.new
host_counter = java.util.concurrent.atomic.AtomicInteger.new
register_counter = java.util.concurrent.atomic.AtomicInteger.new
unregister_counter = java.util.concurrent.atomic.AtomicInteger.new
assertion_count = java.util.concurrent.atomic.AtomicInteger.new
allow_any_instance_of(InMemoryLeaseManager).to receive(:java_send)
allow_any_instance_of(InMemoryCheckpointManager).to receive(:java_send)

Expand Down Expand Up @@ -319,4 +312,3 @@
end
end
end

0 comments on commit cdac9ea

Please sign in to comment.