Skip to content

Commit

Permalink
Merge pull request #9 from nhuray/develop
Browse files Browse the repository at this point in the history
Fix #8 : Acknowledge method in SQLService is always called
  • Loading branch information
jprante committed Jul 11, 2012
2 parents a75dcce + 9b9c166 commit a10f2c5
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 54 deletions.
107 changes: 54 additions & 53 deletions src/main/java/org/elasticsearch/river/jdbc/JDBCRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class JDBCRiver extends AbstractRiverComponent implements River {

@Inject
public JDBCRiver(RiverName riverName, RiverSettings settings,
@RiverIndexName String riverIndexName, Client client) {
@RiverIndexName String riverIndexName, Client client) {
super(riverName, settings);
this.riverIndexName = riverIndexName;
this.client = client;
Expand Down Expand Up @@ -233,6 +233,58 @@ public void run() {
}
}
}

private void housekeeper(long version) throws IOException {
logger.info("housekeeping for version " + version);
client.admin().indices().prepareRefresh(indexName).execute().actionGet();
SearchResponse response = client.prepareSearch().setIndices(indexName).setTypes(typeName).setSearchType(SearchType.SCAN).setScroll(TimeValue.timeValueMinutes(10)).setSize(bulkSize).setVersion(true).setQuery(matchAllQuery()).execute().actionGet();
if (response.timedOut()) {
logger.error("housekeeper scan query timeout");
return;
}
if (response.failedShards() > 0) {
logger.error("housekeeper failed shards in scan response: {0}", response.failedShards());
return;
}
String scrollId = response.getScrollId();
if (scrollId == null) {
logger.error("housekeeper failed, no scroll ID");
return;
}
boolean done = false;
// scroll
long deleted = 0L;
long t0 = System.currentTimeMillis();
do {
response = client.prepareSearchScroll(response.getScrollId()).setScroll(TimeValue.timeValueMinutes(10)).execute().actionGet();
if (response.timedOut()) {
logger.error("housekeeper scroll query timeout");
done = true;
} else if (response.failedShards() > 0) {
logger.error("housekeeper failed shards in scroll response: {}", response.failedShards());
done = true;
} else {
// terminate scrolling?
if (response.hits() == null) {
done = true;
} else {
for (SearchHit hit : response.getHits().getHits()) {
// delete all documents with lower version
if (hit.getVersion() < version) {
operation.delete(hit.getIndex(), hit.getType(), hit.getId());
deleted++;
}
}
scrollId = response.getScrollId();
}
}
if (scrollId != null) {
done = true;
}
} while (!done);
long t1 = System.currentTimeMillis();
logger.info("housekeeper ready, {} documents deleted, took {} ms", deleted, t1 - t0);
}
}

private class JDBCRiverTableConnector implements Runnable {
Expand All @@ -241,7 +293,7 @@ private class JDBCRiverTableConnector implements Runnable {

@Override
public void run() {
while (true) {
while (true) {
for (String optype : optypes) {
try {
Connection connection = service.getConnection(driver, url, user, password, false);
Expand Down Expand Up @@ -272,57 +324,6 @@ public void run() {
}
}

private void housekeeper(long version) throws IOException {
logger.info("housekeeping for version " + version);
client.admin().indices().prepareRefresh(indexName).execute().actionGet();
SearchResponse response = client.prepareSearch().setIndices(indexName).setTypes(typeName).setSearchType(SearchType.SCAN).setScroll(TimeValue.timeValueMinutes(10)).setSize(bulkSize).setVersion(true).setQuery(matchAllQuery()).execute().actionGet();
if (response.timedOut()) {
logger.error("housekeeper scan query timeout");
return;
}
if (response.failedShards() > 0) {
logger.error("housekeeper failed shards in scan response: {0}", response.failedShards());
return;
}
String scrollId = response.getScrollId();
if (scrollId == null) {
logger.error("housekeeper failed, no scroll ID");
return;
}
boolean done = false;
// scroll
long deleted = 0L;
long t0 = System.currentTimeMillis();
do {
response = client.prepareSearchScroll(response.getScrollId()).setScroll(TimeValue.timeValueMinutes(10)).execute().actionGet();
if (response.timedOut()) {
logger.error("housekeeper scroll query timeout");
done = true;
} else if (response.failedShards() > 0) {
logger.error("housekeeper failed shards in scroll response: {}", response.failedShards());
done = true;
} else {
// terminate scrolling?
if (response.hits() == null) {
done = true;
} else {
for (SearchHit hit : response.getHits().getHits()) {
// delete all documents with lower version
if (hit.getVersion() < version) {
operation.delete(hit.getIndex(), hit.getType(), hit.getId());
deleted++;
}
}
scrollId = response.getScrollId();
}
}
if (scrollId != null) {
done = true;
}
} while (!done);
long t1 = System.currentTimeMillis();
logger.info("housekeeper ready, {} documents deleted, took {} ms", deleted, t1 - t0);
}

private void delay(String reason) {
if (poll.millis() > 0L) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/elasticsearch/river/jdbc/SQLService.java
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ private void bind(PreparedStatement pstmt, int i, Object value) throws SQLExcept

/**
* Acknowledge a bulk item response back to the river table. Fill columns
* target_timestamp, taget_operation, target_failed, target_message.
* target_timestamp, target_operation, target_failed, target_message.
*
* @param riverName
* @param response
Expand Down

0 comments on commit a10f2c5

Please sign in to comment.