Skip to content

Commit

Permalink
- Moved deletion to after the ElasticSearch operation was done
Browse files Browse the repository at this point in the history
- Only not failed imported keys are now deleted
- Column separator now works better with field normalization
- ID field now finds the nested property in mapping
  • Loading branch information
mallocator committed Apr 18, 2013
1 parent 2ef8ee6 commit 1bca36c
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.elasticsearch.river.hbase;

import org.elasticsearch.common.logging.ESLogger;

import com.stumbleupon.async.Callback;

/**
* A small helper class that will log any responses from HBase, in case there are any.
*
* @author Ravi Gairola
*/
public class HBaseCallbackLogger implements Callback<Object, Object> {
private final ESLogger logger;
private final String realm;

public HBaseCallbackLogger(final ESLogger logger, final String realm) {
this.logger = logger;
this.realm = realm;
}

@Override
public Object call(final Object arg) throws Exception {
if (arg instanceof Throwable) {
this.logger.error("An async error has been caught from HBase within {}:", (Throwable) arg, this.realm);
}
else {
this.logger.trace("Got response from HBase within {}: {}", this.realm, arg);
}
return arg;
}
}
67 changes: 51 additions & 16 deletions src/main/java/org/elasticsearch/river/hbase/HBaseParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
Expand All @@ -23,17 +26,19 @@
* @author Ravi Gairola
*/
class HBaseParser implements Runnable {
private static final String TIMESTMAP_STATS = "timestamp_stats";
private final HBaseRiver river;
private final ESLogger logger;
private int indexCounter;
private HBaseClient client;
private Scanner scanner;
private boolean stopThread;
private static final String TIMESTMAP_STATS = "timestamp_stats";
private final HBaseRiver river;
private final ESLogger logger;
private final HBaseCallbackLogger cbLogger;
private int indexCounter;
private HBaseClient client;
private Scanner scanner;
private boolean stopThread;

HBaseParser(final HBaseRiver river) {
this.river = river;
this.logger = river.getLogger();
this.cbLogger = new HBaseCallbackLogger(this.logger, "HBase Parser");
}

/**
Expand Down Expand Up @@ -79,7 +84,7 @@ protected void parse() throws InterruptedException, Exception {
try {
this.client = new HBaseClient(this.river.getHosts());
this.logger.debug("Checking if table {} actually exists in HBase DB", this.river.getTable());
this.client.ensureTableExists(this.river.getTable());
this.client.ensureTableExists(this.river.getTable()).addErrback(this.cbLogger);
this.logger.debug("Fetching HBase Scanner");
this.scanner = this.client.newScanner(this.river.getTable());
this.scanner.setServerBlockCache(false);
Expand All @@ -96,7 +101,7 @@ protected void parse() throws InterruptedException, Exception {
ArrayList<ArrayList<KeyValue>> rows;
this.logger.debug("Starting to fetch rows");

while ((rows = this.scanner.nextRows(this.river.getBatchSize()).joinUninterruptibly()) != null) {
while ((rows = this.scanner.nextRows(this.river.getBatchSize()).addErrback(this.cbLogger).joinUninterruptibly()) != null) {
if (this.stopThread) {
this.logger.info("Stopping HBase import in the midle of it");
break;
Expand All @@ -107,14 +112,14 @@ protected void parse() throws InterruptedException, Exception {
this.logger.debug("Closing HBase Scanner and Async Client");
if (this.scanner != null) {
try {
this.scanner.close();
this.scanner.close().addErrback(this.cbLogger);
} catch (Exception e) {
this.logger.error("An Exception has been caught while closing the HBase Scanner", e, (Object[]) null);
}
}
if (this.client != null) {
try {
this.client.shutdown();
this.client.shutdown().addErrback(this.cbLogger);
} catch (Exception e) {
this.logger.error("An Exception has been caught while shuting down the HBase client", e, (Object[]) null);
}
Expand All @@ -130,6 +135,7 @@ protected void parse() throws InterruptedException, Exception {
protected void parseBulkOfRows(final ArrayList<ArrayList<KeyValue>> rows) {
this.logger.debug("Processing the next {} entries in HBase parsing process", rows.size());
final BulkRequestBuilder bulkRequest = this.river.getEsClient().prepareBulk();
final Map<String, byte[]> keyMapForDeletion = new HashMap<String, byte[]>();
for (final ArrayList<KeyValue> row : rows) {
if (this.stopThread) {
this.logger.info("Stopping HBase import in the midle of it");
Expand All @@ -138,24 +144,53 @@ protected void parseBulkOfRows(final ArrayList<ArrayList<KeyValue>> rows) {
if (row.size() > 0) {
final IndexRequestBuilder request = this.river.getEsClient().prepareIndex(this.river.getIndex(), this.river.getType());
final byte[] key = row.get(0).key();
request.setSource(readDataTree(row));
final Map<String, Object> dataTree = readDataTree(row);
request.setSource(dataTree);
request.setTimestamp(String.valueOf(row.get(0).timestamp()));
if (this.river.getIdField() == null) {
request.setId(new String(key, this.river.getCharset()));
final String keyString = new String(key, this.river.getCharset());
request.setId(keyString);
keyMapForDeletion.put(keyString, key);
}
bulkRequest.add(request);
if (this.river.getDeleteOld()) {
this.client.delete(new DeleteRequest(this.river.getTable().getBytes(), key));
else {
final String keyString = findKeyInDataTree(dataTree, this.river.getIdField());
keyMapForDeletion.put(keyString, key);
}
bulkRequest.add(request);
}
}
final BulkResponse response = bulkRequest.execute().actionGet();

this.indexCounter += response.items().length;
this.logger.info("HBase river has indexed {} entries so far", this.indexCounter);
final List<byte[]> failedKeys = new ArrayList<byte[]>();
if (response.hasFailures()) {
for (BulkItemResponse r : response.items()) {
if (r.failed()) {
failedKeys.add(keyMapForDeletion.remove(r.getId()));
}
}
this.logger.error("Errors have occured while trying to index new data from HBase");
this.logger.debug("Failed keys are {}", failedKeys);
}
if (this.river.getDeleteOld()) {
for (Entry<String, byte[]> keyEntry : keyMapForDeletion.entrySet()) {
this.client.delete(new DeleteRequest(this.river.getTable().getBytes(), keyEntry.getValue())).addErrback(this.cbLogger);
}
}
}

@SuppressWarnings("unchecked")
protected String findKeyInDataTree(final Map<String, Object> dataTree, final String keyPath) {
if (!keyPath.contains(this.river.getColumnSeparator())) {
return (String) dataTree.get(keyPath);
}
final String key = keyPath.substring(0, keyPath.indexOf(this.river.getColumnSeparator()));
if (dataTree.get(key) instanceof Map) {
final int subKeyIndex = keyPath.indexOf(this.river.getColumnSeparator()) + this.river.getColumnSeparator().length();
return findKeyInDataTree((Map<String, Object>) dataTree.get(key), keyPath.substring(subKeyIndex));
}
return null;
}

/**
Expand Down
17 changes: 15 additions & 2 deletions src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ public class HBaseRiver extends AbstractRiverComponent implements River, Uncaugh
* }
* </pre>
*
* If no separator is defined, or the separator is empty, no operation is performed.
* If no separator is defined, or the separator is empty, no operation is performed. Try to use single character
* separators, as multi character separators will allow partial hits of a separator to be part of the data. (e.g. A
* separator defined as "()" will leave all "(" and ")" in the parsed data.
*/
public final String columnSeparator;

Expand Down Expand Up @@ -220,6 +222,10 @@ public synchronized void start() {
if (this.idField == null) {
mapping = "{\"" + this.type + "\":{\"_timestamp\":{\"enabled\":true}}}";
}
if (this.columnSeparator != null) {
mapping = "{\"" + this.type + "\":{\"_timestamp\":{\"enabled\":true},\"_id\":{\"path\":\""
+ this.idField.replace(this.columnSeparator, ".") + "\"}}}";
}
else {
mapping = "{\"" + this.type + "\":{\"_timestamp\":{\"enabled\":true},\"_id\":{\"path\":\"" + this.idField + "\"}}}";
}
Expand Down Expand Up @@ -309,7 +315,14 @@ public String normalizeField(final String fieldName) {
if (!isNormalizeFields() || fieldName == null) {
return fieldName;
}
return fieldName.toLowerCase().replaceAll("[^a-z0-9\\-_]*", "");
if (getColumnSeparator() != null) {
String regex = "a-z0-9\\-_";
for (int i = 0; i < getColumnSeparator().length(); i++) {
regex += "\\" + getColumnSeparator().charAt(i);
}
return fieldName.toLowerCase().replaceAll("[^" + regex + "]", "");
}
return fieldName.toLowerCase().replaceAll("[^a-z0-9\\-_]", "");
}

public boolean isNormalizeFields() {
Expand Down
93 changes: 73 additions & 20 deletions src/test/java/org/elasticsearch/river/hbase/HBaseParserTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.elasticsearch.river.hbase;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -20,6 +23,11 @@
import org.testng.annotations.Test;

public class HBaseParserTest {
@AfterClass
public void tearDown() {
Mockit.tearDownMocks();
}

public class ReadQualifierStructureTest {
public String separator;
public boolean normalize;
Expand All @@ -46,11 +54,6 @@ boolean isNormalizeFields() {
};
}

@AfterClass
public void tearDown() {
Mockit.tearDownMocks();
}

@SuppressWarnings("unchecked")
@Test
public void testBase() throws Exception {
Expand Down Expand Up @@ -184,11 +187,6 @@ boolean isNormalizeFields() {
};
}

@AfterClass
public void tearDown() {
Mockit.tearDownMocks();
}

@Test
@SuppressWarnings("unchecked")
public void testBase() {
Expand All @@ -206,19 +204,19 @@ public void testBase() {

final Map<String, Object> output = parser.readDataTree(input);

Assert.assertNotNull(output.get("family1"));
assertNotNull(output.get("family1"));
final Map<String, Object> family1 = (Map<String, Object>) output.get("family1");
Assert.assertEquals(family1.get("category1"), "value1");
Assert.assertEquals(family1.get("category2"), "value2");
Assert.assertEquals(family1.get("category3"), "value3");
Assert.assertNotNull(output.get("family2"));
assertEquals(family1.get("category1"), "value1");
assertEquals(family1.get("category2"), "value2");
assertEquals(family1.get("category3"), "value3");
assertNotNull(output.get("family2"));
final Map<String, Object> family2 = (Map<String, Object>) output.get("family2");
Assert.assertEquals(family2.get("category1"), "value4");
Assert.assertEquals(family2.get("category4"), "value5");
Assert.assertEquals(family2.get("category6"), "value7");
Assert.assertNotNull(output.get("family3"));
assertEquals(family2.get("category1"), "value4");
assertEquals(family2.get("category4"), "value5");
assertEquals(family2.get("category6"), "value7");
assertNotNull(output.get("family3"));
final Map<String, Object> family3 = (Map<String, Object>) output.get("family3");
Assert.assertEquals(family3.get("category5"), "value6");
assertEquals(family3.get("category5"), "value6");
}

private KeyValue getKeyValue(final String family, final String qualifier, final String value) {
Expand All @@ -228,4 +226,59 @@ private KeyValue getKeyValue(final String family, final String qualifier, final
value.getBytes(this.charset));
}
}

public class FindKeyInDataTreeTest {
protected String separator;
protected boolean normalize;

@BeforeClass
public void setUp() {
new MockUp<AbstractRiverComponent>() {
@Mock
void $init(final RiverName riverName, final RiverSettings settings) {}
};

new MockUp<HBaseRiver>() {

@Mock
void $init(final RiverName riverName, final RiverSettings settings, final Client esClient) {}

@Mock
String getColumnSeparator() {
return FindKeyInDataTreeTest.this.separator;
}

@Mock
boolean isNormalizeFields() {
return FindKeyInDataTreeTest.this.normalize;
}
};
}

@Test
public void testBase() {
final HBaseParser parser = new HBaseParser(new HBaseRiver(null, null, null));
this.separator = "::";

final Map<String, Object> dataTree = new HashMap<String, Object>();
final Map<String, Object> dataBranch = new HashMap<String, Object>();
dataBranch.put("theId", "TheValue");
dataTree.put("aBranch", dataBranch);

assertEquals(parser.findKeyInDataTree(dataTree, "aBranch::theId"), "TheValue");
}

@Test
public void testDotSeparator() {
final HBaseParser parser = new HBaseParser(new HBaseRiver(null, null, null));
this.separator = ".";

final Map<String, Object> dataTree = new HashMap<String, Object>();
final Map<String, Object> dataBranch = new HashMap<String, Object>();
dataBranch.put("theId", "TheValue");
dataTree.put("aBranch", dataBranch);

assertEquals(parser.findKeyInDataTree(dataTree, "aBranch.theId"), "TheValue");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ public void testNormalizeField() {
boolean isNormalizeFields() {
return true;
}

@Mock
String getColumnSeparator() {
return "::";
}
};

final HBaseRiver river = new HBaseRiver(null, null, null);
Expand All @@ -37,7 +42,8 @@ boolean isNormalizeFields() {
Assert.assertEquals(river.normalizeField("a-b"), "a-b");
Assert.assertEquals(river.normalizeField("a_b"), "a_b");
Assert.assertEquals(river.normalizeField("90aS"), "90as");
Assert.assertEquals(river.normalizeField("&*($@#!ui^&$(#:\"8ui"), "ui8ui");
Assert.assertEquals(river.normalizeField("&*($@#!ui^&$(#\"8ui"), "ui8ui");
Assert.assertEquals(river.normalizeField("bl%^&*ah::blubb"), "blah::blubb");
Assert.assertEquals(river.normalizeField(null), null);
}
}

0 comments on commit 1bca36c

Please sign in to comment.