Skip to content

Commit

Permalink
Merge pull request jprante#1 from pdegeus/master
Browse files Browse the repository at this point in the history
Fix JDBC river
  • Loading branch information
msimons committed Apr 19, 2013
2 parents fbf7dab + 4bd92f3 commit dfc599f
Show file tree
Hide file tree
Showing 39 changed files with 459 additions and 160 deletions.
45 changes: 22 additions & 23 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.. image:: elasticsearch-river-jdbc/raw/master/src/site/origami.png
.. image:: ../../../elasticsearch-river-jdbc/raw/master/src/site/origami.png

Elasticsearch JDBC river
========================
Expand All @@ -25,49 +25,48 @@ Creating a JDBC river is easy::
Installation
------------

Current version of the plugin: **2.0.2** (January 23, 2013)
Prerequisites::

Prerequisites::
Elasticsearch 0.20+ / 0.90.0.Beta1+
a JDBC driver jar of your database

Elasticsearch 0.20.x
JDBC driver jar
============= ========= ================= ============================================================
ES version Plugin Release date Command
------------- --------- ----------------- ------------------------------------------------------------
0.20+ **2.0.3** February 12, 2013 ./bin/plugin -url http://bit.ly/Yp2Drj -install river-jdbc
0.90.0.Beta1+ **2.2.0** February 28, 2013 ./bin/plugin -url http://bit.ly/145e9Ly -install river-jdbc
============= ========= ================= ============================================================

Bintray:

https://bintray.com/pkg/show/general/jprante/elasticsearch-plugins/elasticsearch-river-jdbc

`Direct download <http://dl.bintray.com/content/jprante/elasticsearch-plugins/release/org/xbib/elasticsearch/elasticsearch-river-jdbc/2.0.2/elasticsearch-river-jdbc-2.0.2.zip>`_

Command::

./bin/plugin -url http://bit.ly/10FJhEd -install river-jdbc

Documentation
-------------

`Quickstart <elasticsearch-river-jdbc/wiki/Quickstart>`_
`Quickstart <../../../elasticsearch-river-jdbc/wiki/Quickstart>`_

`JDBC river parameters <elasticsearch-river-jdbc/wiki/JDBC-River-parameters>`_
`JDBC river parameters <../../../elasticsearch-river-jdbc/wiki/JDBC-River-parameters>`_

`Strategies <elasticsearch-river-jdbc/wiki/Strategies>`_
`Strategies <../../../elasticsearch-river-jdbc/wiki/Strategies>`_

`Moving a table <elasticsearch-river-jdbc/wiki/Moving-a-table-into-Elasticsearch>`_
`Moving a table <../../../elasticsearch-river-jdbc/wiki/Moving-a-table-into-Elasticsearch>`_

`Labeled columns <elasticsearch-river-jdbc/wiki/Labeled-columns>`_
`Labeled columns <../../../elasticsearch-river-jdbc/wiki/Labeled-columns>`_

`Structured objects <elasticsearch-river-jdbc/wiki/Structured-Objects>`_
`Structured objects <../../../elasticsearch-river-jdbc/wiki/Structured-Objects>`_

`RiverSource, RiverMouth, RiverFlow <elasticsearch-river-jdbc/wiki/RiverSource,-RiverMouth,-and-RiverFlow>`_
`RiverSource, RiverMouth, RiverFlow <../../../elasticsearch-river-jdbc/wiki/RiverSource,-RiverMouth,-and-RiverFlow>`_

`Bulk indexing <elasticsearch-river-jdbc/wiki/Bulk-indexing>`_
`Bulk indexing <../../../elasticsearch-river-jdbc/wiki/Bulk-indexing>`_

`Updates with versioning <elasticsearch-river-jdbc/wiki/Updates-with-versioning>`_
`Updates with versioning <../../../elasticsearch-river-jdbc/wiki/Updates-with-versioning>`_

`Updates with database table <elasticsearch-river-jdbc/wiki/Updates-with-database-table>`_
`Updates with database table <../../../elasticsearch-river-jdbc/wiki/Updates-with-database-table>`_

`Setting up the river with PostgreSQL <elasticsearch-river-jdbc/wiki/Step-by-step-recipe-for-setting-up-the-river-with-PostgreSQL>`_
`Setting up the river with PostgreSQL <../../../elasticsearch-river-jdbc/wiki/Step-by-step-recipe-for-setting-up-the-river-with-PostgreSQL>`_

`Loading from CSV <elasticsearch-river-jdbc/wiki/Loading-CSV>`_
`Loading from CSV <../../../elasticsearch-river-jdbc/wiki/Loading-CSV>`_

License
=======
Expand Down
3 changes: 2 additions & 1 deletion bin/create-mysql-river.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "",
"password" : "",
"sql" : "select * from orders"
"sql" : "select * from orders",
"strategy" : "simple"
}
}'
4 changes: 2 additions & 2 deletions bin/create-postgresql-river.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "org.postgresql.Driver",
"url" : "jdbc:postgresql://localhost:5432/test",
"url" : "jdbc:postgresql://localhost:5432/test?loglevel=0",
"user" : "test",
"password" : "test",
"sql" : "select * from orders"
"sql" : "select * from large_table"
}
}'
11 changes: 11 additions & 0 deletions bin/create-postgresql-river.sh~
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@

curl -XPUT 'localhost:9200/_river/my_jdbc_river/_meta' -d '{
"type" : "jdbc",
"jdbc" : {
"driver" : "org.postgresql.Driver",
"url" : "jdbc:postgresql://localhost:5432/test?loglevel=5",
"user" : "test",
"password" : "test",
"sql" : "select * from large_table"
}
}'
26 changes: 19 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>org.xbib.elasticsearch</groupId>
<artifactId>elasticsearch-river-jdbc</artifactId>
<version>2.0.3-SNAPSHOT</version>
<version>2.0.4-SNAPSHOT</version>
<packaging>jar</packaging>

<name>elasticsearch-river-jdbc</name>
Expand Down Expand Up @@ -63,6 +63,16 @@
<artifactId>testng</artifactId>
<version>6.8</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
</exclusion>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down Expand Up @@ -132,14 +142,12 @@
<configuration>
<skip>false</skip>
<forkMode>once</forkMode>
<useSystemClassLoader>false</useSystemClassLoader>
<systemPropertyVariables>
<file.encoding>UTF-8</file.encoding>
<java.io.tmpdir>${project.build.directory}</java.io.tmpdir>
<java.net.preferIPv4Stack>true</java.net.preferIPv4Stack>
<derby.system.home>${project.build.directory}/derby</derby.system.home>
<derby.stream.error.file>${project.build.directory}/derby.log</derby.stream.error.file>
<java.util.logging.config.file>${basedir}/src/test/resources/logging.properties</java.util.logging.config.file>
<oracle.jdbc.Trace>true</oracle.jdbc.Trace>
<log4j.configuration>file:${project.build.testOutputDirectory}/log4j.properties</log4j.configuration>
</systemPropertyVariables>
<includes>
<include>**/*Tests.java</include>
Expand Down Expand Up @@ -185,7 +193,7 @@
</plugin>
<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.8.1</version>
<version>2.9</version>
<configuration>
<encoding>${project.build.sourceEncoding}</encoding>
<locale>en</locale>
Expand All @@ -203,7 +211,7 @@
</plugin>
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.0</version>
<version>3.2</version>
<configuration>
<locales>en</locales>
<inputEncoding>UTF-8</inputEncoding>
Expand Down Expand Up @@ -249,6 +257,10 @@
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<derby.system.home>${project.build.directory}/derby</derby.system.home>
<derby.stream.error.file>${project.build.directory}/derby.log</derby.stream.error.file>
</systemPropertyVariables>
<suiteXmlFiles>
<suiteXmlFile>${basedir}/src/test/resources/testsuite-derby.xml</suiteXmlFile>
</suiteXmlFiles>
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/org/elasticsearch/river/jdbc/JDBCRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public JDBCRiver(RiverName riverName, RiverSettings riverSettings,
sql = XContentMapValues.nodeStringValue(sourceSettings.get("sql"), null);
sqlparams = XContentMapValues.extractRawValues("sqlparams", sourceSettings);
rounding = XContentMapValues.nodeStringValue(sourceSettings.get("rounding"), null);
scale = XContentMapValues.nodeIntegerValue(sourceSettings.get("scale"), 0);
scale = XContentMapValues.nodeIntegerValue(sourceSettings.get("scale"), 2);
autocommit = XContentMapValues.nodeBooleanValue(sourceSettings.get("autocommit"), Boolean.FALSE);
fetchsize = url.startsWith("jdbc:mysql") ? Integer.MIN_VALUE :
XContentMapValues.nodeIntegerValue(sourceSettings.get("fetchsize"), 10);
Expand Down Expand Up @@ -142,7 +142,7 @@ public JDBCRiver(RiverName riverName, RiverSettings riverSettings,
.precision(scale);

riverMouth = RiverServiceLoader.findRiverMouth(strategy);
logger.debug("found river target {} for strategy {}", riverMouth.getClass().getName(), strategy);
logger.debug("found river mouth {} for strategy {}", riverMouth.getClass().getName(), strategy);
riverMouth.index(indexName)
.type(typeName)
.maxBulkActions(bulkSize)
Expand All @@ -158,7 +158,7 @@ public JDBCRiver(RiverName riverName, RiverSettings riverSettings,
.riverIndexName(riverIndexName)
.riverSettings(riverSettings.settings())
.riverSource(riverSource)
.riverTarget(riverMouth)
.riverMouth(riverMouth)
.pollInterval(poll)
.pollStatement(sql)
.pollStatementParams(sqlparams)
Expand All @@ -177,7 +177,7 @@ public JDBCRiver(RiverName riverName, RiverSettings riverSettings,
// prepare task for run
riverFlow.riverContext(riverContext);

logger.debug("found river task {} for strategy {}", riverFlow.getClass().getName(), strategy);
logger.debug("found river flow {} for strategy {}", riverFlow.getClass().getName(), strategy);
}

@Override
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/org/elasticsearch/river/jdbc/RiverSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,14 @@ public interface RiverSource {
*/
ResultSet executeQuery(PreparedStatement statement) throws SQLException;

/**
* Execute query without binding parameters
* @param sql the SQL statement
* @return the result set
* @throws SQLException
*/
ResultSet executeQuery(String sql) throws SQLException;

/**
* Execute insert/update
* @param statement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public Date startDate() {
public SimpleRiverFlow delay(String reason) {
TimeValue poll = context.pollingInterval();
if (poll.millis() > 0L) {
logger.info("{}, waiting {}", reason, poll);
logger.info("({}) {}, waiting {}", context.riverName(), reason, poll);
try {
Thread.sleep(poll.millis());
} catch (InterruptedException e) {
Expand Down Expand Up @@ -151,8 +151,8 @@ public void run() {
public void move() {
try {
RiverSource source = context.riverSource();
RiverMouth target = context.riverTarget();
Client client = context.riverTarget().client();
RiverMouth target = context.riverMouth();
Client client = context.riverMouth().client();
Number version;
String digest;
GetResponse get = null;
Expand Down Expand Up @@ -219,10 +219,10 @@ public void move() {
*/
protected void versionHouseKeeping(long version) throws IOException {
logger.info("housekeeping for version " + version);
Client client = context.riverTarget().client();
String indexName = context.riverTarget().index();
String typeName = context.riverTarget().type();
int bulkSize = context.riverTarget().maxBulkActions();
Client client = context.riverMouth().client();
String indexName = context.riverMouth().index();
String typeName = context.riverMouth().type();
int bulkSize = context.riverMouth().maxBulkActions();
client.admin().indices().prepareRefresh(indexName).execute().actionGet();
SearchResponse response = client.prepareSearch().setIndices(indexName).setTypes(typeName).setSearchType(SearchType.SCAN)
.setScroll(TimeValue.timeValueMinutes(10)).setSize(bulkSize).setVersion(true).setQuery(matchAllQuery()).execute().actionGet();
Expand Down Expand Up @@ -260,7 +260,7 @@ protected void versionHouseKeeping(long version) throws IOException {
for (SearchHit hit : response.getHits().getHits()) {
// delete all documents with lower version
if (hit.getVersion() < version) {
context.riverTarget().delete(new StructuredObject()
context.riverMouth().delete(new StructuredObject()
.index(hit.getIndex())
.type(hit.getType())
.id(hit.getId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ public class SimpleRiverMouth implements RiverMouth {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
long l = outstandingBulkRequests.incrementAndGet();
logger.info("new bulk [{}] of [{} items], {} outstanding bulk requests",
executionId, request.numberOfActions(), l);
logger.info("({}) new bulk [{}] of [{} items], {} outstanding bulk requests",
context.riverName(), executionId, request.numberOfActions(), l);
}

@Override
Expand All @@ -91,19 +91,19 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon
try {
context.riverSource().acknowledge(response);
} catch (IOException e) {
logger.error("bulk ["+executionId+"] acknowledge error", e);
logger.error("("+context.riverName()+") bulk ["+executionId+"] acknowledge error", e);
}
}
outstandingBulkRequests.decrementAndGet();
logger.info("bulk [{}] success [{} items] [{}ms]",
executionId, response.items().length, response.getTook().millis());
logger.info("({}) bulk [{}] success [{} items] [{}ms]",
context.riverName(), executionId, response.items().length, response.getTook().millis());

}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
outstandingBulkRequests.decrementAndGet();
logger.error("bulk [" + executionId + "] error", failure);
logger.error("(" + context.riverName() + ") bulk [" + executionId + "] error", failure);
error = true;
}
};
Expand Down
Loading

0 comments on commit dfc599f

Please sign in to comment.