Skip to content

Commit

Permalink
Fix connection and memory leak on pipeline reload.
Browse files Browse the repository at this point in the history
When the pipeline reload thread closes the connection, but the connection is still active (becuase is executing a query), the connection and associated memory will leak. The underlying library does not make any assurances that the connection will indeed be closed if currently in use, and observation shows this to be true. The fix here is to ensure that the a thread can not attempt close the connection (it will block) while the connection is in use. In this case that thread is the pipeline thread and the reload will now block until the current query finishes (or errors).

Fixes logstash-plugins#251
  • Loading branch information
jakelandis committed Dec 6, 2017
1 parent 85335a2 commit b655c6b
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 30 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 4.3.2
- [#251](https://github.com/logstash-plugins/logstash-input-jdbc/issues/251) Fix connection and memory leak.

## 4.3.1
- Update gemspec summary

Expand Down
3 changes: 1 addition & 2 deletions lib/logstash/inputs/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,8 @@ def run(queue)
end # def run

def stop
@scheduler.stop if @scheduler

close_jdbc_connection
@scheduler.stop if @scheduler
end

private
Expand Down
65 changes: 38 additions & 27 deletions lib/logstash/plugin_mixins/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
require "time"
require "date"

java_import java.util.concurrent.locks.ReentrantLock

# Tentative of abstracting JDBC logic to a mixin
# for potential reuse in other plugins (input/output)
module LogStash::PluginMixins::Jdbc
Expand Down Expand Up @@ -191,6 +193,7 @@ def open_jdbc_connection

public
def prepare_jdbc_connection
@connection_lock = ReentrantLock.new
if @use_column_value
case @tracking_column_type
when "numeric"
Expand All @@ -206,50 +209,58 @@ def prepare_jdbc_connection
public
def close_jdbc_connection
begin
# pipeline restarts can also close the jdbc connection, block until the current executing statement is finished to avoid leaking connections
# connections in use won't really get closed
@connection_lock.lock
@database.disconnect if @database
rescue => e
@logger.warn("Failed to close connection", :exception => e)
ensure
@connection_lock.unlock
end
end

public
def execute_statement(statement, parameters)
success = false
open_jdbc_connection
begin
parameters = symbolized_params(parameters)
query = @database[statement, parameters]
sql_last_value = @use_column_value ? @sql_last_value : Time.now.utc
@tracking_column_warning_sent = false
@logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => query.count)

if @jdbc_paging_enabled
query.each_page(@jdbc_page_size) do |paged_dataset|
paged_dataset.each do |row|
success = false
@connection_lock.lock
open_jdbc_connection
begin
parameters = symbolized_params(parameters)
query = @database[statement, parameters]
sql_last_value = @use_column_value ? @sql_last_value : Time.now.utc
@tracking_column_warning_sent = false
@logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => query.count)

if @jdbc_paging_enabled
query.each_page(@jdbc_page_size) do |paged_dataset|
paged_dataset.each do |row|
sql_last_value = get_column_value(row) if @use_column_value
if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime)
sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time`
end
yield extract_values_from(row)
end
end
else
query.each do |row|
sql_last_value = get_column_value(row) if @use_column_value
if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime)
sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time`
end
yield extract_values_from(row)
end
end
success = true
rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e
@logger.warn("Exception when executing JDBC query", :exception => e)
else
query.each do |row|
sql_last_value = get_column_value(row) if @use_column_value
if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime)
sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time`
end
yield extract_values_from(row)
end
@sql_last_value = sql_last_value
ensure
close_jdbc_connection
@connection_lock.unlock
end
success = true
rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e
@logger.warn("Exception when executing JDBC query", :exception => e)
else
@sql_last_value = sql_last_value
end
close_jdbc_connection
return success
return success
end

public
Expand Down
2 changes: 1 addition & 1 deletion logstash-input-jdbc.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-input-jdbc'
s.version = '4.3.1'
s.version = '4.3.2'
s.licenses = ['Apache License (2.0)']
s.summary = "Creates events from JDBC data"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down

0 comments on commit b655c6b

Please sign in to comment.