From f330b74b4b2dbd6af79f17bbc6670874e2236d2b Mon Sep 17 00:00:00 2001 From: mallocator Date: Mon, 15 Apr 2013 09:39:41 -0700 Subject: [PATCH] - Added a logger via error callback to all HBase operations --- .../river/hbase/HBaseCallbackLogger.java | 31 +++++++++++++++++++ .../river/hbase/HBaseParser.java | 26 +++++++++------- 2 files changed, 45 insertions(+), 12 deletions(-) create mode 100644 src/main/java/org/elasticsearch/river/hbase/HBaseCallbackLogger.java diff --git a/src/main/java/org/elasticsearch/river/hbase/HBaseCallbackLogger.java b/src/main/java/org/elasticsearch/river/hbase/HBaseCallbackLogger.java new file mode 100644 index 0000000..fa77269 --- /dev/null +++ b/src/main/java/org/elasticsearch/river/hbase/HBaseCallbackLogger.java @@ -0,0 +1,31 @@ +package org.elasticsearch.river.hbase; + +import org.elasticsearch.common.logging.ESLogger; + +import com.stumbleupon.async.Callback; + +/** + * A small helper class that will log any responses from HBase, in case there are any. + * + * @author Ravi Gairola + */ +public class HBaseCallbackLogger implements Callback { + private final ESLogger logger; + private final String realm; + + public HBaseCallbackLogger(final ESLogger logger, final String realm) { + this.logger = logger; + this.realm = realm; + } + + @Override + public Object call(final Object arg) throws Exception { + if (arg instanceof Throwable) { + this.logger.error("An async error has been caught within {}:", (Throwable) arg, this.realm); + } + else { + this.logger.trace("Got response from HBase in realm {}: {}", this.realm, arg); + } + return arg; + } +} diff --git a/src/main/java/org/elasticsearch/river/hbase/HBaseParser.java b/src/main/java/org/elasticsearch/river/hbase/HBaseParser.java index aab88dc..ac3b6a1 100644 --- a/src/main/java/org/elasticsearch/river/hbase/HBaseParser.java +++ b/src/main/java/org/elasticsearch/river/hbase/HBaseParser.java @@ -23,17 +23,19 @@ * @author Ravi Gairola */ class HBaseParser implements Runnable { - private static final String TIMESTMAP_STATS = "timestamp_stats"; - private final HBaseRiver river; - private final ESLogger logger; - private int indexCounter; - private HBaseClient client; - private Scanner scanner; - private boolean stopThread; + private static final String TIMESTMAP_STATS = "timestamp_stats"; + private final HBaseRiver river; + private final ESLogger logger; + private final HBaseCallbackLogger cbLogger; + private int indexCounter; + private HBaseClient client; + private Scanner scanner; + private boolean stopThread; HBaseParser(final HBaseRiver river) { this.river = river; this.logger = river.getLogger(); + this.cbLogger = new HBaseCallbackLogger(this.logger, "HBase Parser"); } /** @@ -79,7 +81,7 @@ protected void parse() throws InterruptedException, Exception { try { this.client = new HBaseClient(this.river.getHosts()); this.logger.debug("Checking if table {} actually exists in HBase DB", this.river.getTable()); - this.client.ensureTableExists(this.river.getTable()); + this.client.ensureTableExists(this.river.getTable()).addErrback(this.cbLogger); this.logger.debug("Fetching HBase Scanner"); this.scanner = this.client.newScanner(this.river.getTable()); this.scanner.setServerBlockCache(false); @@ -96,7 +98,7 @@ protected void parse() throws InterruptedException, Exception { ArrayList> rows; this.logger.debug("Starting to fetch rows"); - while ((rows = this.scanner.nextRows(this.river.getBatchSize()).joinUninterruptibly()) != null) { + while ((rows = this.scanner.nextRows(this.river.getBatchSize()).addErrback(this.cbLogger).joinUninterruptibly()) != null) { if (this.stopThread) { this.logger.info("Stopping HBase import in the midle of it"); break; @@ -107,14 +109,14 @@ protected void parse() throws InterruptedException, Exception { this.logger.debug("Closing HBase Scanner and Async Client"); if (this.scanner != null) { try { - this.scanner.close(); + this.scanner.close().addErrback(this.cbLogger); } catch (Exception e) { this.logger.error("An Exception has been caught while closing the HBase Scanner", e, (Object[]) null); } } if (this.client != null) { try { - this.client.shutdown(); + this.client.shutdown().addErrback(this.cbLogger); } catch (Exception e) { this.logger.error("An Exception has been caught while shuting down the HBase client", e, (Object[]) null); } @@ -145,7 +147,7 @@ protected void parseBulkOfRows(final ArrayList> rows) { } bulkRequest.add(request); if (this.river.getDeleteOld()) { - this.client.delete(new DeleteRequest(this.river.getTable().getBytes(), key)); + this.client.delete(new DeleteRequest(this.river.getTable().getBytes(), key)).addErrback(this.cbLogger); } } }