From a86ab91509f3c07a3fc77efa77b0cb7020ac03ed Mon Sep 17 00:00:00 2001 From: mallocator Date: Wed, 20 Mar 2013 14:55:06 -0700 Subject: [PATCH] - Some minor changes to documentation --- .../plugin/river/hbase/HBaseRiverPlugin.java | 15 ++++++++++- .../elasticsearch/river/hbase/HBaseRiver.java | 26 ++++++++++--------- .../river/hbase/HBaseRiverModule.java | 8 +++++- 3 files changed, 35 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/elasticsearch/plugin/river/hbase/HBaseRiverPlugin.java b/src/main/java/org/elasticsearch/plugin/river/hbase/HBaseRiverPlugin.java index a4aaabc..15286bd 100644 --- a/src/main/java/org/elasticsearch/plugin/river/hbase/HBaseRiverPlugin.java +++ b/src/main/java/org/elasticsearch/plugin/river/hbase/HBaseRiverPlugin.java @@ -5,19 +5,32 @@ import org.elasticsearch.river.RiversModule; import org.elasticsearch.river.hbase.HBaseRiverModule; -public class HBaseRiverPlugin extends AbstractPlugin { +/** + * Basic plug in information required by ElasticSearch. This class is also referenced under + * /src/main/resources/es-plugin.properties. + * + * @author Ravi Gairola + */ +public class HBaseRiverPlugin extends AbstractPlugin { @Inject public HBaseRiverPlugin() {} + @Override public String name() { return "river-hbase"; } + @Override public String description() { return "River HBase Plugin"; } + /** + * Registers the HBaseRiverModule as "hbase" river. + * + * @param module + */ public void onModule(final RiversModule module) { module.registerRiver("hbase", HBaseRiverModule.class); } diff --git a/src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java b/src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java index 37809e3..8f3e872 100644 --- a/src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java +++ b/src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.shard.IndexShardState; @@ -31,14 +32,14 @@ import org.hbase.async.Scanner; /** - * An HBase import river build similar to the MySQL river, that was modeled after the Solr SQL import functionality. + * An HBase import river built similar to the MySQL river, that was modeled after the Solr SQL import functionality. * * @author Ravi Gairola */ public class HBaseRiver extends AbstractRiverComponent implements River, UncaughtExceptionHandler { private static final String CONFIG_SPACE = "hbase"; private final Client esClient; - private volatile Thread parser; + private volatile Runnable parser; /** * Comma separated list of Zookeeper hosts to which the HBase client can connect to find the cluster. @@ -173,7 +174,6 @@ public synchronized void start() { return; } this.parser = new Parser(); - this.parser.setUncaughtExceptionHandler(this); this.logger.info("Waiting for Index to be ready for interaction"); waitForESReady(); @@ -216,7 +216,9 @@ else if (ExceptionsHelper.unwrapCause(e) instanceof ElasticSearchException) { this.logger.debug("Mapping already exists for index {} and type {}", this.index, this.type); } - this.parser.start(); + final Thread t = EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "hbase_slurper").newThread(this.parser); + t.setUncaughtExceptionHandler(this); + t.start(); } private void waitForESReady() { @@ -263,7 +265,7 @@ public void uncaughtException(final Thread arg0, final Throwable arg1) { * * @author Ravi Gairola */ - private class Parser extends Thread { + private class Parser implements Runnable { private static final String TIMESTMAP_STATS = "timestamp_stats"; private int indexCounter; private HBaseClient client; @@ -294,7 +296,7 @@ public void run() { } } try { - sleep(1000); + Thread.sleep(1000); } catch (InterruptedException e) { HBaseRiver.this.logger.trace("HBase river parsing thread has been interrupted"); } @@ -327,17 +329,16 @@ private void parse() throws InterruptedException, Exception { } } - final String timestamp = String.valueOf(setMinTimestamp(this.scanner)); - HBaseRiver.this.logger.debug("Found latest timestamp in ElasticSearch to be {}", timestamp); - + setMinTimestamp(this.scanner); ArrayList> rows; HBaseRiver.this.logger.debug("Starting to fetch rows"); + while ((rows = this.scanner.nextRows(HBaseRiver.this.batchSize).joinUninterruptibly()) != null) { if (this.stopThread) { HBaseRiver.this.logger.info("Stopping HBase import in the midle of it"); break; } - parseBulkOfRows(timestamp, rows); + parseBulkOfRows(rows); } } finally { HBaseRiver.this.logger.debug("Closing HBase Scanner and Async Client"); @@ -361,10 +362,9 @@ private void parse() throws InterruptedException, Exception { /** * Run over a bulk of rows and process them. * - * @param timestamp * @param rows */ - private void parseBulkOfRows(final String timestamp, final ArrayList> rows) { + private void parseBulkOfRows(final ArrayList> rows) { HBaseRiver.this.logger.debug("Processing the next {} entries in HBase parsing process", rows.size()); final BulkRequestBuilder bulkRequest = HBaseRiver.this.esClient.prepareBulk(); for (final ArrayList row : rows) { @@ -439,10 +439,12 @@ private long setMinTimestamp(final Scanner scanner) { final StatisticalFacet facet = (StatisticalFacet) response.facets().facet(TIMESTMAP_STATS); final long timestamp = (long) Math.max(facet.getMax(), 0); scanner.setMinTimestamp(timestamp); + HBaseRiver.this.logger.debug("Found latest timestamp in ElasticSearch to be {}", timestamp); return timestamp; } HBaseRiver.this.logger.debug("No statistical data about data timestamps could be found -> probably no data there yet"); scanner.setMinTimestamp(0); + HBaseRiver.this.logger.debug("Found latest timestamp in ElasticSearch to be not present (-> 0)"); return 0L; } } diff --git a/src/main/java/org/elasticsearch/river/hbase/HBaseRiverModule.java b/src/main/java/org/elasticsearch/river/hbase/HBaseRiverModule.java index 0e37f6d..78c4b3f 100644 --- a/src/main/java/org/elasticsearch/river/hbase/HBaseRiverModule.java +++ b/src/main/java/org/elasticsearch/river/hbase/HBaseRiverModule.java @@ -3,7 +3,13 @@ import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.river.River; -public class HBaseRiverModule extends AbstractModule { +/** + * Does the initial configuration of the Module, when it is called by ElasticSearch. Binds the HBase river as an eager + * singleton river. + * + * @author Ravi Gairola + */ +public class HBaseRiverModule extends AbstractModule { @Override protected void configure() {