Skip to content

Commit

Permalink
- minor changes to make jdbc-river work on 0.21 branch instead of 0.90
Browse files Browse the repository at this point in the history
- Changed the acknowledgement of bulk request in inserts instead of updates. ( with no update on river source table to avoid deadlocks)
  • Loading branch information
msimons committed Apr 16, 2013
1 parent dcdcff9 commit fbf7dab
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
*/
package org.elasticsearch.rest.action;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Map;

import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.*;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -37,11 +41,6 @@
import org.elasticsearch.river.RiversService;
import org.elasticsearch.river.jdbc.JDBCRiver;

import com.google.common.collect.ImmutableMap;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Map;

/**
* The JDBC River REST fire move. The river can be fired once to run
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/org/elasticsearch/river/jdbc/JDBCRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
*/
package org.elasticsearch.river.jdbc;

import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand All @@ -35,12 +41,6 @@
import org.elasticsearch.river.jdbc.support.RiverContext;
import org.elasticsearch.river.jdbc.support.RiverServiceLoader;

import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

/**
* The JDBC river
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@
*/
package org.elasticsearch.river.jdbc.strategy.simple;

import static org.elasticsearch.client.Requests.indexRequest;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;

import java.io.IOException;
import java.util.Date;
import java.util.Map;

import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
Expand All @@ -34,14 +42,6 @@
import org.elasticsearch.river.jdbc.support.StructuredObject;
import org.elasticsearch.search.SearchHit;

import java.io.IOException;
import java.util.Date;
import java.util.Map;

import static org.elasticsearch.client.Requests.indexRequest;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;

/**
* A river flow implementation for the 'simple' strategy.
* <p/>
Expand Down Expand Up @@ -195,9 +195,9 @@ public void move() {
logger.debug(builder.string());
}
client.prepareBulk().add(indexRequest(context.riverIndexName())
.setType(context.riverName())
.setId(ID_INFO_RIVER_INDEX)
.setSource(builder))
.type(context.riverName())
.id(ID_INFO_RIVER_INDEX)
.source(builder))
.execute().actionGet();
// house keeping if data has changed
if (digest != null && mergeDigest != null && !mergeDigest.equals(digest)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.settings.UpdateSettingsRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.PartialDocumentUpdateRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -98,7 +96,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon
}
outstandingBulkRequests.decrementAndGet();
logger.info("bulk [{}] success [{} items] [{}ms]",
executionId, response.getItems().length, response.getTook().millis());
executionId, response.items().length, response.getTook().millis());

}

Expand Down Expand Up @@ -250,30 +248,30 @@ private IndexRequest createIndexRequest(StructuredObject object, boolean create)
id(object.id());
}
IndexRequest request = Requests.indexRequest(index())
.setType(type())
.setId(id())
.setSource(object.build());
.type(type())
.id(id())
.source(object.build());
if (create) {
request.setCreate(create);
request.create(create);
}
if (object.meta(StructuredObject.VERSION) != null && versioning) {
request.setVersionType(VersionType.EXTERNAL)
.setVersion(Long.parseLong(object.meta(StructuredObject.VERSION)));
request.versionType(VersionType.EXTERNAL)
.version(Long.parseLong(object.meta(StructuredObject.VERSION)));
}
if (object.meta(StructuredObject.ROUTING) != null) {
request.setRouting(object.meta(StructuredObject.ROUTING));
request.routing(object.meta(StructuredObject.ROUTING));
}
if (object.meta(StructuredObject.PERCOLATE) != null) {
request.setPercolate(object.meta(StructuredObject.PERCOLATE));
request.percolate(object.meta(StructuredObject.PERCOLATE));
}
if (object.meta(StructuredObject.PARENT) != null) {
request.setParent(object.meta(StructuredObject.PARENT));
request.parent(object.meta(StructuredObject.PARENT));
}
if (object.meta(StructuredObject.TIMESTAMP) != null) {
request.setTimestamp(object.meta(StructuredObject.TIMESTAMP));
request.timestamp(object.meta(StructuredObject.TIMESTAMP));
}
if (object.meta(StructuredObject.TTL) != null) {
request.setTtl(Long.parseLong(object.meta(StructuredObject.TTL)));
request.ttl(Long.parseLong(object.meta(StructuredObject.TTL)));
}

return request;
Expand Down Expand Up @@ -311,17 +309,17 @@ public void delete(StructuredObject object) {
if (id == null) {
return; // skip if no doc is specified to delete
}
DeleteRequest request = Requests.deleteRequest(index()).setType(type()).setId(id());
DeleteRequest request = Requests.deleteRequest(index()).type(type()).id(id());

if (object.meta(StructuredObject.ROUTING) != null) {
request.setRouting(object.meta(StructuredObject.ROUTING));
request.routing(object.meta(StructuredObject.ROUTING));
}
if (object.meta(StructuredObject.PARENT) != null) {
request.setParent(object.meta(StructuredObject.PARENT));
request.parent(object.meta(StructuredObject.PARENT));
}
if (object.meta(StructuredObject.VERSION) != null && versioning) {
request.setVersionType(VersionType.EXTERNAL)
.setVersion(Long.parseLong(object.meta(StructuredObject.VERSION)));
request.versionType(VersionType.EXTERNAL)
.version(Long.parseLong(object.meta(StructuredObject.VERSION)));
}
bulk.add(request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,6 @@
*/
package org.elasticsearch.river.jdbc.strategy.simple;

import oracle.jdbc.OracleResultSet;
import oracle.sql.OPAQUE;

import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.river.jdbc.RiverSource;
import org.elasticsearch.river.jdbc.support.RiverContext;
import org.elasticsearch.river.jdbc.support.ValueListener;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
Expand Down Expand Up @@ -63,6 +48,21 @@
import java.util.Locale;
import java.util.regex.Pattern;

import oracle.jdbc.OracleResultSet;
import oracle.sql.OPAQUE;

import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.format.DateTimeFormat;
import org.elasticsearch.common.joda.time.format.DateTimeFormatter;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.river.jdbc.RiverSource;
import org.elasticsearch.river.jdbc.support.RiverContext;
import org.elasticsearch.river.jdbc.support.ValueListener;

/**
* A river source implementation for the 'simple' strategy.
* <p/>
Expand Down Expand Up @@ -323,7 +323,7 @@ public PreparedStatement prepareQuery(String sql) throws SQLException {
throw new SQLException("file not found: " + sql);
}
}
Connection connection = connectionForWriting();
Connection connection = connectionForReading();
if (connection == null) {
throw new SQLException("can't connect to source " + url);
}
Expand Down Expand Up @@ -389,16 +389,13 @@ public ResultSet executeQuery(PreparedStatement statement) throws SQLException {
statement.setFetchSize(context.fetchSize());
ResultSet set = statement.executeQuery();

if(writeConnection == null) {
writeConnection = connectionForWriting();
if (writeConnection == null) {
if(readConnection == null) {
readConnection = connectionForReading();
if (readConnection == null) {
throw new SQLException("can't connect to source " + url);
}
}

if (!writeConnection.getAutoCommit()) {
writeConnection.commit();
}
return set;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public String strategy() {

@Override
public String fetch() throws SQLException, IOException {
Connection connection = connectionForWriting();
Connection connection = connectionForReading();
String[] optypes = new String[]{Operations.OP_CREATE, Operations.OP_INDEX, Operations.OP_DELETE, Operations.OP_UPDATE};

long now = System.currentTimeMillis();
Expand Down Expand Up @@ -126,48 +126,41 @@ public SimpleRiverSource acknowledge(BulkResponse response) throws IOException {
if (response == null) {
logger.warn("can't acknowledge null bulk response");
}

// if acknowlegde is disabled return current.
if(!acknowledge()){
return this;
}

try {
Connection connection = connectionForWriting();
String riverName = context.riverName();
for (BulkItemResponse resp : response.getItems()) {
PreparedStatement pstmt;
for (BulkItemResponse resp : response.items()) {
PreparedStatement pstmt = null;
List<Object> params = new ArrayList();

try {
pstmt = prepareUpdate("update \"" + riverName + "\" set \"source_operation\" = 'ack' where \"_index\" = ? and \"_type\" = ? and \"_id\" = ?");
pstmt = prepareUpdate("insert into \""+riverName+"_ack\" (\"_index\",\"_type\",\"_id\","+
"\"target_timestamp\",\"target_operation\",\"target_failed\",\"target_message\") values (?,?,?,?,?,?,?)");

} catch (SQLException e) {
try {
// hsqldb
pstmt = prepareUpdate("update " + riverName + " set \"source_operation\" = 'ack' where \"_index\" = ? and \"_type\" = ? and \"_id\" = ?");
pstmt = prepareUpdate("insert into " + riverName + "_ack (\"_index\",\"_type\",\"_id\","+
"\"target_timestamp\",\"target_operation\",\"target_failed\",\"target_message\") values (?,?,?,?,?,?,?)");
} catch (SQLException e1) {
// mysql
pstmt = prepareUpdate("update " + riverName + " set source_operation = 'ack' where _index = ? and _type = ? and _id = ?");
pstmt = prepareUpdate("insert into " + riverName + "_ack (_index,_type,_id,"+
"target_timestamp,target_operation,target_failed,target_message) values (?,?,?,?,?,?,?)");
}
}
List<Object> params = new ArrayList();
params = new ArrayList();
params.add(resp.getIndex());
params.add(resp.getType());
params.add(resp.getId());
bind(pstmt, params);
executeUpdate(pstmt);
close(pstmt);
try {
pstmt = prepareUpdate("update \"" + riverName + "_ack\" set \"target_timestamp\" = ?, \"target_operation\" = ?, \"target_failed\" = ?, \"target_message\" = ? where \"_index\" = ? and \"_type\" = ? and \"_id\" = ?");
} catch (SQLException e) {
try {
// hsqldb
pstmt = prepareUpdate("update " + riverName + "_ack set \"target_timestamp\" = ?, \"target_operation\" = ?, \"target_failed\" = ?, \"target_message\" = ? where \"_index\" = ? and \"_type\" = ? and \"_id\" = ?");
} catch (SQLException e1) {
// mysql
pstmt = prepareUpdate("update " + riverName + "_ack set target_timestamp = ?, target_operation = ?, target_failed = ?, target_message = ? where _index = ? and _type = ? and _id = ?");
}
}
params = new ArrayList();
params.add(new Timestamp(new java.util.Date().getTime()));
params.add(resp.getOpType());
params.add(resp.opType());
params.add(resp.isFailed());
params.add(resp.getFailureMessage());
params.add(resp.getIndex());
params.add(resp.getType());
params.add(resp.getId());
bind(pstmt, params);
executeUpdate(pstmt);
close(pstmt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@
*/
package org.elasticsearch.river.jdbc.support;

import org.elasticsearch.common.xcontent.XContentBuilder;

import com.google.common.base.Objects;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

import java.io.IOException;
import java.security.MessageDigest;
import java.util.HashMap;
import java.util.Map;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import org.elasticsearch.common.base.Objects;
import org.elasticsearch.common.xcontent.XContentBuilder;


/**
* A structured object is composed by an object data source together with
Expand Down Expand Up @@ -62,6 +62,7 @@ public StructuredObject index(String index) {
public String index() {
return meta.get(INDEX);
}


public StructuredObject type(String type) {
meta.put(TYPE, type);
Expand Down
Loading

0 comments on commit fbf7dab

Please sign in to comment.