Skip to content

Commit

Permalink
- Refactored naming because we are actually importing from HBase not
Browse files Browse the repository at this point in the history
from Hadoop itself
- Simple implementation with creating indices and types, as well as
deleting old entries in ES
  • Loading branch information
mallocator committed Mar 7, 2013
1 parent ade9635 commit 572db08
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 77 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Elasticsearch-Hadoop-River
Elasticsearch-HBase-River
==========================

An import river similar to the elasticsearch mysql river
23 changes: 12 additions & 11 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<name>Elastichsearch Hadoop River</name>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<name>Elastichsearch HBase River</name>
<modelVersion>4.0.0</modelVersion>
<groupId>org.elasticsearch</groupId>
<artifactId>rivers-hadoop</artifactId>
<artifactId>rivers-hbase</artifactId>
<packaging>jar</packaging>
<description>Hadoop River for ElasticSearch</description>
<description>HBase River for ElasticSearch</description>
<inceptionYear>2013</inceptionYear>
<version>1.0.0-SNAPSHOT</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<elasticsearch.version>0.20.5</elasticsearch.version>
<hadoop.version>1.1.1</hadoop.version>
<hbase.version>0.94.3</hbase.version>

<mavenAssemblyPlugin>2.4</mavenAssemblyPlugin>
<mavenJarPluginVersion>2.4</mavenJarPluginVersion>
<mavenCompilerPlugin>3.0</mavenCompilerPlugin>
</properties>

<scm>
<url>scm:git://github.com/mallocator/Elasticsearch-MySQL-River.git</url>
<connection>scm:git://github.com/mallocator/Elasticsearch-MySQL-River.git</connection>
<developerConnection>scm:git://github.com/mallocator/Elasticsearch-MySQL-River.git</developerConnection>
<url>scm:git://github.com/mallocator/Elasticsearch-HBase-River.git</url>
<connection>scm:git://github.com/mallocator/Elasticsearch-HBase-River.git</connection>
<developerConnection>scm:git://github.com/mallocator/Elasticsearch-HBase-River.git</developerConnection>
</scm>

<dependencies>
Expand All @@ -33,9 +34,9 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>${hadoop.version}</version>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>${hbase.version}</version>
</dependency>
</dependencies>
<build>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.elasticsearch.plugin.river.hbase;

import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.plugins.AbstractPlugin;
import org.elasticsearch.river.RiversModule;
import org.elasticsearch.river.hbase.HBaseRiverModule;

public class HBaseRiverPlugin extends AbstractPlugin {

@Inject
public HBaseRiverPlugin() {}

public String name() {
return "river-hbase";
}

public String description() {
return "River HBase Plugin";
}

public void onModule(final RiversModule module) {
module.registerRiver("hbase", HBaseRiverModule.class);
}
}
28 changes: 0 additions & 28 deletions src/main/java/org/elasticsearch/river/hadoop/HadoopRiver.java

This file was deleted.

This file was deleted.

145 changes: 145 additions & 0 deletions src/main/java/org/elasticsearch/river/hbase/HBaseRiver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package org.elasticsearch.river.hbase;

import java.security.InvalidParameterException;
import java.util.Map;

import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;

public class HBaseRiver extends AbstractRiverComponent implements River {
private final Client esClient;
private final String index;
private final String type;
private volatile Thread thread;
private boolean stopThread;

private final boolean deleteOldEntries;
private final long interval;

@Inject
public HBaseRiver(RiverName riverName, RiverSettings settings, final Client esClient) {
super(riverName, settings);
this.esClient = esClient;
this.logger.info("Creating MySQL Stream River");

this.index = readConfig("index", riverName.name());
this.type = readConfig("type", "data");
this.deleteOldEntries = Boolean.parseBoolean(readConfig("deleteOldEntries", "true"));
this.interval = Long.parseLong(readConfig("interval", "600000"));
}

@SuppressWarnings({ "unchecked" })
private String readConfig(final String config, final String defaultValue) {
if (this.settings.settings().containsKey("mysql")) {
Map<String, Object> mysqlSettings = (Map<String, Object>) this.settings.settings().get("mysql");
return XContentMapValues.nodeStringValue(mysqlSettings.get(config), defaultValue);
}
return defaultValue;
}

@Override
public void start() {
this.logger.info("starting hbase stream");
try {
this.esClient.admin()
.indices()
.prepareCreate(this.index)
.addMapping(this.type, "{\"" + this.type + "\":{\"_timestamp\":{\"enabled\":true}}}")
.execute()
.actionGet();
this.logger.info("Created Index {} with _timestamp mapping for {}", this.index, this.type);
} catch (Exception e) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
this.logger.debug("Not creating Index {} as it already exists", this.index);
}
else if (ExceptionsHelper.unwrapCause(e) instanceof ElasticSearchException) {
this.logger.debug("Mapping {}.{} already exists and will not be created", this.index, this.type);
}
else {
this.logger.warn("failed to create index [{}], disabling river...", e, this.index);
return;
}
}

try {
this.esClient.admin()
.indices()
.preparePutMapping(this.index)
.setType(this.type)
.setSource("{\"" + this.type + "\":{\"_timestamp\":{\"enabled\":true}}}")
.setIgnoreConflicts(true)
.execute()
.actionGet();
} catch (ElasticSearchException e) {
this.logger.debug("Mapping already exists for index {} and type {}", this.index, this.type);
}

if (this.thread == null) {
this.thread = EsExecutors.daemonThreadFactory(this.settings.globalSettings(), "hbase_slurper").newThread(new Parser());
this.thread.start();
}
}

@Override
public void close() {
this.logger.info("Closing HBase river");
this.stopThread = true;
this.thread = null;
}

private class Parser extends Thread {
private Parser() {}

@Override
public void run() {
HBaseRiver.this.logger.info("HBase Import Thread has started");
long lastRun = 0;
while (!HBaseRiver.this.stopThread) {
if (lastRun + HBaseRiver.this.interval < System.currentTimeMillis()) {
lastRun = System.currentTimeMillis();
parse();
if (HBaseRiver.this.interval <= 0) {
break;
}
if (!HBaseRiver.this.stopThread) {
HBaseRiver.this.logger.info("HBase Import Thread is waiting for {} Seconds until the next run",
HBaseRiver.this.interval / 1000);
}
}
try {
sleep(1000);
} catch (InterruptedException e) {}
}
HBaseRiver.this.logger.info("HBase Import Thread has finished");
}

private void parse() {
final String timestamp = String.valueOf((int) (System.currentTimeMillis() / 1000));

// TODO fetch data from Hbase according to settings

if (HBaseRiver.this.deleteOldEntries) {
HBaseRiver.this.logger.info("Removing old Hbase entries from ElasticSearch!");
HBaseRiver.this.esClient.prepareDeleteByQuery(HBaseRiver.this.index)
.setTypes(HBaseRiver.this.type)
.setQuery(QueryBuilders.rangeQuery("_timestamp").lt(timestamp))
.execute()
.actionGet();
HBaseRiver.this.logger.info("Old HBase entries have been removed from ElasticSearch!");
}
else {
HBaseRiver.this.logger.info("Not removing old HBase entries from ElasticSearch");
}
}
}
}
12 changes: 12 additions & 0 deletions src/main/java/org/elasticsearch/river/hbase/HBaseRiverModule.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.elasticsearch.river.hbase;

import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.river.River;

public class HBaseRiverModule extends AbstractModule {

@Override
protected void configure() {
bind(River.class).to(HBaseRiver.class).asEagerSingleton();
}
}
2 changes: 1 addition & 1 deletion src/main/resources/es-plugin.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
plugin=org.elasticsearch.plugin.river.hadoop.HadoopRiverPlugin
plugin=org.elasticsearch.plugin.river.hbase.HBaseRiverPlugin

0 comments on commit 572db08

Please sign in to comment.