diff --git a/src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java b/src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java index eb56bca..4fd3d67 100644 --- a/src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java +++ b/src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java @@ -1,8 +1,10 @@ package org.elasticsearch.river.hbase; import java.lang.Thread.UncaughtExceptionHandler; +import java.nio.charset.Charset; import java.security.InvalidParameterException; import java.util.ArrayList; +import java.util.HashMap; import java.util.Map; import org.elasticsearch.ElasticSearchException; @@ -34,46 +36,47 @@ * @author Ravi Gairola */ public class HBaseRiver extends AbstractRiverComponent implements River, UncaughtExceptionHandler { - private final Client esClient; - private volatile Thread thread; + private static final Charset charset = Charset.forName("UTF-8"); + private final Client esClient; + private volatile Thread parser; /** * Comma separated list of Zookeeper hosts to which the HBase client can connect to find the cluster. */ - private final String hosts; + private final String hosts; /** * The HBase table name to be imported from. */ - private final String table; + private final String table; /** * The ElasticSearch index name to be imported to. (default is the river name) */ - private final String index; + private final String index; /** * The ElasticSearch type name to be imported to. (Default is the source table name) */ - private final String type; + private final String type; /** * The interval in ms with which the river is supposed to run (60000 = every minute). (Default is every 10 minutes) */ - private final long interval; + private final long interval; /** * How big are the ElasticSearch bulk indexing sizes supposed to be. Tweaking this might improve performance. (Default is * 100 operations) */ - private final int batchSize; + private final int batchSize; /** * Name of the field from HBase to be used as an idField in ElasticSearch. The mapping will set up accordingly, so that * the _id field is routed to this field name (you can access it then under both the field name and "_id"). If no id * field is given, then ElasticSearch will automatically generate an id. */ - private final String idField; + private final String idField; /** * Loads and verifies all the configuration needed to run this river. @@ -144,7 +147,7 @@ private String readConfig(final String config, final String defaultValue) { */ @Override public synchronized void start() { - if (this.thread != null) { + if (this.parser != null) { this.logger.warn("Trying to start HBase stream although it is already running"); return; } @@ -186,9 +189,10 @@ else if (ExceptionsHelper.unwrapCause(e) instanceof ElasticSearchException) { this.logger.debug("Mapping already exists for index {} and type {}", this.index, this.type); } - this.thread = EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "hbase_slurper").newThread(new Parser()); - this.thread.setUncaughtExceptionHandler(this); - this.thread.start(); + this.parser = new Parser(); + final Thread t = EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "hbase_slurper").newThread(this.parser); + t.setUncaughtExceptionHandler(this); + t.start(); } /** @@ -198,8 +202,10 @@ else if (ExceptionsHelper.unwrapCause(e) instanceof ElasticSearchException) { @Override public void close() { this.logger.info("Closing HBase river"); - ((Parser) this.thread).stopThread(); - this.thread = null; + if (this.parser instanceof Parser) { + ((Parser) this.parser).stopThread(); + } + this.parser = null; } /** @@ -270,7 +276,8 @@ private void parse() throws InterruptedException, Exception { this.client.ensureTableExists(HBaseRiver.this.table); HBaseRiver.this.logger.debug("Fetching HBase Scanner"); this.scanner = this.client.newScanner(HBaseRiver.this.table); - setMinTimestamp(this.scanner); + final String timestamp = String.valueOf(setMinTimestamp(this.scanner)); + ArrayList> rows; HBaseRiver.this.logger.debug("Starting to fetch rows"); while ((rows = this.scanner.nextRows(HBaseRiver.this.batchSize).join()) != null) { @@ -278,28 +285,58 @@ private void parse() throws InterruptedException, Exception { HBaseRiver.this.logger.info("Stopping HBase import in the midle of it"); break; } - HBaseRiver.this.logger.debug("Processing the next {} entries in HBase parsing process", rows.size()); - final BulkRequestBuilder bulkRequest = HBaseRiver.this.esClient.prepareBulk(); - for (final ArrayList row : rows) { - if (this.stopThread) { - HBaseRiver.this.logger.info("Stopping HBase import in the midle of it"); - break; - } - final IndexRequestBuilder request = HBaseRiver.this.esClient.prepareIndex(HBaseRiver.this.index, - HBaseRiver.this.type); - for (final KeyValue column : row) { - request.setSource(new String(column.key()), new String(column.value())); - } - bulkRequest.add(request); - } - bulkRequest.execute().addListener((ActionListener) this); - HBaseRiver.this.logger.debug("Sent Bulk Request with HBase data to ElasticSearch"); + parseBulkOfRows(timestamp, rows); } } finally { stopThread(); } } + /** + * Run over a bulk of rows and process them. + * + * @param timestamp + * @param rows + */ + private void parseBulkOfRows(final String timestamp, final ArrayList> rows) { + HBaseRiver.this.logger.debug("Processing the next {} entries in HBase parsing process", rows.size()); + final BulkRequestBuilder bulkRequest = HBaseRiver.this.esClient.prepareBulk(); + for (final ArrayList row : rows) { + if (this.stopThread) { + HBaseRiver.this.logger.info("Stopping HBase import in the midle of it"); + break; + } + final IndexRequestBuilder request = HBaseRiver.this.esClient.prepareIndex(HBaseRiver.this.index, HBaseRiver.this.type); + request.setTimestamp(timestamp); + request.setSource(readDataTree(row)); + bulkRequest.add(request); + } + bulkRequest.execute().addListener((ActionListener) this); + HBaseRiver.this.logger.debug("Sent Bulk Request with HBase data to ElasticSearch"); + } + + /** + * Generate a tree structure that ElasticSearch can read and index from one of the rows that has been returned from + * HBase. + * + * @param row + * @return + */ + @SuppressWarnings("unchecked") + private Map readDataTree(final ArrayList row) { + final Map dataTree = new HashMap(); + for (final KeyValue column : row) { + final String family = new String(column.family(), charset); + final String qualifier = new String(column.qualifier(), charset); + final String value = new String(column.value(), charset); + if (!dataTree.containsKey(family)) { + dataTree.put(family, new HashMap()); + } + ((Map) dataTree.get(family)).put(qualifier, value); + } + return dataTree; + } + /** * Checks if there is an open Scanner or Client and closes them. */ @@ -326,7 +363,7 @@ public synchronized void stopThread() { * * @param scanner */ - private void setMinTimestamp(final Scanner scanner) { + private long setMinTimestamp(final Scanner scanner) { HBaseRiver.this.logger.debug("Looking into ElasticSearch to determine timestamp of last import"); final SearchResponse response = HBaseRiver.this.esClient.prepareSearch(HBaseRiver.this.index) .setTypes(HBaseRiver.this.type) @@ -338,12 +375,13 @@ private void setMinTimestamp(final Scanner scanner) { if (response.facets().facet(TIMESTMAP_STATS) != null) { HBaseRiver.this.logger.debug("Got statistical data from ElasticSearch about data timestamps"); final StatisticalFacet facet = (StatisticalFacet) response.facets().facet(TIMESTMAP_STATS); - scanner.setMinTimestamp((long) Math.max(facet.getMax(), 0)); - } - else { - HBaseRiver.this.logger.debug("No statistical data about data timestamps could be found -> probably no data there yet"); - scanner.setMinTimestamp(0); + final long timestamp = (long) Math.max(facet.getMax(), 0); + scanner.setMinTimestamp(timestamp); + return timestamp; } + HBaseRiver.this.logger.debug("No statistical data about data timestamps could be found -> probably no data there yet"); + scanner.setMinTimestamp(0); + return 0L; } /**