Skip to content

Commit

Permalink
- River with HBase async client that doesn't seem to work, because it
Browse files Browse the repository at this point in the history
wants to connect to localhost, when really the hdfs is somewhere else
  • Loading branch information
mallocator committed Mar 15, 2013
1 parent 6af5389 commit 08aa0df
Showing 1 changed file with 83 additions and 38 deletions.
121 changes: 83 additions & 38 deletions src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -30,7 +30,6 @@

public class HBaseRiver extends AbstractRiverComponent implements River, UncaughtExceptionHandler {
private final Client esClient;
private boolean stopThread;
private volatile Thread thread;

private final String hosts;
Expand All @@ -45,9 +44,9 @@ public class HBaseRiver extends AbstractRiverComponent implements River, Uncaugh
public HBaseRiver(final RiverName riverName, final RiverSettings settings, final Client esClient) {
super(riverName, settings);
this.esClient = esClient;
this.logger.info("Creating MySQL Stream River");
this.logger.info("Creating HBase Stream River");

this.hosts = readConfig("hosts", readConfig("zookeeper"));
this.hosts = readConfig("hosts", readConfig("zookeeper", null));
this.table = readConfig("table");
this.idField = readConfig("idField", null);
this.index = readConfig("index", riverName.name());
Expand All @@ -67,16 +66,20 @@ private String readConfig(final String config) {

@SuppressWarnings({ "unchecked" })
private String readConfig(final String config, final String defaultValue) {
if (this.settings.settings().containsKey("mysql")) {
Map<String, Object> mysqlSettings = (Map<String, Object>) this.settings.settings().get("mysql");
if (this.settings.settings().containsKey("hbase")) {
Map<String, Object> mysqlSettings = (Map<String, Object>) this.settings.settings().get("hbase");
return XContentMapValues.nodeStringValue(mysqlSettings.get(config), defaultValue);
}
return defaultValue;
}

@Override
public void start() {
this.logger.info("starting hbase stream");
public synchronized void start() {
if (this.thread != null) {
this.logger.warn("Trying to start HBase stream although it is already running");
return;
}
this.logger.info("Starting HBase Stream");
String mapping;
if (this.idField == null) {
mapping = "{\"" + this.type + "\":{\"_timestamp\":{\"enabled\":true}}}";
Expand Down Expand Up @@ -114,36 +117,38 @@ else if (ExceptionsHelper.unwrapCause(e) instanceof ElasticSearchException) {
this.logger.debug("Mapping already exists for index {} and type {}", this.index, this.type);
}

if (this.thread == null) {
this.thread = EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "hbase_slurper").newThread(new Parser());
this.thread.setUncaughtExceptionHandler(this);
this.thread.start();
}
this.thread = EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "hbase_slurper").newThread(new Parser());
this.thread.setUncaughtExceptionHandler(this);
this.thread.start();
}

@Override
public void close() {
this.logger.info("Closing HBase river");
this.stopThread = true;
((Parser) this.thread).stopThread();
this.thread = null;
}

@Override
public void uncaughtException(final Thread arg0, final Throwable arg1) {
this.logger.error("An Exception has been thrown in HBase Import Thread", arg1);
this.logger.error("An Exception has been thrown in HBase Import Thread", arg1, (Object[]) null);
close();
}

private class Parser extends Thread implements ActionListener<BulkResponse> {
private int indexCounter;
private static final String TIMESTMAP_STATS = "timestamp_stats";
private int indexCounter;
private HBaseClient client;
private Scanner scanner;
private boolean stopThread;

private Parser() {}

@Override
public void run() {
HBaseRiver.this.logger.info("HBase Import Thread has started");
long lastRun = 0;
while (!HBaseRiver.this.stopThread) {
while (!this.stopThread) {
if (lastRun + HBaseRiver.this.interval < System.currentTimeMillis()) {
lastRun = System.currentTimeMillis();
try {
Expand All @@ -154,7 +159,7 @@ public void run() {
if (HBaseRiver.this.interval <= 0) {
break;
}
if (!HBaseRiver.this.stopThread) {
if (!this.stopThread) {
HBaseRiver.this.logger.info("HBase Import Thread is waiting for {} Seconds until the next run",
HBaseRiver.this.interval / 1000);
}
Expand All @@ -167,24 +172,58 @@ public void run() {
}

private void parse() throws InterruptedException, Exception {
final HBaseClient client = new HBaseClient(HBaseRiver.this.hosts);
client.ensureTableExists(HBaseRiver.this.table);
final Scanner scanner = client.newScanner(HBaseRiver.this.table);
setMinTimestamp(scanner);

ArrayList<ArrayList<KeyValue>> rows;
while ((rows = scanner.nextRows(HBaseRiver.this.batchSize).join()) != null) {
final BulkRequestBuilder bulkRequest = HBaseRiver.this.esClient.prepareBulk();
for (final ArrayList<KeyValue> row : rows) {
final IndexRequest request = new IndexRequest(HBaseRiver.this.index, HBaseRiver.this.type);
for (final KeyValue column : row) {
final String key = String.valueOf(column.key());
final String value = String.valueOf(column.value());
request.source(key, value);
HBaseRiver.this.logger.info("Parsing data from HBase");
this.client = new HBaseClient(HBaseRiver.this.hosts);
try {
HBaseRiver.this.logger.debug("Checking if table {} actually exists in HBase DB", HBaseRiver.this.table);
this.client.ensureTableExists(HBaseRiver.this.table);
HBaseRiver.this.logger.debug("Fetching HBase Scanner");
this.scanner = this.client.newScanner(HBaseRiver.this.table);
setMinTimestamp(this.scanner);
ArrayList<ArrayList<KeyValue>> rows;
HBaseRiver.this.logger.debug("Starting to fetch rows");
while ((rows = this.scanner.nextRows(HBaseRiver.this.batchSize).join()) != null) {
if (this.stopThread) {
HBaseRiver.this.logger.info("Stopping HBase import in the midle of it");
break;
}
HBaseRiver.this.logger.debug("Processing the next {} entries in HBase parsing process", rows.size());
final BulkRequestBuilder bulkRequest = HBaseRiver.this.esClient.prepareBulk();
for (final ArrayList<KeyValue> row : rows) {
if (this.stopThread) {
HBaseRiver.this.logger.info("Stopping HBase import in the midle of it");
break;
}
final IndexRequestBuilder request = HBaseRiver.this.esClient.prepareIndex(HBaseRiver.this.index,
HBaseRiver.this.type);
for (final KeyValue column : row) {
request.setSource(column.key().toString(), column.value().toString());
}
bulkRequest.add(request);
}
bulkRequest.add(request);
bulkRequest.execute().addListener((ActionListener<BulkResponse>) this);
HBaseRiver.this.logger.debug("Sent Bulk Request with HBase data to ElasticSearch");
}
} finally {
stopThread();
}
}

public synchronized void stopThread() {
this.stopThread = true;
if (this.scanner != null) {
try {
this.scanner.close();
} catch (Exception e) {
HBaseRiver.this.logger.error("An Exception has been caught while closing the HBase Scanner", e, (Object[]) null);
}
}
if (this.client != null) {
try {
this.client.shutdown();
} catch (Exception e) {
HBaseRiver.this.logger.error("An Exception has been caught while shuting down the HBase client", e, (Object[]) null);
}
bulkRequest.execute().addListener((ActionListener<BulkResponse>) this);
}
}

Expand All @@ -194,16 +233,22 @@ private void parse() throws InterruptedException, Exception {
* @param scanner
*/
private void setMinTimestamp(final Scanner scanner) {
HBaseRiver.this.logger.debug("Looking into ElasticSearch to determine timestamp of last import");
final SearchResponse response = HBaseRiver.this.esClient.prepareSearch(HBaseRiver.this.index)
.setTypes(HBaseRiver.this.type)
.setQuery(QueryBuilders.matchAllQuery())
.addFacet(FacetBuilders.statisticalFacet("timestmap_stats").field("_timestamp"))
.addFacet(FacetBuilders.statisticalFacet(TIMESTMAP_STATS).field("_timestamp"))
.execute()
.actionGet();

if (!response.facets().facets().isEmpty()) {
final StatisticalFacet facet = (StatisticalFacet) response.facets().facet("timestmap_stats");
scanner.setMinTimestamp((long) facet.getMax());
if (response.facets().facet(TIMESTMAP_STATS) != null) {
HBaseRiver.this.logger.debug("Got statistical data from ElasticSearch about data timestamps");
final StatisticalFacet facet = (StatisticalFacet) response.facets().facet(TIMESTMAP_STATS);
scanner.setMinTimestamp((long) Math.max(facet.getMax(), 0));
}
else {
HBaseRiver.this.logger.debug("No statistical data about data timestamps could be found -> probably no data there yet");
scanner.setMinTimestamp(0);
}
}

Expand Down

0 comments on commit 08aa0df

Please sign in to comment.