Skip to content

Commit

Permalink
Attempting major upgrade to scalding/hbase/cdh
Browse files Browse the repository at this point in the history
  • Loading branch information
Antwnis committed Jan 29, 2015
1 parent 4c1f607 commit dcd247b
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 40 deletions.
22 changes: 19 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<name>Cascading and Scalding wrapper for HBase with advanced features</name>
<groupId>parallelai</groupId>
<artifactId>parallelai.spyglass</artifactId>
<version>2.10_0.12.0_5.3.0</version>
<version>2.10_0.12.0_5.3.0_beta</version>
<packaging>jar</packaging>

<properties>
Expand All @@ -37,7 +37,7 @@

<!-- Scala/Scalding/Cascading properties -->
<!-- can be 2.9.3 and 2.10.2 -->
<scala.version>2.10.3</scala.version>
<scala.version>2.10.4</scala.version>
<!-- 2.10 for Scala 2.10.2 and 2.9.3 for Scala version 2.9.3 -->
<scalding.scala.version>2.10</scalding.scala.version>
<scalding.version>0.12.0</scalding.version>
Expand Down Expand Up @@ -153,7 +153,17 @@
<!-- HBase -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>

Expand All @@ -180,6 +190,12 @@
<version>4.1</version>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.7</version>
</dependency>

<!-- Testing dependencies (ScalaSpec / ScalaTest / JUnit) -->
<dependency>
<groupId>org.specs2</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HServerAddress;
//import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Addressing;
Expand Down Expand Up @@ -110,16 +110,16 @@ public HBaseTableSplitGranular[] getSplits(JobConf job, int numSplits) throws IO
: maxKey;

HRegionLocation regionLoc = table.getRegionLocation(keys.getFirst()[i]);
HServerAddress regionServerAddress = regionLoc.getServerAddress();
InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress();
String regionLocation;
try {
regionLocation = reverseDNS(regionAddress);
} catch (NamingException e) {
LOG.error("Cannot resolve the host name for " + regionAddress
+ " because of " + e);
regionLocation = regionServerAddress.getHostname();
}
// HServerAddress regionServerAddress = regionLoc.getServerName().getHostname();
// InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress();
String regionLocation = regionLoc.getServerName().getHostname();
// try {
// regionLocation = reverseDNS(regionAddress);
// } catch (NamingException e) {
// LOG.error("Cannot resolve the host name for " + regionAddress
// + " because of " + e);
// regionLocation = regionServerAddress.getHostname();
// }

regionNames[i] = regionLoc.getRegionInfo().getRegionNameAsString();

Expand Down Expand Up @@ -189,18 +189,18 @@ public HBaseTableSplitGranular[] getSplits(JobConf job, int numSplits) throws IO
byte[] rStart = cRegion.getRegionInfo().getStartKey();
byte[] rStop = cRegion.getRegionInfo().getEndKey();

HServerAddress regionServerAddress = cRegion
.getServerAddress();
InetAddress regionAddress = regionServerAddress
.getInetSocketAddress().getAddress();
String regionLocation;
try {
regionLocation = reverseDNS(regionAddress);
} catch (NamingException e) {
LOG.error("Cannot resolve the host name for "
+ regionAddress + " because of " + e);
regionLocation = regionServerAddress.getHostname();
}
// HServerAddress regionServerAddress = cRegion
// .getServerAddress();
// InetAddress regionAddress = regionServerAddress
// .getInetSocketAddress().getAddress();
String regionLocation = cRegion.getHostname();
// try {
// regionLocation = reverseDNS(regionAddress);
// } catch (NamingException e) {
// LOG.error("Cannot resolve the host name for "
// + regionAddress + " because of " + e);
// regionLocation = regionServerAddress.getHostname();
// }

String regionName = cRegion.getRegionInfo().getRegionNameAsString();

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/parallelai/spyglass/hbase/HBaseRawTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public Path getPath() {
return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_"));
}

protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException {
protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
if (hBaseAdmin == null) {
Configuration hbaseConf = HBaseConfiguration.create(conf);
hBaseAdmin = new HBaseAdmin(hbaseConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,10 @@ public boolean next(ImmutableBytesWritable key, Result value)
} else {
key.set(result.getRow());
}

lastSuccessfulRow = key.get();
Writables.copyWritable(result, value);
value.copyFrom(result);
//Writables.copyWritable(result, value); // src , targer
return true;
}
return false;
Expand Down Expand Up @@ -302,7 +303,8 @@ public boolean next(ImmutableBytesWritable key, Result value)
key.set(result.getRow());
}
lastSuccessfulRow = key.get();
Writables.copyWritable(result, value);
value.copyFrom(result);
// Writables.copyWritable(result, value);

return true;
} else {
Expand Down Expand Up @@ -352,7 +354,8 @@ public boolean next(ImmutableBytesWritable key, Result value)
key.set(result.getRow());
}
lastSuccessfulRow = key.get();
Writables.copyWritable(result, value);
value.copyFrom(result);
// Writables.copyWritable(result, value);

return true;
} else {
Expand Down Expand Up @@ -408,7 +411,8 @@ public boolean next(ImmutableBytesWritable key, Result value)
key.set(result.getRow());
}
lastSuccessfulRow = key.get();
Writables.copyWritable(result, value);
value.copyFrom(result);
// Writables.copyWritable(result, value);
return true;
} else {
LOG.debug(String.format("+ Key (%s) return an EMPTY result. Get (%s)", Bytes.toString(nextKey), theGet.getId()) ); //alg0
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/parallelai/spyglass/hbase/HBaseTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public Path getPath() {
return new Path(SCHEME + ":/" + tableName.replaceAll(":", "_"));
}

protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException {
protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
if (hBaseAdmin == null) {
Configuration hbaseConf = HBaseConfiguration.create(conf);
hBaseAdmin = new HBaseAdmin(hbaseConf);
Expand Down
10 changes: 5 additions & 5 deletions src/main/resources/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<description>Cascading and Scalding wrapper for HBase with advanced features</description>
<groupId>parallelai</groupId>
<artifactId>parallelai.spyglass</artifactId>
<version>2.10_0.10_4.4</version>
<version>2.10_0.12_5.3.0_beta</version>
<packaging>jar</packaging>

<organization>
Expand Down Expand Up @@ -68,26 +68,26 @@
<dependency>
<groupId>com.twitter</groupId>
<artifactId>scalding-core_2.10</artifactId>
<version>0.10.0</version>
<version>0.12.0</version>
</dependency>

<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>2.0.0-mr1-cdh4.5.0</version>
<version>2.5.0-mr1-cdh5.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.0.0-cdh4.5.0</version>
<version>2.5.0-cdh5.3.0</version>
</dependency>

<!-- HBase -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.94.6-cdh4.5.0</version>
<version>0.98.6-cdh5.3.0</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import parallelai.spyglass.base.JobRunner
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, HBaseConfiguration}
import org.apache.hadoop.hbase.client.{Put, HTable, HConnectionManager, HBaseAdmin}
import org.apache.hadoop.hbase.io.hfile.Compression
import org.apache.hadoop.hbase.regionserver.StoreFile
import org.apache.hadoop.hbase.util.Bytes
import parallelai.spyglass.hbase.HBaseSalter
Expand Down Expand Up @@ -71,4 +70,4 @@ object HBaseExampleRunner extends App {
"--output", output, "--debug", "true", "--job.lib.path", jobLibPath, "--quorum", quorum))


}
}

0 comments on commit dcd247b

Please sign in to comment.