Skip to content

Commit

Permalink
ssMerge remote-tracking branch 'upstream/master' into ldaonline
Browse files Browse the repository at this point in the history
  • Loading branch information
hhbyyh committed Feb 9, 2015
2 parents 45884ab + 56aff4b commit fa408a8
Show file tree
Hide file tree
Showing 14 changed files with 186 additions and 36 deletions.
27 changes: 16 additions & 11 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,22 @@ private[spark] object JettyUtils extends Logging {
securityMgr: SecurityManager): HttpServlet = {
new HttpServlet {
override def doGet(request: HttpServletRequest, response: HttpServletResponse) {
if (securityMgr.checkUIViewPermissions(request.getRemoteUser)) {
response.setContentType("%s;charset=utf-8".format(servletParams.contentType))
response.setStatus(HttpServletResponse.SC_OK)
val result = servletParams.responder(request)
response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
response.getWriter.println(servletParams.extractFn(result))
} else {
response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
response.sendError(HttpServletResponse.SC_UNAUTHORIZED,
"User is not authorized to access this page.")
try {
if (securityMgr.checkUIViewPermissions(request.getRemoteUser)) {
response.setContentType("%s;charset=utf-8".format(servletParams.contentType))
response.setStatus(HttpServletResponse.SC_OK)
val result = servletParams.responder(request)
response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
response.getWriter.println(servletParams.extractFn(result))
} else {
response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
response.sendError(HttpServletResponse.SC_UNAUTHORIZED,
"User is not authorized to access this page.")
}
} catch {
case e: IllegalArgumentException =>
response.sendError(HttpServletResponse.SC_BAD_REQUEST, e.getMessage)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage
}
id
}.getOrElse {
return Text(s"Missing executorId parameter")
throw new IllegalArgumentException(s"Missing executorId parameter")
}
val time = System.currentTimeMillis()
val maybeThreadDump = sc.get.getExecutorThreadDump(executorId)
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {

def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val jobId = request.getParameter("id").toInt
val parameterId = request.getParameter("id")
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")

val jobId = parameterId.toInt
val jobDataOption = listener.jobIdToData.get(jobId)
if (jobDataOption.isEmpty) {
val content =
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") {
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val poolName = request.getParameter("poolname")
require(poolName != null && poolName.nonEmpty, "Missing poolname parameter")

val poolToActiveStages = listener.poolToActiveStages
val activeStages = poolToActiveStages.get(poolName) match {
case Some(s) => s.values.toSeq
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,14 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {

def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val stageId = request.getParameter("id").toInt
val stageAttemptId = request.getParameter("attempt").toInt
val parameterId = request.getParameter("id")
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")

val parameterAttempt = request.getParameter("attempt")
require(parameterAttempt != null && parameterAttempt.nonEmpty, "Missing attempt parameter")

val stageId = parameterId.toInt
val stageAttemptId = parameterAttempt.toInt
val stageDataOption = listener.stageIdToData.get((stageId, stageAttemptId))

if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
private val listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
val rddId = request.getParameter("id").toInt
val parameterId = request.getParameter("id")
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")

val rddId = parameterId.toInt
val storageStatusList = listener.storageStatusList
val rddInfo = listener.rddInfoList.find(_.id == rddId).getOrElse {
// Rather than crashing, render an "RDD Not Found" page
Expand Down
41 changes: 24 additions & 17 deletions docs/mllib-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,32 @@ See the **[spark.ml programming guide](ml-guide.html)** for more information on

# Dependencies

MLlib uses the linear algebra package [Breeze](http://www.scalanlp.org/),
which depends on [netlib-java](https://github.com/fommil/netlib-java),
and [jblas](https://github.com/mikiobraun/jblas).
`netlib-java` and `jblas` depend on native Fortran routines.
You need to install the
MLlib uses the linear algebra package
[Breeze](http://www.scalanlp.org/), which depends on
[netlib-java](https://github.com/fommil/netlib-java) for optimised
numerical processing. If natives are not available at runtime, you
will see a warning message and a pure JVM implementation will be used
instead.

To learn more about the benefits and background of system optimised
natives, you may wish to watch Sam Halliday's ScalaX talk on
[High Performance Linear Algebra in Scala](http://fommil.github.io/scalax14/#/)).

Due to licensing issues with runtime proprietary binaries, we do not
include `netlib-java`'s native proxies by default. To configure
`netlib-java` / Breeze to use system optimised binaries, include
`com.github.fommil.netlib:all:1.1.2` (or build Spark with
`-Pnetlib-lgpl`) as a dependency of your project and read the
[netlib-java](https://github.com/fommil/netlib-java) documentation for
your platform's additional installation instructions.

MLlib also uses [jblas](https://github.com/mikiobraun/jblas) which
will require you to install the
[gfortran runtime library](https://github.com/mikiobraun/jblas/wiki/Missing-Libraries)
if it is not already present on your nodes.
MLlib will throw a linking error if it cannot detect these libraries automatically.
Due to license issues, we do not include `netlib-java`'s native libraries in MLlib's
dependency set under default settings.
If no native library is available at runtime, you will see a warning message.
To use native libraries from `netlib-java`, please build Spark with `-Pnetlib-lgpl` or
include `com.github.fommil.netlib:all:1.1.2` as a dependency of your project.
If you want to use optimized BLAS/LAPACK libraries such as
[OpenBLAS](http://www.openblas.net/), please link its shared libraries to
`/usr/lib/libblas.so.3` and `/usr/lib/liblapack.so.3`, respectively.
BLAS/LAPACK libraries on worker nodes should be built without multithreading.

To use MLlib in Python, you will need [NumPy](http://www.numpy.org) version 1.4 or newer.

To use MLlib in Python, you will need [NumPy](http://www.numpy.org)
version 1.4 or newer.

---

Expand Down
15 changes: 15 additions & 0 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import hashlib
import logging
import os
import os.path
import pipes
import random
import shutil
import string
from stat import S_IRUSR
import subprocess
import sys
import tarfile
Expand Down Expand Up @@ -349,6 +351,7 @@ def launch_cluster(conn, opts, cluster_name):
if opts.identity_file is None:
print >> stderr, "ERROR: Must provide an identity file (-i) for ssh connections."
sys.exit(1)

if opts.key_pair is None:
print >> stderr, "ERROR: Must provide a key pair name (-k) to use on instances."
sys.exit(1)
Expand Down Expand Up @@ -1007,6 +1010,18 @@ def real_main():
DeprecationWarning
)

if opts.identity_file is not None:
if not os.path.exists(opts.identity_file):
print >> stderr,\
"ERROR: The identity file '{f}' doesn't exist.".format(f=opts.identity_file)
sys.exit(1)

file_mode = os.stat(opts.identity_file).st_mode
if not (file_mode & S_IRUSR) or not oct(file_mode)[-2:] == '00':
print >> stderr, "ERROR: The identity file must be accessible only by you."
print >> stderr, 'You can fix this with: chmod 400 "{f}"'.format(f=opts.identity_file)
sys.exit(1)

if opts.ebs_vol_num > 8:
print >> stderr, "ebs-vol-num cannot be greater than 8"
sys.exit(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ private[mllib] object EigenValueDecomposition {
// Mode 1: A*x = lambda*x, A symmetric
iparam(6) = 1

require(n * ncv.toLong <= Integer.MAX_VALUE && ncv * (ncv.toLong + 8) <= Integer.MAX_VALUE,
s"k = $k and/or n = $n are too large to compute an eigendecomposition")

var ido = new intW(0)
var info = new intW(0)
var resid = new Array[Double](n)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.mllib.recommendation

import org.apache.spark.Logging
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.ml.recommendation.{ALS => NewALS}
import org.apache.spark.rdd.RDD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

package org.apache.spark.mllib.recommendation

import java.io.IOException
import java.lang.{Integer => JavaInteger}

import org.apache.hadoop.fs.Path
import org.jblas.DoubleMatrix

import org.apache.spark.Logging
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.storage.StorageLevel

/**
Expand All @@ -41,7 +45,8 @@ import org.apache.spark.storage.StorageLevel
class MatrixFactorizationModel(
val rank: Int,
val userFeatures: RDD[(Int, Array[Double])],
val productFeatures: RDD[(Int, Array[Double])]) extends Serializable with Logging {
val productFeatures: RDD[(Int, Array[Double])])
extends Saveable with Serializable with Logging {

require(rank > 0)
validateFeatures("User", userFeatures)
Expand Down Expand Up @@ -125,6 +130,12 @@ class MatrixFactorizationModel(
recommend(productFeatures.lookup(product).head, userFeatures, num)
.map(t => Rating(t._1, product, t._2))

protected override val formatVersion: String = "1.0"

override def save(sc: SparkContext, path: String): Unit = {
MatrixFactorizationModel.SaveLoadV1_0.save(this, path)
}

private def recommend(
recommendToFeatures: Array[Double],
recommendableFeatures: RDD[(Int, Array[Double])],
Expand All @@ -136,3 +147,70 @@ class MatrixFactorizationModel(
scored.top(num)(Ordering.by(_._2))
}
}

object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {

import org.apache.spark.mllib.util.Loader._

override def load(sc: SparkContext, path: String): MatrixFactorizationModel = {
val (loadedClassName, formatVersion, metadata) = loadMetadata(sc, path)
val classNameV1_0 = SaveLoadV1_0.thisClassName
(loadedClassName, formatVersion) match {
case (className, "1.0") if className == classNameV1_0 =>
SaveLoadV1_0.load(sc, path)
case _ =>
throw new IOException("MatrixFactorizationModel.load did not recognize model with" +
s"(class: $loadedClassName, version: $formatVersion). Supported:\n" +
s" ($classNameV1_0, 1.0)")
}
}

private[recommendation]
object SaveLoadV1_0 {

private val thisFormatVersion = "1.0"

private[recommendation]
val thisClassName = "org.apache.spark.mllib.recommendation.MatrixFactorizationModel"

/**
* Saves a [[MatrixFactorizationModel]], where user features are saved under `data/users` and
* product features are saved under `data/products`.
*/
def save(model: MatrixFactorizationModel, path: String): Unit = {
val sc = model.userFeatures.sparkContext
val sqlContext = new SQLContext(sc)
import sqlContext.implicits.createDataFrame
val metadata = (thisClassName, thisFormatVersion, model.rank)
val metadataRDD = sc.parallelize(Seq(metadata), 1).toDataFrame("class", "version", "rank")
metadataRDD.toJSON.saveAsTextFile(metadataPath(path))
model.userFeatures.toDataFrame("id", "features").saveAsParquetFile(userPath(path))
model.productFeatures.toDataFrame("id", "features").saveAsParquetFile(productPath(path))
}

def load(sc: SparkContext, path: String): MatrixFactorizationModel = {
val sqlContext = new SQLContext(sc)
val (className, formatVersion, metadata) = loadMetadata(sc, path)
assert(className == thisClassName)
assert(formatVersion == thisFormatVersion)
val rank = metadata.select("rank").first().getInt(0)
val userFeatures = sqlContext.parquetFile(userPath(path))
.map { case Row(id: Int, features: Seq[Double]) =>
(id, features.toArray)
}
val productFeatures = sqlContext.parquetFile(productPath(path))
.map { case Row(id: Int, features: Seq[Double]) =>
(id, features.toArray)
}
new MatrixFactorizationModel(rank, userFeatures, productFeatures)
}

private def userPath(path: String): String = {
new Path(dataPath(path), "user").toUri.toString
}

private def productPath(path: String): String = {
new Path(dataPath(path), "product").toUri.toString
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.scalatest.FunSuite
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

class MatrixFactorizationModelSuite extends FunSuite with MLlibTestSparkContext {

Expand Down Expand Up @@ -53,4 +54,22 @@ class MatrixFactorizationModelSuite extends FunSuite with MLlibTestSparkContext
new MatrixFactorizationModel(rank, userFeatures, prodFeatures1)
}
}

test("save/load") {
val model = new MatrixFactorizationModel(rank, userFeatures, prodFeatures)
val tempDir = Utils.createTempDir()
val path = tempDir.toURI.toString
def collect(features: RDD[(Int, Array[Double])]): Set[(Int, Seq[Double])] = {
features.mapValues(_.toSeq).collect().toSet
}
try {
model.save(sc, path)
val newModel = MatrixFactorizationModel.load(sc, path)
assert(newModel.rank === rank)
assert(collect(newModel.userFeatures) === collect(userFeatures))
assert(collect(newModel.productFeatures) === collect(prodFeatures))
} finally {
Utils.deleteRecursively(tempDir)
}
}
}
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,10 @@
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,11 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
/** Extends QueryExecution with hive specific features. */
protected[sql] class QueryExecution(logicalPlan: LogicalPlan)
extends super.QueryExecution(logicalPlan) {
// Like what we do in runHive, makes sure the session represented by the
// `sessionState` field is activated.
if (SessionState.get() != sessionState) {
SessionState.start(sessionState)
}

/**
* Returns the result as a hive compatible sequence of strings. For native commands, the
Expand Down

0 comments on commit fa408a8

Please sign in to comment.