diff --git a/CHANGELOG.md b/CHANGELOG.md index 0110c30..81e7440 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 4.3.12 + - Added check to prevent count sql syntax errors when debug logging [Issue #287](https://github.com/logstash-plugins/logstash-input-jdbc/issue/287) and [Pull Request #294](https://github.com/logstash-plugins/logstash-input-jdbc/pull/294) + ## 4.3.11 - Fixed crash that occurs when receiving string input that cannot be coerced to UTF-8 (such as BLOB data) [#291](https://github.com/logstash-plugins/logstash-input-jdbc/pull/291) diff --git a/NOTICE.TXT b/NOTICE.TXT index 0b8a947..af8b562 100644 --- a/NOTICE.TXT +++ b/NOTICE.TXT @@ -1,5 +1,5 @@ Elasticsearch -Copyright 2012-2015 Elasticsearch +Copyright 2012-2018 Elasticsearch This product includes software developed by The Apache Software Foundation (http://www.apache.org/). \ No newline at end of file diff --git a/lib/logstash/inputs/jdbc.rb b/lib/logstash/inputs/jdbc.rb index 47887a8..aa47c0f 100755 --- a/lib/logstash/inputs/jdbc.rb +++ b/lib/logstash/inputs/jdbc.rb @@ -1,7 +1,7 @@ # encoding: utf-8 require "logstash/inputs/base" require "logstash/namespace" -require "logstash/plugin_mixins/jdbc" +require "logstash/plugin_mixins/jdbc/jdbc" # This plugin was created as a way to ingest data from any database @@ -123,8 +123,8 @@ # } # --------------------------------------------------------------------------------------------------- # -class LogStash::Inputs::Jdbc < LogStash::Inputs::Base - include LogStash::PluginMixins::Jdbc +module LogStash module Inputs class Jdbc < LogStash::Inputs::Base + include LogStash::PluginMixins::Jdbc::Jdbc config_name "jdbc" # If undefined, Logstash will complain, even if codec is unused. @@ -213,7 +213,8 @@ def register end end - @value_tracker = LogStash::PluginMixins::ValueTracking.build_last_value_tracker(self) + set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTracking.build_last_value_tracker(self)) + set_statement_logger(LogStash::PluginMixins::Jdbc::CheckedCountLogger.new(@logger)) @enable_encoding = !@charset.nil? || !@columns_charset.empty? @@ -221,13 +222,13 @@ def register raise(LogStash::ConfigurationError, "Must set either :statement or :statement_filepath. Only one may be set at a time.") end - @statement = File.read(@statement_filepath) if @statement_filepath + @statement = ::File.read(@statement_filepath) if @statement_filepath if (@jdbc_password_filepath and @jdbc_password) raise(LogStash::ConfigurationError, "Only one of :jdbc_password, :jdbc_password_filepath may be set at a time.") end - @jdbc_password = LogStash::Util::Password.new(File.read(@jdbc_password_filepath).strip) if @jdbc_password_filepath + @jdbc_password = LogStash::Util::Password.new(::File.read(@jdbc_password_filepath).strip) if @jdbc_password_filepath if enable_encoding? encodings = @columns_charset.values @@ -241,6 +242,15 @@ def register end end # def register + # test injection points + def set_statement_logger(instance) + @statement_logger = instance + end + + def set_value_tracker(instance) + @value_tracker = instance + end + def run(queue) if @schedule @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) @@ -296,4 +306,4 @@ def convert(column_name, value) value end end -end # class LogStash::Inputs::Jdbc +end end end # class LogStash::Inputs::Jdbc diff --git a/lib/logstash/plugin_mixins/jdbc.rb b/lib/logstash/plugin_mixins/jdbc.rb deleted file mode 100644 index c9bc547..0000000 --- a/lib/logstash/plugin_mixins/jdbc.rb +++ /dev/null @@ -1,313 +0,0 @@ -# encoding: utf-8 -# TAKEN FROM WIIBAA -require "logstash/config/mixin" -require "time" -require "date" -require "logstash/plugin_mixins/value_tracking" - -java_import java.util.concurrent.locks.ReentrantLock - -# Tentative of abstracting JDBC logic to a mixin -# for potential reuse in other plugins (input/output) -module LogStash::PluginMixins::Jdbc - - # This method is called when someone includes this module - def self.included(base) - # Add these methods to the 'base' given. - base.extend(self) - base.setup_jdbc_config - end - - - public - def setup_jdbc_config - # JDBC driver library path to third party driver library. In case of multiple libraries being - # required you can pass them separated by a comma. - # - # If not provided, Plugin will look for the driver class in the Logstash Java classpath. - config :jdbc_driver_library, :validate => :string - - # JDBC driver class to load, for exmaple, "org.apache.derby.jdbc.ClientDriver" - # NB per https://github.com/logstash-plugins/logstash-input-jdbc/issues/43 if you are using - # the Oracle JDBC driver (ojdbc6.jar) the correct `jdbc_driver_class` is `"Java::oracle.jdbc.driver.OracleDriver"` - config :jdbc_driver_class, :validate => :string, :required => true - - # JDBC connection string - config :jdbc_connection_string, :validate => :string, :required => true - - # JDBC user - config :jdbc_user, :validate => :string, :required => true - - # JDBC password - config :jdbc_password, :validate => :password - - # JDBC password filename - config :jdbc_password_filepath, :validate => :path - - # JDBC enable paging - # - # This will cause a sql statement to be broken up into multiple queries. - # Each query will use limits and offsets to collectively retrieve the full - # result-set. The limit size is set with `jdbc_page_size`. - # - # Be aware that ordering is not guaranteed between queries. - config :jdbc_paging_enabled, :validate => :boolean, :default => false - - # JDBC page size - config :jdbc_page_size, :validate => :number, :default => 100000 - - # JDBC fetch size. if not provided, respective driver's default will be used - config :jdbc_fetch_size, :validate => :number - - # Connection pool configuration. - # Validate connection before use. - config :jdbc_validate_connection, :validate => :boolean, :default => false - - # Connection pool configuration. - # How often to validate a connection (in seconds) - config :jdbc_validation_timeout, :validate => :number, :default => 3600 - - # Connection pool configuration. - # The amount of seconds to wait to acquire a connection before raising a PoolTimeoutError (default 5) - config :jdbc_pool_timeout, :validate => :number, :default => 5 - - # Timezone conversion. - # SQL does not allow for timezone data in timestamp fields. This plugin will automatically - # convert your SQL timestamp fields to Logstash timestamps, in relative UTC time in ISO8601 format. - # - # Using this setting will manually assign a specified timezone offset, instead - # of using the timezone setting of the local machine. You must use a canonical - # timezone, *America/Denver*, for example. - config :jdbc_default_timezone, :validate => :string - - # General/Vendor-specific Sequel configuration options. - # - # An example of an optional connection pool configuration - # max_connections - The maximum number of connections the connection pool - # - # examples of vendor-specific options can be found in this - # documentation page: https://github.com/jeremyevans/sequel/blob/master/doc/opening_databases.rdoc - config :sequel_opts, :validate => :hash, :default => {} - - # Log level at which to log SQL queries, the accepted values are the common ones fatal, error, warn, - # info and debug. The default value is info. - config :sql_log_level, :validate => [ "fatal", "error", "warn", "info", "debug" ], :default => "info" - - # Maximum number of times to try connecting to database - config :connection_retry_attempts, :validate => :number, :default => 1 - # Number of seconds to sleep between connection attempts - config :connection_retry_attempts_wait_time, :validate => :number, :default => 0.5 - end - - private - def jdbc_connect - opts = { - :user => @jdbc_user, - :password => @jdbc_password.nil? ? nil : @jdbc_password.value, - :pool_timeout => @jdbc_pool_timeout, - :keep_reference => false - }.merge(@sequel_opts) - retry_attempts = @connection_retry_attempts - loop do - retry_attempts -= 1 - begin - return Sequel.connect(@jdbc_connection_string, opts=opts) - rescue Sequel::PoolTimeout => e - if retry_attempts <= 0 - @logger.error("Failed to connect to database. #{@jdbc_pool_timeout} second timeout exceeded. Tried #{@connection_retry_attempts} times.") - raise e - else - @logger.error("Failed to connect to database. #{@jdbc_pool_timeout} second timeout exceeded. Trying again.") - end - rescue Sequel::Error => e - if retry_attempts <= 0 - @logger.error("Unable to connect to database. Tried #{@connection_retry_attempts} times", :error_message => e.message, ) - raise e - else - @logger.error("Unable to connect to database. Trying again", :error_message => e.message) - end - end - sleep(@connection_retry_attempts_wait_time) - end - end - - private - def load_drivers(drivers) - drivers.each do |driver| - begin - class_loader = java.lang.ClassLoader.getSystemClassLoader().to_java(java.net.URLClassLoader) - class_loader.add_url(java.io.File.new(driver).toURI().toURL()) - rescue => e - @logger.error("Failed to load #{driver}", :exception => e) - end - end - end - - private - def open_jdbc_connection - require "java" - require "sequel" - require "sequel/adapters/jdbc" - load_drivers(@jdbc_driver_library.split(",")) if @jdbc_driver_library - - begin - Sequel::JDBC.load_driver(@jdbc_driver_class) - rescue Sequel::AdapterNotFound => e - message = if @jdbc_driver_library.nil? - ":jdbc_driver_library is not set, are you sure you included - the proper driver client libraries in your classpath?" - else - "Are you sure you've included the correct jdbc driver in :jdbc_driver_library?" - end - raise LogStash::ConfigurationError, "#{e}. #{message}" - end - @database = jdbc_connect() - @database.extension(:pagination) - if @jdbc_default_timezone - @database.extension(:named_timezones) - @database.timezone = @jdbc_default_timezone - end - if @jdbc_validate_connection - @database.extension(:connection_validator) - @database.pool.connection_validation_timeout = @jdbc_validation_timeout - end - @database.fetch_size = @jdbc_fetch_size unless @jdbc_fetch_size.nil? - begin - @database.test_connection - rescue Sequel::DatabaseConnectionError => e - @logger.warn("Failed test_connection.", :exception => e) - close_jdbc_connection - - #TODO return false and let the plugin raise a LogStash::ConfigurationError - raise e - end - - @database.sql_log_level = @sql_log_level.to_sym - @database.logger = @logger - - @database.extension :identifier_mangling - - if @lowercase_column_names - @database.identifier_output_method = :downcase - else - @database.identifier_output_method = :to_s - end - end - - public - def prepare_jdbc_connection - @connection_lock = ReentrantLock.new - end - - public - def close_jdbc_connection - begin - # pipeline restarts can also close the jdbc connection, block until the current executing statement is finished to avoid leaking connections - # connections in use won't really get closed - @connection_lock.lock - @database.disconnect if @database - rescue => e - @logger.warn("Failed to close connection", :exception => e) - ensure - @connection_lock.unlock - end - end - - public - def execute_statement(statement, parameters) - success = false - @connection_lock.lock - open_jdbc_connection - begin - parameters = symbolized_params(parameters) - query = @database[statement, parameters] - - sql_last_value = @use_column_value ? @value_tracker.value : Time.now.utc - @tracking_column_warning_sent = false - @logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => query.count) - - perform_query(query) do |row| - sql_last_value = get_column_value(row) if @use_column_value - yield extract_values_from(row) - end - success = true - rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e - @logger.warn("Exception when executing JDBC query", :exception => e) - else - @value_tracker.set_value(sql_last_value) - ensure - close_jdbc_connection - @connection_lock.unlock - end - return success - end - - # Performs the query, respecting our pagination settings, yielding once per row of data - # @param query [Sequel::Dataset] - # @yieldparam row [Hash{Symbol=>Object}] - private - def perform_query(query) - if @jdbc_paging_enabled - query.each_page(@jdbc_page_size) do |paged_dataset| - paged_dataset.each do |row| - yield row - end - end - else - query.each do |row| - yield row - end - end - end - - public - def get_column_value(row) - if !row.has_key?(@tracking_column.to_sym) - if !@tracking_column_warning_sent - @logger.warn("tracking_column not found in dataset.", :tracking_column => @tracking_column) - @tracking_column_warning_sent = true - end - # If we can't find the tracking column, return the current value in the ivar - @sql_last_value - else - # Otherwise send the updated tracking column - row[@tracking_column.to_sym] - end - end - - # Symbolize parameters keys to use with Sequel - private - def symbolized_params(parameters) - parameters.inject({}) do |hash,(k,v)| - case v - when LogStash::Timestamp - hash[k.to_sym] = v.time - else - hash[k.to_sym] = v - end - hash - end - end - - private - #Stringify row keys and decorate values when necessary - def extract_values_from(row) - Hash[row.map { |k, v| [k.to_s, decorate_value(v)] }] - end - - private - def decorate_value(value) - if value.is_a?(Time) - # transform it to LogStash::Timestamp as required by LS - LogStash::Timestamp.new(value) - elsif value.is_a?(Date) - LogStash::Timestamp.new(value.to_time) - elsif value.is_a?(DateTime) - # Manual timezone conversion detected. - # This is slower, so we put it in as a conditional case. - LogStash::Timestamp.new(Time.parse(value.to_s)) - else - value - end - end -end diff --git a/lib/logstash/plugin_mixins/jdbc/checked_count_logger.rb b/lib/logstash/plugin_mixins/jdbc/checked_count_logger.rb new file mode 100644 index 0000000..52a138d --- /dev/null +++ b/lib/logstash/plugin_mixins/jdbc/checked_count_logger.rb @@ -0,0 +1,38 @@ +# encoding: utf-8 + +module LogStash module PluginMixins module Jdbc + class CheckedCountLogger + def initialize(logger) + @logger = logger + @needs_check = true + @count_is_supported = false + @in_debug = @logger.debug? + end + + def log_statement_parameters(query, statement, parameters) + return unless @in_debug + check_count_query(query) if @needs_check + if @count_is_supported + @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => execute_count(query)) + else + @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters) + end + end + + def check_count_query(query) + @needs_check = false + begin + execute_count(query) + @count_is_supported = true + rescue Exception => e + @logger.warn("Attempting a count query raised an error, the generated count statement is most likely incorrect but check networking, authentication or your statement syntax", "exception" => e.message) + @logger.warn("Ongoing count statement generation is being prevented") + @count_is_supported = false + end + end + + def execute_count(query) + query.count + end + end +end end end \ No newline at end of file diff --git a/lib/logstash/plugin_mixins/jdbc/jdbc.rb b/lib/logstash/plugin_mixins/jdbc/jdbc.rb new file mode 100644 index 0000000..da524ee --- /dev/null +++ b/lib/logstash/plugin_mixins/jdbc/jdbc.rb @@ -0,0 +1,317 @@ +# encoding: utf-8 +# TAKEN FROM WIIBAA +require "logstash/config/mixin" +require "time" +require "date" +require_relative "value_tracking" +require_relative "checked_count_logger" + +java_import java.util.concurrent.locks.ReentrantLock + +# Tentative of abstracting JDBC logic to a mixin +# for potential reuse in other plugins (input/output) +module LogStash module PluginMixins module Jdbc + module Jdbc + # This method is called when someone includes this module + def self.included(base) + # Add these methods to the 'base' given. + base.extend(self) + base.setup_jdbc_config + end + + + public + def setup_jdbc_config + # JDBC driver library path to third party driver library. In case of multiple libraries being + # required you can pass them separated by a comma. + # + # If not provided, Plugin will look for the driver class in the Logstash Java classpath. + config :jdbc_driver_library, :validate => :string + + # JDBC driver class to load, for exmaple, "org.apache.derby.jdbc.ClientDriver" + # NB per https://github.com/logstash-plugins/logstash-input-jdbc/issues/43 if you are using + # the Oracle JDBC driver (ojdbc6.jar) the correct `jdbc_driver_class` is `"Java::oracle.jdbc.driver.OracleDriver"` + config :jdbc_driver_class, :validate => :string, :required => true + + # JDBC connection string + config :jdbc_connection_string, :validate => :string, :required => true + + # JDBC user + config :jdbc_user, :validate => :string, :required => true + + # JDBC password + config :jdbc_password, :validate => :password + + # JDBC password filename + config :jdbc_password_filepath, :validate => :path + + # JDBC enable paging + # + # This will cause a sql statement to be broken up into multiple queries. + # Each query will use limits and offsets to collectively retrieve the full + # result-set. The limit size is set with `jdbc_page_size`. + # + # Be aware that ordering is not guaranteed between queries. + config :jdbc_paging_enabled, :validate => :boolean, :default => false + + # JDBC page size + config :jdbc_page_size, :validate => :number, :default => 100000 + + # JDBC fetch size. if not provided, respective driver's default will be used + config :jdbc_fetch_size, :validate => :number + + # Connection pool configuration. + # Validate connection before use. + config :jdbc_validate_connection, :validate => :boolean, :default => false + + # Connection pool configuration. + # How often to validate a connection (in seconds) + config :jdbc_validation_timeout, :validate => :number, :default => 3600 + + # Connection pool configuration. + # The amount of seconds to wait to acquire a connection before raising a PoolTimeoutError (default 5) + config :jdbc_pool_timeout, :validate => :number, :default => 5 + + # Timezone conversion. + # SQL does not allow for timezone data in timestamp fields. This plugin will automatically + # convert your SQL timestamp fields to Logstash timestamps, in relative UTC time in ISO8601 format. + # + # Using this setting will manually assign a specified timezone offset, instead + # of using the timezone setting of the local machine. You must use a canonical + # timezone, *America/Denver*, for example. + config :jdbc_default_timezone, :validate => :string + + # General/Vendor-specific Sequel configuration options. + # + # An example of an optional connection pool configuration + # max_connections - The maximum number of connections the connection pool + # + # examples of vendor-specific options can be found in this + # documentation page: https://github.com/jeremyevans/sequel/blob/master/doc/opening_databases.rdoc + config :sequel_opts, :validate => :hash, :default => {} + + # Log level at which to log SQL queries, the accepted values are the common ones fatal, error, warn, + # info and debug. The default value is info. + config :sql_log_level, :validate => [ "fatal", "error", "warn", "info", "debug" ], :default => "info" + + # Maximum number of times to try connecting to database + config :connection_retry_attempts, :validate => :number, :default => 1 + # Number of seconds to sleep between connection attempts + config :connection_retry_attempts_wait_time, :validate => :number, :default => 0.5 + end + + private + def jdbc_connect + opts = { + :user => @jdbc_user, + :password => @jdbc_password.nil? ? nil : @jdbc_password.value, + :pool_timeout => @jdbc_pool_timeout, + :keep_reference => false + }.merge(@sequel_opts) + retry_attempts = @connection_retry_attempts + loop do + retry_attempts -= 1 + begin + return Sequel.connect(@jdbc_connection_string, opts=opts) + rescue Sequel::PoolTimeout => e + if retry_attempts <= 0 + @logger.error("Failed to connect to database. #{@jdbc_pool_timeout} second timeout exceeded. Tried #{@connection_retry_attempts} times.") + raise e + else + @logger.error("Failed to connect to database. #{@jdbc_pool_timeout} second timeout exceeded. Trying again.") + end + rescue Sequel::Error => e + if retry_attempts <= 0 + @logger.error("Unable to connect to database. Tried #{@connection_retry_attempts} times", :error_message => e.message, ) + raise e + else + @logger.error("Unable to connect to database. Trying again", :error_message => e.message) + end + end + sleep(@connection_retry_attempts_wait_time) + end + end + + private + def load_drivers(drivers) + drivers.each do |driver| + begin + class_loader = java.lang.ClassLoader.getSystemClassLoader().to_java(java.net.URLClassLoader) + class_loader.add_url(java.io.File.new(driver).toURI().toURL()) + rescue => e + @logger.error("Failed to load #{driver}", :exception => e) + end + end + end + + private + def open_jdbc_connection + require "java" + require "sequel" + require "sequel/adapters/jdbc" + load_drivers(@jdbc_driver_library.split(",")) if @jdbc_driver_library + + begin + Sequel::JDBC.load_driver(@jdbc_driver_class) + rescue Sequel::AdapterNotFound => e + message = if @jdbc_driver_library.nil? + ":jdbc_driver_library is not set, are you sure you included + the proper driver client libraries in your classpath?" + else + "Are you sure you've included the correct jdbc driver in :jdbc_driver_library?" + end + raise LogStash::ConfigurationError, "#{e}. #{message}" + end + @database = jdbc_connect() + @database.extension(:pagination) + if @jdbc_default_timezone + @database.extension(:named_timezones) + @database.timezone = @jdbc_default_timezone + end + if @jdbc_validate_connection + @database.extension(:connection_validator) + @database.pool.connection_validation_timeout = @jdbc_validation_timeout + end + @database.fetch_size = @jdbc_fetch_size unless @jdbc_fetch_size.nil? + begin + @database.test_connection + rescue Sequel::DatabaseConnectionError => e + @logger.warn("Failed test_connection.", :exception => e) + close_jdbc_connection + + #TODO return false and let the plugin raise a LogStash::ConfigurationError + raise e + end + + @database.sql_log_level = @sql_log_level.to_sym + @database.logger = @logger + + @database.extension :identifier_mangling + + if @lowercase_column_names + @database.identifier_output_method = :downcase + else + @database.identifier_output_method = :to_s + end + end + + public + def prepare_jdbc_connection + @connection_lock = ReentrantLock.new + end + + public + def close_jdbc_connection + begin + # pipeline restarts can also close the jdbc connection, block until the current executing statement is finished to avoid leaking connections + # connections in use won't really get closed + @connection_lock.lock + @database.disconnect if @database + rescue => e + @logger.warn("Failed to close connection", :exception => e) + ensure + @connection_lock.unlock + end + end + + public + def execute_statement(statement, parameters) + success = false + @connection_lock.lock + open_jdbc_connection + begin + params = symbolized_params(parameters) + query = @database[statement, params] + + sql_last_value = @use_column_value ? @value_tracker.value : Time.now.utc + @tracking_column_warning_sent = false + @statement_logger.log_statement_parameters(query, statement, params) + perform_query(query) do |row| + sql_last_value = get_column_value(row) if @use_column_value + yield extract_values_from(row) + end + success = true + rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e + @logger.warn("Exception when executing JDBC query", :exception => e) + else + @value_tracker.set_value(sql_last_value) + ensure + close_jdbc_connection + @connection_lock.unlock + end + return success + end + +# Performs the query, respecting our pagination settings, yielding once per row of data +# @param query [Sequel::Dataset] +# @yieldparam row [Hash{Symbol=>Object}] + private + def perform_query(query) + if @jdbc_paging_enabled + query.each_page(@jdbc_page_size) do |paged_dataset| + paged_dataset.each do |row| + yield row + end + end + else + query.each do |row| + yield row + end + end + end + + public + def get_column_value(row) + if !row.has_key?(@tracking_column.to_sym) + if !@tracking_column_warning_sent + @logger.warn("tracking_column not found in dataset.", :tracking_column => @tracking_column) + @tracking_column_warning_sent = true + end + # If we can't find the tracking column, return the current value in the ivar + @sql_last_value + else + # Otherwise send the updated tracking column + row[@tracking_column.to_sym] + end + end + +# Symbolize parameters keys to use with Sequel + private + def symbolized_params(parameters) + parameters.inject({}) do |hash,(k,v)| + case v + when LogStash::Timestamp + hash[k.to_sym] = v.time + else + hash[k.to_sym] = v + end + hash + end + end + + private +#Stringify row keys and decorate values when necessary + def extract_values_from(row) + Hash[row.map { |k, v| [k.to_s, decorate_value(v)] }] + end + + private + def decorate_value(value) + if value.is_a?(Time) + # transform it to LogStash::Timestamp as required by LS + LogStash::Timestamp.new(value) + elsif value.is_a?(Date) + LogStash::Timestamp.new(value.to_time) + elsif value.is_a?(DateTime) + # Manual timezone conversion detected. + # This is slower, so we put it in as a conditional case. + LogStash::Timestamp.new(Time.parse(value.to_s)) + else + value + end + end + + end +end end end + + diff --git a/lib/logstash/plugin_mixins/value_tracking.rb b/lib/logstash/plugin_mixins/jdbc/value_tracking.rb similarity index 97% rename from lib/logstash/plugin_mixins/value_tracking.rb rename to lib/logstash/plugin_mixins/jdbc/value_tracking.rb index 913d083..73e189d 100644 --- a/lib/logstash/plugin_mixins/value_tracking.rb +++ b/lib/logstash/plugin_mixins/jdbc/value_tracking.rb @@ -1,7 +1,7 @@ # encoding: utf-8 require "yaml" # persistence -module LogStash module PluginMixins +module LogStash module PluginMixins module Jdbc class ValueTracking def self.build_last_value_tracker(plugin) @@ -125,4 +125,4 @@ def read def write(value) end end -end end +end end end diff --git a/logstash-input-jdbc.gemspec b/logstash-input-jdbc.gemspec index 51d8b3c..6bb3e6f 100755 --- a/logstash-input-jdbc.gemspec +++ b/logstash-input-jdbc.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-input-jdbc' - s.version = '4.3.11' + s.version = '4.3.12' s.licenses = ['Apache License (2.0)'] s.summary = "Creates events from JDBC data" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/inputs/integ_spec.rb b/spec/inputs/integ_spec.rb new file mode 100644 index 0000000..9497a8d --- /dev/null +++ b/spec/inputs/integ_spec.rb @@ -0,0 +1,36 @@ +require "logstash/devutils/rspec/spec_helper" +require "logstash/inputs/jdbc" + +# This test requires: Firebird installed to Mac OSX, it uses the built-in example database `employee` + +describe LogStash::Inputs::Jdbc, :integration => true do + # This is a necessary change test-wide to guarantee that no local timezone + # is picked up. It could be arbitrarily set to any timezone, but then the test + # would have to compensate differently. That's why UTC is chosen. + ENV["TZ"] = "Etc/UTC" + let(:mixin_settings) do + { "jdbc_user" => "SYSDBA", "jdbc_driver_class" => "org.firebirdsql.jdbc.FBDriver", "jdbc_driver_library" => "/elastic/tmp/jaybird-full-3.0.4.jar", + "jdbc_connection_string" => "jdbc:firebirdsql://localhost:3050//Library/Frameworks/Firebird.framework/Versions/A/Resources/examples/empbuild/employee.fdb", "jdbc_password" => "masterkey"} + end + let(:settings) { {"statement" => "SELECT FIRST_NAME, LAST_NAME FROM EMPLOYEE WHERE EMP_NO > 144"} } + let(:plugin) { LogStash::Inputs::Jdbc.new(mixin_settings.merge(settings)) } + let(:queue) { Queue.new } + + context "when passing no parameters" do + before do + plugin.register + end + + after do + plugin.stop + end + + it "should retrieve params correctly from Event" do + plugin.run(queue) + event = queue.pop + expect(event.get('first_name')).to eq("Mark") + expect(event.get('last_name')).to eq("Guckenheimer") + end + end +end + diff --git a/spec/inputs/jdbc_spec.rb b/spec/inputs/jdbc_spec.rb index 832d45b..b9a1acf 100755 --- a/spec/inputs/jdbc_spec.rb +++ b/spec/inputs/jdbc_spec.rb @@ -1267,4 +1267,54 @@ expect(event.get("ranking").to_f).to eq(95.67) end end + + context "when debug logging and a count query raises a count related error" do + let(:settings) do + { "statement" => "SELECT * from types_table" } + end + let(:logger) { double("logger", :debug? => true) } + let(:statement_logger) { LogStash::PluginMixins::Jdbc::CheckedCountLogger.new(logger) } + let(:value_tracker) { double("value tracker", :set_value => nil, :write => nil) } + let(:msg) { 'Java::JavaSql::SQLSyntaxErrorException: Dynamic SQL Error; SQL error code = -104; Token unknown - line 1, column 105; LIMIT [SQLState:42000, ISC error code:335544634]' } + let(:error_args) do + {"exception" => msg} + end + + before do + db << "INSERT INTO types_table (num, string, started_at, custom_time, ranking) VALUES (1, 'A test', '1999-12-31', '1999-12-31 23:59:59', 95.67)" + plugin.register + plugin.set_statement_logger(statement_logger) + plugin.set_value_tracker(value_tracker) + allow(value_tracker).to receive(:value).and_return("bar") + allow(statement_logger).to receive(:execute_count).once.and_raise(StandardError.new(msg)) + end + + after do + plugin.stop + end + + context "if the count query raises an error" do + it "should log a debug line without a count key as its unknown whether a count works at this stage" do + expect(logger).to receive(:warn).once.with("Attempting a count query raised an error, the generated count statement is most likely incorrect but check networking, authentication or your statement syntax", error_args) + expect(logger).to receive(:warn).once.with("Ongoing count statement generation is being prevented") + expect(logger).to receive(:debug).once.with("Executing JDBC query", :statement => settings["statement"], :parameters => {:sql_last_value=>"bar"}) + plugin.run(queue) + queue.pop + end + + it "should create an event normally" do + allow(logger).to receive(:warn) + allow(logger).to receive(:debug) + plugin.run(queue) + event = queue.pop + expect(event.get("num")).to eq(1) + expect(event.get("string")).to eq("A test") + expect(event.get("started_at")).to be_a(LogStash::Timestamp) + expect(event.get("started_at").to_s).to eq("1999-12-31T00:00:00.000Z") + expect(event.get("custom_time")).to be_a(LogStash::Timestamp) + expect(event.get("custom_time").to_s).to eq("1999-12-31T23:59:59.000Z") + expect(event.get("ranking").to_f).to eq(95.67) + end + end + end end