Skip to content

Commit

Permalink
- Added options for adding family and qualifier restrictions
Browse files Browse the repository at this point in the history
  • Loading branch information
mallocator committed Mar 20, 2013
1 parent 0ad1dbf commit f43c7b1
Showing 1 changed file with 24 additions and 0 deletions.
24 changes: 24 additions & 0 deletions src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ public class HBaseRiver extends AbstractRiverComponent implements River, Uncaugh
*/
private final Charset charset;

/**
* Limit the scanning of the HBase table to a certain family.
*/
public final byte[] family;

/**
* Limit the scanning of the HBase table to a number of qualifiers. A family must be set for this to take effect.
* Multiple qualifiers can be set via comma separated list.
*/
public final String qualifiers;

/**
* Loads and verifies all the configuration needed to run this river.
*
Expand All @@ -105,6 +116,10 @@ public HBaseRiver(final RiverName riverName, final RiverSettings settings, final
this.batchSize = Integer.parseInt(readConfig("batchSize", "100"));
this.charset = Charset.forName(readConfig("charset", "UTF-8"));

final String family = readConfig("family", null);
this.family = family != null ? family.getBytes(this.charset) : null;
this.qualifiers = readConfig("qualifiers", null);

if (this.interval <= 0) {
throw new IllegalArgumentException("The interval between runs must be at least 1 ms. The current config is set to "
+ this.interval);
Expand Down Expand Up @@ -268,6 +283,7 @@ public void run() {
if (lastRun + HBaseRiver.this.interval < System.currentTimeMillis()) {
lastRun = System.currentTimeMillis();
try {
this.indexCounter = 0;
parse();
} catch (Throwable t) {
HBaseRiver.this.logger.error("An exception has been caught while parsing data from HBase", t);
Expand Down Expand Up @@ -302,6 +318,14 @@ private void parse() throws InterruptedException, Exception {
HBaseRiver.this.logger.debug("Fetching HBase Scanner");
this.scanner = this.client.newScanner(HBaseRiver.this.table);
this.scanner.setServerBlockCache(false);
if (HBaseRiver.this.family != null) {
this.scanner.setFamily(HBaseRiver.this.family);
}
if (HBaseRiver.this.qualifiers != null) {
for (final String qualifier : HBaseRiver.this.qualifiers.split(",")) {
this.scanner.setQualifier(qualifier.trim().getBytes(HBaseRiver.this.charset));
}
}

final String timestamp = String.valueOf(setMinTimestamp(this.scanner));
HBaseRiver.this.logger.debug("Found latest timestamp in ElasticSearch to be {}", timestamp);
Expand Down

0 comments on commit f43c7b1

Please sign in to comment.