diff --git a/src/main/java/org/elasticsearch/river/hbase/HBaseCallbackLogger.java b/src/main/java/org/elasticsearch/river/hbase/HBaseCallbackLogger.java new file mode 100644 index 0000000..2a4087d --- /dev/null +++ b/src/main/java/org/elasticsearch/river/hbase/HBaseCallbackLogger.java @@ -0,0 +1,31 @@ +package org.elasticsearch.river.hbase; + +import org.elasticsearch.common.logging.ESLogger; + +import com.stumbleupon.async.Callback; + +/** + * A small helper class that will log any responses from HBase, in case there are any. + * + * @author Ravi Gairola + */ +public class HBaseCallbackLogger implements Callback { + private final ESLogger logger; + private final String realm; + + public HBaseCallbackLogger(final ESLogger logger, final String realm) { + this.logger = logger; + this.realm = realm; + } + + @Override + public Object call(final Object arg) throws Exception { + if (arg instanceof Throwable) { + this.logger.error("An async error has been caught from HBase within {}:", (Throwable) arg, this.realm); + } + else { + this.logger.trace("Got response from HBase within {}: {}", this.realm, arg); + } + return arg; + } +} diff --git a/src/main/java/org/elasticsearch/river/hbase/HBaseParser.java b/src/main/java/org/elasticsearch/river/hbase/HBaseParser.java index aab88dc..c16febe 100644 --- a/src/main/java/org/elasticsearch/river/hbase/HBaseParser.java +++ b/src/main/java/org/elasticsearch/river/hbase/HBaseParser.java @@ -2,8 +2,11 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequestBuilder; @@ -23,17 +26,19 @@ * @author Ravi Gairola */ class HBaseParser implements Runnable { - private static final String TIMESTMAP_STATS = "timestamp_stats"; - private final HBaseRiver river; - private final ESLogger logger; - private int indexCounter; - private HBaseClient client; - private Scanner scanner; - private boolean stopThread; + private static final String TIMESTMAP_STATS = "timestamp_stats"; + private final HBaseRiver river; + private final ESLogger logger; + private final HBaseCallbackLogger cbLogger; + private int indexCounter; + private HBaseClient client; + private Scanner scanner; + private boolean stopThread; HBaseParser(final HBaseRiver river) { this.river = river; this.logger = river.getLogger(); + this.cbLogger = new HBaseCallbackLogger(this.logger, "HBase Parser"); } /** @@ -79,7 +84,7 @@ protected void parse() throws InterruptedException, Exception { try { this.client = new HBaseClient(this.river.getHosts()); this.logger.debug("Checking if table {} actually exists in HBase DB", this.river.getTable()); - this.client.ensureTableExists(this.river.getTable()); + this.client.ensureTableExists(this.river.getTable()).addErrback(this.cbLogger); this.logger.debug("Fetching HBase Scanner"); this.scanner = this.client.newScanner(this.river.getTable()); this.scanner.setServerBlockCache(false); @@ -96,7 +101,7 @@ protected void parse() throws InterruptedException, Exception { ArrayList> rows; this.logger.debug("Starting to fetch rows"); - while ((rows = this.scanner.nextRows(this.river.getBatchSize()).joinUninterruptibly()) != null) { + while ((rows = this.scanner.nextRows(this.river.getBatchSize()).addErrback(this.cbLogger).joinUninterruptibly()) != null) { if (this.stopThread) { this.logger.info("Stopping HBase import in the midle of it"); break; @@ -107,14 +112,14 @@ protected void parse() throws InterruptedException, Exception { this.logger.debug("Closing HBase Scanner and Async Client"); if (this.scanner != null) { try { - this.scanner.close(); + this.scanner.close().addErrback(this.cbLogger); } catch (Exception e) { this.logger.error("An Exception has been caught while closing the HBase Scanner", e, (Object[]) null); } } if (this.client != null) { try { - this.client.shutdown(); + this.client.shutdown().addErrback(this.cbLogger); } catch (Exception e) { this.logger.error("An Exception has been caught while shuting down the HBase client", e, (Object[]) null); } @@ -130,6 +135,7 @@ protected void parse() throws InterruptedException, Exception { protected void parseBulkOfRows(final ArrayList> rows) { this.logger.debug("Processing the next {} entries in HBase parsing process", rows.size()); final BulkRequestBuilder bulkRequest = this.river.getEsClient().prepareBulk(); + final Map keyMapForDeletion = new HashMap(); for (final ArrayList row : rows) { if (this.stopThread) { this.logger.info("Stopping HBase import in the midle of it"); @@ -138,24 +144,53 @@ protected void parseBulkOfRows(final ArrayList> rows) { if (row.size() > 0) { final IndexRequestBuilder request = this.river.getEsClient().prepareIndex(this.river.getIndex(), this.river.getType()); final byte[] key = row.get(0).key(); - request.setSource(readDataTree(row)); + final Map dataTree = readDataTree(row); + request.setSource(dataTree); request.setTimestamp(String.valueOf(row.get(0).timestamp())); if (this.river.getIdField() == null) { - request.setId(new String(key, this.river.getCharset())); + final String keyString = new String(key, this.river.getCharset()); + request.setId(keyString); + keyMapForDeletion.put(keyString, key); } - bulkRequest.add(request); - if (this.river.getDeleteOld()) { - this.client.delete(new DeleteRequest(this.river.getTable().getBytes(), key)); + else { + final String keyString = findKeyInDataTree(dataTree, this.river.getIdField()); + keyMapForDeletion.put(keyString, key); } + bulkRequest.add(request); } } final BulkResponse response = bulkRequest.execute().actionGet(); this.indexCounter += response.items().length; this.logger.info("HBase river has indexed {} entries so far", this.indexCounter); + final List failedKeys = new ArrayList(); if (response.hasFailures()) { + for (BulkItemResponse r : response.items()) { + if (r.failed()) { + failedKeys.add(keyMapForDeletion.remove(r.getId())); + } + } this.logger.error("Errors have occured while trying to index new data from HBase"); + this.logger.debug("Failed keys are {}", failedKeys); + } + if (this.river.getDeleteOld()) { + for (Entry keyEntry : keyMapForDeletion.entrySet()) { + this.client.delete(new DeleteRequest(this.river.getTable().getBytes(), keyEntry.getValue())).addErrback(this.cbLogger); + } + } + } + + @SuppressWarnings("unchecked") + protected String findKeyInDataTree(final Map dataTree, final String keyPath) { + if (!keyPath.contains(this.river.getColumnSeparator())) { + return (String) dataTree.get(keyPath); + } + final String key = keyPath.substring(0, keyPath.indexOf(this.river.getColumnSeparator())); + if (dataTree.get(key) instanceof Map) { + final int subKeyIndex = keyPath.indexOf(this.river.getColumnSeparator()) + this.river.getColumnSeparator().length(); + return findKeyInDataTree((Map) dataTree.get(key), keyPath.substring(subKeyIndex)); } + return null; } /** diff --git a/src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java b/src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java index 97240a5..60fef80 100644 --- a/src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java +++ b/src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java @@ -109,7 +109,9 @@ public class HBaseRiver extends AbstractRiverComponent implements River, Uncaugh * } * * - * If no separator is defined, or the separator is empty, no operation is performed. + * If no separator is defined, or the separator is empty, no operation is performed. Try to use single character + * separators, as multi character separators will allow partial hits of a separator to be part of the data. (e.g. A + * separator defined as "()" will leave all "(" and ")" in the parsed data. */ public final String columnSeparator; @@ -220,6 +222,10 @@ public synchronized void start() { if (this.idField == null) { mapping = "{\"" + this.type + "\":{\"_timestamp\":{\"enabled\":true}}}"; } + if (this.columnSeparator != null) { + mapping = "{\"" + this.type + "\":{\"_timestamp\":{\"enabled\":true},\"_id\":{\"path\":\"" + + this.idField.replace(this.columnSeparator, ".") + "\"}}}"; + } else { mapping = "{\"" + this.type + "\":{\"_timestamp\":{\"enabled\":true},\"_id\":{\"path\":\"" + this.idField + "\"}}}"; } @@ -309,7 +315,14 @@ public String normalizeField(final String fieldName) { if (!isNormalizeFields() || fieldName == null) { return fieldName; } - return fieldName.toLowerCase().replaceAll("[^a-z0-9\\-_]*", ""); + if (getColumnSeparator() != null) { + String regex = "a-z0-9\\-_"; + for (int i = 0; i < getColumnSeparator().length(); i++) { + regex += "\\" + getColumnSeparator().charAt(i); + } + return fieldName.toLowerCase().replaceAll("[^" + regex + "]", ""); + } + return fieldName.toLowerCase().replaceAll("[^a-z0-9\\-_]", ""); } public boolean isNormalizeFields() { diff --git a/src/test/java/org/elasticsearch/river/hbase/HBaseParserTest.java b/src/test/java/org/elasticsearch/river/hbase/HBaseParserTest.java index 603b02a..e0b620e 100644 --- a/src/test/java/org/elasticsearch/river/hbase/HBaseParserTest.java +++ b/src/test/java/org/elasticsearch/river/hbase/HBaseParserTest.java @@ -1,5 +1,8 @@ package org.elasticsearch.river.hbase; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; @@ -20,6 +23,11 @@ import org.testng.annotations.Test; public class HBaseParserTest { + @AfterClass + public void tearDown() { + Mockit.tearDownMocks(); + } + public class ReadQualifierStructureTest { public String separator; public boolean normalize; @@ -46,11 +54,6 @@ boolean isNormalizeFields() { }; } - @AfterClass - public void tearDown() { - Mockit.tearDownMocks(); - } - @SuppressWarnings("unchecked") @Test public void testBase() throws Exception { @@ -184,11 +187,6 @@ boolean isNormalizeFields() { }; } - @AfterClass - public void tearDown() { - Mockit.tearDownMocks(); - } - @Test @SuppressWarnings("unchecked") public void testBase() { @@ -206,19 +204,19 @@ public void testBase() { final Map output = parser.readDataTree(input); - Assert.assertNotNull(output.get("family1")); + assertNotNull(output.get("family1")); final Map family1 = (Map) output.get("family1"); - Assert.assertEquals(family1.get("category1"), "value1"); - Assert.assertEquals(family1.get("category2"), "value2"); - Assert.assertEquals(family1.get("category3"), "value3"); - Assert.assertNotNull(output.get("family2")); + assertEquals(family1.get("category1"), "value1"); + assertEquals(family1.get("category2"), "value2"); + assertEquals(family1.get("category3"), "value3"); + assertNotNull(output.get("family2")); final Map family2 = (Map) output.get("family2"); - Assert.assertEquals(family2.get("category1"), "value4"); - Assert.assertEquals(family2.get("category4"), "value5"); - Assert.assertEquals(family2.get("category6"), "value7"); - Assert.assertNotNull(output.get("family3")); + assertEquals(family2.get("category1"), "value4"); + assertEquals(family2.get("category4"), "value5"); + assertEquals(family2.get("category6"), "value7"); + assertNotNull(output.get("family3")); final Map family3 = (Map) output.get("family3"); - Assert.assertEquals(family3.get("category5"), "value6"); + assertEquals(family3.get("category5"), "value6"); } private KeyValue getKeyValue(final String family, final String qualifier, final String value) { @@ -228,4 +226,59 @@ private KeyValue getKeyValue(final String family, final String qualifier, final value.getBytes(this.charset)); } } + + public class FindKeyInDataTreeTest { + protected String separator; + protected boolean normalize; + + @BeforeClass + public void setUp() { + new MockUp() { + @Mock + void $init(final RiverName riverName, final RiverSettings settings) {} + }; + + new MockUp() { + + @Mock + void $init(final RiverName riverName, final RiverSettings settings, final Client esClient) {} + + @Mock + String getColumnSeparator() { + return FindKeyInDataTreeTest.this.separator; + } + + @Mock + boolean isNormalizeFields() { + return FindKeyInDataTreeTest.this.normalize; + } + }; + } + + @Test + public void testBase() { + final HBaseParser parser = new HBaseParser(new HBaseRiver(null, null, null)); + this.separator = "::"; + + final Map dataTree = new HashMap(); + final Map dataBranch = new HashMap(); + dataBranch.put("theId", "TheValue"); + dataTree.put("aBranch", dataBranch); + + assertEquals(parser.findKeyInDataTree(dataTree, "aBranch::theId"), "TheValue"); + } + + @Test + public void testDotSeparator() { + final HBaseParser parser = new HBaseParser(new HBaseRiver(null, null, null)); + this.separator = "."; + + final Map dataTree = new HashMap(); + final Map dataBranch = new HashMap(); + dataBranch.put("theId", "TheValue"); + dataTree.put("aBranch", dataBranch); + + assertEquals(parser.findKeyInDataTree(dataTree, "aBranch.theId"), "TheValue"); + } + } } diff --git a/src/test/java/org/elasticsearch/river/hbase/HBaseRiverTest.java b/src/test/java/org/elasticsearch/river/hbase/HBaseRiverTest.java index 94f951d..dc93212 100644 --- a/src/test/java/org/elasticsearch/river/hbase/HBaseRiverTest.java +++ b/src/test/java/org/elasticsearch/river/hbase/HBaseRiverTest.java @@ -25,6 +25,11 @@ public void testNormalizeField() { boolean isNormalizeFields() { return true; } + + @Mock + String getColumnSeparator() { + return "::"; + } }; final HBaseRiver river = new HBaseRiver(null, null, null); @@ -37,7 +42,8 @@ boolean isNormalizeFields() { Assert.assertEquals(river.normalizeField("a-b"), "a-b"); Assert.assertEquals(river.normalizeField("a_b"), "a_b"); Assert.assertEquals(river.normalizeField("90aS"), "90as"); - Assert.assertEquals(river.normalizeField("&*($@#!ui^&$(#:\"8ui"), "ui8ui"); + Assert.assertEquals(river.normalizeField("&*($@#!ui^&$(#\"8ui"), "ui8ui"); + Assert.assertEquals(river.normalizeField("bl%^&*ah::blubb"), "blah::blubb"); Assert.assertEquals(river.normalizeField(null), null); } }