Skip to content

Commit

Permalink
Merge branch 'master' of git://git.apache.org/spark into SPARK-2583
Browse files Browse the repository at this point in the history
  • Loading branch information
sarutak committed Jul 29, 2014
2 parents e7d9aa6 + 800ecff commit 9dfd0d8
Show file tree
Hide file tree
Showing 99 changed files with 4,044 additions and 420 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@ metastore_db/
metastore/
warehouse/
TempStatsStore/
sql/hive-thriftserver/test_warehouses
10 changes: 10 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>hive-thriftserver</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-ganglia-lgpl</id>
<dependencies>
Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-bagel_2.10</artifactId>
<properties>
<sbt.project.name>bagel</sbt.project.name>
<sbt.project.name>bagel</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project Bagel</name>
Expand Down
45 changes: 45 additions & 0 deletions bin/beeline
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"

# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ `command -v java` ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi

# Compute classpath using external script
classpath_output=$($FWDIR/bin/compute-classpath.sh)
if [[ "$?" != "0" ]]; then
echo "$classpath_output"
exit 1
else
CLASSPATH=$classpath_output
fi

CLASS="org.apache.hive.beeline.BeeLine"
exec "$RUNNER" -cp "$CLASSPATH" $CLASS "$@"
1 change: 1 addition & 0 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ if [ -n "$SPARK_PREPEND_CLASSES" ]; then
CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SCALA_VERSION/classes"
fi

Expand Down
4 changes: 2 additions & 2 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ function main(){
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
$FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
$FWDIR/bin/spark-submit --class org.apache.spark.repl.Main spark-shell "$@"
stty icanon echo > /dev/null 2>&1
else
export SPARK_SUBMIT_OPTS
$FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
$FWDIR/bin/spark-submit --class org.apache.spark.repl.Main spark-shell "$@"
fi
}

Expand Down
2 changes: 1 addition & 1 deletion bin/spark-shell.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ rem

set SPARK_HOME=%~dp0..

cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell %* --class org.apache.spark.repl.Main
cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell --class org.apache.spark.repl.Main %*
36 changes: 36 additions & 0 deletions bin/spark-sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#
# Shell script for starting the Spark SQL CLI

# Enter posix mode for bash
set -o posix

# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./sbin/spark-sql [options]"
$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
exit 0
fi

CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
exec "$FWDIR"/bin/spark-submit --class $CLASS spark-internal $@
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<properties>
<sbt.project.name>core</sbt.project.name>
<sbt.project.name>core</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project Core</name>
Expand Down
4 changes: 1 addition & 3 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.SortOrder.SortOrder
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleHandle

Expand Down Expand Up @@ -63,8 +62,7 @@ class ShuffleDependency[K, V, C](
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false,
val sortOrder: Option[SortOrder] = None)
val mapSideCombine: Boolean = false)
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {

val shuffleId: Int = rdd.context.newShuffleId()
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ object SparkSubmit {
private val CLUSTER = 2
private val ALL_DEPLOY_MODES = CLIENT | CLUSTER

// A special jar name that indicates the class being run is inside of Spark itself, and therefore
// no user jar is needed.
private val SPARK_INTERNAL = "spark-internal"

// Special primary resource names that represent shells rather than application jars.
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"
Expand Down Expand Up @@ -257,7 +261,9 @@ object SparkSubmit {
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (clusterManager == YARN && deployMode == CLUSTER) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
childArgs += ("--jar", args.primaryResource)
if (args.primaryResource != SPARK_INTERNAL) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
Expand Down Expand Up @@ -332,7 +338,7 @@ object SparkSubmit {
* Return whether the given primary resource represents a user jar.
*/
private def isUserJar(primaryResource: String): Boolean = {
!isShell(primaryResource) && !isPython(primaryResource)
!isShell(primaryResource) && !isPython(primaryResource) && !isInternal(primaryResource)
}

/**
Expand All @@ -349,6 +355,10 @@ object SparkSubmit {
primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL
}

private[spark] def isInternal(primaryResource: String): Boolean = {
primaryResource == SPARK_INTERNAL
}

/**
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {

/** Fill in values by parsing user options. */
private def parseOpts(opts: Seq[String]): Unit = {
// Delineates parsing of Spark options from parsing of user options.
var inSparkOpts = true

// Delineates parsing of Spark options from parsing of user options.
parse(opts)

def parse(opts: Seq[String]): Unit = opts match {
Expand Down Expand Up @@ -318,7 +319,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
SparkSubmit.printErrorAndExit(errMessage)
case v =>
primaryResource =
if (!SparkSubmit.isShell(v)) {
if (!SparkSubmit.isShell(v) && !SparkSubmit.isInternal(v)) {
Utils.resolveURI(v).toString
} else {
v
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,6 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V, P](self, part)
.setKeyOrdering(ordering)
.setSortOrder(if (ascending) SortOrder.ASCENDING else SortOrder.DESCENDING)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
}

private[spark] object SortOrder extends Enumeration {
type SortOrder = Value
val ASCENDING, DESCENDING = Value
}
12 changes: 1 addition & 11 deletions core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.SortOrder.SortOrder
import org.apache.spark.serializer.Serializer

private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
Expand Down Expand Up @@ -52,8 +51,6 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag](

private var mapSideCombine: Boolean = false

private var sortOrder: Option[SortOrder] = None

/** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */
def setSerializer(serializer: Serializer): ShuffledRDD[K, V, C, P] = {
this.serializer = Option(serializer)
Expand All @@ -78,15 +75,8 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag](
this
}

/** Set sort order for RDD's sorting. */
def setSortOrder(sortOrder: SortOrder): ShuffledRDD[K, V, C, P] = {
this.sortOrder = Option(sortOrder)
this
}

override def getDependencies: Seq[Dependency[_]] = {
List(new ShuffleDependency(prev, part, serializer,
keyOrdering, aggregator, mapSideCombine, sortOrder))
List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
}

override val partitioner = Some(part)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.shuffle.hash

import org.apache.spark.{InterruptibleIterator, TaskContext}
import org.apache.spark.rdd.SortOrder
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReader}

Expand Down Expand Up @@ -51,16 +50,22 @@ class HashShuffleReader[K, C](
iter
}

val sortedIter = for (sortOrder <- dep.sortOrder; ordering <- dep.keyOrdering) yield {
val buf = aggregatedIter.toArray
if (sortOrder == SortOrder.ASCENDING) {
buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
} else {
buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
}
// Sort the output if there is a sort ordering defined.
dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =>
// Define a Comparator for the whole record based on the key Ordering.
val cmp = new Ordering[Product2[K, C]] {
override def compare(o1: Product2[K, C], o2: Product2[K, C]): Int = {
keyOrd.compare(o1._1, o2._1)
}
}
val sortBuffer: Array[Product2[K, C]] = aggregatedIter.toArray
// TODO: do external sort.
scala.util.Sorting.quickSort(sortBuffer)(cmp)
sortBuffer.iterator
case None =>
aggregatedIter
}

sortedIter.getOrElse(aggregatedIter)
}

/** Close this reader */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.util

import org.apache.commons.lang.SystemUtils
import org.apache.commons.lang3.SystemUtils
import org.slf4j.Logger
import sun.misc.{Signal, SignalHandler}

Expand Down
10 changes: 5 additions & 5 deletions dev/create-release/create-release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ if [[ ! "$@" =~ --package-only ]]; then
-Dusername=$GIT_USERNAME -Dpassword=$GIT_PASSWORD \
-Dmaven.javadoc.skip=true \
-Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
-Pyarn -Phive -Phadoop-2.2 -Pspark-ganglia-lgpl\
-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl\
-Dtag=$GIT_TAG -DautoVersionSubmodules=true \
--batch-mode release:prepare

mvn -DskipTests \
-Darguments="-DskipTests=true -Dmaven.javadoc.skip=true -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -Dgpg.passphrase=${GPG_PASSPHRASE}" \
-Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
-Dmaven.javadoc.skip=true \
-Pyarn -Phive -Phadoop-2.2 -Pspark-ganglia-lgpl\
-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl\
release:perform

cd ..
Expand Down Expand Up @@ -111,10 +111,10 @@ make_binary_release() {
spark-$RELEASE_VERSION-bin-$NAME.tgz.sha
}

make_binary_release "hadoop1" "-Phive -Dhadoop.version=1.0.4"
make_binary_release "cdh4" "-Phive -Dhadoop.version=2.0.0-mr1-cdh4.2.0"
make_binary_release "hadoop1" "-Phive -Phive-thriftserver -Dhadoop.version=1.0.4"
make_binary_release "cdh4" "-Phive -Phive-thriftserver -Dhadoop.version=2.0.0-mr1-cdh4.2.0"
make_binary_release "hadoop2" \
"-Phive -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0"
"-Phive -Phive-thriftserver -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0"

# Copy data
echo "Copying release tarballs"
Expand Down
2 changes: 1 addition & 1 deletion dev/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ echo "========================================================================="
# (either resolution or compilation) prompts the user for input either q, r,
# etc to quit or retry. This echo is there to make it not block.
if [ -n "$_RUN_SQL_TESTS" ]; then
echo -e "q\n" | SBT_MAVEN_PROFILES="$SBT_MAVEN_PROFILES -Phive" sbt/sbt clean package \
echo -e "q\n" | SBT_MAVEN_PROFILES="$SBT_MAVEN_PROFILES -Phive -Phive-thriftserver" sbt/sbt clean package \
assembly/assembly test | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
else
echo -e "q\n" | sbt/sbt clean package assembly/assembly test | \
Expand Down
2 changes: 1 addition & 1 deletion dev/scalastyle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# limitations under the License.
#

echo -e "q\n" | sbt/sbt -Phive scalastyle > scalastyle.txt
echo -e "q\n" | sbt/sbt -Phive -Phive-thriftserver scalastyle > scalastyle.txt
# Check style with YARN alpha built too
echo -e "q\n" | sbt/sbt -Pyarn -Phadoop-0.23 -Dhadoop.version=0.23.9 yarn-alpha/scalastyle \
>> scalastyle.txt
Expand Down
Loading

0 comments on commit 9dfd0d8

Please sign in to comment.