Skip to content

Commit

Permalink
Merge pull request jprante#1 from alexanderkjall/master
Browse files Browse the repository at this point in the history
if you want to test the removal of the driver configuration option
  • Loading branch information
cortex committed Nov 27, 2012
2 parents dbe6f07 + 19224b6 commit f8f6102
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 104 deletions.
2 changes: 1 addition & 1 deletion mysql-demo.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ drop table if exists employees;
create table employees (
name varchar(32),
department varchar(32),
salary decimal(5,2)
salary varchar(32)
);

drop table if exists departments;
Expand Down
16 changes: 15 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,21 @@
<artifactId>mysql-connector-java</artifactId>
<version>5.1.15</version>
<scope>test</scope>
</dependency>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-mxj</artifactId>
<version>5.0.12</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
<scope>test</scope>
</dependency>
</dependencies>

<reporting>
Expand Down
11 changes: 6 additions & 5 deletions src/main/java/org/elasticsearch/river/jdbc/BulkOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.elasticsearch.river.jdbc;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
Expand All @@ -30,6 +28,9 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.VersionType;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Send bulk data to Elasticsearch
*
Expand All @@ -49,7 +50,7 @@ public class BulkOperation implements Action {
private static final int MAX_TOTAL_TIMEOUTS = 10;
private static final AtomicInteger onGoingBulks = new AtomicInteger(0);
private static final AtomicInteger counter = new AtomicInteger(0);
private ThreadLocal<BulkRequestBuilder> currentBulk = new ThreadLocal();
private ThreadLocal<BulkRequestBuilder> currentBulk = new ThreadLocal<BulkRequestBuilder>();
private String riverName;
private BulkAcknowledge ack;
private boolean versioning;
Expand Down Expand Up @@ -228,7 +229,7 @@ private void processBulk() {
}
int currentOnGoingBulks = onGoingBulks.incrementAndGet();
final int numberOfActions = currentBulk.get().numberOfActions();
logger.info("submitting new bulk request ({} docs, {} requests currently active)", new Object[]{numberOfActions, currentOnGoingBulks});
logger.info("submitting new bulk request ({} docs, {} requests currently active)", numberOfActions, currentOnGoingBulks );
try {
currentBulk.get().execute(new ActionListener<BulkResponse>() {

Expand All @@ -243,7 +244,7 @@ public void onResponse(BulkResponse bulkResponse) {
logger.error("bulk request has failures: {}", bulkResponse.buildFailureMessage());
} else {
final int totalActions = counter.addAndGet(numberOfActions);
logger.info("bulk request success ({} millis, {} docs, total of {} docs)", new Object[]{bulkResponse.tookInMillis(), numberOfActions, totalActions});
logger.info("bulk request success ({} millis, {} docs, total of {} docs)", bulkResponse.tookInMillis(), numberOfActions, totalActions );
}
onGoingBulks.decrementAndGet();
synchronized (onGoingBulks) {
Expand Down
51 changes: 23 additions & 28 deletions src/main/java/org/elasticsearch/river/jdbc/JDBCRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,33 @@
*/
package org.elasticsearch.river.jdbc;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import static org.elasticsearch.client.Requests.indexRequest;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import org.elasticsearch.river.*;
import org.elasticsearch.search.SearchHit;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Date;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.client.Requests.indexRequest;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;

public class JDBCRiver extends AbstractRiverComponent implements River {

private final Client client;
Expand All @@ -61,7 +59,6 @@ public class JDBCRiver extends AbstractRiverComponent implements River {
private final TimeValue poll;
private final TimeValue interval;
private final String url;
private final String driver;
private final String user;
private final String password;
private final String sql;
Expand All @@ -85,7 +82,6 @@ public JDBCRiver(RiverName riverName, RiverSettings settings,
Map<String, Object> jdbcSettings = (Map<String, Object>) settings.settings().get("jdbc");
poll = XContentMapValues.nodeTimeValue(jdbcSettings.get("poll"), TimeValue.timeValueMinutes(60));
url = XContentMapValues.nodeStringValue(jdbcSettings.get("url"), null);
driver = XContentMapValues.nodeStringValue(jdbcSettings.get("driver"), null);
user = XContentMapValues.nodeStringValue(jdbcSettings.get("user"), null);
password = XContentMapValues.nodeStringValue(jdbcSettings.get("password"), null);
sql = XContentMapValues.nodeStringValue(jdbcSettings.get("sql"), null);
Expand All @@ -99,7 +95,6 @@ public JDBCRiver(RiverName riverName, RiverSettings settings,
} else {
poll = TimeValue.timeValueMinutes(60);
url = null;
driver = null;
user = null;
password = null;
sql = null;
Expand Down Expand Up @@ -137,8 +132,8 @@ public JDBCRiver(RiverName riverName, RiverSettings settings,

@Override
public void start() {
logger.info("starting JDBC connector: URL [{}], driver [{}], sql [{}], river table [{}], indexing to [{}]/[{}], poll [{}]",
url, driver, sql, rivertable, indexName, typeName, poll);
logger.info("starting JDBC connector: URL [{}], sql [{}], river table [{}], indexing to [{}]/[{}], poll [{}]",
url, sql, rivertable, indexName, typeName, poll);
try {
client.admin().indices().prepareCreate(indexName).execute().actionGet();
creationDate = new Date();
Expand Down Expand Up @@ -172,8 +167,8 @@ private class JDBCConnector implements Runnable {
@Override
public void run() {

Number version = null;
String digest = null;
Number version;
String digest;

while (true) {
try {
Expand All @@ -193,7 +188,7 @@ public void run() {
throw new IOException("can't retrieve previously persisted state from " + riverIndexName + "/" + riverName().name());
}
}
Connection connection = service.getConnection(driver, url, user, password, true);
Connection connection = service.getConnection(url, user, password, true);
PreparedStatement statement = service.prepareStatement(connection, sql);


Expand All @@ -206,7 +201,7 @@ public void run() {

ResultSet results = service.execute(statement, fetchsize);
Merger merger = new Merger(operation, version.longValue());
saveStatus(version.longValue(), merger.getDigest(), "running", 0, startTime.toString());
saveStatus(version.longValue(), merger.getDigest(), "running", 0, startTime );

long rows = 0L;
while (service.nextRow(results, merger)) {
Expand Down Expand Up @@ -325,7 +320,7 @@ public void run() {
while (true) {
for (String optype : optypes) {
try {
Connection connection = service.getConnection(driver, url, user, password, false);
Connection connection = service.getConnection(url, user, password, false);
PreparedStatement statement = service.prepareRiverTableStatement(connection, riverName.getName(), optype, interval.millis());
ResultSet results = service.execute(statement, fetchsize);
Merger merger = new Merger(operation);
Expand Down Expand Up @@ -356,11 +351,11 @@ public void run() {

private void delay(String reason) {
if (poll.millis() > 0L) {
logger.info("{}, waiting {}, URL [{}] driver [{}] sql [{}] river table [{}]",
reason, poll, url, driver, sql, rivertable);
logger.info("{}, waiting {}, URL [{}] sql [{}] river table [{}]",
reason, poll, url, sql, rivertable);
try {
Thread.sleep(poll.millis());
} catch (InterruptedException e1) {
} catch (InterruptedException ignored) {
}
}
}
Expand Down
12 changes: 7 additions & 5 deletions src/main/java/org/elasticsearch/river/jdbc/Merger.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
*/
package org.elasticsearch.river.jdbc;

import org.elasticsearch.common.Base64;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.xcontent.XContentBuilder;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

/**
Expand Down Expand Up @@ -95,7 +97,7 @@ public Merger(char delimiter, Action action, long version) throws IOException, N
this.delimiter = delimiter;
this.builder = jsonBuilder();
this.listener = action;
this.map = new HashMap();
this.map = new HashMap<String, Object>();
this.version = version;
this.digest = MessageDigest.getInstance(DIGEST_ALGORITHM);
this.closed = false;
Expand Down Expand Up @@ -228,7 +230,7 @@ public void flush(String optype, String index, String type, String id) throws IO
}
builder.close();
builder = jsonBuilder();
map = new HashMap();
map = new HashMap<String, Object>();
}
}

Expand Down Expand Up @@ -284,7 +286,7 @@ protected void merge(Map<String, Object> map, String key, Object value) {
throw new IllegalArgumentException("illegal prefix: " + p);
}
} else {
Map<String, Object> m = new HashMap();
Map<String, Object> m = new HashMap<String, Object>();
map.put(p, m);
merge(m, q, value);
}
Expand Down
43 changes: 13 additions & 30 deletions src/main/java/org/elasticsearch/river/jdbc/SQLService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,15 @@
*/
package org.elasticsearch.river.jdbc;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.common.logging.ESLogger;

import java.io.IOException;
import java.math.BigDecimal;
import java.net.URL;
import java.sql.Array;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.NClob;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLXML;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.sql.*;
import java.util.LinkedList;
import java.util.List;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.common.logging.ESLogger;

/**
* The SQL service class manages the SQL access to the JDBC connection.
Expand Down Expand Up @@ -88,18 +75,14 @@ public SQLService setPrecision(int scale) {
/**
* Get JDBC connection
*
* @param driverClassName
* @param jdbcURL
* @param user
* @param password
* @return the connection
* @throws ClassNotFoundException
* @throws SQLException
*/
public Connection getConnection(final String driverClassName,
final String jdbcURL, final String user, final String password, boolean readOnly)
throws ClassNotFoundException, SQLException {
Class.forName(driverClassName);
public Connection getConnection(final String jdbcURL, final String user, final String password, boolean readOnly)
throws SQLException {
this.connection = DriverManager.getConnection(jdbcURL, user, password);
connection.setReadOnly(readOnly);
connection.setAutoCommit(false);
Expand Down Expand Up @@ -248,8 +231,8 @@ public boolean nextRiverTableRow(ResultSet result, RowListener listener)

private void processRow(ResultSet result, RowListener listener, String operation, String index, String type, String id)
throws SQLException, IOException {
LinkedList<String> keys = new LinkedList();
LinkedList<Object> values = new LinkedList();
LinkedList<String> keys = new LinkedList<String>();
LinkedList<Object> values = new LinkedList<Object>();
ResultSetMetaData metadata = result.getMetaData();
int columns = metadata.getColumnCount();
for (int i = 1; i <= columns; i++) {
Expand Down Expand Up @@ -813,19 +796,19 @@ private void bind(PreparedStatement pstmt, int i, Object value) throws SQLExcept
pstmt.setString(i, (String) value);
}
} else if (value instanceof Integer) {
pstmt.setInt(i, ((Integer) value).intValue());
pstmt.setInt(i, (Integer) value );
} else if (value instanceof Long) {
pstmt.setLong(i, ((Long) value).longValue());
pstmt.setLong(i, (Long) value );
} else if (value instanceof BigDecimal) {
pstmt.setBigDecimal(i, (BigDecimal) value);
} else if (value instanceof Date) {
pstmt.setDate(i, (Date) value);
} else if (value instanceof Timestamp) {
pstmt.setTimestamp(i, (Timestamp) value);
} else if (value instanceof Float) {
pstmt.setFloat(i, ((Float) value).floatValue());
pstmt.setFloat(i, (Float) value );
} else if (value instanceof Double) {
pstmt.setDouble(i, ((Double) value).doubleValue());
pstmt.setDouble(i, (Double) value );
} else {
pstmt.setObject(i, value);
}
Expand Down Expand Up @@ -864,4 +847,4 @@ public void acknowledge(String riverName, BulkItemResponse[] response) throws IO
throw new IOException(ex);
}
}
}
}
17 changes: 9 additions & 8 deletions src/main/java/org/elasticsearch/river/jdbc/ValueSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
*/
package org.elasticsearch.river.jdbc;

import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.security.MessageDigest;
import org.elasticsearch.common.xcontent.XContentBuilder;

/**
* A ValueSet represents one or many values.
Expand All @@ -47,9 +48,9 @@ public ValueSet(Object valueset, Object v) {
Object[] values = t.getValues();
int l = values.length;
boolean found = false;
for (int i = 0; i < l; i++) {
found = v != null && v.equals(values[i]);
if (found) {
for ( Object value : values ) {
found = v != null && v.equals( value );
if ( found ) {
break;
}
}
Expand Down Expand Up @@ -83,10 +84,10 @@ public void build(XContentBuilder builder, MessageDigest digest, String encoding
if (value.length > 1) {
builder.startArray();
}
for (int i = 0; i < value.length; i++) {
builder.value(value[i]);
if (value[i] != null) {
digest.update(value[i].toString().getBytes(encoding));
for ( Object aValue : value ) {
builder.value( aValue );
if ( aValue != null ) {
digest.update( aValue.toString().getBytes( encoding ) );
}
}
if (value.length > 1) {
Expand Down
Loading

0 comments on commit f8f6102

Please sign in to comment.