Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into SPARK-20754-mod&pos…
Browse files Browse the repository at this point in the history
…ition

Conflicts:
	sql/core/src/test/resources/sql-tests/inputs/operators.sql
	sql/core/src/test/resources/sql-tests/results/operators.sql.out
  • Loading branch information
wangyum committed Jun 13, 2017
2 parents 6168d3f + fc0e694 commit 746b37d
Show file tree
Hide file tree
Showing 183 changed files with 6,839 additions and 1,715 deletions.
1 change: 1 addition & 0 deletions R/pkg/.Rbuildignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
^README\.Rmd$
^src-native$
^html$
^tests/fulltests/*
1 change: 1 addition & 0 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2646,6 +2646,7 @@ generateAliasesForIntersectedCols <- function (x, intersectedColNames, suffix) {
#' Input SparkDataFrames can have different schemas (names and data types).
#'
#' Note: This does not remove duplicate rows across the two SparkDataFrames.
#' Also as standard in SQL, this function resolves columns by position (not by name).
#'
#' @param x A SparkDataFrame
#' @param y A SparkDataFrame
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/install.R
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ hadoopVersionName <- function(hadoopVersion) {
# The implementation refers to appdirs package: https://pypi.python.org/pypi/appdirs and
# adapt to Spark context
sparkCachePath <- function() {
if (.Platform$OS.type == "windows") {
if (is_windows()) {
winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA)
if (is.na(winAppPath)) {
stop(paste("%LOCALAPPDATA% not found.",
Expand Down
8 changes: 2 additions & 6 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -908,10 +908,6 @@ isAtomicLengthOne <- function(x) {
is.atomic(x) && length(x) == 1
}

is_cran <- function() {
!identical(Sys.getenv("NOT_CRAN"), "true")
}

is_windows <- function() {
.Platform$OS.type == "windows"
}
Expand All @@ -920,6 +916,6 @@ hadoop_home_set <- function() {
!identical(Sys.getenv("HADOOP_HOME"), "")
}

not_cran_or_windows_with_hadoop <- function() {
!is_cran() && (!is_windows() || hadoop_home_set())
windows_with_hadoop <- function() {
!is_windows() || hadoop_home_set()
}
90 changes: 90 additions & 0 deletions R/pkg/inst/tests/testthat/test_basic.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

context("basic tests for CRAN")

test_that("create DataFrame from list or data.frame", {
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)

i <- 4
df <- createDataFrame(data.frame(dummy = 1:i))
expect_equal(count(df), i)

l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
df <- createDataFrame(l)
expect_equal(columns(df), c("a", "b"))

a <- 1:3
b <- c("a", "b", "c")
ldf <- data.frame(a, b)
df <- createDataFrame(ldf)
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
expect_equal(count(df), 3)
ldf2 <- collect(df)
expect_equal(ldf$a, ldf2$a)

mtcarsdf <- createDataFrame(mtcars)
expect_equivalent(collect(mtcarsdf), mtcars)

bytes <- as.raw(c(1, 2, 3))
df <- createDataFrame(list(list(bytes)))
expect_equal(collect(df)[[1]][[1]], bytes)

sparkR.session.stop()
})

test_that("spark.glm and predict", {
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)

training <- suppressWarnings(createDataFrame(iris))
# gaussian family
model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species)
prediction <- predict(model, training)
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
vals <- collect(select(prediction, "prediction"))
rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)

# Gamma family
x <- runif(100, -1, 1)
y <- rgamma(100, rate = 10 / exp(0.5 + 1.2 * x), shape = 10)
df <- as.DataFrame(as.data.frame(list(x = x, y = y)))
model <- glm(y ~ x, family = Gamma, df)
out <- capture.output(print(summary(model)))
expect_true(any(grepl("Dispersion parameter for gamma family", out)))

# tweedie family
model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
family = "tweedie", var.power = 1.2, link.power = 0.0)
prediction <- predict(model, training)
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
vals <- collect(select(prediction, "prediction"))

# manual calculation of the R predicted values to avoid dependence on statmod
#' library(statmod)
#' rModel <- glm(Sepal.Width ~ Sepal.Length + Species, data = iris,
#' family = tweedie(var.power = 1.2, link.power = 0.0))
#' print(coef(rModel))

rCoef <- c(0.6455409, 0.1169143, -0.3224752, -0.3282174)
rVals <- exp(as.numeric(model.matrix(Sepal.Width ~ Sepal.Length + Species,
data = iris) %*% rCoef))
expect_true(all(abs(rVals - vals) < 1e-5), rVals - vals)

sparkR.session.stop()
})
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ context("SerDe functionality")
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)

test_that("SerDe of primitive types", {
skip_on_cran()

x <- callJStatic("SparkRHandler", "echo", 1L)
expect_equal(x, 1L)
expect_equal(class(x), "integer")
Expand All @@ -40,8 +38,6 @@ test_that("SerDe of primitive types", {
})

test_that("SerDe of list of primitive types", {
skip_on_cran()

x <- list(1L, 2L, 3L)
y <- callJStatic("SparkRHandler", "echo", x)
expect_equal(x, y)
Expand Down Expand Up @@ -69,8 +65,6 @@ test_that("SerDe of list of primitive types", {
})

test_that("SerDe of list of lists", {
skip_on_cran()

x <- list(list(1L, 2L, 3L), list(1, 2, 3),
list(TRUE, FALSE), list("a", "b", "c"))
y <- callJStatic("SparkRHandler", "echo", x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,11 @@
context("Windows-specific tests")

test_that("sparkJars tag in SparkContext", {
skip_on_cran()

if (.Platform$OS.type != "windows") {
if (!is_windows()) {
skip("This test is only for Windows, skipped")
}

testOutput <- launchScript("ECHO", "a/b/c", wait = TRUE)
abcPath <- testOutput[1]
expect_equal(abcPath, "a\\b\\c")
})

message("--- End test (Windows) ", as.POSIXct(Sys.time(), tz = "GMT"))
message("elapsed ", (proc.time() - timer_ptm)[3])
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext",
mockFile <- c("Spark is pretty.", "Spark is awesome.")

test_that("saveAsObjectFile()/objectFile() following textFile() works", {
skip_on_cran()

fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
writeLines(mockFile, fileName1)
Expand All @@ -40,8 +38,6 @@ test_that("saveAsObjectFile()/objectFile() following textFile() works", {
})

test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
skip_on_cran()

fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")

l <- list(1, 2, 3)
Expand All @@ -54,8 +50,6 @@ test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
})

test_that("saveAsObjectFile()/objectFile() following RDD transformations works", {
skip_on_cran()

fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
writeLines(mockFile, fileName1)
Expand All @@ -80,8 +74,6 @@ test_that("saveAsObjectFile()/objectFile() following RDD transformations works",
})

test_that("saveAsObjectFile()/objectFile() works with multiple paths", {
skip_on_cran()

fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ rdd <- parallelize(sc, nums, 2L)
mockFile <- c("Spark is pretty.", "Spark is awesome.")

test_that("union on two RDDs", {
skip_on_cran()

actual <- collectRDD(unionRDD(rdd, rdd))
expect_equal(actual, as.list(rep(nums, 2)))

Expand All @@ -53,8 +51,6 @@ test_that("union on two RDDs", {
})

test_that("cogroup on two RDDs", {
skip_on_cran()

rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
Expand All @@ -73,8 +69,6 @@ test_that("cogroup on two RDDs", {
})

test_that("zipPartitions() on RDDs", {
skip_on_cran()

rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ nums <- 1:2
rrdd <- parallelize(sc, nums, 2L)

test_that("using broadcast variable", {
skip_on_cran()

randomMat <- matrix(nrow = 10, ncol = 10, data = rnorm(100))
randomMatBr <- broadcastRDD(sc, randomMat)

Expand All @@ -40,8 +38,6 @@ test_that("using broadcast variable", {
})

test_that("without using broadcast variable", {
skip_on_cran()

randomMat <- matrix(nrow = 10, ncol = 10, data = rnorm(100))

useBroadcast <- function(x) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
context("functions in client.R")

test_that("adding spark-testing-base as a package works", {
skip_on_cran()

args <- generateSparkSubmitArgs("", "", "", "",
"holdenk:spark-testing-base:1.3.0_0.0.5")
expect_equal(gsub("[[:space:]]", "", args),
Expand All @@ -28,22 +26,16 @@ test_that("adding spark-testing-base as a package works", {
})

test_that("no package specified doesn't add packages flag", {
skip_on_cran()

args <- generateSparkSubmitArgs("", "", "", "", "")
expect_equal(gsub("[[:space:]]", "", args),
"")
})

test_that("multiple packages don't produce a warning", {
skip_on_cran()

expect_warning(generateSparkSubmitArgs("", "", "", "", c("A", "B")), NA)
})

test_that("sparkJars sparkPackages as character vectors", {
skip_on_cran()

args <- generateSparkSubmitArgs("", "", c("one.jar", "two.jar", "three.jar"), "",
c("com.databricks:spark-avro_2.10:2.0.1"))
expect_match(args, "--jars one.jar,two.jar,three.jar")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
context("test functions in sparkR.R")

test_that("Check masked functions", {
skip_on_cran()

# Check that we are not masking any new function from base, stats, testthat unexpectedly
# NOTE: We should avoid adding entries to *namesOfMaskedCompletely* as masked functions make it
# hard for users to use base R functions. Please check when in doubt.
Expand Down Expand Up @@ -57,8 +55,6 @@ test_that("Check masked functions", {
})

test_that("repeatedly starting and stopping SparkR", {
skip_on_cran()

for (i in 1:4) {
sc <- suppressWarnings(sparkR.init(master = sparkRTestMaster))
rdd <- parallelize(sc, 1:20, 2L)
Expand All @@ -77,8 +73,6 @@ test_that("repeatedly starting and stopping SparkSession", {
})

test_that("rdd GC across sparkR.stop", {
skip_on_cran()

sc <- sparkR.sparkContext(master = sparkRTestMaster) # sc should get id 0
rdd1 <- parallelize(sc, 1:20, 2L) # rdd1 should get id 1
rdd2 <- parallelize(sc, 1:10, 2L) # rdd2 should get id 2
Expand All @@ -102,8 +96,6 @@ test_that("rdd GC across sparkR.stop", {
})

test_that("job group functions can be called", {
skip_on_cran()

sc <- sparkR.sparkContext(master = sparkRTestMaster)
setJobGroup("groupId", "job description", TRUE)
cancelJobGroup("groupId")
Expand All @@ -116,16 +108,12 @@ test_that("job group functions can be called", {
})

test_that("utility function can be called", {
skip_on_cran()

sparkR.sparkContext(master = sparkRTestMaster)
setLogLevel("ERROR")
sparkR.session.stop()
})

test_that("getClientModeSparkSubmitOpts() returns spark-submit args from whitelist", {
skip_on_cran()

e <- new.env()
e[["spark.driver.memory"]] <- "512m"
ops <- getClientModeSparkSubmitOpts("sparkrmain", e)
Expand Down Expand Up @@ -153,8 +141,6 @@ test_that("getClientModeSparkSubmitOpts() returns spark-submit args from whiteli
})

test_that("sparkJars sparkPackages as comma-separated strings", {
skip_on_cran()

expect_warning(processSparkJars(" a, b "))
jars <- suppressWarnings(processSparkJars(" a, b "))
expect_equal(lapply(jars, basename), list("a", "b"))
Expand Down Expand Up @@ -182,8 +168,6 @@ test_that("spark.lapply should perform simple transforms", {
})

test_that("add and get file to be downloaded with Spark job on every node", {
skip_on_cran()

sparkR.sparkContext(master = sparkRTestMaster)
# Test add file.
path <- tempfile(pattern = "hello", fileext = ".txt")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ nums <- 1:2
rdd <- parallelize(sc, nums, 2L)

test_that("include inside function", {
skip_on_cran()

# Only run the test if plyr is installed.
if ("plyr" %in% rownames(installed.packages())) {
suppressPackageStartupMessages(library(plyr))
Expand All @@ -44,8 +42,6 @@ test_that("include inside function", {
})

test_that("use include package", {
skip_on_cran()

# Only run the test if plyr is installed.
if ("plyr" %in% rownames(installed.packages())) {
suppressPackageStartupMessages(library(plyr))
Expand Down
File renamed without changes.
Loading

0 comments on commit 746b37d

Please sign in to comment.