Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into SPARK-1413
Browse files Browse the repository at this point in the history
  • Loading branch information
witgo committed Apr 7, 2014
2 parents 8bb6ffd + accd099 commit fa00d5d
Show file tree
Hide file tree
Showing 34 changed files with 452 additions and 252 deletions.
12 changes: 11 additions & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>hive</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-ganglia-lgpl</id>
<dependencies>
Expand Down Expand Up @@ -208,7 +218,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>buildnumber-maven-plugin</artifactId>
<version>1.1</version>
<version>1.2</version>
<executions>
<execution>
<phase>validate</phase>
Expand Down
35 changes: 19 additions & 16 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,7 @@ FWDIR="$(cd `dirname $0`/..; pwd)"
# Build up classpath
CLASSPATH="$SPARK_CLASSPATH:$FWDIR/conf"

# Support for interacting with Hive. Since hive pulls in a lot of dependencies that might break
# existing Spark applications, it is not included in the standard spark assembly. Instead, we only
# include it in the classpath if the user has explicitly requested it by running "sbt hive/assembly"
# Hopefully we will find a way to avoid uber-jars entirely and deploy only the needed packages in
# the future.
if [ -f "$FWDIR"/sql/hive/target/scala-$SCALA_VERSION/spark-hive-assembly-*.jar ]; then

# Datanucleus jars do not work if only included in the uberjar as plugin.xml metadata is lost.
DATANUCLEUSJARS=$(JARS=("$FWDIR/lib_managed/jars"/datanucleus-*.jar); IFS=:; echo "${JARS[*]}")
CLASSPATH=$CLASSPATH:$DATANUCLEUSJARS

ASSEMBLY_DIR="$FWDIR/sql/hive/target/scala-$SCALA_VERSION/"
else
ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION/"
fi
ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION"

# First check if we have a dependencies jar. If so, include binary classes with the deps jar
if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
Expand All @@ -59,7 +45,7 @@ if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes"

DEPS_ASSEMBLY_JAR=`ls "$ASSEMBLY_DIR"/spark*-assembly*hadoop*-deps.jar`
DEPS_ASSEMBLY_JAR=`ls "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar`
CLASSPATH="$CLASSPATH:$DEPS_ASSEMBLY_JAR"
else
# Else use spark-assembly jar from either RELEASE or assembly directory
Expand All @@ -71,6 +57,23 @@ else
CLASSPATH="$CLASSPATH:$ASSEMBLY_JAR"
fi

# When Hive support is needed, Datanucleus jars must be included on the classpath.
# Datanucleus jars do not work if only included in the uber jar as plugin.xml metadata is lost.
# Both sbt and maven will populate "lib_managed/jars/" with the datanucleus jars when Spark is
# built with Hive, so first check if the datanucleus jars exist, and then ensure the current Spark
# assembly is built for Hive, before actually populating the CLASSPATH with the jars.
# Note that this check order is faster (by up to half a second) in the case where Hive is not used.
num_datanucleus_jars=$(ls "$FWDIR"/lib_managed/jars/ | grep "datanucleus-.*\\.jar" | wc -l)
if [ $num_datanucleus_jars -gt 0 ]; then
AN_ASSEMBLY_JAR=${ASSEMBLY_JAR:-$DEPS_ASSEMBLY_JAR}
num_hive_files=$(jar tvf "$AN_ASSEMBLY_JAR" org/apache/hadoop/hive/ql/exec 2>/dev/null | wc -l)
if [ $num_hive_files -gt 0 ]; then
echo "Spark assembly has been built with Hive, including Datanucleus jars on classpath" 1>&2
DATANUCLEUSJARS=$(echo "$FWDIR/lib_managed/jars"/datanucleus-*.jar | tr " " :)
CLASSPATH=$CLASSPATH:$DATANUCLEUSJARS
fi
fi

# Add test classes if we're running from SBT or Maven with SPARK_TESTING set to 1
if [[ $SPARK_TESTING == 1 ]]; then
CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/test-classes"
Expand Down
2 changes: 0 additions & 2 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -154,5 +154,3 @@ if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
fi

exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"


2 changes: 0 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,10 @@
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_${scala.binary.version}</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill-java</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
Expand Down
14 changes: 13 additions & 1 deletion core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.api.java

import java.util.{Comparator, List => JList}
import java.util.{Comparator, Iterator => JIterator, List => JList}
import java.lang.{Iterable => JIterable}

import scala.collection.JavaConversions._
import scala.reflect.ClassTag
Expand Down Expand Up @@ -280,6 +281,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
new java.util.ArrayList(arr)
}

/**
* Return an iterator that contains all of the elements in this RDD.
*
* The iterator will consume as much memory as the largest partition in this RDD.
*/
def toLocalIterator(): JIterator[T] = {
import scala.collection.JavaConversions._
rdd.toLocalIterator
}


/**
* Return an array that contains all of the elements in this RDD.
* @deprecated As of Spark 1.0.0, toArray() is deprecated, use {@link #collect()} instead
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ private[deploy] object DeployMessages {

case class KillDriver(driverId: String) extends DeployMessage

// Worker internal

case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders

// AppClient to Master

case class RegisterApplication(appDescription: ApplicationDescription)
Expand Down
23 changes: 22 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ private[spark] class Worker(
val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3

val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", true)
// How often worker will clean up old app folders
val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
// TTL for app folders/data; after TTL expires it will be cleaned up
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)

// Index into masterUrls that we're currently trying to register with.
var masterIndex = 0

Expand Down Expand Up @@ -179,12 +185,28 @@ private[spark] class Worker(
registered = true
changeMaster(masterUrl, masterWebUiUrl)
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
if (CLEANUP_ENABLED) {
context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
}

case SendHeartbeat =>
masterLock.synchronized {
if (connected) { master ! Heartbeat(workerId) }
}

case WorkDirCleanup =>
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
val cleanupFuture = concurrent.future {
logInfo("Cleaning up oldest application directories in " + workDir + " ...")
Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS)
.foreach(Utils.deleteRecursively)
}
cleanupFuture onFailure {
case e: Throwable =>
logError("App dir cleanup failed: " + e.getMessage, e)
}

case MasterChanged(masterUrl, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterUrl)
changeMaster(masterUrl, masterWebUiUrl)
Expand Down Expand Up @@ -331,7 +353,6 @@ private[spark] class Worker(
}

private[spark] object Worker {

def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,18 @@ abstract class RDD[T: ClassTag](
Array.concat(results: _*)
}

/**
* Return an iterator that contains all of the elements in this RDD.
*
* The iterator will consume as much memory as the largest partition in this RDD.
*/
def toLocalIterator: Iterator[T] = {
def collectPartition(p: Int): Array[T] = {
sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head
}
(0 until partitions.length).iterator.flatMap(i => collectPartition(i))
}

/**
* Return an array that contains all of the elements in this RDD.
*/
Expand Down
19 changes: 17 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -597,9 +597,24 @@ private[spark] object Utils extends Logging {
}

if (fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())) {
return false;
return false
} else {
return true;
return true
}
}

/**
* Finds all the files in a directory whose last modified time is older than cutoff seconds.
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
* @param cutoff measured in seconds. Files older than this are returned.
*/
def findOldFiles(dir: File, cutoff: Long): Seq[File] = {
val currentTimeMillis = System.currentTimeMillis
if (dir.isDirectory) {
val files = listFilesSafely(dir)
files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 1000) }
} else {
throw new IllegalArgumentException(dir + " is not a directory!")
}
}

Expand Down
9 changes: 9 additions & 0 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import scala.Tuple2;

import com.google.common.collect.Lists;
import com.google.common.base.Optional;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
Expand Down Expand Up @@ -179,6 +180,14 @@ public void call(String s) {
Assert.assertEquals(2, foreachCalls);
}

@Test
public void toLocalIterator() {
List<Integer> correct = Arrays.asList(1, 2, 3, 4);
JavaRDD<Integer> rdd = sc.parallelize(correct);
List<Integer> result = Lists.newArrayList(rdd.toLocalIterator());
Assert.assertTrue(correct.equals(result));
}

@SuppressWarnings("unchecked")
@Test
public void lookup() {
Expand Down
1 change: 1 addition & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
test("basic operations") {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
assert(nums.collect().toList === List(1, 2, 3, 4))
assert(nums.toLocalIterator.toList === List(1, 2, 3, 4))
val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)
assert(dups.distinct().count() === 4)
assert(dups.distinct.count === 4) // Can distinct and count be called without parentheses?
Expand Down
15 changes: 14 additions & 1 deletion core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.util

import scala.util.Random

import java.io.{ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
import java.nio.{ByteBuffer, ByteOrder}

import com.google.common.base.Charsets
Expand Down Expand Up @@ -154,5 +154,18 @@ class UtilsSuite extends FunSuite {
val iterator = Iterator.range(0, 5)
assert(Utils.getIteratorSize(iterator) === 5L)
}

test("findOldFiles") {
// create some temporary directories and files
val parent: File = Utils.createTempDir()
val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories
val child2: File = Utils.createTempDir(parent.getCanonicalPath)
// set the last modified time of child1 to 10 secs old
child1.setLastModified(System.currentTimeMillis() - (1000 * 10))

val result = Utils.findOldFiles(parent, 5) // find files older than 5 secs
assert(result.size.equals(1))
assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath))
}
}

2 changes: 1 addition & 1 deletion dev/audit-release/maven_app_core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<version>3.1</version>
</plugin>
</plugins>
</build>
Expand Down
4 changes: 2 additions & 2 deletions dev/create-release/create-release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ mvn -DskipTests \
-Darguments="-DskipTests=true -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -Dgpg.passphrase=${GPG_PASSPHRASE}" \
-Dusername=$GIT_USERNAME -Dpassword=$GIT_PASSWORD \
-Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
-Pyarn -Pspark-ganglia-lgpl \
-Pyarn -Phive -Pspark-ganglia-lgpl\
-Dtag=$GIT_TAG -DautoVersionSubmodules=true \
--batch-mode release:prepare

mvn -DskipTests \
-Darguments="-DskipTests=true -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -Dgpg.passphrase=${GPG_PASSPHRASE}" \
-Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
-Pyarn -Pspark-ganglia-lgpl\
-Pyarn -Phive -Pspark-ganglia-lgpl\
release:perform

rm -rf spark
Expand Down
26 changes: 22 additions & 4 deletions dev/merge_spark_pr.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,20 @@ def merge_pr(pr_num, target_ref):
run_cmd("git fetch %s %s:%s" % (PUSH_REMOTE_NAME, target_ref, target_branch_name))
run_cmd("git checkout %s" % target_branch_name)

run_cmd(['git', 'merge', pr_branch_name, '--squash'])
had_conflicts = False
try:
run_cmd(['git', 'merge', pr_branch_name, '--squash'])
except Exception as e:
msg = "Error merging: %s\nWould you like to manually fix-up this merge?" % e
continue_maybe(msg)
msg = "Okay, please fix any conflicts and 'git add' conflicting files... Finished?"
continue_maybe(msg)
had_conflicts = True

commit_authors = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name,
'--pretty=format:%an <%ae>']).split("\n")
distinct_authors = sorted(set(commit_authors), key=lambda x: commit_authors.count(x), reverse=True)
distinct_authors = sorted(set(commit_authors), key=lambda x: commit_authors.count(x),
reverse=True)
primary_author = distinct_authors[0]
commits = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name,
'--pretty=format:%h [%an] %s']).split("\n\n")
Expand All @@ -105,6 +114,13 @@ def merge_pr(pr_num, target_ref):

merge_message_flags += ["-m", authors]

if had_conflicts:
committer_name = run_cmd("git config --get user.name").strip()
committer_email = run_cmd("git config --get user.email").strip()
message = "This patch had conflicts when merged, resolved by\nCommitter: %s <%s>" % (
committer_name, committer_email)
merge_message_flags += ["-m", message]

# The string "Closes #%s" string is required for GitHub to correctly close the PR
merge_message_flags += ["-m",
"Closes #%s from %s and squashes the following commits:" % (pr_num, pr_repo_desc)]
Expand Down Expand Up @@ -186,8 +202,10 @@ def maybe_cherry_pick(pr_num, merge_hash, default_branch):
maybe_cherry_pick(pr_num, merge_hash, latest_branch)
sys.exit(0)

if bool(pr["mergeable"]) == False:
fail("Pull request %s is not mergeable in its current form" % pr_num)
if not bool(pr["mergeable"]):
msg = "Pull request %s is not mergeable in its current form.\n" % pr_num + \
"Continue? (experts only!)"
continue_maybe(msg)

print ("\n=== Pull Request #%s ===" % pr_num)
print("title\t%s\nsource\t%s\ntarget\t%s\nurl\t%s" % (
Expand Down
2 changes: 1 addition & 1 deletion docs/building-with-maven.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ title: Building Spark with Maven
* This will become a table of contents (this text will be scraped).
{:toc}

Building Spark using Maven Requires Maven 3 (the build process is tested with Maven 3.0.4) and Java 1.6 or newer.
Building Spark using Maven requires Maven 3.0.4 or newer and Java 1.6 or newer.


## Setting up Maven's Memory Usage ##
Expand Down
Loading

0 comments on commit fa00d5d

Please sign in to comment.