Skip to content

Commit

Permalink
- Added a logger via error callback to all HBase operations
Browse files Browse the repository at this point in the history
  • Loading branch information
mallocator committed Apr 15, 2013
1 parent 2ef8ee6 commit f330b74
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.elasticsearch.river.hbase;

import org.elasticsearch.common.logging.ESLogger;

import com.stumbleupon.async.Callback;

/**
* A small helper class that will log any responses from HBase, in case there are any.
*
* @author Ravi Gairola
*/
public class HBaseCallbackLogger implements Callback<Object, Object> {
private final ESLogger logger;
private final String realm;

public HBaseCallbackLogger(final ESLogger logger, final String realm) {
this.logger = logger;
this.realm = realm;
}

@Override
public Object call(final Object arg) throws Exception {
if (arg instanceof Throwable) {
this.logger.error("An async error has been caught within {}:", (Throwable) arg, this.realm);
}
else {
this.logger.trace("Got response from HBase in realm {}: {}", this.realm, arg);
}
return arg;
}
}
26 changes: 14 additions & 12 deletions src/main/java/org/elasticsearch/river/hbase/HBaseParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,19 @@
* @author Ravi Gairola
*/
class HBaseParser implements Runnable {
private static final String TIMESTMAP_STATS = "timestamp_stats";
private final HBaseRiver river;
private final ESLogger logger;
private int indexCounter;
private HBaseClient client;
private Scanner scanner;
private boolean stopThread;
private static final String TIMESTMAP_STATS = "timestamp_stats";
private final HBaseRiver river;
private final ESLogger logger;
private final HBaseCallbackLogger cbLogger;
private int indexCounter;
private HBaseClient client;
private Scanner scanner;
private boolean stopThread;

HBaseParser(final HBaseRiver river) {
this.river = river;
this.logger = river.getLogger();
this.cbLogger = new HBaseCallbackLogger(this.logger, "HBase Parser");
}

/**
Expand Down Expand Up @@ -79,7 +81,7 @@ protected void parse() throws InterruptedException, Exception {
try {
this.client = new HBaseClient(this.river.getHosts());
this.logger.debug("Checking if table {} actually exists in HBase DB", this.river.getTable());
this.client.ensureTableExists(this.river.getTable());
this.client.ensureTableExists(this.river.getTable()).addErrback(this.cbLogger);
this.logger.debug("Fetching HBase Scanner");
this.scanner = this.client.newScanner(this.river.getTable());
this.scanner.setServerBlockCache(false);
Expand All @@ -96,7 +98,7 @@ protected void parse() throws InterruptedException, Exception {
ArrayList<ArrayList<KeyValue>> rows;
this.logger.debug("Starting to fetch rows");

while ((rows = this.scanner.nextRows(this.river.getBatchSize()).joinUninterruptibly()) != null) {
while ((rows = this.scanner.nextRows(this.river.getBatchSize()).addErrback(this.cbLogger).joinUninterruptibly()) != null) {
if (this.stopThread) {
this.logger.info("Stopping HBase import in the midle of it");
break;
Expand All @@ -107,14 +109,14 @@ protected void parse() throws InterruptedException, Exception {
this.logger.debug("Closing HBase Scanner and Async Client");
if (this.scanner != null) {
try {
this.scanner.close();
this.scanner.close().addErrback(this.cbLogger);
} catch (Exception e) {
this.logger.error("An Exception has been caught while closing the HBase Scanner", e, (Object[]) null);
}
}
if (this.client != null) {
try {
this.client.shutdown();
this.client.shutdown().addErrback(this.cbLogger);
} catch (Exception e) {
this.logger.error("An Exception has been caught while shuting down the HBase client", e, (Object[]) null);
}
Expand Down Expand Up @@ -145,7 +147,7 @@ protected void parseBulkOfRows(final ArrayList<ArrayList<KeyValue>> rows) {
}
bulkRequest.add(request);
if (this.river.getDeleteOld()) {
this.client.delete(new DeleteRequest(this.river.getTable().getBytes(), key));
this.client.delete(new DeleteRequest(this.river.getTable().getBytes(), key)).addErrback(this.cbLogger);
}
}
}
Expand Down

0 comments on commit f330b74

Please sign in to comment.