Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into expr_bin
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jun 9, 2015
2 parents 50e0c3b + 490d5a7 commit 824f761
Show file tree
Hide file tree
Showing 40 changed files with 679 additions and 208 deletions.
8 changes: 8 additions & 0 deletions R/pkg/R/serialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ writeObject <- function(con, object, writeType = TRUE) {
# passing in vectors as arrays and instead require arrays to be passed
# as lists.
type <- class(object)[[1]] # class of POSIXlt is c("POSIXlt", "POSIXt")
# Checking types is needed here, since ‘is.na’ only handles atomic vectors,
# lists and pairlists
if (type %in% c("integer", "character", "logical", "double", "numeric")) {
if (is.na(object)) {
object <- NULL
type <- "NULL"
}
}
if (writeType) {
writeType(con, type)
}
Expand Down
37 changes: 37 additions & 0 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,43 @@ test_that("create DataFrame from RDD", {
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
})

test_that("convert NAs to null type in DataFrames", {
rdd <- parallelize(sc, list(list(1L, 2L), list(NA, 4L)))
df <- createDataFrame(sqlContext, rdd, list("a", "b"))
expect_true(is.na(collect(df)[2, "a"]))
expect_equal(collect(df)[2, "b"], 4L)

l <- data.frame(x = 1L, y = c(1L, NA_integer_, 3L))
df <- createDataFrame(sqlContext, l)
expect_equal(collect(df)[2, "x"], 1L)
expect_true(is.na(collect(df)[2, "y"]))

rdd <- parallelize(sc, list(list(1, 2), list(NA, 4)))
df <- createDataFrame(sqlContext, rdd, list("a", "b"))
expect_true(is.na(collect(df)[2, "a"]))
expect_equal(collect(df)[2, "b"], 4)

l <- data.frame(x = 1, y = c(1, NA_real_, 3))
df <- createDataFrame(sqlContext, l)
expect_equal(collect(df)[2, "x"], 1)
expect_true(is.na(collect(df)[2, "y"]))

l <- list("a", "b", NA, "d")
df <- createDataFrame(sqlContext, l)
expect_true(is.na(collect(df)[3, "_1"]))
expect_equal(collect(df)[4, "_1"], "d")

l <- list("a", "b", NA_character_, "d")
df <- createDataFrame(sqlContext, l)
expect_true(is.na(collect(df)[3, "_1"]))
expect_equal(collect(df)[4, "_1"], "d")

l <- list(TRUE, FALSE, NA, TRUE)
df <- createDataFrame(sqlContext, l)
expect_true(is.na(collect(df)[3, "_1"]))
expect_equal(collect(df)[4, "_1"], TRUE)
})

test_that("toDF", {
rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) })
df <- toDF(rdd, list("a", "b"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ abstract class DeserializationStream {
} catch {
case eof: EOFException =>
finished = true
null
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,11 +539,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
/**
* For testing only. Wait until at least `numExecutors` executors are up, or throw
* `TimeoutException` if the waiting time elapsed before `numExecutors` executors up.
* Exposed for testing.
*
* @param numExecutors the number of executors to wait at least
* @param timeout time to wait in milliseconds
*/
@VisibleForTesting
private[spark] def waitUntilExecutorsUp(numExecutors: Int, timeout: Long): Unit = {
val finishTime = System.currentTimeMillis() + timeout
while (System.currentTimeMillis() < finishTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ import org.apache.ivy.plugins.resolver.IBiblioResolver

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
import org.apache.spark.util.Utils

class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {

private var tempIvyPath: String = _

private val noOpOutputStream = new OutputStream {
def write(b: Int) = {}
}
Expand All @@ -47,6 +50,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
super.beforeAll()
// We don't want to write logs during testing
SparkSubmitUtils.printStream = new BufferPrintStream
tempIvyPath = Utils.createTempDir(namePrefix = "ivy").getAbsolutePath()
}

test("incorrect maven coordinate throws error") {
Expand Down Expand Up @@ -90,21 +94,20 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
}

test("ivy path works correctly") {
val ivyPath = "dummy" + File.separator + "ivy"
val md = SparkSubmitUtils.getModuleDescriptor
val artifacts = for (i <- 0 until 3) yield new MDArtifact(md, s"jar-$i", "jar", "jar")
var jPaths = SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(ivyPath))
var jPaths = SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, new File(tempIvyPath))
for (i <- 0 until 3) {
val index = jPaths.indexOf(ivyPath)
val index = jPaths.indexOf(tempIvyPath)
assert(index >= 0)
jPaths = jPaths.substring(index + ivyPath.length)
jPaths = jPaths.substring(index + tempIvyPath.length)
}
val main = MavenCoordinate("my.awesome.lib", "mylib", "0.1")
IvyTestUtils.withRepository(main, None, None) { repo =>
// end to end
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, Option(repo),
Option(ivyPath), true)
assert(jarPath.indexOf(ivyPath) >= 0, "should use non-default ivy path")
Option(tempIvyPath), true)
assert(jarPath.indexOf(tempIvyPath) >= 0, "should use non-default ivy path")
}
}

Expand All @@ -123,13 +126,12 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
}
// Local ivy repository with modified home
val dummyIvyPath = "dummy" + File.separator + "ivy"
val dummyIvyLocal = new File(dummyIvyPath, "local" + File.separator)
val dummyIvyLocal = new File(tempIvyPath, "local" + File.separator)
IvyTestUtils.withRepository(main, None, Some(dummyIvyLocal), true) { repo =>
val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, None,
Some(dummyIvyPath), true)
Some(tempIvyPath), true)
assert(jarPath.indexOf("mylib") >= 0, "should find artifact")
assert(jarPath.indexOf(dummyIvyPath) >= 0, "should be in new ivy path")
assert(jarPath.indexOf(tempIvyPath) >= 0, "should be in new ivy path")
}
}

Expand Down
4 changes: 2 additions & 2 deletions docs/mllib-frequent-pattern-mining.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ MLlib's FP-growth implementation takes the following (hyper-)parameters:
<div class="codetabs">
<div data-lang="scala" markdown="1">

[`FPGrowth`](api/java/org/apache/spark/mllib/fpm/FPGrowth.html) implements the
[`FPGrowth`](api/scala/index.html#org.apache.spark.mllib.fpm.FPGrowth) implements the
FP-growth algorithm.
It take a `JavaRDD` of transactions, where each transaction is an `Iterable` of items of a generic type.
Calling `FPGrowth.run` with transactions returns an
[`FPGrowthModel`](api/java/org/apache/spark/mllib/fpm/FPGrowthModel.html)
[`FPGrowthModel`](api/scala/index.html#org.apache.spark.mllib.fpm.FPGrowthModel)
that stores the frequent itemsets with their frequencies.

{% highlight scala %}
Expand Down
6 changes: 5 additions & 1 deletion docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1102,7 +1102,11 @@ root
{% endhighlight %}

Notice that the data types of the partitioning columns are automatically inferred. Currently,
numeric data types and string type are supported.
numeric data types and string type are supported. Sometimes users may not want to automatically
infer the data types of the partitioning columns. For these use cases, the automatic type inference
can be configured by `spark.sql.sources.partitionColumnTypeInference.enabled`, which is default to
`true`. When type inference is disabled, string type will be used for the partitioning columns.


### Schema merging

Expand Down
2 changes: 1 addition & 1 deletion docs/streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ title: Spark Streaming + Kafka Integration Guide
## Approach 1: Receiver-based Approach
This approach uses a Receiver to receive the data. The Received is implemented using the Kafka high-level consumer API. As with all receivers, the data received from Kafka through a Receiver is stored in Spark executors, and then jobs launched by Spark Streaming processes the data.

However, under default configuration, this approach can lose data under failures (see [receiver reliability](streaming-programming-guide.html#receiver-reliability). To ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming. To ensure zero data loss, enable the Write Ahead Logs (introduced in Spark 1.2). This synchronously saves all the received Kafka data into write ahead logs on a distributed file system (e.g HDFS), so that all the data can be recovered on failure. See [Deploying section](streaming-programming-guide.html#deploying-applications) in the streaming programming guide for more details on Write Ahead Logs.
However, under default configuration, this approach can lose data under failures (see [receiver reliability](streaming-programming-guide.html#receiver-reliability). To ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming (introduced in Spark 1.2). This synchronously saves all the received Kafka data into write ahead logs on a distributed file system (e.g HDFS), so that all the data can be recovered on failure. See [Deploying section](streaming-programming-guide.html#deploying-applications) in the streaming programming guide for more details on Write Ahead Logs.

Next, we discuss how to use this approach in your streaming application.

Expand Down
8 changes: 8 additions & 0 deletions mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.ml

import java.{util => ju}

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer

import org.apache.spark.Logging
Expand Down Expand Up @@ -175,6 +178,11 @@ class PipelineModel private[ml] (
val stages: Array[Transformer])
extends Model[PipelineModel] with Logging {

/** A Java/Python-friendly auxiliary constructor. */
private[ml] def this(uid: String, stages: ju.List[Transformer]) = {
this(uid, stages.asScala.toArray)
}

override def validateParams(): Unit = {
super.validateParams()
stages.foreach(_.validateParams())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ object GradientDescent extends Logging {
* if it's L2 updater; for L1 updater, the same logic is followed.
*/
var regVal = updater.compute(
weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
weights, Vectors.zeros(weights.size), 0, 1, regParam)._2

for (i <- 1 to numIterations) {
val bcWeights = data.context.broadcast(weights)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,11 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
*/
val initialWeights = {
if (numOfLinearPredictor == 1) {
Vectors.dense(new Array[Double](numFeatures))
Vectors.zeros(numFeatures)
} else if (addIntercept) {
Vectors.dense(new Array[Double]((numFeatures + 1) * numOfLinearPredictor))
Vectors.zeros((numFeatures + 1) * numOfLinearPredictor)
} else {
Vectors.dense(new Array[Double](numFeatures * numOfLinearPredictor))
Vectors.zeros(numFeatures * numOfLinearPredictor)
}
}
run(input, initialWeights)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,7 @@ abstract class StreamingLinearAlgorithm[
throw new IllegalArgumentException("Model must be initialized before starting training.")
}
data.foreachRDD { (rdd, time) =>
val initialWeights =
model match {
case Some(m) =>
m.weights
case None =>
val numFeatures = rdd.first().features.size
Vectors.dense(numFeatures)
}
model = Some(algorithm.run(rdd, initialWeights))
model = Some(algorithm.run(rdd, model.get.weights))
logInfo("Model updated at time %s".format(time.toString))
val display = model.get.weights.size match {
case x if x > 100 => model.get.weights.toArray.take(100).mkString("[", ",", "...")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class StreamingLinearRegressionWithSGD private[mllib] (
this
}

/** Set the initial weights. Default: [0.0, 0.0]. */
/** Set the initial weights. */
def setInitialWeights(initialWeights: Vector): this.type = {
this.model = Some(algorithm.createModel(initialWeights, 0.0))
this
Expand Down
17 changes: 17 additions & 0 deletions mllib/src/test/scala/org/apache/spark/ml/PipelineSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.ml

import scala.collection.JavaConverters._

import org.mockito.Matchers.{any, eq => meq}
import org.mockito.Mockito.when
import org.scalatest.mock.MockitoSugar.mock
Expand Down Expand Up @@ -81,4 +83,19 @@ class PipelineSuite extends SparkFunSuite {
pipeline.fit(dataset)
}
}

test("pipeline model constructors") {
val transform0 = mock[Transformer]
val model1 = mock[MyModel]

val stages = Array(transform0, model1)
val pipelineModel0 = new PipelineModel("pipeline0", stages)
assert(pipelineModel0.uid === "pipeline0")
assert(pipelineModel0.stages === stages)

val stagesAsList = stages.toList.asJava
val pipelineModel1 = new PipelineModel("pipeline1", stagesAsList)
assert(pipelineModel1.uid === "pipeline1")
assert(pipelineModel1.stages === stages)
}
}
26 changes: 24 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@
<parquet.deps.scope>compile</parquet.deps.scope>

<!--
Overridable test home. So that you can call individual pom files directory without
Overridable test home. So that you can call individual pom files directly without
things breaking.
-->
<spark.test.home>${session.executionRootDirectory}</spark.test.home>
Expand Down Expand Up @@ -587,7 +587,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.23.Final</version>
<version>4.0.28.Final</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
Expand Down Expand Up @@ -1256,6 +1256,7 @@
<systemProperties>
<derby.system.durability>test</derby.system.durability>
<java.awt.headless>true</java.awt.headless>
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
<spark.test.home>${spark.test.home}</spark.test.home>
<spark.testing>1</spark.testing>
<spark.ui.enabled>false</spark.ui.enabled>
Expand Down Expand Up @@ -1289,6 +1290,7 @@
<systemProperties>
<derby.system.durability>test</derby.system.durability>
<java.awt.headless>true</java.awt.headless>
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
<spark.test.home>${spark.test.home}</spark.test.home>
<spark.testing>1</spark.testing>
<spark.ui.enabled>false</spark.ui.enabled>
Expand Down Expand Up @@ -1548,6 +1550,26 @@
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>create-tmp-dir</id>
<phase>generate-test-resources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<mkdir dir="${project.build.directory}/tmp" />
</target>
</configuration>
</execution>
</executions>
</plugin>

<!-- Enable surefire and scalatest in all children, in one place: -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
6 changes: 6 additions & 0 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ object BuildCommons {
// Root project.
val spark = ProjectRef(buildLocation, "spark")
val sparkHome = buildLocation

val testTempDir = s"$sparkHome/target/tmp"
if (!new File(testTempDir).isDirectory()) {
require(new File(testTempDir).mkdirs())
}
}

object SparkBuild extends PomBuild {
Expand Down Expand Up @@ -496,6 +501,7 @@ object TestSettings {
"SPARK_DIST_CLASSPATH" ->
(fullClasspath in Test).value.files.map(_.getAbsolutePath).mkString(":").stripSuffix(":"),
"JAVA_HOME" -> sys.env.get("JAVA_HOME").getOrElse(sys.props("java.home"))),
javaOptions in Test += s"-Djava.io.tmpdir=$testTempDir",
javaOptions in Test += "-Dspark.test.home=" + sparkHome,
javaOptions in Test += "-Dspark.testing=1",
javaOptions in Test += "-Dspark.port.maxRetries=100",
Expand Down
Loading

0 comments on commit 824f761

Please sign in to comment.