Skip to content

Commit

Permalink
Add Prepared Statement support (#349)
Browse files Browse the repository at this point in the history
* Move code around so backtrace shows which class is being initialised.

* Add prepared statements, preserve previous behavoiur.

* DOH remnant @sql_last_value

* Add prep statement code, tests, docs and bump version.

* Update lib/logstash/plugin_mixins/jdbc/statement_handler.rb

Co-Authored-By: Colin Surprenant <[email protected]>

* Update lib/logstash/plugin_mixins/jdbc/value_tracking.rb

Co-Authored-By: Rob Bavey <[email protected]>

* updates per PR review, non count logging, stricter validation

Fixes #166 
Fixes #233
  • Loading branch information
Guy Boertje authored Sep 9, 2019
1 parent b1076ca commit c7c91c4
Show file tree
Hide file tree
Showing 9 changed files with 389 additions and 67 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 4.3.16
- Add support for prepared statements [Issue 233](https://github.com/logstash-plugins/logstash-input-jdbc/issues/233)

## 4.3.15
- Use atomic booleam to load drivers once
- Added CHANGELOG entries
Expand Down
57 changes: 56 additions & 1 deletion docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,35 @@ input {
}
---------------------------------------------------------------------------------------------------

==== Prepared Statements

Using server side prepared statements can speed up execution times as the server optimises the query plan and execution.

NOTE: Not all JDBC accessible technologies will support prepared statements.

With the introduction of Prepared Statement support comes a different code execution path and some new settings. Most of the existing settings are still useful but there are several new settings for Prepared Statements to read up on.
Use the boolean setting `use_prepared_statements` to enable this execution mode. Use the `prepared_statement_name` setting to specify a name for the Prepared Statement, this identifies the prepared statement locally and remotely and it should be unique in your config and on the database. Use the `prepared_statement_bind_values` array setting to specify the bind values, use the exact string `:sql_last_value` (multiple times if necessary) for the predefined parameter mentioned before. The `statement` (or `statement_path`) setting still holds the SQL statement but to use bind variables you must use the `?` character as a placeholder in the exact order found in the `prepared_statement_bind_values` array.

NOTE: Building count queries around a prepared statement is not supported at this time and because jdbc paging uses count queries under the hood, jdbc paging is not supported with prepared statements at this time either. Therefore, `jdbc_paging_enabled`, `jdbc_page_size` settings are ignored when using prepared statements.

Example:
[source,ruby]
---------------------------------------------------------------------------------------------------
input {
jdbc {
statement => "SELECT * FROM mgd.seq_sequence WHERE _sequence_key > ? AND _sequence_key < ? + ? ORDER BY _sequence_key ASC"
prepared_statement_bind_values => [":sql_last_value", ":sql_last_value", 4]
prepared_statement_name => "foobar"
use_prepared_statements => true
use_column_value => true
tracking_column_type => "numeric"
tracking_column => "_sequence_key"
last_run_metadata_path => "/elastic/tmp/testing/confs/test-jdbc-int-sql_last_value.yml"
# ... other configuration bits
}
}
---------------------------------------------------------------------------------------------------


[id="plugins-{type}s-{plugin}-options"]
==== Jdbc Input Configuration Options
Expand All @@ -149,7 +178,6 @@ This plugin supports the following configuration options plus the <<plugins-{typ
[cols="<,<,<",options="header",]
|=======================================================================
|Setting |Input type|Required
| <<plugins-{type}s-{plugin}-plugin_timezone>> |<<string,string>>, one of `["local", "utc"]`|No
| <<plugins-{type}s-{plugin}-clean_run>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-columns_charset>> |<<hash,hash>>|No
| <<plugins-{type}s-{plugin}-connection_retry_attempts>> |<<number,number>>|No
Expand All @@ -170,6 +198,9 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-last_run_metadata_path>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-lowercase_column_names>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-parameters>> |<<hash,hash>>|No
| <<plugins-{type}s-{plugin}-plugin_timezone>> |<<string,string>>, one of `["local", "utc"]`|No
| <<plugins-{type}s-{plugin}-prepared_statement_bind_values>> |<<array,array>>|No
| <<plugins-{type}s-{plugin}-prepared_statement_name>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-record_last_run>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-schedule>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-sequel_opts>> |<<hash,hash>>|No
Expand All @@ -179,6 +210,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-tracking_column>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-tracking_column_type>> |<<string,string>>, one of `["numeric", "timestamp"]`|No
| <<plugins-{type}s-{plugin}-use_column_value>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-use_prepared_statements>> |<<boolean,boolean>>|No
|=======================================================================

Also see <<plugins-{type}s-{plugin}-common-options>> for a list of options supported by all
Expand Down Expand Up @@ -409,6 +441,22 @@ Whether to force the lowercasing of identifier fields

Hash of query parameter, for example `{ "target_id" => "321" }`

[id="plugins-{type}s-{plugin}-prepared_statement_bind_values"]
===== `prepared_statement_bind_values`

* Value type is <<array,array>>
* Default value is `[]`

Array of bind values for the prepared statement. `:sql_last_value` is a reserved predefined string

[id="plugins-{type}s-{plugin}-prepared_statement_name"]
===== `prepared_statement_name`

* Value type is <<string,string>>
* Default value is `""`

Name given to the prepared statement. It must be unique in your config and in the database

[id="plugins-{type}s-{plugin}-record_last_run"]
===== `record_last_run`

Expand Down Expand Up @@ -506,6 +554,13 @@ When set to `true`, uses the defined
<<plugins-{type}s-{plugin}-tracking_column>> value as the `:sql_last_value`. When set
to `false`, `:sql_last_value` reflects the last time the query was executed.

[id="plugins-{type}s-{plugin}-use_prepared_statements"]
===== `use_prepared_statements`

* Value type is <<boolean,boolean>>
* Default value is `false`

When set to `true`, enables prepare statement usage

[id="plugins-{type}s-{plugin}-common-options"]
include::{include_path}/{type}.asciidoc[]
Expand Down
46 changes: 37 additions & 9 deletions lib/logstash/inputs/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ module LogStash module Inputs class Jdbc < LogStash::Inputs::Base
# this will only convert column0 that has ISO-8859-1 as an original encoding.
config :columns_charset, :validate => :hash, :default => {}

config :use_prepared_statements, :validate => :boolean, :default => false

config :prepared_statement_name, :validate => :string, :default => ""

config :prepared_statement_bind_values, :validate => :array, :default => []

attr_reader :database # for test mocking/stubbing

public
Expand All @@ -217,17 +223,25 @@ def register
end
end

set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTracking.build_last_value_tracker(self))
set_statement_logger(LogStash::PluginMixins::Jdbc::CheckedCountLogger.new(@logger))

@enable_encoding = !@charset.nil? || !@columns_charset.empty?

unless @statement.nil? ^ @statement_filepath.nil?
raise(LogStash::ConfigurationError, "Must set either :statement or :statement_filepath. Only one may be set at a time.")
end

@statement = ::File.read(@statement_filepath) if @statement_filepath

# must validate prepared statement mode after trying to read in from @statement_filepath
if @use_prepared_statements
validation_errors = validate_prepared_statement_mode
unless validation_errors.empty?
raise(LogStash::ConfigurationError, "Prepared Statement Mode validation errors: " + validation_errors.join(", "))
end
end

set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTracking.build_last_value_tracker(self))
set_statement_logger(LogStash::PluginMixins::Jdbc::CheckedCountLogger.new(@logger))

@enable_encoding = !@charset.nil? || !@columns_charset.empty?

if (@jdbc_password_filepath and @jdbc_password)
raise(LogStash::ConfigurationError, "Only one of :jdbc_password, :jdbc_password_filepath may be set at a time.")
end
Expand All @@ -248,7 +262,7 @@ def register

# test injection points
def set_statement_logger(instance)
@statement_logger = instance
@statement_handler = LogStash::PluginMixins::Jdbc::StatementHandler.build_statement_handler(self, instance)
end

def set_value_tracker(instance)
Expand All @@ -275,10 +289,24 @@ def stop

private

def validate_prepared_statement_mode
error_messages = []
if @prepared_statement_name.empty?
error_messages << "must provide a name for the Prepared Statement, it must be unique for the db session"
end
if @statement.count("?") != @prepared_statement_bind_values.size
# mismatch in number of bind value elements to placeholder characters
error_messages << "there is a mismatch between the number of statement `?` placeholders and :prepared_statement_bind_values array setting elements"
end
if @jdbc_paging_enabled
# Pagination is not supported when using prepared statements
error_messages << "JDBC pagination cannot be used at this time"
end
error_messages
end

def execute_query(queue)
# update default parameters
@parameters['sql_last_value'] = @value_tracker.value
execute_statement(@statement, @parameters) do |row|
execute_statement do |row|
if enable_encoding?
## do the necessary conversions to string elements
row = Hash[row.map { |k, v| [k.to_s, convert(k, v)] }]
Expand Down
11 changes: 8 additions & 3 deletions lib/logstash/plugin_mixins/jdbc/checked_count_logger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@ def initialize(logger)
@in_debug = @logger.debug?
end

def log_statement_parameters(query, statement, parameters)
def disable_count
@needs_check = false
@count_is_supported = false
end

def log_statement_parameters(statement, parameters, query)
return unless @in_debug
check_count_query(query) if @needs_check
check_count_query(query) if @needs_check && query
if @count_is_supported
@logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => execute_count(query))
else
Expand All @@ -35,4 +40,4 @@ def execute_count(query)
query.count
end
end
end end end
end end end
45 changes: 5 additions & 40 deletions lib/logstash/plugin_mixins/jdbc/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require_relative "value_tracking"
require_relative "checked_count_logger"
require_relative "wrapped_driver"
require_relative "statement_handler"

java_import java.util.concurrent.locks.ReentrantLock

Expand Down Expand Up @@ -246,18 +247,14 @@ def close_jdbc_connection
end

public
def execute_statement(statement, parameters)
# sql_last_value has been set in params by caller
def execute_statement
success = false
@connection_lock.lock
open_jdbc_connection
begin
params = symbolized_params(parameters)
query = @database[statement, params]
sql_last_value = @use_column_value ? @value_tracker.value : Time.now.utc
@tracking_column_warning_sent = false
@statement_logger.log_statement_parameters(query, statement, params)
perform_query(query) do |row|
@statement_handler.perform_query(@database, @value_tracker.value) do |row|
sql_last_value = get_column_value(row) if @use_column_value
yield extract_values_from(row)
end
Expand All @@ -273,55 +270,23 @@ def execute_statement(statement, parameters)
return success
end

# Performs the query, respecting our pagination settings, yielding once per row of data
# @param query [Sequel::Dataset]
# @yieldparam row [Hash{Symbol=>Object}]
private
def perform_query(query)
if @jdbc_paging_enabled
query.each_page(@jdbc_page_size) do |paged_dataset|
paged_dataset.each do |row|
yield row
end
end
else
query.each do |row|
yield row
end
end
end

public
def get_column_value(row)
if !row.has_key?(@tracking_column.to_sym)
if !@tracking_column_warning_sent
@logger.warn("tracking_column not found in dataset.", :tracking_column => @tracking_column)
@tracking_column_warning_sent = true
end
# If we can't find the tracking column, return the current value_tracker value
# If we can't find the tracking column, return the current value in the ivar
@value_tracker.value
else
# Otherwise send the updated tracking column
row[@tracking_column.to_sym]
end
end

# Symbolize parameters keys to use with Sequel
private
def symbolized_params(parameters)
parameters.inject({}) do |hash,(k,v)|
case v
when LogStash::Timestamp
hash[k.to_sym] = v.time
else
hash[k.to_sym] = v
end
hash
end
end

private
#Stringify row keys and decorate values when necessary
#Stringify row keys and decorate values when necessary
def extract_values_from(row)
Hash[row.map { |k, v| [k.to_s, decorate_value(v)] }]
end
Expand Down
Loading

0 comments on commit c7c91c4

Please sign in to comment.