diff --git a/src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java b/src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java index 06446a6..7830a0c 100644 --- a/src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java +++ b/src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java @@ -10,7 +10,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; @@ -30,7 +30,6 @@ public class HBaseRiver extends AbstractRiverComponent implements River, UncaughtExceptionHandler { private final Client esClient; - private boolean stopThread; private volatile Thread thread; private final String hosts; @@ -45,9 +44,9 @@ public class HBaseRiver extends AbstractRiverComponent implements River, Uncaugh public HBaseRiver(final RiverName riverName, final RiverSettings settings, final Client esClient) { super(riverName, settings); this.esClient = esClient; - this.logger.info("Creating MySQL Stream River"); + this.logger.info("Creating HBase Stream River"); - this.hosts = readConfig("hosts", readConfig("zookeeper")); + this.hosts = readConfig("hosts", readConfig("zookeeper", null)); this.table = readConfig("table"); this.idField = readConfig("idField", null); this.index = readConfig("index", riverName.name()); @@ -67,16 +66,20 @@ private String readConfig(final String config) { @SuppressWarnings({ "unchecked" }) private String readConfig(final String config, final String defaultValue) { - if (this.settings.settings().containsKey("mysql")) { - Map mysqlSettings = (Map) this.settings.settings().get("mysql"); + if (this.settings.settings().containsKey("hbase")) { + Map mysqlSettings = (Map) this.settings.settings().get("hbase"); return XContentMapValues.nodeStringValue(mysqlSettings.get(config), defaultValue); } return defaultValue; } @Override - public void start() { - this.logger.info("starting hbase stream"); + public synchronized void start() { + if (this.thread != null) { + this.logger.warn("Trying to start HBase stream although it is already running"); + return; + } + this.logger.info("Starting HBase Stream"); String mapping; if (this.idField == null) { mapping = "{\"" + this.type + "\":{\"_timestamp\":{\"enabled\":true}}}"; @@ -114,28 +117,30 @@ else if (ExceptionsHelper.unwrapCause(e) instanceof ElasticSearchException) { this.logger.debug("Mapping already exists for index {} and type {}", this.index, this.type); } - if (this.thread == null) { - this.thread = EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "hbase_slurper").newThread(new Parser()); - this.thread.setUncaughtExceptionHandler(this); - this.thread.start(); - } + this.thread = EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "hbase_slurper").newThread(new Parser()); + this.thread.setUncaughtExceptionHandler(this); + this.thread.start(); } @Override public void close() { this.logger.info("Closing HBase river"); - this.stopThread = true; + ((Parser) this.thread).stopThread(); this.thread = null; } @Override public void uncaughtException(final Thread arg0, final Throwable arg1) { - this.logger.error("An Exception has been thrown in HBase Import Thread", arg1); + this.logger.error("An Exception has been thrown in HBase Import Thread", arg1, (Object[]) null); close(); } private class Parser extends Thread implements ActionListener { - private int indexCounter; + private static final String TIMESTMAP_STATS = "timestamp_stats"; + private int indexCounter; + private HBaseClient client; + private Scanner scanner; + private boolean stopThread; private Parser() {} @@ -143,7 +148,7 @@ private Parser() {} public void run() { HBaseRiver.this.logger.info("HBase Import Thread has started"); long lastRun = 0; - while (!HBaseRiver.this.stopThread) { + while (!this.stopThread) { if (lastRun + HBaseRiver.this.interval < System.currentTimeMillis()) { lastRun = System.currentTimeMillis(); try { @@ -154,7 +159,7 @@ public void run() { if (HBaseRiver.this.interval <= 0) { break; } - if (!HBaseRiver.this.stopThread) { + if (!this.stopThread) { HBaseRiver.this.logger.info("HBase Import Thread is waiting for {} Seconds until the next run", HBaseRiver.this.interval / 1000); } @@ -167,24 +172,58 @@ public void run() { } private void parse() throws InterruptedException, Exception { - final HBaseClient client = new HBaseClient(HBaseRiver.this.hosts); - client.ensureTableExists(HBaseRiver.this.table); - final Scanner scanner = client.newScanner(HBaseRiver.this.table); - setMinTimestamp(scanner); - - ArrayList> rows; - while ((rows = scanner.nextRows(HBaseRiver.this.batchSize).join()) != null) { - final BulkRequestBuilder bulkRequest = HBaseRiver.this.esClient.prepareBulk(); - for (final ArrayList row : rows) { - final IndexRequest request = new IndexRequest(HBaseRiver.this.index, HBaseRiver.this.type); - for (final KeyValue column : row) { - final String key = String.valueOf(column.key()); - final String value = String.valueOf(column.value()); - request.source(key, value); + HBaseRiver.this.logger.info("Parsing data from HBase"); + this.client = new HBaseClient(HBaseRiver.this.hosts); + try { + HBaseRiver.this.logger.debug("Checking if table {} actually exists in HBase DB", HBaseRiver.this.table); + this.client.ensureTableExists(HBaseRiver.this.table); + HBaseRiver.this.logger.debug("Fetching HBase Scanner"); + this.scanner = this.client.newScanner(HBaseRiver.this.table); + setMinTimestamp(this.scanner); + ArrayList> rows; + HBaseRiver.this.logger.debug("Starting to fetch rows"); + while ((rows = this.scanner.nextRows(HBaseRiver.this.batchSize).join()) != null) { + if (this.stopThread) { + HBaseRiver.this.logger.info("Stopping HBase import in the midle of it"); + break; + } + HBaseRiver.this.logger.debug("Processing the next {} entries in HBase parsing process", rows.size()); + final BulkRequestBuilder bulkRequest = HBaseRiver.this.esClient.prepareBulk(); + for (final ArrayList row : rows) { + if (this.stopThread) { + HBaseRiver.this.logger.info("Stopping HBase import in the midle of it"); + break; + } + final IndexRequestBuilder request = HBaseRiver.this.esClient.prepareIndex(HBaseRiver.this.index, + HBaseRiver.this.type); + for (final KeyValue column : row) { + request.setSource(column.key().toString(), column.value().toString()); + } + bulkRequest.add(request); } - bulkRequest.add(request); + bulkRequest.execute().addListener((ActionListener) this); + HBaseRiver.this.logger.debug("Sent Bulk Request with HBase data to ElasticSearch"); + } + } finally { + stopThread(); + } + } + + public synchronized void stopThread() { + this.stopThread = true; + if (this.scanner != null) { + try { + this.scanner.close(); + } catch (Exception e) { + HBaseRiver.this.logger.error("An Exception has been caught while closing the HBase Scanner", e, (Object[]) null); + } + } + if (this.client != null) { + try { + this.client.shutdown(); + } catch (Exception e) { + HBaseRiver.this.logger.error("An Exception has been caught while shuting down the HBase client", e, (Object[]) null); } - bulkRequest.execute().addListener((ActionListener) this); } } @@ -194,16 +233,22 @@ private void parse() throws InterruptedException, Exception { * @param scanner */ private void setMinTimestamp(final Scanner scanner) { + HBaseRiver.this.logger.debug("Looking into ElasticSearch to determine timestamp of last import"); final SearchResponse response = HBaseRiver.this.esClient.prepareSearch(HBaseRiver.this.index) .setTypes(HBaseRiver.this.type) .setQuery(QueryBuilders.matchAllQuery()) - .addFacet(FacetBuilders.statisticalFacet("timestmap_stats").field("_timestamp")) + .addFacet(FacetBuilders.statisticalFacet(TIMESTMAP_STATS).field("_timestamp")) .execute() .actionGet(); - if (!response.facets().facets().isEmpty()) { - final StatisticalFacet facet = (StatisticalFacet) response.facets().facet("timestmap_stats"); - scanner.setMinTimestamp((long) facet.getMax()); + if (response.facets().facet(TIMESTMAP_STATS) != null) { + HBaseRiver.this.logger.debug("Got statistical data from ElasticSearch about data timestamps"); + final StatisticalFacet facet = (StatisticalFacet) response.facets().facet(TIMESTMAP_STATS); + scanner.setMinTimestamp((long) Math.max(facet.getMax(), 0)); + } + else { + HBaseRiver.this.logger.debug("No statistical data about data timestamps could be found -> probably no data there yet"); + scanner.setMinTimestamp(0); } }