Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add another flexible way for JDBC paging (manual mode) #95

Merged
merged 10 commits into from
Dec 22, 2021
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 5.2.0
- Added `jdbc_paging_mode` option to choose if use `explicit` pagination in statements and avoid the initial count
query or use `auto` to delegate to the underlying library [#95](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/95)

## 5.1.10
- Refactor: to explicit Java (driver) class name loading [#96](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/96),
the change is expected to provide a more robust fix for the driver loading issue [#83](https://github.com/logstash-plugins/logstash-integration-jdbc/issues/83).
Expand Down
52 changes: 51 additions & 1 deletion docs/input-jdbc.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ Here is the list:
|sql_last_value | The value used to calculate which rows to query. Before any query is run,
this is set to Thursday, 1 January 1970, or 0 if `use_column_value` is true and
`tracking_column` is set. It is updated accordingly after subsequent queries are run.
|offset, size| Values used with manual paging mode to explicitly implement the paging.
Supported only if <<plugins-{type}s-{plugin}-jdbc_paging_enabled>> is enabled and
<<plugins-{type}s-{plugin}-jdbc_paging_mode>> has the `explicit` value.
|==========================================================

Example:
Expand All @@ -153,7 +156,7 @@ NOTE: Not all JDBC accessible technologies will support prepared statements.
With the introduction of Prepared Statement support comes a different code execution path and some new settings. Most of the existing settings are still useful but there are several new settings for Prepared Statements to read up on.
Use the boolean setting `use_prepared_statements` to enable this execution mode. Use the `prepared_statement_name` setting to specify a name for the Prepared Statement, this identifies the prepared statement locally and remotely and it should be unique in your config and on the database. Use the `prepared_statement_bind_values` array setting to specify the bind values, use the exact string `:sql_last_value` (multiple times if necessary) for the predefined parameter mentioned before. The `statement` (or `statement_path`) setting still holds the SQL statement but to use bind variables you must use the `?` character as a placeholder in the exact order found in the `prepared_statement_bind_values` array.

NOTE: Building count queries around a prepared statement is not supported at this time and because jdbc paging uses count queries under the hood, jdbc paging is not supported with prepared statements at this time either. Therefore, `jdbc_paging_enabled`, `jdbc_page_size` settings are ignored when using prepared statements.
NOTE: Building count queries around a prepared statement is not supported at this time. Because jdbc paging uses count queries when `jdbc_paging_mode` has value `auto`,jdbc paging is not supported with prepared statements at this time either. Therefore, `jdbc_paging_enabled`, `jdbc_page_size` settings are ignored when using prepared statements.

Example:
[source,ruby]
Expand Down Expand Up @@ -193,6 +196,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-jdbc_fetch_size>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-jdbc_page_size>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-jdbc_paging_enabled>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-jdbc_paging_mode>> |<<string,string>>, one of `["auto", "explicit"]`|No
| <<plugins-{type}s-{plugin}-jdbc_password>> |<<password,password>>|No
| <<plugins-{type}s-{plugin}-jdbc_password_filepath>> |a valid filesystem path|No
| <<plugins-{type}s-{plugin}-jdbc_pool_timeout>> |<<number,number>>|No
Expand Down Expand Up @@ -373,6 +377,52 @@ result-set. The limit size is set with `jdbc_page_size`.

Be aware that ordering is not guaranteed between queries.

[id="plugins-{type}s-{plugin}-jdbc_paging_mode"]
===== `jdbc_paging_mode`

* Value can be any of: `auto`, `explicit`
* Default value is `"auto"`

Whether to use `explicit` or `auto` mode during the JDBC paging

If `auto`, your statement will be automatically surrounded by a count query and subsequent multiple paged queries (with `LIMIT` statement, etc.).

If `explicit`, multiple queries (without a count query ahead) will be performed with your statement, until no more rows are retrieved.
You have to write your own paging conditions in your statement configuration.
The `offset` and `size` parameters can be used in your statement (`size` equal to `jdbc_page_size`, and `offset` incremented by `size` for each query).
When the number of rows returned by the query is not equal to `size`, SQL paging will be ended.
Example:

[source, ruby]
------------------------------------------------------
input {
jdbc {
statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > :sql_last_value LIMIT :size OFFSET :offset",
jdbc_paging_enabled => true,
jdbc_paging_mode => "explicit",
jdbc_page_size => 100000
}
}
------------------------------------------------------

[source, ruby]
------------------------------------------------------
input {
jdbc {
statement => "CALL fetch_my_data(:sql_last_value, :offset, :size)",
jdbc_paging_enabled => true,
jdbc_paging_mode => "explicit",
jdbc_page_size => 100000
}
}
------------------------------------------------------

This mode can be considered in the following situations:

. Performance issues encountered in default paging mode.
. Your SQL statement is complex, so simply surrounding it with paging statements is not what you want.
. Your statement is a stored procedure, and the actual paging statement is inside it.

[id="plugins-{type}s-{plugin}-jdbc_password"]
===== `jdbc_password`

Expand Down
3 changes: 3 additions & 0 deletions lib/logstash/plugin_mixins/jdbc/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ def setup_jdbc_config
# Be aware that ordering is not guaranteed between queries.
config :jdbc_paging_enabled, :validate => :boolean, :default => false

# Which pagination mode to use, automatic pagination or explicitly defined in the query.
config :jdbc_paging_mode, :validate => [ "auto", "explicit" ], :default => "auto"

# JDBC page size
config :jdbc_page_size, :validate => :number, :default => 100000

Expand Down
36 changes: 34 additions & 2 deletions lib/logstash/plugin_mixins/jdbc/statement_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@
module LogStash module PluginMixins module Jdbc
class StatementHandler
def self.build_statement_handler(plugin, logger)
klass = plugin.use_prepared_statements ? PreparedStatementHandler : NormalStatementHandler
if plugin.use_prepared_statements
klass = PreparedStatementHandler
else
if plugin.jdbc_paging_enabled && plugin.jdbc_paging_mode == "explicit"
klass = ExplicitPagingModeStatementHandler
else
klass = NormalStatementHandler
end
end
klass.new(plugin, logger)
end

Expand All @@ -27,7 +35,9 @@ def post_init(plugin)
class NormalStatementHandler < StatementHandler
# Performs the query, respecting our pagination settings, yielding once per row of data
# @param db [Sequel::Database]
# @param sql_last_value [Integet|DateTime|Time]
# @param sql_last_value [Integer|DateTime|Time]
# @param jdbc_paging_enabled [Boolean]
# @param jdbc_page_size [Integer]
# @yieldparam row [Hash{Symbol=>Object}]
def perform_query(db, sql_last_value, jdbc_paging_enabled, jdbc_page_size)
query = build_query(db, sql_last_value)
Expand Down Expand Up @@ -67,6 +77,28 @@ def post_init(plugin)
end
end

class ExplicitPagingModeStatementHandler < NormalStatementHandler
# Performs the query, respecting our pagination settings, yielding once per row of data
# @param db [Sequel::Database]
# @param sql_last_value [Integer|DateTime|Time]
# @param jdbc_paging_enabled [Boolean]
# @param jdbc_page_size [Integer]
# @yieldparam row [Hash{Symbol=>Object}]
def perform_query(db, sql_last_value, jdbc_paging_enabled, jdbc_page_size)
query = build_query(db, sql_last_value)
offset = 0
loop do
rows_in_page = 0
query.with_sql(query.sql, offset: offset, size: jdbc_page_size).each do |row|
yield row
rows_in_page += 1
end
break unless rows_in_page == jdbc_page_size
offset += jdbc_page_size
end
end
end

class PreparedStatementHandler < StatementHandler
attr_reader :name, :bind_values_array, :statement_prepared, :prepared

Expand Down
2 changes: 1 addition & 1 deletion logstash-integration-jdbc.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-integration-jdbc'
s.version = '5.1.10'
s.version = '5.2.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Integration with JDBC - input and filter plugins"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
33 changes: 33 additions & 0 deletions spec/inputs/jdbc_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,39 @@

end

context "when iterating result-set via explicit paging mode" do

let(:settings) do
{
"statement" => "SELECT * from test_table OFFSET :offset ROWS FETCH NEXT :size ROWS ONLY",
"jdbc_paging_enabled" => true,
"jdbc_paging_mode" => "explicit",
"jdbc_page_size" => 10
}
end

let(:num_rows) { 15 }

before do
plugin.register
end

after do
plugin.stop
end

it "should fetch all rows" do
num_rows.times do
db[:test_table].insert(:num => 1, :custom_time => Time.now.utc, :created_at => Time.now.utc)
end

plugin.run(queue)

expect(queue.size).to eq(num_rows)
end

end

context "when using target option" do
let(:settings) do
{
Expand Down