Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into Collect…
Browse files Browse the repository at this point in the history
…EnoughPrefixes
  • Loading branch information
zhangjiajin committed Jul 16, 2015
2 parents 095aa3a + ba33096 commit b07e20c
Show file tree
Hide file tree
Showing 110 changed files with 3,129 additions and 843 deletions.
2 changes: 1 addition & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1328,7 +1328,7 @@ setMethod("write.df",
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] = path
options[['path']] <- path
}
callJMethod(df@sdf, "save", source, jmode, options)
})
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/client.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ connectBackend <- function(hostname, port, timeout = 6000) {

determineSparkSubmitBin <- function() {
if (.Platform$OS.type == "unix") {
sparkSubmitBinName = "spark-submit"
sparkSubmitBinName <- "spark-submit"
} else {
sparkSubmitBinName = "spark-submit.cmd"
sparkSubmitBinName <- "spark-submit.cmd"
}
sparkSubmitBinName
}
Expand Down
1 change: 1 addition & 0 deletions R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
# Int -> integer
# String -> character
# Boolean -> logical
# Float -> double
# Double -> double
# Long -> double
# Array[Byte] -> raw
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/group.R
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ setMethod("count",
setMethod("agg",
signature(x = "GroupedData"),
function(x, ...) {
cols = list(...)
cols <- list(...)
stopifnot(length(cols) > 0)
if (is.character(cols[[1]])) {
cols <- varargsToEnv(...)
Expand All @@ -97,7 +97,7 @@ setMethod("agg",
if (!is.null(ns)) {
for (n in ns) {
if (n != "") {
cols[[n]] = alias(cols[[n]], n)
cols[[n]] <- alias(cols[[n]], n)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions R/pkg/R/schema.R
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ structField.character <- function(x, type, nullable = TRUE) {
}
options <- c("byte",
"integer",
"float",
"double",
"numeric",
"character",
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ convertJListToRList <- function(jList, flatten, logicalUpperBound = NULL,
if (isInstanceOf(obj, "scala.Tuple2")) {
# JavaPairRDD[Array[Byte], Array[Byte]].

keyBytes = callJMethod(obj, "_1")
valBytes = callJMethod(obj, "_2")
keyBytes <- callJMethod(obj, "_1")
valBytes <- callJMethod(obj, "_2")
res <- list(unserialize(keyBytes),
unserialize(valBytes))
} else {
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/test_binaryFile.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ context("functions on binary files")
# JavaSparkContext handle
sc <- sparkR.init()

mockFile = c("Spark is pretty.", "Spark is awesome.")
mockFile <- c("Spark is pretty.", "Spark is awesome.")

test_that("saveAsObjectFile()/objectFile() following textFile() works", {
fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/test_binary_function.R
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ test_that("zipPartitions() on RDDs", {
expect_equal(actual,
list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6))))

mockFile = c("Spark is pretty.", "Spark is awesome.")
mockFile <- c("Spark is pretty.", "Spark is awesome.")
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
writeLines(mockFile, fileName)

Expand Down
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ test_that("zipRDD() on RDDs", {
expect_equal(actual,
list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004)))

mockFile = c("Spark is pretty.", "Spark is awesome.")
mockFile <- c("Spark is pretty.", "Spark is awesome.")
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
writeLines(mockFile, fileName)

Expand Down Expand Up @@ -483,7 +483,7 @@ test_that("cartesian() on RDDs", {
actual <- collect(cartesian(rdd, emptyRdd))
expect_equal(actual, list())

mockFile = c("Spark is pretty.", "Spark is awesome.")
mockFile <- c("Spark is pretty.", "Spark is awesome.")
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
writeLines(mockFile, fileName)

Expand Down
26 changes: 26 additions & 0 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,32 @@ test_that("create DataFrame from RDD", {
expect_equal(count(df), 10)
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))

df <- jsonFile(sqlContext, jsonPathNa)
hiveCtx <- tryCatch({
newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc)
}, error = function(err) {
skip("Hive is not build with SparkSQL, skipped")
})
sql(hiveCtx, "CREATE TABLE people (name string, age double, height float)")
insertInto(df, "people")
expect_equal(sql(hiveCtx, "SELECT age from people WHERE name = 'Bob'"), c(16))
expect_equal(sql(hiveCtx, "SELECT height from people WHERE name ='Bob'"), c(176.5))

schema <- structType(structField("name", "string"), structField("age", "integer"),
structField("height", "float"))
df2 <- createDataFrame(sqlContext, df.toRDD, schema)
expect_equal(columns(df2), c("name", "age", "height"))
expect_equal(dtypes(df2), list(c("name", "string"), c("age", "int"), c("height", "float")))
expect_equal(collect(where(df2, df2$name == "Bob")), c("Bob", 16, 176.5))

localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18), height=c(164.10, 181.4, 173.7))
df <- createDataFrame(sqlContext, localDF, schema)
expect_is(df, "DataFrame")
expect_equal(count(df), 3)
expect_equal(columns(df), c("name", "age", "height"))
expect_equal(dtypes(df), list(c("name", "string"), c("age", "int"), c("height", "float")))
expect_equal(collect(where(df, df$name == "John")), c("John", 19, 164.10))
})

test_that("convert NAs to null type in DataFrames", {
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/test_textFile.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ context("the textFile() function")
# JavaSparkContext handle
sc <- sparkR.init()

mockFile = c("Spark is pretty.", "Spark is awesome.")
mockFile <- c("Spark is pretty.", "Spark is awesome.")

test_that("textFile() on a local file returns an RDD", {
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/test_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ test_that("cleanClosure on R functions", {
# Test for overriding variables in base namespace (Issue: SparkR-196).
nums <- as.list(1:10)
rdd <- parallelize(sc, nums, 2L)
t = 4 # Override base::t in .GlobalEnv.
t <- 4 # Override base::t in .GlobalEnv.
f <- function(x) { x > t }
newF <- cleanClosure(f)
env <- environment(newF)
Expand Down
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,11 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.razorvine</groupId>
<artifactId>pyrolite</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark
import java.util.concurrent.TimeUnit

import scala.collection.mutable
import scala.util.control.ControlThrowable

import com.codahale.metrics.{Gauge, MetricRegistry}

Expand Down Expand Up @@ -211,7 +212,16 @@ private[spark] class ExecutorAllocationManager(
listenerBus.addListener(listener)

val scheduleTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(schedule())
override def run(): Unit = {
try {
schedule()
} catch {
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
}
}
}
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}
Expand Down
15 changes: 14 additions & 1 deletion core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,20 @@ object TaskContext {
*/
def get(): TaskContext = taskContext.get

private val taskContext: ThreadLocal[TaskContext] = new ThreadLocal[TaskContext]
/**
* Returns the partition id of currently active TaskContext. It will return 0
* if there is no active TaskContext for cases like local execution.
*/
def getPartitionId(): Int = {
val tc = taskContext.get()
if (tc == null) {
0
} else {
tc.partitionId()
}
}

private[this] val taskContext: ThreadLocal[TaskContext] = new ThreadLocal[TaskContext]

// Note: protected[spark] instead of private[spark] to prevent the following two from
// showing up in JavaDoc.
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/api/r/SerDe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ private[spark] object SerDe {
// Int -> integer
// String -> character
// Boolean -> logical
// Float -> double
// Double -> double
// Long -> double
// Array[Byte] -> raw
Expand Down Expand Up @@ -215,6 +216,9 @@ private[spark] object SerDe {
case "long" | "java.lang.Long" =>
writeType(dos, "double")
writeDouble(dos, value.asInstanceOf[Long].toDouble)
case "float" | "java.lang.Float" =>
writeType(dos, "double")
writeDouble(dos, value.asInstanceOf[Float].toDouble)
case "double" | "java.lang.Double" =>
writeType(dos, "double")
writeDouble(dos, value.asInstanceOf[Double])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import java.io._

import scala.reflect.ClassTag

import akka.serialization.Serialization

import org.apache.spark.Logging
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer}
import org.apache.spark.util.Utils


Expand All @@ -32,11 +31,11 @@ import org.apache.spark.util.Utils
* Files are deleted when applications and workers are removed.
*
* @param dir Directory to store files. Created if non-existent (but not recursively).
* @param serialization Used to serialize our objects.
* @param serializer Used to serialize our objects.
*/
private[master] class FileSystemPersistenceEngine(
val dir: String,
val serialization: Serialization)
val serializer: Serializer)
extends PersistenceEngine with Logging {

new File(dir).mkdir()
Expand All @@ -57,27 +56,31 @@ private[master] class FileSystemPersistenceEngine(
private def serializeIntoFile(file: File, value: AnyRef) {
val created = file.createNewFile()
if (!created) { throw new IllegalStateException("Could not create file: " + file) }
val serializer = serialization.findSerializerFor(value)
val serialized = serializer.toBinary(value)
val out = new FileOutputStream(file)
val fileOut = new FileOutputStream(file)
var out: SerializationStream = null
Utils.tryWithSafeFinally {
out.write(serialized)
out = serializer.newInstance().serializeStream(fileOut)
out.writeObject(value)
} {
out.close()
fileOut.close()
if (out != null) {
out.close()
}
}
}

private def deserializeFromFile[T](file: File)(implicit m: ClassTag[T]): T = {
val fileData = new Array[Byte](file.length().asInstanceOf[Int])
val dis = new DataInputStream(new FileInputStream(file))
val fileIn = new FileInputStream(file)
var in: DeserializationStream = null
try {
dis.readFully(fileData)
in = serializer.newInstance().deserializeStream(fileIn)
in.readObject[T]()
} finally {
dis.close()
fileIn.close()
if (in != null) {
in.close()
}
}
val clazz = m.runtimeClass.asInstanceOf[Class[T]]
val serializer = serialization.serializerFor(clazz)
serializer.fromBinary(fileData).asInstanceOf[T]
}

}
18 changes: 7 additions & 11 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,8 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.language.postfixOps
import scala.util.Random

import akka.serialization.Serialization
import akka.serialization.SerializationExtension
import org.apache.hadoop.fs.Path

import org.apache.spark.rpc.akka.AkkaRpcEnv
import org.apache.spark.rpc._
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription,
Expand All @@ -44,6 +41,7 @@ import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.deploy.rest.StandaloneRestServer
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
import org.apache.spark.serializer.{JavaSerializer, Serializer}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}

Expand All @@ -58,9 +56,6 @@ private[master] class Master(
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")

// TODO Remove it once we don't use akka.serialization.Serialization
private val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem

private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)

private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
Expand Down Expand Up @@ -161,20 +156,21 @@ private[master] class Master(
masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)

val serializer = new JavaSerializer(conf)
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(actorSystem))
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, SerializationExtension(actorSystem))
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serialization])
.newInstance(conf, SerializationExtension(actorSystem))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
Expand Down Expand Up @@ -213,7 +209,7 @@ private[master] class Master(

override def receive: PartialFunction[Any, Unit] = {
case ElectedLeader => {
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.deploy.master

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rpc.RpcEnv

import scala.reflect.ClassTag

Expand Down Expand Up @@ -80,8 +81,11 @@ abstract class PersistenceEngine {
* Returns the persisted data sorted by their respective ids (which implies that they're
* sorted by time of creation).
*/
final def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
(read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read[WorkerInfo]("worker_"))
final def readPersistedData(
rpcEnv: RpcEnv): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
rpcEnv.deserialize { () =>
(read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read[WorkerInfo]("worker_"))
}
}

def close() {}
Expand Down
Loading

0 comments on commit b07e20c

Please sign in to comment.