Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into orderby-agg
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Sep 5, 2015
2 parents 8f73c40 + 47058ca commit 6023c8b
Show file tree
Hide file tree
Showing 113 changed files with 1,072 additions and 1,052 deletions.
2 changes: 1 addition & 1 deletion R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ readString <- function(con) {
raw <- readBin(con, raw(), stringLen, endian = "big")
string <- rawToChar(raw)
Encoding(string) <- "UTF-8"
string
string
}

readInt <- function(con) {
Expand Down
13 changes: 9 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,6 @@ object SparkSubmit {

// The following modes are not supported or applicable
(clusterManager, deployMode) match {
case (MESOS, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on Mesos clusters.")
case (MESOS, CLUSTER) if args.isR =>
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
"applications on Mesos clusters.")
Expand Down Expand Up @@ -554,7 +551,15 @@ object SparkSubmit {
if (isMesosCluster) {
assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API")
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
childArgs += (args.primaryResource, args.mainClass)
if (args.isPython) {
// Second argument is main class
childArgs += (args.primaryResource, "")
if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
}
} else {
childArgs += (args.primaryResource, args.mainClass)
}
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.mesos.Protos.Environment.Variable
import org.apache.mesos.Protos.TaskStatus.Reason
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.{Scheduler, SchedulerDriver}

import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
import org.apache.spark.metrics.MetricsSystem
Expand Down Expand Up @@ -375,21 +374,20 @@ private[spark] class MesosClusterScheduler(
val executorOpts = desc.schedulerProperties.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
envBuilder.addVariables(
Variable.newBuilder().setName("SPARK_EXECUTOR_OPTS").setValue(executorOpts))
val cmdOptions = generateCmdOption(desc).mkString(" ")
val dockerDefined = desc.schedulerProperties.contains("spark.mesos.executor.docker.image")
val executorUri = desc.schedulerProperties.get("spark.executor.uri")
.orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
val appArguments = desc.command.arguments.mkString(" ")
val (executable, jar) = if (dockerDefined) {
// Gets the path to run spark-submit, and the path to the Mesos sandbox.
val (executable, sandboxPath) = if (dockerDefined) {
// Application jar is automatically downloaded in the mounted sandbox by Mesos,
// and the path to the mounted volume is stored in $MESOS_SANDBOX env variable.
("./bin/spark-submit", s"$$MESOS_SANDBOX/${desc.jarUrl.split("/").last}")
("./bin/spark-submit", "$MESOS_SANDBOX")
} else if (executorUri.isDefined) {
builder.addUris(CommandInfo.URI.newBuilder().setValue(executorUri.get).build())
val folderBasename = executorUri.get.split('/').last.split('.').head
val cmdExecutable = s"cd $folderBasename*; $prefixEnv bin/spark-submit"
val cmdJar = s"../${desc.jarUrl.split("/").last}"
(cmdExecutable, cmdJar)
// Sandbox path points to the parent folder as we chdir into the folderBasename.
(cmdExecutable, "..")
} else {
val executorSparkHome = desc.schedulerProperties.get("spark.mesos.executor.home")
.orElse(conf.getOption("spark.home"))
Expand All @@ -398,30 +396,50 @@ private[spark] class MesosClusterScheduler(
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
}
val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getCanonicalPath
val cmdJar = desc.jarUrl.split("/").last
(cmdExecutable, cmdJar)
// Sandbox points to the current directory by default with Mesos.
(cmdExecutable, ".")
}
builder.setValue(s"$executable $cmdOptions $jar $appArguments")
val primaryResource = new File(sandboxPath, desc.jarUrl.split("/").last).toString()
val cmdOptions = generateCmdOption(desc, sandboxPath).mkString(" ")
val appArguments = desc.command.arguments.mkString(" ")
builder.setValue(s"$executable $cmdOptions $primaryResource $appArguments")
builder.setEnvironment(envBuilder.build())
conf.getOption("spark.mesos.uris").map { uris =>
setupUris(uris, builder)
}
desc.schedulerProperties.get("spark.mesos.uris").map { uris =>
setupUris(uris, builder)
}
desc.schedulerProperties.get("spark.submit.pyFiles").map { pyFiles =>
setupUris(pyFiles, builder)
}
builder.build()
}

private def generateCmdOption(desc: MesosDriverDescription): Seq[String] = {
private def generateCmdOption(desc: MesosDriverDescription, sandboxPath: String): Seq[String] = {
var options = Seq(
"--name", desc.schedulerProperties("spark.app.name"),
"--class", desc.command.mainClass,
"--master", s"mesos://${conf.get("spark.master")}",
"--driver-cores", desc.cores.toString,
"--driver-memory", s"${desc.mem}M")

// Assume empty main class means we're running python
if (!desc.command.mainClass.equals("")) {
options ++= Seq("--class", desc.command.mainClass)
}

desc.schedulerProperties.get("spark.executor.memory").map { v =>
options ++= Seq("--executor-memory", v)
}
desc.schedulerProperties.get("spark.cores.max").map { v =>
options ++= Seq("--total-executor-cores", v)
}
desc.schedulerProperties.get("spark.submit.pyFiles").map { pyFiles =>
val formattedFiles = pyFiles.split(",")
.map { path => new File(sandboxPath, path.split("/").last).toString() }
.mkString(",")
options ++= Seq("--py-files", formattedFiles)
}
options
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,7 @@ class DAGSchedulerSuite
}

// The map stage should have been submitted.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedMapStageAttempts() === 1)

complete(taskSets(0), Seq(
Expand Down
2 changes: 2 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ From the client, you can submit a job to Mesos cluster by running `spark-submit`
to the url of the MesosClusterDispatcher (e.g: mesos://dispatcher:7077). You can view driver statuses on the
Spark cluster Web UI.

Note that jars or python files that are passed to spark-submit should be URIs reachable by Mesos slaves.

# Mesos Run Modes

Spark can run over Mesos in two modes: "fine-grained" (default) and "coarse-grained".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ private[ml] trait MultilayerPerceptronParams extends PredictorParams
with HasSeed with HasMaxIter with HasTol {
/**
* Layer sizes including input size and output size.
* Default: Array(1, 1)
* @group param
*/
final val layers: IntArrayParam = new IntArrayParam(this, "layers",
Expand All @@ -50,6 +51,7 @@ private[ml] trait MultilayerPerceptronParams extends PredictorParams
* Data is stacked within partitions. If block size is more than remaining data in
* a partition then it is adjusted to the size of this data.
* Recommended size is between 10 and 1000.
* Default: 128
* @group expertParam
*/
final val blockSize: IntParam = new IntParam(this, "blockSize",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class BinaryClassificationEvaluator(override val uid: String)

/**
* param for metric name in evaluation
* Default: areaUnderROC
* @group param
*/
val metricName: Param[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ final class Binarizer(override val uid: String)
* Param for threshold used to binarize continuous features.
* The features greater than the threshold, will be binarized to 1.0.
* The features equal to or less than the threshold, will be binarized to 0.0.
* Default: 0.0
* @group param
*/
val threshold: DoubleParam =
Expand Down
1 change: 1 addition & 0 deletions mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ private[feature] trait IDFBase extends Params with HasInputCol with HasOutputCol

/**
* The minimum of documents in which a term should appear.
* Default: 0
* @group param
*/
final val minDocFreq = new IntParam(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class StopWordsRemover(override val uid: String)

/**
* the stop words set to be filtered out
* Default: [[StopWords.English]]
* @group param
*/
val stopWords: StringArrayParam = new StringArrayParam(this, "stopWords", "stop words")
Expand All @@ -110,6 +111,7 @@ class StopWordsRemover(override val uid: String)

/**
* whether to do a case sensitive comparison over the stop words
* Default: false
* @group param
*/
val caseSensitive: BooleanParam = new BooleanParam(this, "caseSensitive",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ class IndexToString private[ml] (

/**
* Param for array of labels.
* Optional labels to be provided by the user, if not supplied column
* metadata is read for labels.
* Optional labels to be provided by the user.
* Default: Empty array, in which case column metadata is used for labels.
* @group param
*/
final val labels: StringArrayParam = new StringArrayParam(this, "labels",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ private[ml] trait VectorIndexerParams extends Params with HasInputCol with HasOu
* Must be >= 2.
*
* (default = 20)
* @group param
*/
val maxCategories = new IntParam(this, "maxCategories",
"Threshold for the number of values a categorical feature can take (>= 2)." +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ final class VectorSlicer(override val uid: String)
/**
* An array of indices to select features from a vector column.
* There can be no overlap with [[names]].
* Default: Empty array
* @group param
*/
val indices = new IntArrayParam(this, "indices",
Expand All @@ -67,6 +68,7 @@ final class VectorSlicer(override val uid: String)
* An array of feature names to select features from a vector column.
* These names must be specified by ML [[org.apache.spark.ml.attribute.Attribute]]s.
* There can be no overlap with [[indices]].
* Default: Empty Array
* @group param
*/
val names = new StringArrayParam(this, "names",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ private[feature] trait Word2VecBase extends Params

/**
* The dimension of the code that you want to transform from words.
* Default: 100
* @group param
*/
final val vectorSize = new IntParam(
Expand All @@ -50,6 +51,7 @@ private[feature] trait Word2VecBase extends Params

/**
* Number of partitions for sentences of words.
* Default: 1
* @group param
*/
final val numPartitions = new IntParam(
Expand All @@ -62,6 +64,7 @@ private[feature] trait Word2VecBase extends Params
/**
* The minimum number of times a token must appear to be included in the word2vec model's
* vocabulary.
* Default: 5
* @group param
*/
final val minCount = new IntParam(this, "minCount", "the minimum number of times a token must " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ private[regression] trait IsotonicRegressionBase extends Params with HasFeatures
/**
* Param for whether the output sequence should be isotonic/increasing (true) or
* antitonic/decreasing (false).
* Default: true
* @group param
*/
final val isotonic: BooleanParam =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ trait PredicateHelper {

/**
* Returns true if `expr` can be evaluated using only the output of `plan`. This method
* can be used to determine when is is acceptable to move expression evaluation within a query
* can be used to determine when it is acceptable to move expression evaluation within a query
* plan.
*
* For example consider a join between two relations R(a, b) and S(c, d).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
* filled in automatically by the QueryPlanner using the other execution strategies that are
* available.
*/
protected def planLater(plan: LogicalPlan) = this.plan(plan).next()
protected def planLater(plan: LogicalPlan): PhysicalPlan = this.plan(plan).next()

def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
// Obviously a lot to do here still...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,19 +195,19 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
// as join keys.
val (joinPredicates, otherPredicates) =
condition.map(splitConjunctivePredicates).getOrElse(Nil).partition {
case EqualTo(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
(canEvaluate(l, right) && canEvaluate(r, left)) => true
case EqualTo(l, r) =>
(canEvaluate(l, left) && canEvaluate(r, right)) ||
(canEvaluate(l, right) && canEvaluate(r, left))
case _ => false
}

val joinKeys = joinPredicates.map {
case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
}
val leftKeys = joinKeys.map(_._1)
val rightKeys = joinKeys.map(_._2)

if (joinKeys.nonEmpty) {
val (leftKeys, rightKeys) = joinKeys.unzip
logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.util._
* Provides helper methods for comparing plans.
*/
class PlanTest extends SparkFunSuite {

/**
* Since attribute references are given globally unique ids during analysis,
* we must normalize them to check if two different queries are identical.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
left: LogicalPlan,
right: LogicalPlan,
condition: Option[Expression],
side: joins.BuildSide) = {
side: joins.BuildSide): Seq[SparkPlan] = {
val broadcastHashJoin = execution.joins.BroadcastHashJoin(
leftKeys, rightKeys, side, planLater(left), planLater(right))
condition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) :: Nil
Expand Down Expand Up @@ -123,12 +123,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// --- Outer joins --------------------------------------------------------------------------

case ExtractEquiJoinKeys(
LeftOuter, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
LeftOuter, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
joins.BroadcastHashOuterJoin(
leftKeys, rightKeys, LeftOuter, condition, planLater(left), planLater(right)) :: Nil

case ExtractEquiJoinKeys(
RightOuter, leftKeys, rightKeys, condition, CanBroadcast(left), right) =>
RightOuter, leftKeys, rightKeys, condition, CanBroadcast(left), right) =>
joins.BroadcastHashOuterJoin(
leftKeys, rightKeys, RightOuter, condition, planLater(left), planLater(right)) :: Nil

Expand Down Expand Up @@ -156,11 +156,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// Aggregations that can be performed in two phases, before and after the shuffle.
case PartialAggregation(
namedGroupingAttributes,
rewrittenAggregateExpressions,
groupingExpressions,
partialComputation,
child) if !canBeConvertedToNewAggregation(plan) =>
namedGroupingAttributes,
rewrittenAggregateExpressions,
groupingExpressions,
partialComputation,
child) if !canBeConvertedToNewAggregation(plan) =>
execution.Aggregate(
partial = false,
namedGroupingAttributes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private[joins] class SortMergeJoinScanner(
* Advances the streamed input iterator and buffers all rows from the buffered input that
* have matching keys.
* @return true if the streamed iterator returned a row, false otherwise. If this returns true,
* then [getStreamedRow and [[getBufferedMatches]] can be called to produce the outer
* then [[getStreamedRow]] and [[getBufferedMatches]] can be called to produce the outer
* join results.
*/
final def findNextOuterJoinRows(): Boolean = {
Expand Down
Loading

0 comments on commit 6023c8b

Please sign in to comment.