Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force all usage of sql_last_value to be typed according to the settings #260

Merged
merged 3 commits into from
Feb 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
## 4.3.5
- [#140](https://github.com/logstash-plugins/logstash-input-jdbc/issues/140) Fix long standing bug where setting jdbc_default_timezone loses milliseconds. Force all usage of sql_last_value to be typed according to the settings.

## 4.3.4
- [#261](https://github.com/logstash-plugins/logstash-input-jdbc/issues/261) Fix memory leak.

## 4.3.3
- [#255](https://github.com/logstash-plugins/logstash-input-jdbc/issues/255) Fix thread and memory leak.

## 4.3.2
- [#251](https://github.com/logstash-plugins/logstash-input-jdbc/issues/251) Fix connection and memory leak.

## 4.3.1
- Update gemspec summary

## 4.3.0
- [#147](https://github.com/logstash-plugins/logstash-input-jdbc/issues/147) Open and close connection for each query
## 4.3.0
- [#147](https://github.com/logstash-plugins/logstash-input-jdbc/issues/147) Open and close connection for each query

## 4.2.4
- [#220](https://github.com/logstash-plugins/logstash-input-jdbc/issues/220) Log exception when database connection test fails
Expand Down
Empty file removed NOTICE
Empty file.
24 changes: 6 additions & 18 deletions lib/logstash/inputs/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
require "logstash/inputs/base"
require "logstash/namespace"
require "logstash/plugin_mixins/jdbc"
require "yaml" # persistence


# This plugin was created as a way to ingest data from any database
# with a JDBC interface into Logstash. You can periodically schedule ingestion
Expand Down Expand Up @@ -206,21 +206,16 @@ def register
require "rufus/scheduler"
prepare_jdbc_connection

# Raise an error if @use_column_value is true, but no @tracking_column is set
if @use_column_value
# Raise an error if @use_column_value is true, but no @tracking_column is set
if @tracking_column.nil?
raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.")
end
end

@enable_encoding = [email protected]? || !@columns_charset.empty?
@value_tracker = LogStash::PluginMixins::ValueTracking.build_last_value_tracker(self)

# load sql_last_value from file if exists
if @clean_run && File.exist?(@last_run_metadata_path)
File.delete(@last_run_metadata_path)
elsif File.exist?(@last_run_metadata_path)
@sql_last_value = YAML.load(File.read(@last_run_metadata_path))
end
@enable_encoding = [email protected]? || !@columns_charset.empty?

unless @statement.nil? ^ @statement_filepath.nil?
raise(LogStash::ConfigurationError, "Must set either :statement or :statement_filepath. Only one may be set at a time.")
Expand Down Expand Up @@ -248,13 +243,11 @@ def run(queue)
@scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
@scheduler.cron @schedule do
execute_query(queue)
update_state_file
end

@scheduler.join
else
execute_query(queue)
update_state_file
end
end # def run

Expand All @@ -267,7 +260,7 @@ def stop

def execute_query(queue)
# update default parameters
@parameters['sql_last_value'] = @sql_last_value
@parameters['sql_last_value'] = @value_tracker.value
execute_statement(@statement, @parameters) do |row|
if enable_encoding?
## do the necessary conversions to string elements
Expand All @@ -277,12 +270,7 @@ def execute_query(queue)
decorate(event)
queue << event
end
end

def update_state_file
if @record_last_run
File.write(@last_run_metadata_path, YAML.dump(@sql_last_value))
end
@value_tracker.write
end

private
Expand Down
21 changes: 5 additions & 16 deletions lib/logstash/plugin_mixins/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "logstash/config/mixin"
require "time"
require "date"
require "logstash/plugin_mixins/value_tracking"

java_import java.util.concurrent.locks.ReentrantLock

Expand Down Expand Up @@ -195,17 +196,7 @@ def open_jdbc_connection
public
def prepare_jdbc_connection
@connection_lock = ReentrantLock.new
if @use_column_value
case @tracking_column_type
when "numeric"
@sql_last_value = 0
when "timestamp"
@sql_last_value = Time.at(0).utc
end
else
@sql_last_value = Time.at(0).utc
end
end # def prepare_jdbc_connection
end

public
def close_jdbc_connection
Expand All @@ -229,22 +220,20 @@ def execute_statement(statement, parameters)
begin
parameters = symbolized_params(parameters)
query = @database[statement, parameters]
sql_last_value = @use_column_value ? @sql_last_value : Time.now.utc

sql_last_value = @use_column_value ? @value_tracker.value : Time.now.utc
@tracking_column_warning_sent = false
@logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => query.count)

perform_query(query) do |row|
sql_last_value = get_column_value(row) if @use_column_value
if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime)
sql_last_value = sql_last_value.to_time # Coerce the timestamp to a `Time`
end
yield extract_values_from(row)
end
success = true
rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e
@logger.warn("Exception when executing JDBC query", :exception => e)
else
@sql_last_value = sql_last_value
@value_tracker.set_value(sql_last_value)
ensure
close_jdbc_connection
@connection_lock.unlock
Expand Down
124 changes: 124 additions & 0 deletions lib/logstash/plugin_mixins/value_tracking.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# encoding: utf-8
require "yaml" # persistence

module LogStash module PluginMixins
class ValueTracking

def self.build_last_value_tracker(plugin)
if plugin.use_column_value && plugin.tracking_column_type == "numeric"
# use this irrespective of the jdbc_default_timezone setting
klass = NumericValueTracker
else
if plugin.jdbc_default_timezone.nil? || plugin.jdbc_default_timezone.empty?
# no TZ stuff for Sequel, use Time
klass = TimeValueTracker
else
# Sequel does timezone handling on DateTime only
klass = DateTimeValueTracker
end
end

handler = NullFileHandler.new(plugin.last_run_metadata_path)
if plugin.record_last_run
handler = FileHandler.new(plugin.last_run_metadata_path)
end
if plugin.clean_run
handler.clean
end

instance = klass.new(handler)
end

attr_reader :value

def initialize(handler)
@file_handler = handler
set_value(get_initial)
end

def get_initial
# override in subclass
end

def set_value(value)
# override in subclass
end

def write
@file_handler.write(@value)
end
end


class NumericValueTracker < ValueTracking
def get_initial
@file_handler.read || 0
end

def set_value(value)
return unless value.is_a?(Numeric)
@value = value
end
end

class DateTimeValueTracker < ValueTracking
def get_initial
@file_handler.read || DateTime.new(1970)
end

def set_value(value)
if value.respond_to?(:to_datetime)
@value = value.to_datetime
end
end
end

class TimeValueTracker < ValueTracking
def get_initial
@file_handler.read || Time.at(0).utc
end

def set_value(value)
if value.respond_to?(:to_time)
@value = value.to_time
end
end
end

class FileHandler
def initialize(path)
@path = path
@exists = ::File.exist?(@path)
end

def clean
return unless @exists
::File.delete(@path)
@exists = false
end

def read
return unless @exists
YAML.load(::File.read(@path))
end

def write(value)
::File.write(@path, YAML.dump(value))
@exists = true
end
end

class NullFileHandler
def initialize(path)
end

def clean
end

def read
end

def write(value)
end
end
end end
2 changes: 1 addition & 1 deletion logstash-input-jdbc.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-input-jdbc'
s.version = '4.3.4'
s.version = '4.3.5'
s.licenses = ['Apache License (2.0)']
s.summary = "Creates events from JDBC data"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
Loading