Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
Browse files Browse the repository at this point in the history
  • Loading branch information
MLnick committed Jun 3, 2014
2 parents 15a7d07 + 862283e commit 01e0813
Show file tree
Hide file tree
Showing 37 changed files with 1,492 additions and 854 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
sbt/*.jar
.settings
.cache
.mima-excludes
.generated-mima-excludes
/build/
work/
out/
Expand Down
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ target
.project
.classpath
.mima-excludes
.generated-mima-excludes
.rat-excludes
.*md
derby.log
Expand Down
29 changes: 0 additions & 29 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -258,35 +258,6 @@
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<phase>test</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<exportAntProperties>true</exportAntProperties>
<target>
<property name="spark.classpath" refid="maven.test.classpath" />
<property environment="env" />
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
<condition>
<not>
<or>
<isset property="env.SCALA_HOME" />
<isset property="env.SCALA_LIBRARY_PATH" />
</or>
</not>
</condition>
</fail>
</target>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
Expand Down
20 changes: 20 additions & 0 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,26 @@ private object SpecialLengths {
private[spark] object PythonRDD extends Logging {
val UTF8 = Charset.forName("UTF-8")

/**
* Adapter for calling SparkContext#runJob from Python.
*
* This method will return an iterator of an array that contains all elements in the RDD
* (effectively a collect()), but allows you to run on a certain subset of partitions,
* or to enable local execution.
*/
def runJob(
sc: SparkContext,
rdd: JavaRDD[Array[Byte]],
partitions: JArrayList[Int],
allowLocal: Boolean): Iterator[Array[Byte]] = {
type ByteArray = Array[Byte]
type UnrolledPartition = Array[ByteArray]
val allPartitions: Array[UnrolledPartition] =
sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions, allowLocal)
val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*)
flattenedPartition.iterator
}

def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int):
JavaRDD[Array[Byte]] = {
val file = new DataInputStream(new FileInputStream(filename))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,16 +381,19 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
object SparkSubmitArguments {
/** Load properties present in the given file. */
def getPropertiesFromFile(file: File): Seq[(String, String)] = {
require(file.exists(), s"Properties file ${file.getName} does not exist")
require(file.exists(), s"Properties file $file does not exist")
require(file.isFile(), s"Properties file $file is not a normal file")
val inputStream = new FileInputStream(file)
val properties = new Properties()
try {
val properties = new Properties()
properties.load(inputStream)
properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim))
} catch {
case e: IOException =>
val message = s"Failed when loading Spark properties file ${file.getName}"
val message = s"Failed when loading Spark properties file $file"
throw new SparkException(message, e)
} finally {
inputStream.close()
}
properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim))
}
}
34 changes: 34 additions & 0 deletions dev/mima
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

set -o pipefail

# Go to the Spark project root directory
FWDIR="$(cd `dirname $0`/..; pwd)"
cd $FWDIR

./bin/spark-class org.apache.spark.tools.GenerateMIMAIgnore
echo -e "q\n" | sbt/sbt mima-report-binary-issues | grep -v -e "info.*Resolving"
ret_val=$?

if [ $ret_val != 0 ]; then
echo "NOTE: Exceptions to binary compatibility can be added in project/MimaExcludes.scala"
fi

exit $ret_val
3 changes: 1 addition & 2 deletions dev/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,4 @@ fi
echo "========================================================================="
echo "Detecting binary incompatibilites with MiMa"
echo "========================================================================="
./bin/spark-class org.apache.spark.tools.GenerateMIMAIgnore
echo -e "q\n" | sbt/sbt mima-report-binary-issues | grep -v -e "info.*Resolving"
dev/mima
4 changes: 3 additions & 1 deletion docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ A schema can be applied to an existing RDD by calling `applySchema` and providin
for the JavaBean.

{% highlight java %}
JavaSQLContext ctx = new org.apache.spark.sql.api.java.JavaSQLContext(sc)

JavaSparkContext ctx = ...; // An existing JavaSparkContext.
JavaSQLContext sqlCtx = new org.apache.spark.sql.api.java.JavaSQLContext(ctx)

// Load a text file and convert each line to a JavaBean.
JavaRDD<Person> people = ctx.textFile("examples/src/main/resources/people.txt").map(
Expand Down
Loading

0 comments on commit 01e0813

Please sign in to comment.