Skip to content

Commit

Permalink
Merge github.com:apache/spark
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
	core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
	core/src/main/scala/org/apache/spark/ui/SparkUI.scala
  • Loading branch information
andrewor14 committed Mar 19, 2014
2 parents b8ba817 + a18ea00 commit a1c5cd9
Show file tree
Hide file tree
Showing 16 changed files with 298 additions and 121 deletions.
2 changes: 1 addition & 1 deletion NOTICE
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Apache Spark
Copyright 2013 The Apache Software Foundation.
Copyright 2014 The Apache Software Foundation.

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.google.common.base.Optional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job}

import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.Partitioner._
Expand Down Expand Up @@ -558,6 +558,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
}

/**
* Output the RDD to any Hadoop-supported storage system, using
* a Configuration object for that storage system.
*/
def saveAsNewAPIHadoopDataset(conf: Configuration) {
rdd.saveAsNewAPIHadoopDataset(conf)
}

/** Output the RDD to any Hadoop-supported file system. */
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
path: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
master.masterMetricsSystem.getServletHandlers ++
master.applicationMetricsSystem.getServletHandlers ++
Seq[ServletContextHandler](
createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static/*"),
createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"),
createServletHandler("/app/json",
(request: HttpServletRequest) => applicationPage.renderJson(request), master.securityMgr),
createServletHandler("/app",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
private val handlers: Seq[ServletContextHandler] = {
worker.metricsSystem.getServletHandlers ++
Seq[ServletContextHandler](
createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static/*"),
createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static"),
createServletHandler("/log",
(request: HttpServletRequest) => log(request), worker.securityMgr),
createServletHandler("/logPage",
Expand Down
104 changes: 58 additions & 46 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ import scala.reflect.ClassTag

import com.clearspring.analytics.stream.cardinality.HyperLogLog
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, JobContext, SparkHadoopMapReduceUtil}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}

// SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark.
Expand Down Expand Up @@ -603,50 +603,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
val job = new NewAPIHadoopJob(conf)
job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass)

val wrappedConf = new SerializableWritable(job.getConfiguration)
val outpath = new Path(path)
NewFileOutputFormat.setOutputPath(job, outpath)
val jobFormat = outputFormatClass.newInstance
jobFormat.checkOutputSpecs(job)
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
/* "reduce task" <split #> <attempt # = spark task #> */
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
attemptNumber)
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
val format = outputFormatClass.newInstance
format match {
case c: Configurable => c.setConf(wrappedConf.value)
case _ => ()
}
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
while (iter.hasNext) {
val (k, v) = iter.next()
writer.write(k, v)
}
writer.close(hadoopContext)
committer.commitTask(hadoopContext)
return 1
}

/* apparently we need a TaskAttemptID to construct an OutputCommitter;
* however we're only going to use this local OutputCommitter for
* setupJob/commitJob, so we just use a dummy "map" task.
*/
val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
jobCommitter.setupJob(jobTaskContext)
self.context.runJob(self, writeShard _)
jobCommitter.commitJob(jobTaskContext)
job.setOutputFormatClass(outputFormatClass)
job.getConfiguration.set("mapred.output.dir", path)
saveAsNewAPIHadoopDataset(job.getConfiguration)
}

/**
Expand Down Expand Up @@ -692,6 +651,59 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
saveAsHadoopDataset(conf)
}

/**
* Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop
* Configuration object for that storage system. The Conf should set an OutputFormat and any
* output paths required (e.g. a table name to write to) in the same way as it would be
* configured for a Hadoop MapReduce job.
*/
def saveAsNewAPIHadoopDataset(conf: Configuration) {
val job = new NewAPIHadoopJob(conf)
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
val wrappedConf = new SerializableWritable(job.getConfiguration)
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance

if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
// FileOutputFormat ignores the filesystem parameter
jobFormat.checkOutputSpecs(job)
}

def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
/* "reduce task" <split #> <attempt # = spark task #> */
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
attemptNumber)
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
val format = outfmt.newInstance
format match {
case c: Configurable => c.setConf(wrappedConf.value)
case _ => ()
}
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
while (iter.hasNext) {
val (k, v) = iter.next()
writer.write(k, v)
}
writer.close(hadoopContext)
committer.commitTask(hadoopContext)
return 1
}

val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
jobCommitter.setupJob(jobTaskContext)
self.context.runJob(self, writeShard _)
jobCommitter.commitJob(jobTaskContext)
}

/**
* Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
* that storage system. The JobConf should set an OutputFormat and any output paths required
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ private[spark] object JettyUtils extends Logging {
case None =>
throw new Exception("Could not find resource path for Web UI: " + resourceBase)
}
contextHandler.addServlet(holder, path)
contextHandler.setContextPath(path)
contextHandler.addServlet(holder, "/")
contextHandler
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private[spark] class SparkUI(
exec.getHandlers ++
metricsServletHandlers ++
Seq[ServletContextHandler] (
createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static/*"),
createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"),
createRedirectHandler("/", "/stages", basePath)
)
}
Expand Down Expand Up @@ -117,5 +117,5 @@ private[spark] class SparkUI(

private[spark] object SparkUI {
val DEFAULT_PORT = "4040"
val STATIC_RESOURCE_DIR = "org/apache/spark/ui"
val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
}
39 changes: 33 additions & 6 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import scala.io.Source
import com.google.common.io.Files
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress.DefaultCodec
import org.apache.hadoop.mapred.FileAlreadyExistsException
import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, TextOutputFormat}
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
import org.apache.hadoop.mapreduce.Job
import org.scalatest.FunSuite

import org.apache.spark.SparkContext._
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

class FileSuite extends FunSuite with LocalSparkContext {

Expand Down Expand Up @@ -236,18 +237,44 @@ class FileSuite extends FunSuite with LocalSparkContext {
val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
intercept[FileAlreadyExistsException] {
randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath)
}
}

test ("prevent user from overwriting the non-empty directory (new Hadoop API)") {
sc = new SparkContext("local", "test")
val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
randomRDD.saveAsTextFile(tempdir.getPath + "/output")
assert(new File(tempdir.getPath + "/output/part-00000").exists() === true)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath + "/output")
assert(new File(tempdir.getPath + "/output/part-r-00000").exists() === true)
intercept[FileAlreadyExistsException] {
randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath)
}
}

test ("save Hadoop Dataset through old Hadoop API") {
sc = new SparkContext("local", "test")
val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
val job = new JobConf()
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
job.set("mapred.output.format.class", classOf[TextOutputFormat[String, String]].getName)
job.set("mapred.output.dir", tempdir.getPath + "/outputDataset_old")
randomRDD.saveAsHadoopDataset(job)
assert(new File(tempdir.getPath + "/outputDataset_old/part-00000").exists() === true)
}

test ("save Hadoop Dataset through new Hadoop API") {
sc = new SparkContext("local", "test")
val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
val job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]])
job.getConfiguration.set("mapred.output.dir", tempdir.getPath + "/outputDataset_new")
randomRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
assert(new File(tempdir.getPath + "/outputDataset_new/part-r-00000").exists() === true)
}
}
32 changes: 32 additions & 0 deletions make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ echo "Version is ${VERSION}"
# Initialize defaults
SPARK_HADOOP_VERSION=1.0.4
SPARK_YARN=false
SPARK_TACHYON=false
MAKE_TGZ=false

# Parse arguments
Expand All @@ -70,6 +71,9 @@ while (( "$#" )); do
--with-yarn)
SPARK_YARN=true
;;
--with-tachyon)
SPARK_TACHYON=true
;;
--tgz)
MAKE_TGZ=true
;;
Expand All @@ -90,6 +94,12 @@ else
echo "YARN disabled"
fi

if [ "$SPARK_TACHYON" == "true" ]; then
echo "Tachyon Enabled"
else
echo "Tachyon Disabled"
fi

# Build fat JAR
export SPARK_HADOOP_VERSION
export SPARK_YARN
Expand All @@ -113,6 +123,28 @@ cp -r "$FWDIR/python" "$DISTDIR"
cp -r "$FWDIR/sbin" "$DISTDIR"


# Download and copy in tachyon, if requested
if [ "$SPARK_TACHYON" == "true" ]; then
TACHYON_VERSION="0.4.1"
TACHYON_URL="https://github.com/amplab/tachyon/releases/download/v${TACHYON_VERSION}/tachyon-${TACHYON_VERSION}-bin.tar.gz"

TMPD=`mktemp -d`

pushd $TMPD > /dev/null
echo "Fetchting tachyon tgz"
wget "$TACHYON_URL"

tar xf "tachyon-${TACHYON_VERSION}-bin.tar.gz"
cp "tachyon-${TACHYON_VERSION}/target/tachyon-${TACHYON_VERSION}-jar-with-dependencies.jar" "$DISTDIR/jars"
mkdir -p "$DISTDIR/tachyon/src/main/java/tachyon/web"
cp -r "tachyon-${TACHYON_VERSION}"/{bin,conf,libexec} "$DISTDIR/tachyon"
cp -r "tachyon-${TACHYON_VERSION}"/src/main/java/tachyon/web/resources "$DISTDIR/tachyon/src/main/java/tachyon/web"
sed -i "s|export TACHYON_JAR=\$TACHYON_HOME/target/\(.*\)|# This is set for spark's make-distribution\n export TACHYON_JAR=\$TACHYON_HOME/../../jars/\1|" "$DISTDIR/tachyon/libexec/tachyon-config.sh"

popd > /dev/null
rm -rf $TMPD
fi

if [ "$MAKE_TGZ" == "true" ]; then
TARDIR="$FWDIR/spark-$VERSION"
cp -r "$DISTDIR" "$TARDIR"
Expand Down
Loading

0 comments on commit a1c5cd9

Please sign in to comment.