Skip to content

Commit

Permalink
- Now waits properly for the index to be ready (if there is one)
Browse files Browse the repository at this point in the history
  • Loading branch information
mallocator committed Mar 20, 2013
1 parent 7b27db4 commit 0ad1dbf
Showing 1 changed file with 32 additions and 17 deletions.
49 changes: 32 additions & 17 deletions src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.indices.status.ShardStatus;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
Expand All @@ -17,6 +18,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
Expand All @@ -34,51 +36,52 @@
* @author Ravi Gairola
*/
public class HBaseRiver extends AbstractRiverComponent implements River, UncaughtExceptionHandler {
private final Client esClient;
private volatile Thread parser;
private static final String CONFIG_SPACE = "hbase";
private final Client esClient;
private volatile Thread parser;

/**
* Comma separated list of Zookeeper hosts to which the HBase client can connect to find the cluster.
*/
private final String hosts;
private final String hosts;

/**
* The HBase table name to be imported from.
*/
private final String table;
private final String table;

/**
* The ElasticSearch index name to be imported to. (default is the river name)
*/
private final String index;
private final String index;

/**
* The ElasticSearch type name to be imported to. (Default is the source table name)
*/
private final String type;
private final String type;

/**
* The interval in ms with which the river is supposed to run (60000 = every minute). (Default is every 10 minutes)
*/
private final long interval;
private final long interval;

/**
* How big are the ElasticSearch bulk indexing sizes supposed to be. Tweaking this might improve performance. (Default is
* 100 operations)
*/
private final int batchSize;
private final int batchSize;

/**
* Name of the field from HBase to be used as an idField in ElasticSearch. The mapping will set up accordingly, so that
* the _id field is routed to this field name (you can access it then under both the field name and "_id"). If no id
* field is given, then ElasticSearch will automatically generate an id.
*/
private final String idField;
private final String idField;

/**
* The char set which is used to parse data from HBase. (Default is UTF-8)
*/
private final Charset charset;
private final Charset charset;

/**
* Loads and verifies all the configuration needed to run this river.
Expand Down Expand Up @@ -137,8 +140,8 @@ private String readConfig(final String config) {
*/
@SuppressWarnings({ "unchecked" })
private String readConfig(final String config, final String defaultValue) {
if (this.settings.settings().containsKey("hbase")) {
Map<String, Object> mysqlSettings = (Map<String, Object>) this.settings.settings().get("hbase");
if (this.settings.settings().containsKey(CONFIG_SPACE)) {
Map<String, Object> mysqlSettings = (Map<String, Object>) this.settings.settings().get(CONFIG_SPACE);
return XContentMapValues.nodeStringValue(mysqlSettings.get(config), defaultValue);
}
return defaultValue;
Expand All @@ -157,6 +160,7 @@ public synchronized void start() {
this.parser = new Parser();
this.parser.setUncaughtExceptionHandler(this);

this.logger.info("Waiting for Index to be ready for interaction");
waitForESReady();

this.logger.info("Starting HBase Stream");
Expand Down Expand Up @@ -201,10 +205,21 @@ else if (ExceptionsHelper.unwrapCause(e) instanceof ElasticSearchException) {
}

private void waitForESReady() {
this.logger.info("Waiting for Index to be ready for interaction");
// TODO doesn't actually work, just times out after a while.
this.esClient.admin().cluster().prepareHealth(this.index).setWaitForGreenStatus().execute().actionGet();
this.logger.debug("ElastichSearch Index has reported it is ready for interaction");
if (!this.esClient.admin().indices().prepareExists(this.index).execute().actionGet().exists()) {
return;
}
for (final ShardStatus status : this.esClient.admin().indices().prepareStatus(this.index).execute().actionGet().getShards()) {
if (status.getState() != IndexShardState.STARTED) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
this.logger.trace("HBase thread has been interrupted while waiting for the database to be reachable");
}
this.logger.trace("Waiting...");
waitForESReady();
break;
}
}
}

/**
Expand Down Expand Up @@ -346,7 +361,7 @@ private void parseBulkOfRows(final String timestamp, final ArrayList<ArrayList<K
final BulkResponse response = bulkRequest.execute().actionGet();

this.indexCounter += response.items().length;
HBaseRiver.this.logger.info("HBase imported has indexed {} entries so far", this.indexCounter);
HBaseRiver.this.logger.info("HBase river has indexed {} entries so far", this.indexCounter);
if (response.hasFailures()) {
HBaseRiver.this.logger.error("Errors have occured while trying to index new data from HBase");
}
Expand Down

0 comments on commit 0ad1dbf

Please sign in to comment.