Skip to content

Commit

Permalink
- Somewhat working version that stops as soon as HBase throws an error
Browse files Browse the repository at this point in the history
  • Loading branch information
mallocator committed Mar 19, 2013
1 parent 452579a commit 7b27db4
Showing 1 changed file with 71 additions and 71 deletions.
142 changes: 71 additions & 71 deletions src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@

import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.IndexAlreadyExistsException;
Expand All @@ -36,47 +34,51 @@
* @author Ravi Gairola
*/
public class HBaseRiver extends AbstractRiverComponent implements River, UncaughtExceptionHandler {
private static final Charset charset = Charset.forName("UTF-8");
private final Client esClient;
private volatile Thread parser;
private final Client esClient;
private volatile Thread parser;

/**
* Comma separated list of Zookeeper hosts to which the HBase client can connect to find the cluster.
*/
private final String hosts;
private final String hosts;

/**
* The HBase table name to be imported from.
*/
private final String table;
private final String table;

/**
* The ElasticSearch index name to be imported to. (default is the river name)
*/
private final String index;
private final String index;

/**
* The ElasticSearch type name to be imported to. (Default is the source table name)
*/
private final String type;
private final String type;

/**
* The interval in ms with which the river is supposed to run (60000 = every minute). (Default is every 10 minutes)
*/
private final long interval;
private final long interval;

/**
* How big are the ElasticSearch bulk indexing sizes supposed to be. Tweaking this might improve performance. (Default is
* 100 operations)
*/
private final int batchSize;
private final int batchSize;

/**
* Name of the field from HBase to be used as an idField in ElasticSearch. The mapping will set up accordingly, so that
* the _id field is routed to this field name (you can access it then under both the field name and "_id"). If no id
* field is given, then ElasticSearch will automatically generate an id.
*/
private final String idField;
private final String idField;

/**
* The char set which is used to parse data from HBase. (Default is UTF-8)
*/
private final Charset charset;

/**
* Loads and verifies all the configuration needed to run this river.
Expand All @@ -98,6 +100,7 @@ public HBaseRiver(final RiverName riverName, final RiverSettings settings, final
this.type = readConfig("type", this.table);
this.interval = Long.parseLong(readConfig("interval", "600000"));
this.batchSize = Integer.parseInt(readConfig("batchSize", "100"));
this.charset = Charset.forName(readConfig("charset", "UTF-8"));

if (this.interval <= 0) {
throw new IllegalArgumentException("The interval between runs must be at least 1 ms. The current config is set to "
Expand Down Expand Up @@ -151,6 +154,11 @@ public synchronized void start() {
this.logger.warn("Trying to start HBase stream although it is already running");
return;
}
this.parser = new Parser();
this.parser.setUncaughtExceptionHandler(this);

waitForESReady();

this.logger.info("Starting HBase Stream");
String mapping;
if (this.idField == null) {
Expand Down Expand Up @@ -189,18 +197,22 @@ else if (ExceptionsHelper.unwrapCause(e) instanceof ElasticSearchException) {
this.logger.debug("Mapping already exists for index {} and type {}", this.index, this.type);
}

this.parser = new Parser();
final Thread t = EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "hbase_slurper").newThread(this.parser);
t.setUncaughtExceptionHandler(this);
t.start();
this.parser.start();
}

private void waitForESReady() {
this.logger.info("Waiting for Index to be ready for interaction");
// TODO doesn't actually work, just times out after a while.
this.esClient.admin().cluster().prepareHealth(this.index).setWaitForGreenStatus().execute().actionGet();
this.logger.debug("ElastichSearch Index has reported it is ready for interaction");
}

/**
* This method is called by ElasticSearch when shutting down the river. The method will stop the thread and close all
* connections to HBase.
*/
@Override
public void close() {
public synchronized void close() {
this.logger.info("Closing HBase river");
if (this.parser instanceof Parser) {
((Parser) this.parser).stopThread();
Expand All @@ -209,21 +221,19 @@ public void close() {
}

/**
* Some of the asynchronous methods of the HBase client will throw Exceptions that are not caught anywhere else. If this
* happens, the entire river is shut down.
* Some of the asynchronous methods of the HBase client will throw Exceptions that are not caught anywhere else.
*/
@Override
public void uncaughtException(final Thread arg0, final Throwable arg1) {
this.logger.error("An Exception has been thrown in HBase Import Thread", arg1, (Object[]) null);
close();
}

/**
* A separate Thread that does the actual fetching and storing of data from an HBase cluster.
*
* @author Ravi Gairola
*/
private class Parser extends Thread implements ActionListener<BulkResponse> {
private class Parser extends Thread {
private static final String TIMESTMAP_STATS = "timestamp_stats";
private int indexCounter;
private HBaseClient client;
Expand All @@ -244,8 +254,8 @@ public void run() {
lastRun = System.currentTimeMillis();
try {
parse();
} catch (Exception e) {
HBaseRiver.this.logger.error("An exception has been caught while parsing data from HBase", e);
} catch (Throwable t) {
HBaseRiver.this.logger.error("An exception has been caught while parsing data from HBase", t);
}
if (!this.stopThread) {
HBaseRiver.this.logger.info("HBase Import Thread is waiting for {} Seconds until the next run",
Expand All @@ -270,25 +280,42 @@ public void run() {
*/
private void parse() throws InterruptedException, Exception {
HBaseRiver.this.logger.info("Parsing data from HBase");
this.client = new HBaseClient(HBaseRiver.this.hosts);
try {
this.client = new HBaseClient(HBaseRiver.this.hosts);
HBaseRiver.this.logger.debug("Checking if table {} actually exists in HBase DB", HBaseRiver.this.table);
this.client.ensureTableExists(HBaseRiver.this.table);
HBaseRiver.this.logger.debug("Fetching HBase Scanner");
this.scanner = this.client.newScanner(HBaseRiver.this.table);
this.scanner.setServerBlockCache(false);

final String timestamp = String.valueOf(setMinTimestamp(this.scanner));
HBaseRiver.this.logger.debug("Found latest timestamp in ElasticSearch to be {}", timestamp);

ArrayList<ArrayList<KeyValue>> rows;
HBaseRiver.this.logger.debug("Starting to fetch rows");
while ((rows = this.scanner.nextRows(HBaseRiver.this.batchSize).join()) != null) {
while ((rows = this.scanner.nextRows(HBaseRiver.this.batchSize).joinUninterruptibly()) != null) {
if (this.stopThread) {
HBaseRiver.this.logger.info("Stopping HBase import in the midle of it");
break;
}
parseBulkOfRows(timestamp, rows);
}
} finally {
stopThread();
HBaseRiver.this.logger.debug("Closing HBase Scanner and Async Client");
if (this.scanner != null) {
try {
this.scanner.close();
} catch (Exception e) {
HBaseRiver.this.logger.error("An Exception has been caught while closing the HBase Scanner", e, (Object[]) null);
}
}
if (this.client != null) {
try {
this.client.shutdown();
} catch (Exception e) {
HBaseRiver.this.logger.error("An Exception has been caught while shuting down the HBase client", e, (Object[]) null);
}
}
}
}

Expand All @@ -306,16 +333,23 @@ private void parseBulkOfRows(final String timestamp, final ArrayList<ArrayList<K
HBaseRiver.this.logger.info("Stopping HBase import in the midle of it");
break;
}
final IndexRequestBuilder request = HBaseRiver.this.esClient.prepareIndex(HBaseRiver.this.index, HBaseRiver.this.type);
request.setTimestamp(timestamp);
request.setSource(readDataTree(row));
if (HBaseRiver.this.idField == null && row.size() > 0) {
request.setId(new String(row.get(0).key(), charset));
if (row.size() > 0) {
final IndexRequestBuilder request = HBaseRiver.this.esClient.prepareIndex(HBaseRiver.this.index, HBaseRiver.this.type);
request.setSource(readDataTree(row));
request.setTimestamp(String.valueOf(row.get(0).timestamp()));
if (HBaseRiver.this.idField == null) {
request.setId(new String(row.get(0).key(), HBaseRiver.this.charset));
}
bulkRequest.add(request);
}
bulkRequest.add(request);
}
bulkRequest.execute().addListener((ActionListener<BulkResponse>) this);
HBaseRiver.this.logger.debug("Sent Bulk Request with HBase data to ElasticSearch");
final BulkResponse response = bulkRequest.execute().actionGet();

this.indexCounter += response.items().length;
HBaseRiver.this.logger.info("HBase imported has indexed {} entries so far", this.indexCounter);
if (response.hasFailures()) {
HBaseRiver.this.logger.error("Errors have occured while trying to index new data from HBase");
}
}

/**
Expand All @@ -329,9 +363,9 @@ private void parseBulkOfRows(final String timestamp, final ArrayList<ArrayList<K
private Map<String, Object> readDataTree(final ArrayList<KeyValue> row) {
final Map<String, Object> dataTree = new HashMap<String, Object>();
for (final KeyValue column : row) {
final String family = new String(column.family(), charset);
final String qualifier = new String(column.qualifier(), charset);
final String value = new String(column.value(), charset);
final String family = new String(column.family(), HBaseRiver.this.charset);
final String qualifier = new String(column.qualifier(), HBaseRiver.this.charset);
final String value = new String(column.value(), HBaseRiver.this.charset);
if (!dataTree.containsKey(family)) {
dataTree.put(family, new HashMap<String, Object>());
}
Expand All @@ -345,20 +379,6 @@ private Map<String, Object> readDataTree(final ArrayList<KeyValue> row) {
*/
public synchronized void stopThread() {
this.stopThread = true;
if (this.scanner != null) {
try {
this.scanner.close();
} catch (Exception e) {
HBaseRiver.this.logger.error("An Exception has been caught while closing the HBase Scanner", e, (Object[]) null);
}
}
if (this.client != null) {
try {
this.client.shutdown();
} catch (Exception e) {
HBaseRiver.this.logger.error("An Exception has been caught while shuting down the HBase client", e, (Object[]) null);
}
}
}

/**
Expand Down Expand Up @@ -386,25 +406,5 @@ private long setMinTimestamp(final Scanner scanner) {
scanner.setMinTimestamp(0);
return 0L;
}

/**
* Elasticsearch Response handler.
*/
@Override
public void onResponse(final BulkResponse response) {
this.indexCounter += response.items().length;
HBaseRiver.this.logger.info("HBase imported has indexed {} entries so far", this.indexCounter);
if (response.hasFailures()) {
HBaseRiver.this.logger.error("Errors have occured while trying to index new data from HBase");
}
}

/**
* Elasticsearch Failure handler.
*/
@Override
public void onFailure(final Throwable e) {
HBaseRiver.this.logger.error("An error has been caught while trying to index new data from HBase", e, new Object[] {});
}
}
}

0 comments on commit 7b27db4

Please sign in to comment.