From 2fa26ec02fc2251102f89bb67523419fd7dd3757 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 18 Mar 2014 11:06:18 -0700 Subject: [PATCH 1/6] SPARK-1102: Create a saveAsNewAPIHadoopDataset method https://spark-project.atlassian.net/browse/SPARK-1102 Create a saveAsNewAPIHadoopDataset method By @mateiz: "Right now RDDs can only be saved as files using the new Hadoop API, not as "datasets" with no filename and just a JobConf. See http://codeforhire.com/2014/02/18/using-spark-with-mongodb/ for an example of how you have to give a bogus filename. For the old Hadoop API, we have saveAsHadoopDataset." Author: CodingCat Closes #12 from CodingCat/SPARK-1102 and squashes the following commits: 6ba0c83 [CodingCat] add test cases for saveAsHadoopDataSet (new&old API) a8d11ba [CodingCat] style fix......... 95a6929 [CodingCat] code clean 7643c88 [CodingCat] change the parameter type back to Configuration a8583ee [CodingCat] Create a saveAsNewAPIHadoopDataset method --- .../apache/spark/api/java/JavaPairRDD.scala | 10 +- .../apache/spark/rdd/PairRDDFunctions.scala | 104 ++++++++++-------- .../scala/org/apache/spark/FileSuite.scala | 39 ++++++- 3 files changed, 100 insertions(+), 53 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 0ff428c120353..9596dbaf75488 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -26,7 +26,7 @@ import com.google.common.base.Optional import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{JobConf, OutputFormat} -import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job} import org.apache.spark.{HashPartitioner, Partitioner} import org.apache.spark.Partitioner._ @@ -558,6 +558,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf) } + /** + * Output the RDD to any Hadoop-supported storage system, using + * a Configuration object for that storage system. + */ + def saveAsNewAPIHadoopDataset(conf: Configuration) { + rdd.saveAsNewAPIHadoopDataset(conf) + } + /** Output the RDD to any Hadoop-supported file system. */ def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]]( path: String, diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index b0d322fe27bd5..447deafff53cd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -30,11 +30,11 @@ import scala.reflect.ClassTag import com.clearspring.analytics.stream.cardinality.HyperLogLog import org.apache.hadoop.conf.{Configurable, Configuration} -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat} -import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, JobContext, SparkHadoopMapReduceUtil} +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil} import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} // SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark. @@ -603,50 +603,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) val job = new NewAPIHadoopJob(conf) job.setOutputKeyClass(keyClass) job.setOutputValueClass(valueClass) - - val wrappedConf = new SerializableWritable(job.getConfiguration) - val outpath = new Path(path) - NewFileOutputFormat.setOutputPath(job, outpath) - val jobFormat = outputFormatClass.newInstance - jobFormat.checkOutputSpecs(job) - val formatter = new SimpleDateFormat("yyyyMMddHHmm") - val jobtrackerID = formatter.format(new Date()) - val stageId = self.id - def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = { - // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it - // around by taking a mod. We expect that no task will be attempted 2 billion times. - val attemptNumber = (context.attemptId % Int.MaxValue).toInt - /* "reduce task" */ - val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId, - attemptNumber) - val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) - val format = outputFormatClass.newInstance - format match { - case c: Configurable => c.setConf(wrappedConf.value) - case _ => () - } - val committer = format.getOutputCommitter(hadoopContext) - committer.setupTask(hadoopContext) - val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]] - while (iter.hasNext) { - val (k, v) = iter.next() - writer.write(k, v) - } - writer.close(hadoopContext) - committer.commitTask(hadoopContext) - return 1 - } - - /* apparently we need a TaskAttemptID to construct an OutputCommitter; - * however we're only going to use this local OutputCommitter for - * setupJob/commitJob, so we just use a dummy "map" task. - */ - val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0) - val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) - val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) - jobCommitter.setupJob(jobTaskContext) - self.context.runJob(self, writeShard _) - jobCommitter.commitJob(jobTaskContext) + job.setOutputFormatClass(outputFormatClass) + job.getConfiguration.set("mapred.output.dir", path) + saveAsNewAPIHadoopDataset(job.getConfiguration) } /** @@ -692,6 +651,59 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) saveAsHadoopDataset(conf) } + /** + * Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop + * Configuration object for that storage system. The Conf should set an OutputFormat and any + * output paths required (e.g. a table name to write to) in the same way as it would be + * configured for a Hadoop MapReduce job. + */ + def saveAsNewAPIHadoopDataset(conf: Configuration) { + val job = new NewAPIHadoopJob(conf) + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + val jobtrackerID = formatter.format(new Date()) + val stageId = self.id + val wrappedConf = new SerializableWritable(job.getConfiguration) + val outfmt = job.getOutputFormatClass + val jobFormat = outfmt.newInstance + + if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) { + // FileOutputFormat ignores the filesystem parameter + jobFormat.checkOutputSpecs(job) + } + + def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = { + // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it + // around by taking a mod. We expect that no task will be attempted 2 billion times. + val attemptNumber = (context.attemptId % Int.MaxValue).toInt + /* "reduce task" */ + val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId, + attemptNumber) + val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) + val format = outfmt.newInstance + format match { + case c: Configurable => c.setConf(wrappedConf.value) + case _ => () + } + val committer = format.getOutputCommitter(hadoopContext) + committer.setupTask(hadoopContext) + val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]] + while (iter.hasNext) { + val (k, v) = iter.next() + writer.write(k, v) + } + writer.close(hadoopContext) + committer.commitTask(hadoopContext) + return 1 + } + + val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0) + val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) + val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) + jobCommitter.setupJob(jobTaskContext) + self.context.runJob(self, writeShard _) + jobCommitter.commitJob(jobTaskContext) + } + /** * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for * that storage system. The JobConf should set an OutputFormat and any output paths required diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 76173608e9f70..01af94077144a 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -24,11 +24,12 @@ import scala.io.Source import com.google.common.io.Files import org.apache.hadoop.io._ import org.apache.hadoop.io.compress.DefaultCodec -import org.apache.hadoop.mapred.FileAlreadyExistsException +import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, TextOutputFormat} +import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} +import org.apache.hadoop.mapreduce.Job import org.scalatest.FunSuite import org.apache.spark.SparkContext._ -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat class FileSuite extends FunSuite with LocalSparkContext { @@ -236,7 +237,7 @@ class FileSuite extends FunSuite with LocalSparkContext { val tempdir = Files.createTempDir() val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) intercept[FileAlreadyExistsException] { - randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath) + randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath) } } @@ -244,10 +245,36 @@ class FileSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test") val tempdir = Files.createTempDir() val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) - randomRDD.saveAsTextFile(tempdir.getPath + "/output") - assert(new File(tempdir.getPath + "/output/part-00000").exists() === true) + randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath + "/output") + assert(new File(tempdir.getPath + "/output/part-r-00000").exists() === true) intercept[FileAlreadyExistsException] { - randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath) + randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath) } } + + test ("save Hadoop Dataset through old Hadoop API") { + sc = new SparkContext("local", "test") + val tempdir = Files.createTempDir() + val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) + val job = new JobConf() + job.setOutputKeyClass(classOf[String]) + job.setOutputValueClass(classOf[String]) + job.set("mapred.output.format.class", classOf[TextOutputFormat[String, String]].getName) + job.set("mapred.output.dir", tempdir.getPath + "/outputDataset_old") + randomRDD.saveAsHadoopDataset(job) + assert(new File(tempdir.getPath + "/outputDataset_old/part-00000").exists() === true) + } + + test ("save Hadoop Dataset through new Hadoop API") { + sc = new SparkContext("local", "test") + val tempdir = Files.createTempDir() + val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) + val job = new Job(sc.hadoopConfiguration) + job.setOutputKeyClass(classOf[String]) + job.setOutputValueClass(classOf[String]) + job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]]) + job.getConfiguration.set("mapred.output.dir", tempdir.getPath + "/outputDataset_new") + randomRDD.saveAsNewAPIHadoopDataset(job.getConfiguration) + assert(new File(tempdir.getPath + "/outputDataset_new/part-r-00000").exists() === true) + } } From 79e547fe5a675a9a10b6acdc73759d67725ad7c6 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Tue, 18 Mar 2014 14:34:31 -0700 Subject: [PATCH 2/6] Update copyright year in NOTICE to 2014 Author: Matei Zaharia Closes #174 from mateiz/update-notice and squashes the following commits: 47fc1a5 [Matei Zaharia] Update copyright year in NOTICE to 2014 --- NOTICE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/NOTICE b/NOTICE index 7cbb114b2ae2d..dce0c4eaf31ed 100644 --- a/NOTICE +++ b/NOTICE @@ -1,5 +1,5 @@ Apache Spark -Copyright 2013 The Apache Software Foundation. +Copyright 2014 The Apache Software Foundation. This product includes software developed at The Apache Software Foundation (http://www.apache.org/). From e108b9ab94c4310ec56ef0eda99bb904133f942d Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 18 Mar 2014 15:14:13 -0700 Subject: [PATCH 3/6] [SPARK-1260]: faster construction of features with intercept The current implementation uses `Array(1.0, features: _*)` to construct a new array with intercept. This is not efficient for big arrays because `Array.apply` uses a for loop that iterates over the arguments. `Array.+:` is a better choice here. Also, I don't see a reason to set initial weights to ones. So I set them to zeros. JIRA: https://spark-project.atlassian.net/browse/SPARK-1260 Author: Xiangrui Meng Closes #161 from mengxr/sgd and squashes the following commits: b5cfc53 [Xiangrui Meng] set default weights to zeros a1439c2 [Xiangrui Meng] faster construction of features with intercept --- .../mllib/regression/GeneralizedLinearAlgorithm.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala index f98b0b536deaa..b9621530efa22 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -119,7 +119,7 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] */ def run(input: RDD[LabeledPoint]) : M = { val nfeatures: Int = input.first().features.length - val initialWeights = Array.fill(nfeatures)(1.0) + val initialWeights = new Array[Double](nfeatures) run(input, initialWeights) } @@ -134,15 +134,15 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] throw new SparkException("Input validation failed.") } - // Add a extra variable consisting of all 1.0's for the intercept. + // Prepend an extra variable consisting of all 1.0's for the intercept. val data = if (addIntercept) { - input.map(labeledPoint => (labeledPoint.label, Array(1.0, labeledPoint.features:_*))) + input.map(labeledPoint => (labeledPoint.label, labeledPoint.features.+:(1.0))) } else { input.map(labeledPoint => (labeledPoint.label, labeledPoint.features)) } val initialWeightsWithIntercept = if (addIntercept) { - Array(1.0, initialWeights:_*) + initialWeights.+:(1.0) } else { initialWeights } From f9d8a83c0006bb59c61e8770cd201b72333cb9a4 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 18 Mar 2014 17:20:42 -0700 Subject: [PATCH 4/6] [SPARK-1266] persist factors in implicit ALS In implicit ALS computation, the user or product factor is used twice in each iteration. Caching can certainly help accelerate the computation. I saw the running time decreased by ~70% for implicit ALS on the movielens data. I also made the following changes: 1. Change `YtYb` type from `Broadcast[Option[DoubleMatrix]]` to `Option[Broadcast[DoubleMatrix]]`, so we don't need to broadcast None in explicit computation. 2. Mark methods `computeYtY`, `unblockFactors`, `updateBlock`, and `updateFeatures private`. Users do not need those methods. 3. Materialize the final matrix factors before returning the model. It allows us to clean up other cached RDDs before returning the model. I do not have a better solution here, so I use `RDD.count()`. JIRA: https://spark-project.atlassian.net/browse/SPARK-1266 Author: Xiangrui Meng Closes #165 from mengxr/als and squashes the following commits: c9676a6 [Xiangrui Meng] add a comment about the last products.persist d3a88aa [Xiangrui Meng] change implicitPrefs match to if ... else ... 63862d6 [Xiangrui Meng] persist factors in implicit ALS --- .../spark/mllib/recommendation/ALS.scala | 145 +++++++++++------- 1 file changed, 89 insertions(+), 56 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 777d0db2d6653..0cc9f48769f83 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -148,8 +148,10 @@ class ALS private ( * Returns a MatrixFactorizationModel with feature vectors for each user and product. */ def run(ratings: RDD[Rating]): MatrixFactorizationModel = { + val sc = ratings.context + val numBlocks = if (this.numBlocks == -1) { - math.max(ratings.context.defaultParallelism, ratings.partitions.size / 2) + math.max(sc.defaultParallelism, ratings.partitions.size / 2) } else { this.numBlocks } @@ -187,21 +189,41 @@ class ALS private ( } } - for (iter <- 1 to iterations) { - // perform ALS update - logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations)) - // YtY / XtX is an Option[DoubleMatrix] and is only required for the implicit feedback model - val YtY = computeYtY(users) - val YtYb = ratings.context.broadcast(YtY) - products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda, - alpha, YtYb) - logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) - val XtX = computeYtY(products) - val XtXb = ratings.context.broadcast(XtX) - users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda, - alpha, XtXb) + if (implicitPrefs) { + for (iter <- 1 to iterations) { + // perform ALS update + logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations)) + // Persist users because it will be called twice. + users.persist() + val YtY = Some(sc.broadcast(computeYtY(users))) + val previousProducts = products + products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda, + alpha, YtY) + previousProducts.unpersist() + logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) + products.persist() + val XtX = Some(sc.broadcast(computeYtY(products))) + val previousUsers = users + users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda, + alpha, XtX) + previousUsers.unpersist() + } + } else { + for (iter <- 1 to iterations) { + // perform ALS update + logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations)) + products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda, + alpha, YtY = None) + logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations)) + users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda, + alpha, YtY = None) + } } + // The last `products` will be used twice. One to generate the last `users` and the other to + // generate `productsOut`. So we cache it for better performance. + products.persist() + // Flatten and cache the two final RDDs to un-block them val usersOut = unblockFactors(users, userOutLinks) val productsOut = unblockFactors(products, productOutLinks) @@ -209,31 +231,39 @@ class ALS private ( usersOut.persist() productsOut.persist() + // Materialize usersOut and productsOut. + usersOut.count() + productsOut.count() + + products.unpersist() + + // Clean up. + userInLinks.unpersist() + userOutLinks.unpersist() + productInLinks.unpersist() + productOutLinks.unpersist() + new MatrixFactorizationModel(rank, usersOut, productsOut) } /** * Computes the (`rank x rank`) matrix `YtY`, where `Y` is the (`nui x rank`) matrix of factors - * for each user (or product), in a distributed fashion. Here `reduceByKeyLocally` is used as - * the driver program requires `YtY` to broadcast it to the slaves + * for each user (or product), in a distributed fashion. + * * @param factors the (block-distributed) user or product factor vectors - * @return Option[YtY] - whose value is only used in the implicit preference model + * @return YtY - whose value is only used in the implicit preference model */ - def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = { - if (implicitPrefs) { - val n = rank * (rank + 1) / 2 - val LYtY = factors.values.aggregate(new DoubleMatrix(n))( seqOp = (L, Y) => { - Y.foreach(y => dspr(1.0, new DoubleMatrix(y), L)) - L - }, combOp = (L1, L2) => { - L1.addi(L2) - }) - val YtY = new DoubleMatrix(rank, rank) - fillFullMatrix(LYtY, YtY) - Option(YtY) - } else { - None - } + private def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = { + val n = rank * (rank + 1) / 2 + val LYtY = factors.values.aggregate(new DoubleMatrix(n))( seqOp = (L, Y) => { + Y.foreach(y => dspr(1.0, new DoubleMatrix(y), L)) + L + }, combOp = (L1, L2) => { + L1.addi(L2) + }) + val YtY = new DoubleMatrix(rank, rank) + fillFullMatrix(LYtY, YtY) + YtY } /** @@ -264,7 +294,7 @@ class ALS private ( /** * Flatten out blocked user or product factors into an RDD of (id, factor vector) pairs */ - def unblockFactors(blockedFactors: RDD[(Int, Array[Array[Double]])], + private def unblockFactors(blockedFactors: RDD[(Int, Array[Array[Double]])], outLinks: RDD[(Int, OutLinkBlock)]) = { blockedFactors.join(outLinks).flatMap{ case (b, (factors, outLinkBlock)) => for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i)) @@ -332,8 +362,11 @@ class ALS private ( val outLinkBlock = makeOutLinkBlock(numBlocks, ratings) Iterator.single((blockId, (inLinkBlock, outLinkBlock))) }, true) - links.persist(StorageLevel.MEMORY_AND_DISK) - (links.mapValues(_._1), links.mapValues(_._2)) + val inLinks = links.mapValues(_._1) + val outLinks = links.mapValues(_._2) + inLinks.persist(StorageLevel.MEMORY_AND_DISK) + outLinks.persist(StorageLevel.MEMORY_AND_DISK) + (inLinks, outLinks) } /** @@ -365,7 +398,7 @@ class ALS private ( rank: Int, lambda: Double, alpha: Double, - YtY: Broadcast[Option[DoubleMatrix]]) + YtY: Option[Broadcast[DoubleMatrix]]) : RDD[(Int, Array[Array[Double]])] = { val numBlocks = products.partitions.size @@ -388,8 +421,8 @@ class ALS private ( * Compute the new feature vectors for a block of the users matrix given the list of factors * it received from each product and its InLinkBlock. */ - def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock, - rank: Int, lambda: Double, alpha: Double, YtY: Broadcast[Option[DoubleMatrix]]) + private def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock, + rank: Int, lambda: Double, alpha: Double, YtY: Option[Broadcast[DoubleMatrix]]) : Array[Array[Double]] = { // Sort the incoming block factor messages by block ID and make them an array @@ -416,21 +449,20 @@ class ALS private ( dspr(1.0, x, tempXtX) val (us, rs) = inLinkBlock.ratingsForBlock(productBlock)(p) for (i <- 0 until us.length) { - implicitPrefs match { - case false => - userXtX(us(i)).addi(tempXtX) - SimpleBlas.axpy(rs(i), x, userXy(us(i))) - case true => - // Extension to the original paper to handle rs(i) < 0. confidence is a function - // of |rs(i)| instead so that it is never negative: - val confidence = 1 + alpha * abs(rs(i)) - SimpleBlas.axpy(confidence - 1.0, tempXtX, userXtX(us(i))) - // For rs(i) < 0, the corresponding entry in P is 0 now, not 1 -- negative rs(i) - // means we try to reconstruct 0. We add terms only where P = 1, so, term below - // is now only added for rs(i) > 0: - if (rs(i) > 0) { - SimpleBlas.axpy(confidence, x, userXy(us(i))) - } + if (implicitPrefs) { + // Extension to the original paper to handle rs(i) < 0. confidence is a function + // of |rs(i)| instead so that it is never negative: + val confidence = 1 + alpha * abs(rs(i)) + SimpleBlas.axpy(confidence - 1.0, tempXtX, userXtX(us(i))) + // For rs(i) < 0, the corresponding entry in P is 0 now, not 1 -- negative rs(i) + // means we try to reconstruct 0. We add terms only where P = 1, so, term below + // is now only added for rs(i) > 0: + if (rs(i) > 0) { + SimpleBlas.axpy(confidence, x, userXy(us(i))) + } + } else { + userXtX(us(i)).addi(tempXtX) + SimpleBlas.axpy(rs(i), x, userXy(us(i))) } } } @@ -443,9 +475,10 @@ class ALS private ( // Add regularization (0 until rank).foreach(i => fullXtX.data(i*rank + i) += lambda) // Solve the resulting matrix, which is symmetric and positive-definite - implicitPrefs match { - case false => Solve.solvePositive(fullXtX, userXy(index)).data - case true => Solve.solvePositive(fullXtX.addi(YtY.value.get), userXy(index)).data + if (implicitPrefs) { + Solve.solvePositive(fullXtX.addi(YtY.get.value), userXy(index)).data + } else { + Solve.solvePositive(fullXtX, userXy(index)).data } } } From cc2655a237442a71c75d4fade99767df7648e55f Mon Sep 17 00:00:00 2001 From: witgo Date: Tue, 18 Mar 2014 21:57:47 -0700 Subject: [PATCH 5/6] Fix SPARK-1256: Master web UI and Worker web UI returns a 404 error Author: witgo Closes #150 from witgo/SPARK-1256 and squashes the following commits: 08044a2 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1256 c99b030 [witgo] Fix SPARK-1256 --- .../org/apache/spark/deploy/master/ui/MasterWebUI.scala | 2 +- .../org/apache/spark/deploy/worker/ui/WorkerWebUI.scala | 2 +- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 5 ++++- core/src/main/scala/org/apache/spark/ui/SparkUI.scala | 2 +- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index a7bd01e284c8e..4ad1f95be31c9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -61,7 +61,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { master.applicationMetricsSystem.getServletHandlers val handlers = metricsHandlers ++ Seq[ServletContextHandler]( - createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static/*"), + createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR + "/static", "/static"), createServletHandler("/app/json", createServlet((request: HttpServletRequest) => applicationPage.renderJson(request), master.securityMgr)), diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index ffc05bd30687a..4e33b330ad4e7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -47,7 +47,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I val metricsHandlers = worker.metricsSystem.getServletHandlers val handlers = metricsHandlers ++ Seq[ServletContextHandler]( - createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static/*"), + createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE + "/static", "/static"), createServletHandler("/log", createServlet((request: HttpServletRequest) => log(request), worker.securityMgr)), createServletHandler("/logPage", createServlet((request: HttpServletRequest) => logPage diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 7c35cd165ad7c..e0555ca7ac02f 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -111,10 +111,13 @@ private[spark] object JettyUtils extends Logging { Option(getClass.getClassLoader.getResource(resourceBase)) match { case Some(res) => holder.setInitParameter("resourceBase", res.toString) + holder.setInitParameter("welcomeServlets", "false") + holder.setInitParameter("pathInfoOnly", "false") case None => throw new Exception("Could not find resource path for Web UI: " + resourceBase) } - contextHandler.addServlet(holder, path) + contextHandler.setContextPath(path) + contextHandler.addServlet(holder, "/") contextHandler } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index ca82c3da2fc24..5f0dee64fedb7 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -38,7 +38,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging { var server: Option[Server] = None val handlers = Seq[ServletContextHandler] ( - createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static/*"), + createStaticHandler(SparkUI.STATIC_RESOURCE_DIR + "/static", "/static"), createRedirectHandler("/stages", "/") ) val storage = new BlockManagerUI(sc) From a18ea00f3af0fa4c6b2c59933e22b6c9f0f636c8 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 18 Mar 2014 22:04:57 -0700 Subject: [PATCH 6/6] Bundle tachyon: SPARK-1269 This should all work as expected with the current version of the tachyon tarball (0.4.1) Author: Nick Lanham Closes #137 from nicklan/bundle-tachyon and squashes the following commits: 2eee15b [Nick Lanham] Put back in exec, start tachyon first 738ba23 [Nick Lanham] Move tachyon out of sbin f2f9bc6 [Nick Lanham] More checks for tachyon script 111e8e1 [Nick Lanham] Only try tachyon operations if tachyon script exists 0561574 [Nick Lanham] Copy over web resources so web interface can run 4dc9809 [Nick Lanham] Update to tachyon 0.4.1 0a1a20c [Nick Lanham] Add scripts using tachyon tarball --- make-distribution.sh | 32 ++++++++++++++++++++++++++++++++ sbin/start-all.sh | 15 +++++++++++++-- sbin/start-master.sh | 21 +++++++++++++++++++++ sbin/start-slaves.sh | 23 +++++++++++++++++++++++ sbin/stop-master.sh | 4 ++++ sbin/stop-slaves.sh | 5 +++++ 6 files changed, 98 insertions(+), 2 deletions(-) diff --git a/make-distribution.sh b/make-distribution.sh index e6b5956d1e7e2..6bc6819d8da92 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -58,6 +58,7 @@ echo "Version is ${VERSION}" # Initialize defaults SPARK_HADOOP_VERSION=1.0.4 SPARK_YARN=false +SPARK_TACHYON=false MAKE_TGZ=false # Parse arguments @@ -70,6 +71,9 @@ while (( "$#" )); do --with-yarn) SPARK_YARN=true ;; + --with-tachyon) + SPARK_TACHYON=true + ;; --tgz) MAKE_TGZ=true ;; @@ -90,6 +94,12 @@ else echo "YARN disabled" fi +if [ "$SPARK_TACHYON" == "true" ]; then + echo "Tachyon Enabled" +else + echo "Tachyon Disabled" +fi + # Build fat JAR export SPARK_HADOOP_VERSION export SPARK_YARN @@ -113,6 +123,28 @@ cp -r "$FWDIR/python" "$DISTDIR" cp -r "$FWDIR/sbin" "$DISTDIR" +# Download and copy in tachyon, if requested +if [ "$SPARK_TACHYON" == "true" ]; then + TACHYON_VERSION="0.4.1" + TACHYON_URL="https://github.com/amplab/tachyon/releases/download/v${TACHYON_VERSION}/tachyon-${TACHYON_VERSION}-bin.tar.gz" + + TMPD=`mktemp -d` + + pushd $TMPD > /dev/null + echo "Fetchting tachyon tgz" + wget "$TACHYON_URL" + + tar xf "tachyon-${TACHYON_VERSION}-bin.tar.gz" + cp "tachyon-${TACHYON_VERSION}/target/tachyon-${TACHYON_VERSION}-jar-with-dependencies.jar" "$DISTDIR/jars" + mkdir -p "$DISTDIR/tachyon/src/main/java/tachyon/web" + cp -r "tachyon-${TACHYON_VERSION}"/{bin,conf,libexec} "$DISTDIR/tachyon" + cp -r "tachyon-${TACHYON_VERSION}"/src/main/java/tachyon/web/resources "$DISTDIR/tachyon/src/main/java/tachyon/web" + sed -i "s|export TACHYON_JAR=\$TACHYON_HOME/target/\(.*\)|# This is set for spark's make-distribution\n export TACHYON_JAR=\$TACHYON_HOME/../../jars/\1|" "$DISTDIR/tachyon/libexec/tachyon-config.sh" + + popd > /dev/null + rm -rf $TMPD +fi + if [ "$MAKE_TGZ" == "true" ]; then TARDIR="$FWDIR/spark-$VERSION" cp -r "$DISTDIR" "$TARDIR" diff --git a/sbin/start-all.sh b/sbin/start-all.sh index 2daf49db359df..5c89ab4d86b3a 100755 --- a/sbin/start-all.sh +++ b/sbin/start-all.sh @@ -24,11 +24,22 @@ sbin=`dirname "$0"` sbin=`cd "$sbin"; pwd` +TACHYON_STR="" + +while (( "$#" )); do +case $1 in + --with-tachyon) + TACHYON_STR="--with-tachyon" + ;; + esac +shift +done + # Load the Spark configuration . "$sbin/spark-config.sh" # Start Master -"$sbin"/start-master.sh +"$sbin"/start-master.sh $TACHYON_STR # Start Workers -"$sbin"/start-slaves.sh +"$sbin"/start-slaves.sh $TACHYON_STR diff --git a/sbin/start-master.sh b/sbin/start-master.sh index ec3dfdb4197ec..03a3428aea9f1 100755 --- a/sbin/start-master.sh +++ b/sbin/start-master.sh @@ -22,6 +22,21 @@ sbin=`dirname "$0"` sbin=`cd "$sbin"; pwd` +START_TACHYON=false + +while (( "$#" )); do +case $1 in + --with-tachyon) + if [ ! -e "$sbin"/../tachyon/bin/tachyon ]; then + echo "Error: --with-tachyon specified, but tachyon not found." + exit -1 + fi + START_TACHYON=true + ;; + esac +shift +done + . "$sbin/spark-config.sh" if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then @@ -41,3 +56,9 @@ if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then fi "$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT + +if [ "$START_TACHYON" == "true" ]; then + "$sbin"/../tachyon/bin/tachyon bootstrap-conf $SPARK_MASTER_IP + "$sbin"/../tachyon/bin/tachyon format -s + "$sbin"/../tachyon/bin/tachyon-start.sh master +fi diff --git a/sbin/start-slaves.sh b/sbin/start-slaves.sh index fd5cdeb1e6788..da641cfe3c6fa 100755 --- a/sbin/start-slaves.sh +++ b/sbin/start-slaves.sh @@ -20,6 +20,22 @@ sbin=`dirname "$0"` sbin=`cd "$sbin"; pwd` + +START_TACHYON=false + +while (( "$#" )); do +case $1 in + --with-tachyon) + if [ ! -e "$sbin"/../tachyon/bin/tachyon ]; then + echo "Error: --with-tachyon specified, but tachyon not found." + exit -1 + fi + START_TACHYON=true + ;; + esac +shift +done + . "$sbin/spark-config.sh" if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then @@ -35,6 +51,13 @@ if [ "$SPARK_MASTER_IP" = "" ]; then SPARK_MASTER_IP=`hostname` fi +if [ "$START_TACHYON" == "true" ]; then + "$sbin/slaves.sh" cd "$SPARK_HOME" \; "$sbin"/../tachyon/bin/tachyon bootstrap-conf $SPARK_MASTER_IP + + # set -t so we can call sudo + SPARK_SSH_OPTS="-o StrictHostKeyChecking=no -t" "$sbin/slaves.sh" cd "$SPARK_HOME" \; "$sbin/../tachyon/bin/tachyon-start.sh" worker SudoMount \; sleep 1 +fi + # Launch the slaves if [ "$SPARK_WORKER_INSTANCES" = "" ]; then exec "$sbin/slaves.sh" cd "$SPARK_HOME" \; "$sbin/start-slave.sh" 1 spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT diff --git a/sbin/stop-master.sh b/sbin/stop-master.sh index 2adabd426563c..b6bdaa4db373c 100755 --- a/sbin/stop-master.sh +++ b/sbin/stop-master.sh @@ -25,3 +25,7 @@ sbin=`cd "$sbin"; pwd` . "$sbin/spark-config.sh" "$sbin"/spark-daemon.sh stop org.apache.spark.deploy.master.Master 1 + +if [ -e "$sbin"/../tachyon/bin/tachyon ]; then + "$sbin"/../tachyon/bin/tachyon killAll tachyon.master.Master +fi diff --git a/sbin/stop-slaves.sh b/sbin/stop-slaves.sh index eb803b4900347..6bf393ccd4b09 100755 --- a/sbin/stop-slaves.sh +++ b/sbin/stop-slaves.sh @@ -26,6 +26,11 @@ if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then . "${SPARK_CONF_DIR}/spark-env.sh" fi +# do before the below calls as they exec +if [ -e "$sbin"/../tachyon/bin/tachyon ]; then + "$sbin/slaves.sh" cd "$SPARK_HOME" \; "$sbin"/../tachyon/bin/tachyon killAll tachyon.worker.Worker +fi + if [ "$SPARK_WORKER_INSTANCES" = "" ]; then "$sbin"/spark-daemons.sh stop org.apache.spark.deploy.worker.Worker 1 else