diff --git a/bin/load-spark-env.sh b/bin/load-spark-env.sh
index 476dd826551fd..d425f9feaac54 100644
--- a/bin/load-spark-env.sh
+++ b/bin/load-spark-env.sh
@@ -30,6 +30,9 @@ if [ -z "$SPARK_ENV_LOADED" ]; then
use_conf_dir=${SPARK_CONF_DIR:-"$parent_dir/conf"}
if [ -f "${use_conf_dir}/spark-env.sh" ]; then
+ # Promote all variable declarations to environment (exported) variables
+ set -a
. "${use_conf_dir}/spark-env.sh"
+ set +a
fi
fi
diff --git a/bin/spark-shell b/bin/spark-shell
index fac006cf492ed..535ee3ccd8269 100755
--- a/bin/spark-shell
+++ b/bin/spark-shell
@@ -127,7 +127,7 @@ function set_spark_log_conf(){
function set_spark_master(){
if ! [[ "$1" =~ $ARG_FLAG_PATTERN ]]; then
- MASTER="$1"
+ export MASTER="$1"
else
out_error "wrong format for $2"
fi
@@ -145,7 +145,7 @@ function resolve_spark_master(){
fi
if [ -z "$MASTER" ]; then
- MASTER="$DEFAULT_MASTER"
+ export MASTER="$DEFAULT_MASTER"
fi
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b23accbbb9410..28a865c0ad3b5 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -37,6 +37,7 @@ import org.apache.mesos.MesosNativeLibrary
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
+import org.apache.spark.input.WholeTextFileInputFormat
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
@@ -371,6 +372,39 @@ class SparkContext(
minSplits).map(pair => pair._2.toString)
}
+ /**
+ * Read a directory of text files from HDFS, a local file system (available on all nodes), or any
+ * Hadoop-supported file system URI. Each file is read as a single record and returned in a
+ * key-value pair, where the key is the path of each file, the value is the content of each file.
+ *
+ *
For example, if you have the following files:
+ * {{{
+ * hdfs://a-hdfs-path/part-00000
+ * hdfs://a-hdfs-path/part-00001
+ * ...
+ * hdfs://a-hdfs-path/part-nnnnn
+ * }}}
+ *
+ * Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`,
+ *
+ *
then `rdd` contains
+ * {{{
+ * (a-hdfs-path/part-00000, its content)
+ * (a-hdfs-path/part-00001, its content)
+ * ...
+ * (a-hdfs-path/part-nnnnn, its content)
+ * }}}
+ *
+ * @note Small files are perferred, large file is also allowable, but may cause bad performance.
+ */
+ def wholeTextFiles(path: String): RDD[(String, String)] = {
+ newAPIHadoopFile(
+ path,
+ classOf[WholeTextFileInputFormat],
+ classOf[String],
+ classOf[String])
+ }
+
/**
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
* necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index e531a57aced31..6cbdeac58d5e2 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -154,6 +154,34 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
*/
def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)
+ /**
+ * Read a directory of text files from HDFS, a local file system (available on all nodes), or any
+ * Hadoop-supported file system URI. Each file is read as a single record and returned in a
+ * key-value pair, where the key is the path of each file, the value is the content of each file.
+ *
+ *
For example, if you have the following files:
+ * {{{
+ * hdfs://a-hdfs-path/part-00000
+ * hdfs://a-hdfs-path/part-00001
+ * ...
+ * hdfs://a-hdfs-path/part-nnnnn
+ * }}}
+ *
+ * Do `JavaPairRDD rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")`,
+ *
+ * then `rdd` contains
+ * {{{
+ * (a-hdfs-path/part-00000, its content)
+ * (a-hdfs-path/part-00001, its content)
+ * ...
+ * (a-hdfs-path/part-nnnnn, its content)
+ * }}}
+ *
+ * @note Small files are perferred, large file is also allowable, but may cause bad performance.
+ */
+ def wholeTextFiles(path: String): JavaPairRDD[String, String] =
+ new JavaPairRDD(sc.wholeTextFiles(path))
+
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 1fa799190409f..e05fbfe321495 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -79,20 +79,23 @@ object SparkSubmit {
printErrorAndExit("master must start with yarn, mesos, spark, or local")
}
- // Because "yarn-standalone" and "yarn-client" encapsulate both the master
+ // Because "yarn-cluster" and "yarn-client" encapsulate both the master
// and deploy mode, we have some logic to infer the master and deploy mode
// from each other if only one is specified, or exit early if they are at odds.
- if (appArgs.deployMode == null && appArgs.master == "yarn-standalone") {
+ if (appArgs.deployMode == null &&
+ (appArgs.master == "yarn-standalone" || appArgs.master == "yarn-cluster")) {
appArgs.deployMode = "cluster"
}
if (appArgs.deployMode == "cluster" && appArgs.master == "yarn-client") {
printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" are not compatible")
}
- if (appArgs.deployMode == "client" && appArgs.master == "yarn-standalone") {
- printErrorAndExit("Deploy mode \"client\" and master \"yarn-standalone\" are not compatible")
+ if (appArgs.deployMode == "client" &&
+ (appArgs.master == "yarn-standalone" || appArgs.master == "yarn-cluster")) {
+ printErrorAndExit("Deploy mode \"client\" and master \"" + appArgs.master
+ + "\" are not compatible")
}
if (appArgs.deployMode == "cluster" && appArgs.master.startsWith("yarn")) {
- appArgs.master = "yarn-standalone"
+ appArgs.master = "yarn-cluster"
}
if (appArgs.deployMode != "cluster" && appArgs.master.startsWith("yarn")) {
appArgs.master = "yarn-client"
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 9c8f54ea6f77a..834b3df2f164b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -171,7 +171,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
outStream.println("Unknown/unsupported param " + unknownParam)
}
outStream.println(
- """Usage: spark-submit [options]
+ """Usage: spark-submit [options]
|Options:
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
| --deploy-mode DEPLOY_MODE Mode to deploy the app in, either 'client' or 'cluster'.
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
new file mode 100644
index 0000000000000..4887fb6b84eb2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+
+/**
+ * A [[org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat CombineFileInputFormat]] for
+ * reading whole text files. Each file is read as key-value pair, where the key is the file path and
+ * the value is the entire content of file.
+ */
+
+private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[String, String] {
+ override protected def isSplitable(context: JobContext, file: Path): Boolean = false
+
+ override def createRecordReader(
+ split: InputSplit,
+ context: TaskAttemptContext): RecordReader[String, String] = {
+
+ new CombineFileRecordReader[String, String](
+ split.asInstanceOf[CombineFileSplit],
+ context,
+ classOf[WholeTextFileRecordReader])
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
new file mode 100644
index 0000000000000..c3dabd2e79995
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import com.google.common.io.{ByteStreams, Closeables}
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+/**
+ * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
+ * out in a key-value pair, where the key is the file path and the value is the entire content of
+ * the file.
+ */
+private[spark] class WholeTextFileRecordReader(
+ split: CombineFileSplit,
+ context: TaskAttemptContext,
+ index: Integer)
+ extends RecordReader[String, String] {
+
+ private val path = split.getPath(index)
+ private val fs = path.getFileSystem(context.getConfiguration)
+
+ // True means the current file has been processed, then skip it.
+ private var processed = false
+
+ private val key = path.toString
+ private var value: String = null
+
+ override def initialize(split: InputSplit, context: TaskAttemptContext) = {}
+
+ override def close() = {}
+
+ override def getProgress = if (processed) 1.0f else 0.0f
+
+ override def getCurrentKey = key
+
+ override def getCurrentValue = value
+
+ override def nextKeyValue = {
+ if (!processed) {
+ val fileIn = fs.open(path)
+ val innerBuffer = ByteStreams.toByteArray(fileIn)
+
+ value = new Text(innerBuffer).toString
+ Closeables.close(fileIn, false)
+
+ processed = true
+ true
+ } else {
+ false
+ }
+ }
+}
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index c6b65c7348ae0..2372f2d9924a1 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -17,9 +17,7 @@
package org.apache.spark;
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
+import java.io.*;
import java.util.*;
import scala.Tuple2;
@@ -599,6 +597,32 @@ public void textFiles() throws IOException {
Assert.assertEquals(expected, readRDD.collect());
}
+ @Test
+ public void wholeTextFiles() throws IOException {
+ byte[] content1 = "spark is easy to use.\n".getBytes();
+ byte[] content2 = "spark is also easy to use.\n".getBytes();
+
+ File tempDir = Files.createTempDir();
+ String tempDirName = tempDir.getAbsolutePath();
+ DataOutputStream ds = new DataOutputStream(new FileOutputStream(tempDirName + "/part-00000"));
+ ds.write(content1);
+ ds.close();
+ ds = new DataOutputStream(new FileOutputStream(tempDirName + "/part-00001"));
+ ds.write(content2);
+ ds.close();
+
+ HashMap container = new HashMap();
+ container.put(tempDirName+"/part-00000", new Text(content1).toString());
+ container.put(tempDirName+"/part-00001", new Text(content2).toString());
+
+ JavaPairRDD readRDD = sc.wholeTextFiles(tempDirName);
+ List> result = readRDD.collect();
+
+ for (Tuple2 res : result) {
+ Assert.assertEquals(res._2(), container.get(res._1()));
+ }
+ }
+
@Test
public void textFilesCompressed() throws IOException {
File tempDir = Files.createTempDir();
diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
new file mode 100644
index 0000000000000..09e35bfc8f85f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import java.io.DataOutputStream
+import java.io.File
+import java.io.FileOutputStream
+
+import scala.collection.immutable.IndexedSeq
+
+import com.google.common.io.Files
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import org.apache.hadoop.io.Text
+
+import org.apache.spark.SparkContext
+
+/**
+ * Tests the correctness of
+ * [[org.apache.spark.input.WholeTextFileRecordReader WholeTextFileRecordReader]]. A temporary
+ * directory is created as fake input. Temporal storage would be deleted in the end.
+ */
+class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
+ private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+
+ // Set the block size of local file system to test whether files are split right or not.
+ sc.hadoopConfiguration.setLong("fs.local.block.size", 32)
+ }
+
+ override def afterAll() {
+ sc.stop()
+ }
+
+ private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte]) = {
+ val out = new DataOutputStream(new FileOutputStream(s"${inputDir.toString}/$fileName"))
+ out.write(contents, 0, contents.length)
+ out.close()
+ }
+
+ /**
+ * This code will test the behaviors of WholeTextFileRecordReader based on local disk. There are
+ * three aspects to check:
+ * 1) Whether all files are read;
+ * 2) Whether paths are read correctly;
+ * 3) Does the contents be the same.
+ */
+ test("Correctness of WholeTextFileRecordReader.") {
+
+ val dir = Files.createTempDir()
+ println(s"Local disk address is ${dir.toString}.")
+
+ WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
+ createNativeFile(dir, filename, contents)
+ }
+
+ val res = sc.wholeTextFiles(dir.toString).collect()
+
+ assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
+ "Number of files read out does not fit with the actual value.")
+
+ for ((filename, contents) <- res) {
+ val shortName = filename.split('/').last
+ assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName),
+ s"Missing file name $filename.")
+ assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString,
+ s"file $filename contents can not match.")
+ }
+
+ dir.delete()
+ }
+}
+
+/**
+ * Files to be tested are defined here.
+ */
+object WholeTextFileRecordReaderSuite {
+ private val testWords: IndexedSeq[Byte] = "Spark is easy to use.\n".map(_.toByte)
+
+ private val fileNames = Array("part-00000", "part-00001", "part-00002")
+ private val fileLengths = Array(10, 100, 1000)
+
+ private val files = fileLengths.zip(fileNames).map { case (upperBound, filename) =>
+ filename -> Stream.continually(testWords.toList.toStream).flatten.take(upperBound).toArray
+ }.toMap
+}
diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md
index b69e3416fb322..7f75ea44e4cea 100644
--- a/docs/cluster-overview.md
+++ b/docs/cluster-overview.md
@@ -56,7 +56,7 @@ The recommended way to launch a compiled Spark application is through the spark-
bin directory), which takes care of setting up the classpath with Spark and its dependencies, as well as
provides a layer over the different cluster managers and deploy modes that Spark supports. It's usage is
- spark-submit `` ``
+ spark-submit `` ``
Where options are any of: