Skip to content

Commit

Permalink
- Added option for normalizing field names
Browse files Browse the repository at this point in the history
  • Loading branch information
mallocator committed Apr 1, 2013
1 parent 68de78e commit 3564c5b
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 120 deletions.
8 changes: 4 additions & 4 deletions src/main/java/org/elasticsearch/river/hbase/HBaseParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ protected void parseBulkOfRows(final ArrayList<ArrayList<KeyValue>> rows) {
protected Map<String, Object> readDataTree(final ArrayList<KeyValue> row) {
final Map<String, Object> dataTree = new HashMap<String, Object>();
for (final KeyValue column : row) {
final String family = new String(column.family(), this.river.getCharset());
final String family = this.river.normalizeField(new String(column.family(), this.river.getCharset()));
final String qualifier = new String(column.qualifier(), this.river.getCharset());
final String value = new String(column.value(), this.river.getCharset());
if (!dataTree.containsKey(family)) {
Expand All @@ -187,7 +187,7 @@ protected void readQualifierStructure(final Map<String, Object> parent, final St
if (this.river.getColumnSeparator() != null && !this.river.getColumnSeparator().isEmpty()) {
final int separatorPos = qualifier.indexOf(this.river.getColumnSeparator());
if (separatorPos != -1) {
final String parentQualifier = qualifier.substring(0, separatorPos);
final String parentQualifier = this.river.normalizeField(qualifier.substring(0, separatorPos));
final String childQualifier = qualifier.substring(separatorPos + this.river.getColumnSeparator().length());
if (!childQualifier.isEmpty()) {
if (!(parent.get(parentQualifier) instanceof Map)) {
Expand All @@ -196,11 +196,11 @@ protected void readQualifierStructure(final Map<String, Object> parent, final St
readQualifierStructure((Map<String, Object>) parent.get(parentQualifier), childQualifier, value);
return;
}
parent.put(qualifier.replace(this.river.getColumnSeparator(), ""), value);
parent.put(this.river.normalizeField(qualifier.replace(this.river.getColumnSeparator(), "")), value);
return;
}
}
parent.put(qualifier, value);
parent.put(this.river.normalizeField(qualifier), value);
}

/**
Expand Down
38 changes: 33 additions & 5 deletions src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,20 @@ public class HBaseRiver extends AbstractRiverComponent implements River, Uncaugh
/**
* Limit the scanning of the HBase table to a certain family.
*/
public final byte[] family;
private final byte[] family;

/**
* Limit the scanning of the HBase table to a number of qualifiers. A family must be set for this to take effect.
* Multiple qualifiers can be set via comma separated list.
*/
public final String qualifiers;
private final String qualifiers;

/**
* Some names must be given in a lower case format (the index name for example), others are more flexible. This flag will
* normalize all fields to lower case and remove special characters that ELastichsearch can't handle. (The filter is
* probably stricter than needed in most cases)
*/
private final boolean normalizeFields;

/**
* Splits up the column into further sub columns if a separator is defined. For example:
Expand All @@ -101,6 +108,8 @@ public class HBaseRiver extends AbstractRiverComponent implements River, Uncaugh
* }
* }
* </pre>
*
* If no separator is defined, or the separator is empty, no operation is performed.
*/
public final String columnSeparator;

Expand All @@ -117,12 +126,13 @@ public HBaseRiver(final RiverName riverName, final RiverSettings settings, final
this.esClient = esClient;
this.logger.info("Creating HBase Stream River");

this.normalizeFields = Boolean.parseBoolean(readConfig("normalizeFields", "true"));
this.hosts = readConfig("hosts");
this.table = readConfig("table");
this.idField = readConfig("idField", null);
this.columnSeparator = readConfig("columnSeparator", null);
this.index = readConfig("index", riverName.name());
this.type = readConfig("type", this.table);
this.idField = normalizeField(readConfig("idField", null));
this.index = normalizeField(readConfig("index", riverName.name()));
this.type = normalizeField(readConfig("type", this.table));
this.interval = Long.parseLong(readConfig("interval", "600000"));
this.batchSize = Integer.parseInt(readConfig("batchSize", "100"));
this.charset = Charset.forName(readConfig("charset", "UTF-8"));
Expand Down Expand Up @@ -270,6 +280,24 @@ public void uncaughtException(final Thread arg0, final Throwable arg1) {
this.logger.error("An Exception has been thrown in HBase Import Thread", arg1, (Object[]) null);
}

/**
* If the normalizeField flag is set, this method will return a lower case representation of the field, as well as
* stripping away all special characters except "-" and "_".
*
* @param fieldName
* @return
*/
public String normalizeField(final String fieldName) {
if (!isNormalizeFields()) {
return fieldName;
}
return fieldName.toLowerCase().replaceAll("[^a-z0-9\\-_]*", "");
}

public boolean isNormalizeFields() {
return this.normalizeFields;
}

public long getInterval() {
return this.interval;
}
Expand Down
241 changes: 130 additions & 111 deletions src/test/java/org/elasticsearch/river/hbase/HBaseParserTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,119 +15,138 @@
import org.testng.annotations.Test;

public class HBaseParserTest {
public String separator;

@BeforeClass
public void setUp() {
new MockUp<AbstractRiverComponent>() {
@Mock
void $init(final RiverName riverName, final RiverSettings settings) {}
};
new MockUp<HBaseRiver>() {
@Mock
void $init(final RiverName riverName, final RiverSettings settings, final Client esClient) {}

@Mock
String getColumnSeparator() {
return HBaseParserTest.this.separator;
}
};
}

@SuppressWarnings("unchecked")
@Test
public void testReadQualifierStructure() throws Exception {
this.separator = "::";
final Map<String, Object> result = new HashMap<String, Object>();
final HBaseParser parser = new HBaseParser(new HBaseRiver(null, null, null));
parser.readQualifierStructure(result, "data::set1::category1", "test1");
parser.readQualifierStructure(result, "data::set1::category2", "test2");
parser.readQualifierStructure(result, "data::set1::category3", "test3");
parser.readQualifierStructure(result, "data::set2::category1", "test4");
parser.readQualifierStructure(result, "data::set2::category2", "test5");

Assert.assertEquals(((Map<String, Object>) ((Map<String, Object>) result.get("data")).get("set1")).get("category1"), "test1");
Assert.assertEquals(((Map<String, Object>) ((Map<String, Object>) result.get("data")).get("set1")).get("category2"), "test2");
Assert.assertEquals(((Map<String, Object>) ((Map<String, Object>) result.get("data")).get("set1")).get("category3"), "test3");
Assert.assertEquals(((Map<String, Object>) ((Map<String, Object>) result.get("data")).get("set2")).get("category1"), "test4");
Assert.assertEquals(((Map<String, Object>) ((Map<String, Object>) result.get("data")).get("set2")).get("category2"), "test5");
}

@Test
public void testReadQualifierStructureNullSeperator() throws Exception {
this.separator = null;

final Map<String, Object> result = new HashMap<String, Object>();
final HBaseParser parser = new HBaseParser(new HBaseRiver(null, null, null));
parser.readQualifierStructure(result, "data::set1::category1", "test1");
parser.readQualifierStructure(result, "data::set1::category2", "test2");
parser.readQualifierStructure(result, "data::set1::category3", "test3");
parser.readQualifierStructure(result, "data::set2::category1", "test4");
parser.readQualifierStructure(result, "data::set2::category2", "test5");

Assert.assertEquals(result.get("data::set1::category1"), "test1");
Assert.assertEquals(result.get("data::set1::category2"), "test2");
Assert.assertEquals(result.get("data::set1::category3"), "test3");
Assert.assertEquals(result.get("data::set2::category1"), "test4");
Assert.assertEquals(result.get("data::set2::category2"), "test5");
}

@Test
public void testReadQualifierStructureEmptySeperator() throws Exception {
this.separator = "";

final Map<String, Object> result = new HashMap<String, Object>();
final HBaseParser parser = new HBaseParser(new HBaseRiver(null, null, null));
parser.readQualifierStructure(result, "data::set1::category1", "test1");
parser.readQualifierStructure(result, "data::set1::category2", "test2");
parser.readQualifierStructure(result, "data::set1::category3", "test3");
parser.readQualifierStructure(result, "data::set2::category1", "test4");
parser.readQualifierStructure(result, "data::set2::category2", "test5");

Assert.assertEquals(result.get("data::set1::category1"), "test1");
Assert.assertEquals(result.get("data::set1::category2"), "test2");
Assert.assertEquals(result.get("data::set1::category3"), "test3");
Assert.assertEquals(result.get("data::set2::category1"), "test4");
Assert.assertEquals(result.get("data::set2::category2"), "test5");
}
public class ReadDataTreeTest {

@SuppressWarnings("unchecked")
@Test
public void testReadQualifierStructureEmptySubQualifier() throws Exception {
this.separator = "::";

final Map<String, Object> result = new HashMap<String, Object>();
final HBaseParser parser = new HBaseParser(new HBaseRiver(null, null, null));
parser.readQualifierStructure(result, "data::set1::category1", "test1");
parser.readQualifierStructure(result, "data::set1::category2", "test2");
parser.readQualifierStructure(result, "data::set1::category3", "test3");
parser.readQualifierStructure(result, "data::set2::category1", "test4");
parser.readQualifierStructure(result, "data::set2::", "test5");

System.out.println(result);

Assert.assertEquals(((Map<String, Object>) ((Map<String, Object>) result.get("data")).get("set1")).get("category1"), "test1");
Assert.assertEquals(((Map<String, Object>) ((Map<String, Object>) result.get("data")).get("set1")).get("category2"), "test2");
Assert.assertEquals(((Map<String, Object>) ((Map<String, Object>) result.get("data")).get("set1")).get("category3"), "test3");
Assert.assertEquals(((Map<String, Object>) result.get("data")).get("set2"), "test5");
}

@Test
public void testReadQualifierStructureWrongSeperator() throws Exception {
this.separator = "--";

final Map<String, Object> result = new HashMap<String, Object>();
final HBaseParser parser = new HBaseParser(new HBaseRiver(null, null, null));
parser.readQualifierStructure(result, "data::set1::category1", "test1");
parser.readQualifierStructure(result, "data::set1::category2", "test2");
parser.readQualifierStructure(result, "data::set1::category3", "test3");
parser.readQualifierStructure(result, "data::set2::category1", "test4");
parser.readQualifierStructure(result, "data::set2::category2", "test5");

Assert.assertEquals(result.get("data::set1::category1"), "test1");
Assert.assertEquals(result.get("data::set1::category2"), "test2");
Assert.assertEquals(result.get("data::set1::category3"), "test3");
Assert.assertEquals(result.get("data::set2::category1"), "test4");
Assert.assertEquals(result.get("data::set2::category2"), "test5");
public class ReadQualifierStructureTest {
public String separator;
public boolean normalize;

@BeforeClass
public void setUp() {
new MockUp<AbstractRiverComponent>() {
@Mock
void $init(final RiverName riverName, final RiverSettings settings) {}
};
new MockUp<HBaseRiver>() {
@Mock
void $init(final RiverName riverName, final RiverSettings settings, final Client esClient) {}

@Mock
String getColumnSeparator() {
return ReadQualifierStructureTest.this.separator;
}

@Mock
boolean isNormalizeFields() {
return ReadQualifierStructureTest.this.normalize;
}
};
}

@SuppressWarnings("unchecked")
@Test
public void testReadQualifierStructure() throws Exception {
this.separator = "::";
this.normalize = false;

final Map<String, Object> result = new HashMap<String, Object>();
final HBaseParser parser = new HBaseParser(new HBaseRiver(null, null, null));
parser.readQualifierStructure(result, "data::set1::category1", "test1");
parser.readQualifierStructure(result, "data::set1::category2", "test2");
parser.readQualifierStructure(result, "data::set1::category3", "test3");
parser.readQualifierStructure(result, "data::set2::category1", "test4");
parser.readQualifierStructure(result, "data::set2::category2", "test5");

Assert.assertEquals(((Map<String, Object>) ((Map<String, Object>) result.get("data")).get("set1")).get("category1"), "test1");
Assert.assertEquals(((Map<String, Object>) ((Map<String, Object>) result.get("data")).get("set1")).get("category2"), "test2");
Assert.assertEquals(((Map<String, Object>) ((Map<String, Object>) result.get("data")).get("set1")).get("category3"), "test3");
Assert.assertEquals(((Map<String, Object>) ((Map<String, Object>) result.get("data")).get("set2")).get("category1"), "test4");
Assert.assertEquals(((Map<String, Object>) ((Map<String, Object>) result.get("data")).get("set2")).get("category2"), "test5");
}

@Test
public void testReadQualifierStructureNullSeperator() throws Exception {
this.separator = null;
this.normalize = false;

final Map<String, Object> result = new HashMap<String, Object>();
final HBaseParser parser = new HBaseParser(new HBaseRiver(null, null, null));
parser.readQualifierStructure(result, "data::set1::category1", "test1");
parser.readQualifierStructure(result, "data::set1::category2", "test2");
parser.readQualifierStructure(result, "data::set1::category3", "test3");
parser.readQualifierStructure(result, "data::set2::category1", "test4");
parser.readQualifierStructure(result, "data::set2::category2", "test5");

Assert.assertEquals(result.get("data::set1::category1"), "test1");
Assert.assertEquals(result.get("data::set1::category2"), "test2");
Assert.assertEquals(result.get("data::set1::category3"), "test3");
Assert.assertEquals(result.get("data::set2::category1"), "test4");
Assert.assertEquals(result.get("data::set2::category2"), "test5");
}

@Test
public void testReadQualifierStructureEmptySeperator() throws Exception {
this.separator = "";
this.normalize = false;

final Map<String, Object> result = new HashMap<String, Object>();
final HBaseParser parser = new HBaseParser(new HBaseRiver(null, null, null));
parser.readQualifierStructure(result, "data::set1::category1", "test1");
parser.readQualifierStructure(result, "data::set1::category2", "test2");
parser.readQualifierStructure(result, "data::set1::category3", "test3");
parser.readQualifierStructure(result, "data::set2::category1", "test4");
parser.readQualifierStructure(result, "data::set2::category2", "test5");

Assert.assertEquals(result.get("data::set1::category1"), "test1");
Assert.assertEquals(result.get("data::set1::category2"), "test2");
Assert.assertEquals(result.get("data::set1::category3"), "test3");
Assert.assertEquals(result.get("data::set2::category1"), "test4");
Assert.assertEquals(result.get("data::set2::category2"), "test5");
}

@SuppressWarnings("unchecked")
@Test
public void testReadQualifierStructureEmptySubQualifier() throws Exception {
this.separator = "::";
this.normalize = true;

final Map<String, Object> result = new HashMap<String, Object>();
final HBaseParser parser = new HBaseParser(new HBaseRiver(null, null, null));
parser.readQualifierStructure(result, "data::set1::category1", "test1");
parser.readQualifierStructure(result, "data::set1::category2", "test2");
parser.readQualifierStructure(result, "data::set1::category3", "test3");
parser.readQualifierStructure(result, "data::set2::category1", "test4");
parser.readQualifierStructure(result, "data::set2::", "test5");

System.out.println(result);

Assert.assertEquals(((Map<String, Object>) ((Map<String, Object>) result.get("data")).get("set1")).get("category1"), "test1");
Assert.assertEquals(((Map<String, Object>) ((Map<String, Object>) result.get("data")).get("set1")).get("category2"), "test2");
Assert.assertEquals(((Map<String, Object>) ((Map<String, Object>) result.get("data")).get("set1")).get("category3"), "test3");
Assert.assertEquals(((Map<String, Object>) result.get("data")).get("set2"), "test5");
}

@Test
public void testReadQualifierStructureWrongSeperator() throws Exception {
this.separator = "--";
this.normalize = false;

final Map<String, Object> result = new HashMap<String, Object>();
final HBaseParser parser = new HBaseParser(new HBaseRiver(null, null, null));
parser.readQualifierStructure(result, "data::set1::category1", "test1");
parser.readQualifierStructure(result, "data::set1::category2", "test2");
parser.readQualifierStructure(result, "data::set1::category3", "test3");
this.normalize = true;
parser.readQualifierStructure(result, "data::set2::category1", "test4");
parser.readQualifierStructure(result, "data::set2::category2", "test5");

Assert.assertEquals(result.get("data::set1::category1"), "test1");
Assert.assertEquals(result.get("data::set1::category2"), "test2");
Assert.assertEquals(result.get("data::set1::category3"), "test3");
Assert.assertEquals(result.get("dataset2category1"), "test4");
Assert.assertEquals(result.get("dataset2category2"), "test5");
}
}
}
42 changes: 42 additions & 0 deletions src/test/java/org/elasticsearch/river/hbase/HBaseRiverTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.elasticsearch.river.hbase;

import mockit.Mock;
import mockit.MockUp;

import org.elasticsearch.client.Client;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import org.testng.Assert;
import org.testng.annotations.Test;

public class HBaseRiverTest {
@Test
public void testNormalizeField() {
new MockUp<AbstractRiverComponent>() {
@Mock
void $init(final RiverName riverName, final RiverSettings settings) {}
};
new MockUp<HBaseRiver>() {
@Mock
void $init(final RiverName riverName, final RiverSettings settings, final Client esClient) {}

@Mock
boolean isNormalizeFields() {
return true;
}
};

final HBaseRiver river = new HBaseRiver(null, null, null);

Assert.assertEquals(river.normalizeField(""), "");
Assert.assertEquals(river.normalizeField(" "), "");
Assert.assertEquals(river.normalizeField("a"), "a");
Assert.assertEquals(river.normalizeField("A"), "a");
Assert.assertEquals(river.normalizeField("Aa"), "aa");
Assert.assertEquals(river.normalizeField("a-b"), "a-b");
Assert.assertEquals(river.normalizeField("a_b"), "a_b");
Assert.assertEquals(river.normalizeField("90aS"), "90as");
Assert.assertEquals(river.normalizeField("&*($@#!ui^&$(#:\"8ui"), "ui8ui");
}
}

0 comments on commit 3564c5b

Please sign in to comment.