Skip to content

Commit

Permalink
- Some minor changes to documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
mallocator committed Mar 20, 2013
1 parent f43c7b1 commit a86ab91
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,32 @@
import org.elasticsearch.river.RiversModule;
import org.elasticsearch.river.hbase.HBaseRiverModule;

public class HBaseRiverPlugin extends AbstractPlugin {
/**
* Basic plug in information required by ElasticSearch. This class is also referenced under
* /src/main/resources/es-plugin.properties.
*
* @author Ravi Gairola
*/
public class HBaseRiverPlugin extends AbstractPlugin {

@Inject
public HBaseRiverPlugin() {}

@Override
public String name() {
return "river-hbase";
}

@Override
public String description() {
return "River HBase Plugin";
}

/**
* Registers the HBaseRiverModule as "hbase" river.
*
* @param module
*/
public void onModule(final RiversModule module) {
module.registerRiver("hbase", HBaseRiverModule.class);
}
Expand Down
26 changes: 14 additions & 12 deletions src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.IndexShardState;
Expand All @@ -31,14 +32,14 @@
import org.hbase.async.Scanner;

/**
* An HBase import river build similar to the MySQL river, that was modeled after the Solr SQL import functionality.
* An HBase import river built similar to the MySQL river, that was modeled after the Solr SQL import functionality.
*
* @author Ravi Gairola
*/
public class HBaseRiver extends AbstractRiverComponent implements River, UncaughtExceptionHandler {
private static final String CONFIG_SPACE = "hbase";
private final Client esClient;
private volatile Thread parser;
private volatile Runnable parser;

/**
* Comma separated list of Zookeeper hosts to which the HBase client can connect to find the cluster.
Expand Down Expand Up @@ -173,7 +174,6 @@ public synchronized void start() {
return;
}
this.parser = new Parser();
this.parser.setUncaughtExceptionHandler(this);

this.logger.info("Waiting for Index to be ready for interaction");
waitForESReady();
Expand Down Expand Up @@ -216,7 +216,9 @@ else if (ExceptionsHelper.unwrapCause(e) instanceof ElasticSearchException) {
this.logger.debug("Mapping already exists for index {} and type {}", this.index, this.type);
}

this.parser.start();
final Thread t = EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "hbase_slurper").newThread(this.parser);
t.setUncaughtExceptionHandler(this);
t.start();
}

private void waitForESReady() {
Expand Down Expand Up @@ -263,7 +265,7 @@ public void uncaughtException(final Thread arg0, final Throwable arg1) {
*
* @author Ravi Gairola
*/
private class Parser extends Thread {
private class Parser implements Runnable {
private static final String TIMESTMAP_STATS = "timestamp_stats";
private int indexCounter;
private HBaseClient client;
Expand Down Expand Up @@ -294,7 +296,7 @@ public void run() {
}
}
try {
sleep(1000);
Thread.sleep(1000);
} catch (InterruptedException e) {
HBaseRiver.this.logger.trace("HBase river parsing thread has been interrupted");
}
Expand Down Expand Up @@ -327,17 +329,16 @@ private void parse() throws InterruptedException, Exception {
}
}

final String timestamp = String.valueOf(setMinTimestamp(this.scanner));
HBaseRiver.this.logger.debug("Found latest timestamp in ElasticSearch to be {}", timestamp);

setMinTimestamp(this.scanner);
ArrayList<ArrayList<KeyValue>> rows;
HBaseRiver.this.logger.debug("Starting to fetch rows");

while ((rows = this.scanner.nextRows(HBaseRiver.this.batchSize).joinUninterruptibly()) != null) {
if (this.stopThread) {
HBaseRiver.this.logger.info("Stopping HBase import in the midle of it");
break;
}
parseBulkOfRows(timestamp, rows);
parseBulkOfRows(rows);
}
} finally {
HBaseRiver.this.logger.debug("Closing HBase Scanner and Async Client");
Expand All @@ -361,10 +362,9 @@ private void parse() throws InterruptedException, Exception {
/**
* Run over a bulk of rows and process them.
*
* @param timestamp
* @param rows
*/
private void parseBulkOfRows(final String timestamp, final ArrayList<ArrayList<KeyValue>> rows) {
private void parseBulkOfRows(final ArrayList<ArrayList<KeyValue>> rows) {
HBaseRiver.this.logger.debug("Processing the next {} entries in HBase parsing process", rows.size());
final BulkRequestBuilder bulkRequest = HBaseRiver.this.esClient.prepareBulk();
for (final ArrayList<KeyValue> row : rows) {
Expand Down Expand Up @@ -439,10 +439,12 @@ private long setMinTimestamp(final Scanner scanner) {
final StatisticalFacet facet = (StatisticalFacet) response.facets().facet(TIMESTMAP_STATS);
final long timestamp = (long) Math.max(facet.getMax(), 0);
scanner.setMinTimestamp(timestamp);
HBaseRiver.this.logger.debug("Found latest timestamp in ElasticSearch to be {}", timestamp);
return timestamp;
}
HBaseRiver.this.logger.debug("No statistical data about data timestamps could be found -> probably no data there yet");
scanner.setMinTimestamp(0);
HBaseRiver.this.logger.debug("Found latest timestamp in ElasticSearch to be not present (-> 0)");
return 0L;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.river.River;

public class HBaseRiverModule extends AbstractModule {
/**
* Does the initial configuration of the Module, when it is called by ElasticSearch. Binds the HBase river as an eager
* singleton river.
*
* @author Ravi Gairola
*/
public class HBaseRiverModule extends AbstractModule {

@Override
protected void configure() {
Expand Down

0 comments on commit a86ab91

Please sign in to comment.