Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into spark-1403
Browse files Browse the repository at this point in the history
  • Loading branch information
Bharath Bhushan committed Apr 4, 2014
2 parents f3c9a14 + 16b8308 commit 4803c19
Show file tree
Hide file tree
Showing 11 changed files with 328 additions and 12 deletions.
3 changes: 3 additions & 0 deletions bin/load-spark-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ if [ -z "$SPARK_ENV_LOADED" ]; then
use_conf_dir=${SPARK_CONF_DIR:-"$parent_dir/conf"}

if [ -f "${use_conf_dir}/spark-env.sh" ]; then
# Promote all variable declarations to environment (exported) variables
set -a
. "${use_conf_dir}/spark-env.sh"
set +a
fi
fi
4 changes: 2 additions & 2 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ function set_spark_log_conf(){

function set_spark_master(){
if ! [[ "$1" =~ $ARG_FLAG_PATTERN ]]; then
MASTER="$1"
export MASTER="$1"
else
out_error "wrong format for $2"
fi
Expand All @@ -145,7 +145,7 @@ function resolve_spark_master(){
fi

if [ -z "$MASTER" ]; then
MASTER="$DEFAULT_MASTER"
export MASTER="$DEFAULT_MASTER"
fi

}
Expand Down
34 changes: 34 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.input.WholeTextFileInputFormat
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -371,6 +372,39 @@ class SparkContext(
minSplits).map(pair => pair._2.toString)
}

/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each file.
*
* <p> For example, if you have the following files:
* {{{
* hdfs://a-hdfs-path/part-00000
* hdfs://a-hdfs-path/part-00001
* ...
* hdfs://a-hdfs-path/part-nnnnn
* }}}
*
* Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`,
*
* <p> then `rdd` contains
* {{{
* (a-hdfs-path/part-00000, its content)
* (a-hdfs-path/part-00001, its content)
* ...
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @note Small files are perferred, large file is also allowable, but may cause bad performance.
*/
def wholeTextFiles(path: String): RDD[(String, String)] = {
newAPIHadoopFile(
path,
classOf[WholeTextFileInputFormat],
classOf[String],
classOf[String])
}

/**
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
* necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,34 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
*/
def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)

/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each file.
*
* <p> For example, if you have the following files:
* {{{
* hdfs://a-hdfs-path/part-00000
* hdfs://a-hdfs-path/part-00001
* ...
* hdfs://a-hdfs-path/part-nnnnn
* }}}
*
* Do `JavaPairRDD<String, String> rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")`,
*
* <p> then `rdd` contains
* {{{
* (a-hdfs-path/part-00000, its content)
* (a-hdfs-path/part-00001, its content)
* ...
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @note Small files are perferred, large file is also allowable, but may cause bad performance.
*/
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
new JavaPairRDD(sc.wholeTextFiles(path))

/** Get an RDD for a Hadoop SequenceFile with given key and value types.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
Expand Down
13 changes: 8 additions & 5 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,23 @@ object SparkSubmit {
printErrorAndExit("master must start with yarn, mesos, spark, or local")
}

// Because "yarn-standalone" and "yarn-client" encapsulate both the master
// Because "yarn-cluster" and "yarn-client" encapsulate both the master
// and deploy mode, we have some logic to infer the master and deploy mode
// from each other if only one is specified, or exit early if they are at odds.
if (appArgs.deployMode == null && appArgs.master == "yarn-standalone") {
if (appArgs.deployMode == null &&
(appArgs.master == "yarn-standalone" || appArgs.master == "yarn-cluster")) {
appArgs.deployMode = "cluster"
}
if (appArgs.deployMode == "cluster" && appArgs.master == "yarn-client") {
printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" are not compatible")
}
if (appArgs.deployMode == "client" && appArgs.master == "yarn-standalone") {
printErrorAndExit("Deploy mode \"client\" and master \"yarn-standalone\" are not compatible")
if (appArgs.deployMode == "client" &&
(appArgs.master == "yarn-standalone" || appArgs.master == "yarn-cluster")) {
printErrorAndExit("Deploy mode \"client\" and master \"" + appArgs.master
+ "\" are not compatible")
}
if (appArgs.deployMode == "cluster" && appArgs.master.startsWith("yarn")) {
appArgs.master = "yarn-standalone"
appArgs.master = "yarn-cluster"
}
if (appArgs.deployMode != "cluster" && appArgs.master.startsWith("yarn")) {
appArgs.master = "yarn-client"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
outStream.println("Unknown/unsupported param " + unknownParam)
}
outStream.println(
"""Usage: spark-submit <primary binary> [options]
"""Usage: spark-submit <app jar> [options]
|Options:
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
| --deploy-mode DEPLOY_MODE Mode to deploy the app in, either 'client' or 'cluster'.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.input

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit

/**
* A [[org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat CombineFileInputFormat]] for
* reading whole text files. Each file is read as key-value pair, where the key is the file path and
* the value is the entire content of file.
*/

private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[String, String] {
override protected def isSplitable(context: JobContext, file: Path): Boolean = false

override def createRecordReader(
split: InputSplit,
context: TaskAttemptContext): RecordReader[String, String] = {

new CombineFileRecordReader[String, String](
split.asInstanceOf[CombineFileSplit],
context,
classOf[WholeTextFileRecordReader])
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.input

import com.google.common.io.{ByteStreams, Closeables}

import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext

/**
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
* out in a key-value pair, where the key is the file path and the value is the entire content of
* the file.
*/
private[spark] class WholeTextFileRecordReader(
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends RecordReader[String, String] {

private val path = split.getPath(index)
private val fs = path.getFileSystem(context.getConfiguration)

// True means the current file has been processed, then skip it.
private var processed = false

private val key = path.toString
private var value: String = null

override def initialize(split: InputSplit, context: TaskAttemptContext) = {}

override def close() = {}

override def getProgress = if (processed) 1.0f else 0.0f

override def getCurrentKey = key

override def getCurrentValue = value

override def nextKeyValue = {
if (!processed) {
val fileIn = fs.open(path)
val innerBuffer = ByteStreams.toByteArray(fileIn)

value = new Text(innerBuffer).toString
Closeables.close(fileIn, false)

processed = true
true
} else {
false
}
}
}
30 changes: 27 additions & 3 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package org.apache.spark;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.io.*;
import java.util.*;

import scala.Tuple2;
Expand Down Expand Up @@ -599,6 +597,32 @@ public void textFiles() throws IOException {
Assert.assertEquals(expected, readRDD.collect());
}

@Test
public void wholeTextFiles() throws IOException {
byte[] content1 = "spark is easy to use.\n".getBytes();
byte[] content2 = "spark is also easy to use.\n".getBytes();

File tempDir = Files.createTempDir();
String tempDirName = tempDir.getAbsolutePath();
DataOutputStream ds = new DataOutputStream(new FileOutputStream(tempDirName + "/part-00000"));
ds.write(content1);
ds.close();
ds = new DataOutputStream(new FileOutputStream(tempDirName + "/part-00001"));
ds.write(content2);
ds.close();

HashMap<String, String> container = new HashMap<String, String>();
container.put(tempDirName+"/part-00000", new Text(content1).toString());
container.put(tempDirName+"/part-00001", new Text(content2).toString());

JavaPairRDD<String, String> readRDD = sc.wholeTextFiles(tempDirName);
List<Tuple2<String, String>> result = readRDD.collect();

for (Tuple2<String, String> res : result) {
Assert.assertEquals(res._2(), container.get(res._1()));
}
}

@Test
public void textFilesCompressed() throws IOException {
File tempDir = Files.createTempDir();
Expand Down
Loading

0 comments on commit 4803c19

Please sign in to comment.