Skip to content

Commit

Permalink
- Added Checkstyle configuration
Browse files Browse the repository at this point in the history
- Added eclipse .checkstyle file to ignore list
- HBase implementation that only looks for hdfs on localhost
- Added some comments and documentation
  • Loading branch information
mallocator committed Mar 18, 2013
1 parent 08aa0df commit aa98430
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 11 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
.DS_Store
.project
.classpath
/.checkstyle
164 changes: 164 additions & 0 deletions checkstyle_include.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE module PUBLIC "-//Puppy Crawl//DTD Check Configuration 1.3//EN" "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">

<!-- This configuration file was written by the eclipse-cs plugin configuration
editor -->
<!-- Checkstyle-Configuration: default Description: none -->
<module name="Checker">
<property name="severity" value="warning" />
<module name="TreeWalker">
<property name="cacheFile" value="target/checkstyle.cache" />
<module name="FileContentsHolder" />
<module name="MissingOverride" />
<module name="JavadocMethod">
<property name="severity" value="info" />
<property name="scope" value="public" />
<property name="allowThrowsTagsForSubclasses" value="true" />
<property name="allowMissingParamTags" value="true" />
<property name="allowMissingThrowsTags" value="true" />
<property name="allowMissingReturnTag" value="true" />
<property name="allowMissingPropertyJavadoc" value="true" />
<property name="suppressLoadErrors" value="true" />
<property name="tokens" value="METHOD_DEF" />
</module>
<module name="JavadocStyle">
<property name="severity" value="info" />
<property name="checkEmptyJavadoc" value="true" />
</module>
<module name="AbstractClassName">
<property name="severity" value="info" />
</module>
<module name="ClassTypeParameterName">
<property name="severity" value="info" />
</module>
<module name="ConstantName">
<property name="severity" value="info" />
<property name="applyToPrivate" value="false" />
</module>
<module name="LocalFinalVariableName">
<property name="severity" value="info" />
</module>
<module name="LocalVariableName">
<property name="severity" value="info" />
</module>
<module name="MemberName">
<property name="severity" value="info" />
</module>
<module name="MethodName">
<property name="severity" value="info" />
</module>
<module name="MethodTypeParameterName">
<property name="severity" value="info" />
</module>
<module name="PackageName">
<property name="severity" value="warning" />
</module>
<module name="ParameterName">
<property name="severity" value="info" />
</module>
<module name="StaticVariableName">
<property name="severity" value="info" />
<property name="applyToPrivate" value="false" />
</module>
<module name="TypeName">
<property name="severity" value="info" />
</module>
<module name="RedundantImport">
<property name="severity" value="info" />
</module>
<module name="AnonInnerLength">
<property name="severity" value="info" />
<property name="max" value="50" />
</module>
<module name="MethodLength">
<property name="severity" value="info" />
</module>
<module name="MethodLength">
<property name="severity" value="warning" />
<property name="max" value="200" />
</module>
<module name="MethodLength">
<property name="severity" value="error" />
<property name="max" value="250" />
</module>
<module name="ParameterNumber">
<property name="severity" value="info" />
</module>
<module name="ParameterNumber">
<property name="severity" value="warning" />
<property name="max" value="9" />
</module>
<module name="ParameterNumber">
<property name="severity" value="error" />
<property name="max" value="10" />
</module>
<module name="OuterTypeNumber">
<property name="severity" value="info" />
</module>
<module name="OuterTypeNumber">
<property name="severity" value="warning" />
<property name="max" value="3" />
</module>
<module name="OuterTypeNumber">
<property name="severity" value="error" />
<property name="max" value="5" />
</module>
<module name="ModifierOrder">
<property name="severity" value="info" />
</module>
<module name="RedundantModifier">
<property name="severity" value="info" />
</module>
<module name="EmptyBlock">
<property name="severity" value="info" />
</module>
<module name="AvoidNestedBlocks">
<property name="severity" value="info" />
</module>
<module name="LeftCurly">
<property name="severity" value="info" />
</module>
<module name="NeedBraces">
<property name="severity" value="warning" />
</module>
<module name="FinalClass">
<property name="severity" value="info" />
</module>
<module name="HideUtilityClassConstructor">
<property name="severity" value="info" />
</module>
<module name="BooleanExpressionComplexity">
<property name="severity" value="info" />
<property name="max" value="5" />
</module>
<module name="ArrayTypeStyle">
<property name="severity" value="info" />
</module>
<module name="FinalParameters">
<property name="severity" value="info" />
</module>
<module name="UpperEll">
<property name="severity" value="info" />
</module>
<module name="JavadocType">
<property name="severity" value="info" />
<property name="scope" value="public" />
</module>
</module>
<module name="SuppressionCommentFilter" />
<module name="FileLength">
<property name="severity" value="info" />
<property name="max" value="1000" />
</module>
<module name="FileLength">
<property name="severity" value="warning" />
<property name="max" value="1500" />
</module>
<module name="FileLength">
<property name="severity" value="error" />
</module>
<module name="StrictDuplicateCode">
<property name="severity" value="warning" />
<property name="min" value="50" />
</module>
</module>
38 changes: 38 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<elasticsearch.version>0.20.5</elasticsearch.version>
<hbase.async.version>1.4.1</hbase.async.version>

<version.findbugs>2.5.2</version.findbugs>
<version.checkstyle>2.9.1</version.checkstyle>

<mavenAssemblyPlugin>2.4</mavenAssemblyPlugin>
<mavenJarPluginVersion>2.4</mavenJarPluginVersion>
Expand Down Expand Up @@ -89,6 +92,41 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<version>${version.findbugs}</version>
<configuration>
<failOnError>false</failOnError>
<findbugsXmlOutput>true</findbugsXmlOutput>
<findbugsXmlWithMessages>true</findbugsXmlWithMessages>
<xmlOutput>true</xmlOutput>
</configuration>
<executions>
<execution>
<phase>install</phase>
<goals>
<goal>findbugs</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>${version.checkstyle}</version>
<configuration>
<configLocation>${basedir}/checkstyle_include.xml</configLocation>
</configuration>
<executions>
<execution>
<phase>install</phase>
<goals>
<goal>checkstyle</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
64 changes: 53 additions & 11 deletions src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
import org.hbase.async.KeyValue;
import org.hbase.async.Scanner;

/**
* An HBase import river build similar to the MySQL river, that was modeled after the Solr SQL import functionality.
*
* @author Ravi Gairola
*/
public class HBaseRiver extends AbstractRiverComponent implements River, UncaughtExceptionHandler {
private final Client esClient;
private volatile Thread thread;
Expand All @@ -46,24 +51,47 @@ public HBaseRiver(final RiverName riverName, final RiverSettings settings, final
this.esClient = esClient;
this.logger.info("Creating HBase Stream River");

this.hosts = readConfig("hosts", readConfig("zookeeper", null));
this.hosts = readConfig("hosts");
this.table = readConfig("table");
this.idField = readConfig("idField", null);
this.index = readConfig("index", riverName.name());
this.type = readConfig("type", "data");
this.interval = Long.parseLong(readConfig("interval", "600000"));
this.batchSize = Integer.parseInt(readConfig("batchSize", "1000"));

if (this.interval <= 0) {
throw new IllegalArgumentException("The interval between runs must be at least 1 ms. The current config is set to "
+ this.interval);
}
if (this.batchSize <= 0) {
throw new IllegalArgumentException("The batch size must be set to at least 1. The current config is set to " + this.batchSize);
}
}

/**
* Fetch the value of a configuration that has no default value and is therefore mandatory. Empty (trimmed) strings are
* as invalid as no value at all (null).
*
* @param config Key of the configuration to fetch
* @throws InvalidParameterException if a configuration is missing (null or empty)
* @return
*/
private String readConfig(final String config) {
final String result = readConfig(config, null);
if (result == null) {
if (result == null || result.trim().isEmpty()) {
this.logger.error("Unable to read required config {}. Aborting!", config);
throw new InvalidParameterException("Unable to read required config " + config);
}
return result;
}

/**
* Fetch the value of a configuration that has a default value and is therefore optional.
*
* @param config Key of the configuration to fetch
* @param defaultValue The value to set if no value could be found
* @return
*/
@SuppressWarnings({ "unchecked" })
private String readConfig(final String config, final String defaultValue) {
if (this.settings.settings().containsKey("hbase")) {
Expand All @@ -73,6 +101,10 @@ private String readConfig(final String config, final String defaultValue) {
return defaultValue;
}

/**
* This method is launched by ElasticSearch and starts the HBase River. The method will try to create a mapping with
* timestamps enabled. If a mapping already exists the user should make sure, that timestamps are enabled for this type.
*/
@Override
public synchronized void start() {
if (this.thread != null) {
Expand Down Expand Up @@ -122,28 +154,39 @@ else if (ExceptionsHelper.unwrapCause(e) instanceof ElasticSearchException) {
this.thread.start();
}

/**
* This method is called by ElasticSearch when shutting down the river. The method will stop the thread and close all
* connections to HBase.
*/
@Override
public void close() {
this.logger.info("Closing HBase river");
((Parser) this.thread).stopThread();
this.thread = null;
}

/**
* Some of the asynchronous methods of the HBase client will throw Exceptions that are not caught anywhere else. If this
* happens, the entire river is shut down.
*/
@Override
public void uncaughtException(final Thread arg0, final Throwable arg1) {
this.logger.error("An Exception has been thrown in HBase Import Thread", arg1, (Object[]) null);
close();
}

/**
* A separate Thread that does the actual fetching and storing of data from an HBase cluster.
*
* @author Ravi Gairola
*/
private class Parser extends Thread implements ActionListener<BulkResponse> {
private static final String TIMESTMAP_STATS = "timestamp_stats";
private int indexCounter;
private HBaseClient client;
private Scanner scanner;
private boolean stopThread;

private Parser() {}

@Override
public void run() {
HBaseRiver.this.logger.info("HBase Import Thread has started");
Expand All @@ -156,17 +199,16 @@ public void run() {
} catch (Exception e) {
HBaseRiver.this.logger.error("An exception has been caught while parsing data from HBase", e);
}
if (HBaseRiver.this.interval <= 0) {
break;
}
if (!this.stopThread) {
HBaseRiver.this.logger.info("HBase Import Thread is waiting for {} Seconds until the next run",
HBaseRiver.this.interval / 1000);
}
}
try {
sleep(1000);
} catch (InterruptedException e) {}
} catch (InterruptedException e) {
HBaseRiver.this.logger.trace("HBase river parsing thread has been interrupted");
}
}
HBaseRiver.this.logger.info("HBase Import Thread has finished");
}
Expand Down Expand Up @@ -197,7 +239,7 @@ private void parse() throws InterruptedException, Exception {
final IndexRequestBuilder request = HBaseRiver.this.esClient.prepareIndex(HBaseRiver.this.index,
HBaseRiver.this.type);
for (final KeyValue column : row) {
request.setSource(column.key().toString(), column.value().toString());
request.setSource(new String(column.key()), new String(column.value()));
}
bulkRequest.add(request);
}
Expand Down Expand Up @@ -253,7 +295,7 @@ private void setMinTimestamp(final Scanner scanner) {
}

/**
* Elasticsearch Response handler
* Elasticsearch Response handler.
*/
@Override
public void onResponse(final BulkResponse response) {
Expand All @@ -265,7 +307,7 @@ public void onResponse(final BulkResponse response) {
}

/**
* Elasticsearch Failure handler
* Elasticsearch Failure handler.
*/
@Override
public void onFailure(final Throwable e) {
Expand Down

0 comments on commit aa98430

Please sign in to comment.