Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Azure event Hub client library to version 3.3.0 #96

Merged
merged 10 commits into from
Oct 25, 2024
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 1.5.0
- Updated Azure Event Hub client library to version 3.3.0 [#96](https://github.com/logstash-plugins/logstash-input-azure_event_hubs/pull/96)

## 1.4.9
- Fixed issue with `getHostContext` method accessibility, causing plugin not to be able to run [#93](https://github.com/logstash-plugins/logstash-input-azure_event_hubs/pull/93)

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.4.9
1.5.0
54 changes: 45 additions & 9 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,43 @@ repositories {
dependencies {
testImplementation 'junit:junit:4.12'

implementation 'com.microsoft.azure:azure-eventhubs:2.3.2'
implementation 'com.microsoft.azure:azure-eventhubs:3.3.0'

implementation 'com.microsoft.azure:qpid-proton-j-extensions:1.2.4'
implementation 'com.microsoft.azure:azure-eventhubs-eph:2.5.2'
implementation 'com.microsoft.azure:azure-eventhubs-eph:3.3.0'
implementation 'com.microsoft.azure:azure-storage:8.6.6'
implementation 'com.google.code.gson:gson:2.10.1'
implementation 'org.apache.qpid:proton-j:0.33.9'
implementation 'com.google.code.gson:gson:2.8.9'
implementation 'org.apache.qpid:proton-j:0.33.8'

implementation 'com.microsoft.azure:azure-keyvault-core:1.2.4'
implementation 'com.microsoft.azure:adal4j:1.6.4'
implementation 'com.microsoft.azure:azure-annotations:1.10.0'
implementation 'com.microsoft.azure:azure-client-authentication:1.7.3'
implementation 'com.microsoft.azure:azure-client-runtime:1.7.3'
implementation 'com.microsoft.rest:client-runtime:1.7.3'
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.12.7'
implementation 'com.fasterxml.jackson.core:jackson-core:2.12.7'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.12.7.1'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-joda:2.12.7'
implementation 'com.github.stephenc.jcip:jcip-annotations:1.0-1'
implementation 'com.google.guava:guava:32.0.1-jre'
implementation 'com.nimbusds:lang-tag:1.7'
implementation 'com.nimbusds:nimbus-jose-jwt:9.37.2'
implementation 'com.nimbusds:oauth2-oidc-sdk:6.5'
implementation 'com.squareup.okhttp3:logging-interceptor:3.12.2'
implementation 'com.squareup.okhttp3:okhttp-urlconnection:3.12.2'
implementation 'com.squareup.okhttp3:okhttp:3.14.7'
implementation 'com.squareup.okio:okio:1.17.6'
implementation 'com.squareup.retrofit2:adapter-rxjava:2.7.2'
implementation 'com.squareup.retrofit2:converter-jackson:2.7.2'
implementation 'com.squareup.retrofit2:retrofit:2.7.2'
implementation 'com.sun.mail:javax.mail:1.6.1'
implementation 'commons-codec:commons-codec:1.11'
implementation 'io.reactivex:rxjava:1.3.8'
implementation 'javax.activation:activation:1.1'
implementation 'net.minidev:json-smart:2.4.9'
implementation 'org.checkerframework:checker-compat-qual:2.0.0'
implementation 'org.codehaus.mojo:animal-sniffer-annotations:1.14'
compileOnly 'org.apache.logging.log4j:log4j-api:2.17.0' // provided by Logstash
testImplementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.17.0' // provided by Logstash
}
Expand All @@ -47,11 +78,16 @@ task vendor {
doLast {
String vendorPathPrefix = "vendor/jar-dependencies"
configurations.runtimeClasspath.allDependencies.each { dep ->
File f = configurations.runtimeClasspath.filter { it.absolutePath.contains("${dep.group}" + File.separator + "${dep.name}" + File.separator + "${dep.version}") }.singleFile
String groupPath = dep.group.replaceAll('\\.', '/')
File newJarFile = file("${vendorPathPrefix}" + File.separator + "${groupPath}" + File.separator + "${dep.name}"+ File.separator + "${dep.version}" + File.separator + "${dep.name}-${dep.version}.jar")
newJarFile.mkdirs()
Files.copy(f.toPath(), newJarFile.toPath(), REPLACE_EXISTING)
FileCollection fileCollection = configurations.runtimeClasspath.filter { it.absolutePath.contains("${dep.group}" + File.separator + "${dep.name}" + File.separator + "${dep.version}") }
if (fileCollection.isEmpty()) {
println "runtimeClasspath is empty for dependency ${dep.group}" + File.separator + "${dep.name}" + File.separator + "${dep.version}"
} else {
File f = fileCollection.singleFile
String groupPath = dep.group.replaceAll('\\.', '/')
File newJarFile = file("${vendorPathPrefix}" + File.separator + "${groupPath}" + File.separator + "${dep.name}" + File.separator + "${dep.version}" + File.separator + "${dep.name}-${dep.version}.jar")
newJarFile.mkdirs()
Files.copy(f.toPath(), newJarFile.toPath(), REPLACE_EXISTING)
}
}
}
}
Expand Down
37 changes: 33 additions & 4 deletions lib/logstash-input-azure_event_hubs.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,38 @@
# AUTOGENERATED BY THE GRADLE SCRIPT. DO NOT EDIT.

require 'jar_dependencies'
require_jar('com.microsoft.azure', 'azure-eventhubs', '2.3.2')
require_jar('com.microsoft.azure', 'azure-eventhubs', '3.3.0')
require_jar('com.microsoft.azure', 'qpid-proton-j-extensions', '1.2.4')
require_jar('com.microsoft.azure', 'azure-eventhubs-eph', '2.5.2')
require_jar('com.microsoft.azure', 'azure-eventhubs-eph', '3.3.0')
require_jar('com.microsoft.azure', 'azure-storage', '8.6.6')
require_jar('com.google.code.gson', 'gson', '2.10.1')
require_jar('org.apache.qpid', 'proton-j', '0.33.9')
require_jar('com.google.code.gson', 'gson', '2.8.9')
require_jar('org.apache.qpid', 'proton-j', '0.33.8')
require_jar('com.microsoft.azure', 'azure-keyvault-core', '1.2.4')
require_jar('com.microsoft.azure', 'adal4j', '1.6.4')
require_jar('com.microsoft.azure', 'azure-annotations', '1.10.0')
require_jar('com.microsoft.azure', 'azure-client-authentication', '1.7.3')
require_jar('com.microsoft.azure', 'azure-client-runtime', '1.7.3')
require_jar('com.microsoft.rest', 'client-runtime', '1.7.3')
require_jar('com.fasterxml.jackson.core', 'jackson-annotations', '2.12.7')
require_jar('com.fasterxml.jackson.core', 'jackson-core', '2.12.7')
require_jar('com.fasterxml.jackson.core', 'jackson-databind', '2.12.7.1')
require_jar('com.fasterxml.jackson.datatype', 'jackson-datatype-joda', '2.12.7')
require_jar('com.github.stephenc.jcip', 'jcip-annotations', '1.0-1')
require_jar('com.google.guava', 'guava', '32.0.1-jre')
require_jar('com.nimbusds', 'lang-tag', '1.7')
require_jar('com.nimbusds', 'nimbus-jose-jwt', '9.37.2')
require_jar('com.nimbusds', 'oauth2-oidc-sdk', '6.5')
require_jar('com.squareup.okhttp3', 'logging-interceptor', '3.12.2')
require_jar('com.squareup.okhttp3', 'okhttp-urlconnection', '3.12.2')
require_jar('com.squareup.okhttp3', 'okhttp', '3.14.7')
require_jar('com.squareup.okio', 'okio', '1.17.6')
require_jar('com.squareup.retrofit2', 'adapter-rxjava', '2.7.2')
require_jar('com.squareup.retrofit2', 'converter-jackson', '2.7.2')
require_jar('com.squareup.retrofit2', 'retrofit', '2.7.2')
require_jar('com.sun.mail', 'javax.mail', '1.6.1')
require_jar('commons-codec', 'commons-codec', '1.11')
require_jar('io.reactivex', 'rxjava', '1.3.8')
require_jar('javax.activation', 'activation', '1.1')
require_jar('net.minidev', 'json-smart', '2.4.9')
require_jar('org.checkerframework', 'checker-compat-qual', '2.0.0')
require_jar('org.codehaus.mojo', 'animal-sniffer-annotations', '1.14')
33 changes: 16 additions & 17 deletions lib/logstash/inputs/azure_event_hubs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -401,14 +401,17 @@ def run(queue)
@logger.info("Event Hub #{event_hub_name} is initializing... ")
begin
if event_hub['storage_connection']
event_processor_host = EventProcessorHost.new(
EventProcessorHost.createHostName('logstash'),
event_hub_name,
event_hub['consumer_group'],
event_hub['event_hub_connections'].first.value, #there will only be one in this array by the time it gets here
event_hub['storage_connection'].value,
event_hub.fetch('storage_container', event_hub_name),
scheduled_executor_service)
event_processor_host = EventProcessorHost::EventProcessorHostBuilder.newBuilder(EventProcessorHost.createHostName('logstash'), event_hub['consumer_group'])
.useAzureStorageCheckpointLeaseManager(
event_hub['storage_connection'].value,
event_hub.fetch('storage_container', event_hub_name),
nil
)
.useEventHubConnectionString(
event_hub['event_hub_connections'].first.value, #there will only be one in this array by the time it gets here
)
.setExecutor(scheduled_executor_service)
.build
else
@logger.warn("You have NOT specified a `storage_connection_string` for #{event_hub_name}. This configuration is only supported for a single Logstash instance.")
event_processor_host = create_in_memory_event_processor_host(event_hub, event_hub_name, scheduled_executor_service)
Expand Down Expand Up @@ -487,15 +490,11 @@ def run(queue)
def create_in_memory_event_processor_host(event_hub, event_hub_name, scheduled_executor_service)
checkpoint_manager = InMemoryCheckpointManager.new
lease_manager = InMemoryLeaseManager.new
event_processor_host = EventProcessorHost.new(
EventProcessorHost.createHostName('logstash'),
event_hub_name,
event_hub['consumer_group'],
event_hub['event_hub_connections'].first.value, #there will only be one in this array by the time it gets here
checkpoint_manager,
lease_manager,
scheduled_executor_service,
nil)
event_processor_host = EventProcessorHost::EventProcessorHostBuilder.newBuilder(EventProcessorHost.createHostName('logstash'), event_hub['consumer_group'])
.useUserCheckpointAndLeaseManagers(checkpoint_manager, lease_manager)
.useEventHubConnectionString(event_hub['event_hub_connections'].first.value) #there will only be one in this array by the time it gets here
.setExecutor(scheduled_executor_service)
.build
host_context = get_host_context(event_processor_host)
#using java_send to avoid naming conflicts with 'initialize' method
lease_manager.java_send :initialize, [HostContext], host_context
Expand Down
97 changes: 62 additions & 35 deletions spec/inputs/azure_event_hub_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,25 +84,39 @@
unregister_counter.incrementAndGet
completable_future
}
expect(EventProcessorHost).to receive(:new).at_most(2).times {|host_name, event_hub_name, consumer_group, event_hub_connection, storage_connection, container, executor|
case event_hub_name
when 'event_hub_name0'

assertion_count.incrementAndGet
build_step_mock = double("final build step")
expect(build_step_mock).to receive(:build).at_most(2).times.and_return(mock_host)

executor_step = double("executor step")
expect(executor_step).to receive(:setExecutor).at_most(2).times.and_return(build_step_mock)

mock_connection_string = double("connection string")
expect(mock_connection_string).to receive(:useEventHubConnectionString).at_most(2).times {|event_hub_connection|
case event_hub_connection
when /.*event_hub_name0$/
expect(event_hub_connection).to eql(config['event_hub_connections'][0])
expect(container).to eql('event_hub_name0') # default
when /.*event_hub_name1$/
expect(event_hub_connection).to eql(config['event_hub_connections'][1])
end
executor_step
}

mock_builder = double("storage and lease managers")
expect(mock_builder).to receive(:useAzureStorageCheckpointLeaseManager).at_most(2).times {|storage_connection_str, storage_container, storage_blob_prefix|
case storage_container
when 'event_hub_name0'
assertion_count.incrementAndGet
when 'event_hub_name1'
assertion_count.incrementAndGet
expect(host_name).to start_with('logstash')
expect(event_hub_connection).to eql(config['event_hub_connections'][1])
expect(container).to eql('event_hub_name1') # default
end
expect(host_name).to start_with('logstash')
expect(storage_connection).to eql(config['storage_connection'])
mock_connection_string
}

expect(EventProcessorHost::EventProcessorHostBuilder).to receive(:newBuilder).at_most(2).times {|host_name, consumer_group|
expect(host_name).to start_with('logstash')
host_counter.incrementAndGet
mock_host
mock_builder
}
# signal the stop first since the run method blocks until stop is called.
input.do_stop
Expand All @@ -126,8 +140,6 @@
expect(exploded_config[0]['event_hub_connections'][0].value).to eql('Endpoint=sb://logstash/;SharedAccessKeyName=activity-log-readonly;SharedAccessKey=something;EntityPath=event_hub1')
end
end


end

describe "Advanced Config" do
Expand Down Expand Up @@ -221,32 +233,45 @@
unregister_counter.incrementAndGet
completable_future
}
expect(EventProcessorHost).to receive(:new).at_most(3).times {|host_name, event_hub_name, consumer_group, event_hub_connection, storage_connection, container, executor|
case event_hub_name

build_step_mock = double("final build step")
expect(build_step_mock).to receive(:build).at_most(3).times.and_return(mock_host)

executor_step = double("executor step")
expect(executor_step).to receive(:setExecutor).at_most(3).times.and_return(build_step_mock)

mock_connection_string = double("connection string")
expect(mock_connection_string).to receive(:useEventHubConnectionString).at_most(3).times {|event_hub_connection|
case event_hub_connection
when /.*event_hub_name0$/
expect(event_hub_connection).to eql(config['event_hubs'][0]['event_hub_name0']['event_hub_connections'][0].value)
when /.*event_hub_name1$/
expect(event_hub_connection).to eql(config['event_hubs'][1]['event_hub_name1']['event_hub_connections'][0].value)
end
executor_step
}

managers_mock = double("checkpoint and lease managers")
expect(managers_mock).to receive(:useUserCheckpointAndLeaseManagers).at_most(3).times {|checkpoint_mngr, lease_mngr|
assertion_count.incrementAndGet
mock_connection_string
}
expect(managers_mock).to receive(:useAzureStorageCheckpointLeaseManager).at_most(3).times {|storage_connection_str, storage_container, storage_blob_prefix|
case storage_container
when 'event_hub_name0'
if consumer_group.eql?('cg')
assertion_count.incrementAndGet
expect(host_name).to start_with('logstash')
expect(event_hub_connection).to eql(config['event_hubs'][0]['event_hub_name0']['event_hub_connections'][0].value)
expect(storage_connection).to eql(config['event_hubs'][0]['event_hub_name0']['storage_connection'].value)
expect(container).to eql('event_hub_name0') # default
elsif consumer_group.eql?('ls')
assertion_count.incrementAndGet
expect(event_hub_connection).to eql(config['event_hubs'][2]['event_hub_name0']['event_hub_connections'][0].value)
# in this mode, storage connection and container are replaced with in memory offset management
expect(storage_connection).to be_kind_of(InMemoryCheckpointManager)
expect(container).to be_kind_of(InMemoryLeaseManager)
end
when 'event_hub_name1'
assertion_count.incrementAndGet
expect(host_name).to start_with('logstash')
expect(event_hub_connection).to eql(config['event_hubs'][1]['event_hub_name1']['event_hub_connections'][0].value)
expect(storage_connection).to eql(config['event_hubs'][1]['event_hub_name1']['storage_connection'].value)
expect(container).to eql(config['event_hubs'][1]['event_hub_name1']['storage_container'])
when 'alt_container'
assertion_count.incrementAndGet
end
mock_connection_string
}

expect(EventProcessorHost::EventProcessorHostBuilder).to receive(:newBuilder).at_most(3).times {|host_name, consumer_group|
expect(host_name).to start_with('logstash')
host_counter.incrementAndGet
mock_host
managers_mock
}

# signal the stop first since the run method blocks until stop is called.
input.do_stop
input.run(mock_queue)
Expand All @@ -259,12 +284,14 @@
it "can create an in memory EPH" do
#event_hub, event_hub_name, scheduled_executor_service
exploded_config = input.event_hubs_exploded
# During build step Azure libraries does a syntax check of EventHub connection string, so needs to be a pseudo real
exploded_config[0]['event_hub_connections'] = [::LogStash::Util::Password.new("Endpoint=sb://logstash.windows.net/;SharedAccessKeyName=activity-log-read-only;SharedAccessKey=blabla;EntityPath=ops-logs")]
input.create_in_memory_event_processor_host(exploded_config[0], exploded_config[0]['event_hubs'].first, nil)
end
end

describe "Bad Basic Config" do
describe "Offset overwritting" do
describe "Offset overwriting" do
let(:config) do
{
'event_hub_connections' => ['Endpoint=sb://...;EntityPath=event_hub_name0', 'Endpoint=sb://...;EntityPath=event_hub_name0'],
Expand Down