Skip to content

Commit

Permalink
Merge pull request apache#318 from srowen/master
Browse files Browse the repository at this point in the history
Suggested small changes to Java code for slightly more standard style, encapsulation and in some cases performance

Sorry if this is too abrupt or not a welcome set of changes, but thought I'd see if I could contribute a little. I'm a Java developer and just getting seriously into Spark. So I thought I'd suggest a number of small changes to the couple Java parts of the code to make it a little tighter, more standard and even a bit faster.

Feel free to take all, some or none of this. Happy to explain any of it.
  • Loading branch information
rxin committed Jan 7, 2014
2 parents 468af0f + 4b92a20 commit 15d9534
Show file tree
Hide file tree
Showing 20 changed files with 174 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@

class FileClient {

private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
private static final Logger LOG = LoggerFactory.getLogger(FileClient.class.getName());

private final FileClientHandler handler;
private Channel channel = null;
private Bootstrap bootstrap = null;
private EventLoopGroup group = null;
private final int connectTimeout;
private final int sendTimeout = 60; // 1 min

public FileClient(FileClientHandler handler, int connectTimeout) {
FileClient(FileClientHandler handler, int connectTimeout) {
this.handler = handler;
this.connectTimeout = connectTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {

private final FileClientHandler fhandler;

public FileClientChannelInitializer(FileClientHandler handler) {
FileClientChannelInitializer(FileClientHandler handler) {
fhandler = handler;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@
*/
class FileServer {

private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
private static final Logger LOG = LoggerFactory.getLogger(FileServer.class.getName());

private EventLoopGroup bossGroup = null;
private EventLoopGroup workerGroup = null;
private ChannelFuture channelFuture = null;
private int port = 0;
private Thread blockingThread = null;

public FileServer(PathResolver pResolver, int port) {
FileServer(PathResolver pResolver, int port) {
InetSocketAddress addr = new InetSocketAddress(port);

// Configure the server.
Expand Down Expand Up @@ -70,7 +69,8 @@ public FileServer(PathResolver pResolver, int port) {
* Start the file server asynchronously in a new thread.
*/
public void start() {
blockingThread = new Thread() {
Thread blockingThread = new Thread() {
@Override
public void run() {
try {
channelFuture.channel().closeFuture().sync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@

class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> {

PathResolver pResolver;
private final PathResolver pResolver;

public FileServerChannelInitializer(PathResolver pResolver) {
FileServerChannelInitializer(PathResolver pResolver) {
this.pResolver = pResolver;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@

class FileServerHandler extends SimpleChannelInboundHandler<String> {

private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
private static final Logger LOG = LoggerFactory.getLogger(FileServerHandler.class.getName());

private final PathResolver pResolver;

public FileServerHandler(PathResolver pResolver){
FileServerHandler(PathResolver pResolver){
this.pResolver = pResolver;
}

Expand All @@ -61,7 +61,7 @@ public void channelRead0(ChannelHandlerContext ctx, String blockIdString) {
ctx.flush();
return;
}
int len = new Long(length).intValue();
int len = (int) length;
ctx.write((new FileHeader(len, blockId)).buffer());
try {
ctx.write(new DefaultFileRegion(new FileInputStream(file)
Expand Down
52 changes: 26 additions & 26 deletions core/src/main/java/org/apache/spark/network/netty/PathResolver.java
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.netty;

import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.FileSegment;

public interface PathResolver {
/** Get the file segment in which the given block resides. */
public FileSegment getBlockLocation(BlockId blockId);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.netty;

import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.FileSegment;

public interface PathResolver {
/** Get the file segment in which the given block resides. */
FileSegment getBlockLocation(BlockId blockId);
}
29 changes: 16 additions & 13 deletions examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@

import java.io.Serializable;
import java.util.Arrays;
import java.util.StringTokenizer;
import java.util.Random;
import java.util.regex.Pattern;

/**
* Logistic regression based classification.
*/
public class JavaHdfsLR {
public final class JavaHdfsLR {

static int D = 10; // Number of dimensions
static Random rand = new Random(42);
private static final int D = 10; // Number of dimensions
private static final Random rand = new Random(42);

static class DataPoint implements Serializable {
public DataPoint(double[] x, double y) {
DataPoint(double[] x, double y) {
this.x = x;
this.y = y;
}
Expand All @@ -46,20 +46,22 @@ public DataPoint(double[] x, double y) {
}

static class ParsePoint extends Function<String, DataPoint> {
private static final Pattern SPACE = Pattern.compile(" ");

@Override
public DataPoint call(String line) {
StringTokenizer tok = new StringTokenizer(line, " ");
double y = Double.parseDouble(tok.nextToken());
String[] tok = SPACE.split(line);
double y = Double.parseDouble(tok[0]);
double[] x = new double[D];
int i = 0;
while (i < D) {
x[i] = Double.parseDouble(tok.nextToken());
i += 1;
for (int i = 0; i < D; i++) {
x[i] = Double.parseDouble(tok[i + 1]);
}
return new DataPoint(x, y);
}
}

static class VectorSum extends Function2<double[], double[], double[]> {
@Override
public double[] call(double[] a, double[] b) {
double[] result = new double[D];
for (int j = 0; j < D; j++) {
Expand All @@ -70,12 +72,13 @@ public double[] call(double[] a, double[] b) {
}

static class ComputeGradient extends Function<DataPoint, double[]> {
double[] weights;
private final double[] weights;

public ComputeGradient(double[] weights) {
ComputeGradient(double[] weights) {
this.weights = weights;
}

@Override
public double[] call(DataPoint p) {
double[] gradient = new double[D];
for (int i = 0; i < D; i++) {
Expand Down
23 changes: 15 additions & 8 deletions examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,24 @@

import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

/**
* K-means clustering using Java API.
*/
public class JavaKMeans {
public final class JavaKMeans {

private static final Pattern SPACE = Pattern.compile(" ");

/** Parses numbers split by whitespace to a vector */
static Vector parseVector(String line) {
String[] splits = line.split(" ");
String[] splits = SPACE.split(line);
double[] data = new double[splits.length];
int i = 0;
for (String s : splits)
data[i] = Double.parseDouble(splits[i++]);
for (String s : splits) {
data[i] = Double.parseDouble(s);
i++;
}
return new Vector(data);
}

Expand Down Expand Up @@ -82,7 +87,7 @@ public static void main(String[] args) throws Exception {
JavaRDD<Vector> data = sc.textFile(path).map(
new Function<String, Vector>() {
@Override
public Vector call(String line) throws Exception {
public Vector call(String line) {
return parseVector(line);
}
}
Expand All @@ -96,7 +101,7 @@ public Vector call(String line) throws Exception {
JavaPairRDD<Integer, Vector> closest = data.map(
new PairFunction<Vector, Integer, Vector>() {
@Override
public Tuple2<Integer, Vector> call(Vector vector) throws Exception {
public Tuple2<Integer, Vector> call(Vector vector) {
return new Tuple2<Integer, Vector>(
closestPoint(vector, centroids), vector);
}
Expand All @@ -107,7 +112,8 @@ public Tuple2<Integer, Vector> call(Vector vector) throws Exception {
JavaPairRDD<Integer, List<Vector>> pointsGroup = closest.groupByKey();
Map<Integer, Vector> newCentroids = pointsGroup.mapValues(
new Function<List<Vector>, Vector>() {
public Vector call(List<Vector> ps) throws Exception {
@Override
public Vector call(List<Vector> ps) {
return average(ps);
}
}).collectAsMap();
Expand All @@ -122,8 +128,9 @@ public Vector call(List<Vector> ps) throws Exception {
} while (tempDist > convergeDist);

System.out.println("Final centers:");
for (Vector c : centroids)
for (Vector c : centroids) {
System.out.println(c);
}

System.exit(0);

Expand Down
20 changes: 10 additions & 10 deletions examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
/**
* Executes a roll up-style query against Apache logs.
*/
public class JavaLogQuery {
public final class JavaLogQuery {

public static List<String> exampleApacheLogs = Lists.newArrayList(
public static final List<String> exampleApacheLogs = Lists.newArrayList(
"10.10.10.10 - \"FRED\" [18/Jan/2013:17:56:07 +1100] \"GET http://images.com/2013/Generic.jpg " +
"HTTP/1.1\" 304 315 \"http://referall.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; " +
"Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; " +
Expand All @@ -51,14 +51,14 @@ public class JavaLogQuery {
"3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.352 \"-\" - \"\" 256 977 988 \"\" " +
"0 73.23.2.15 images.com 1358492557 - Whatup");

public static Pattern apacheLogRegex = Pattern.compile(
public static final Pattern apacheLogRegex = Pattern.compile(
"^([\\d.]+) (\\S+) (\\S+) \\[([\\w\\d:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([\\d\\-]+) \"([^\"]+)\" \"([^\"]+)\".*");

/** Tracks the total query count and number of aggregate bytes for a particular group. */
public static class Stats implements Serializable {

private int count;
private int numBytes;
private final int count;
private final int numBytes;

public Stats(int count, int numBytes) {
this.count = count;
Expand Down Expand Up @@ -92,12 +92,12 @@ public static Stats extractStats(String line) {
if (m.find()) {
int bytes = Integer.parseInt(m.group(7));
return new Stats(1, bytes);
}
else
} else {
return new Stats(1, 0);
}
}

public static void main(String[] args) throws Exception {
public static void main(String[] args) {
if (args.length == 0) {
System.err.println("Usage: JavaLogQuery <master> [logFile]");
System.exit(1);
Expand All @@ -110,14 +110,14 @@ public static void main(String[] args) throws Exception {

JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.map(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
@Override
public Tuple2<Tuple3<String, String, String>, Stats> call(String s) throws Exception {
public Tuple2<Tuple3<String, String, String>, Stats> call(String s) {
return new Tuple2<Tuple3<String, String, String>, Stats>(extractKey(s), extractStats(s));
}
});

JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(new Function2<Stats, Stats, Stats>() {
@Override
public Stats call(Stats stats, Stats stats2) throws Exception {
public Stats call(Stats stats, Stats stats2) {
return stats.merge(stats2);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.examples;

import org.apache.spark.SparkContext;
import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
Expand All @@ -29,6 +28,7 @@

import java.util.List;
import java.util.ArrayList;
import java.util.regex.Pattern;

/**
* Computes the PageRank of URLs from an input file. Input file should
Expand All @@ -39,7 +39,9 @@
* ...
* where URL and their neighbors are separated by space(s).
*/
public class JavaPageRank {
public final class JavaPageRank {
private static final Pattern SPACES = Pattern.compile("\\s+");

private static class Sum extends Function2<Double, Double, Double> {
@Override
public Double call(Double a, Double b) {
Expand Down Expand Up @@ -67,15 +69,15 @@ public static void main(String[] args) throws Exception {
JavaPairRDD<String, List<String>> links = lines.map(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) {
String[] parts = s.split("\\s+");
String[] parts = SPACES.split(s);
return new Tuple2<String, String>(parts[0], parts[1]);
}
}).distinct().groupByKey().cache();

// Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
JavaPairRDD<String, Double> ranks = links.mapValues(new Function<List<String>, Double>() {
@Override
public Double call(List<String> rs) throws Exception {
public Double call(List<String> rs) {
return 1.0;
}
});
Expand All @@ -98,7 +100,7 @@ public Iterable<Tuple2<String, Double>> call(Tuple2<List<String>, Double> s) {
// Re-calculates URL ranks based on neighbor contributions.
ranks = contribs.reduceByKey(new Sum()).mapValues(new Function<Double, Double>() {
@Override
public Double call(Double sum) throws Exception {
public Double call(Double sum) {
return 0.15 + sum * 0.85;
}
});
Expand Down
Loading

0 comments on commit 15d9534

Please sign in to comment.