Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21045][PYSPARK]Fixed executor blocked because traceback.format_exc throw UnicodeDecodeError #18262

Closed
wants to merge 52 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
81f5db5
Fixed executor blocked because traceback.format_exc throw UnicodeDeco…
Jun 10, 2017
9893efd
check by python version
Jun 13, 2017
ee793ec
[SPARK-20211][SQL] Fix the Precision and Scale of Decimal Values when…
gatorsmile Jun 10, 2017
cdb26eb
[SPARK-20620][TEST] Improve some unit tests for NullExpressionsSuite …
10110346 Jun 10, 2017
0f2da15
[SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN
felixcheung Jun 11, 2017
e5940a0
[SPARK-21000][MESOS] Add Mesos labels support to the Spark Dispatcher
Jun 11, 2017
ef661f6
[SPARK-20935][STREAMING] Always close WriteAheadLog and make it idemp…
HyukjinKwon Jun 11, 2017
ebb284f
[SPARK-13933][BUILD] Update hadoop-2.7 profile's curator version to 2…
wangyum Jun 11, 2017
d1dde23
[SPARK-20877][SPARKR][FOLLOWUP] clean up after test move
felixcheung Jun 11, 2017
160cbe7
Fixed typo in sql.functions
Jun 11, 2017
9713c7c
[SPARK-21031][SQL] Add `alterTableStats` to store spark's stats and l…
wzhfy Jun 12, 2017
d262b77
[SPARK-18891][SQL] Support for Scala Map collection types
michalsenkyr Jun 12, 2017
9bbd3ca
[SPARK-18891][SQL] Support for specific Java List subtypes
michalsenkyr Jun 12, 2017
ae461e9
[SPARK-20715] Store MapStatuses only in MapOutputTracker, not Shuffle…
JoshRosen Jun 12, 2017
1b61edd
[SPARK-20665][SQL][FOLLOW-UP] Move test case to MathExpressionsSuite
10110346 Jun 12, 2017
c8c8ba8
[DOCS] Fix error: ambiguous reference to overloaded definition
ZiyueHuang Jun 12, 2017
f02667c
[SPARK-21041][SQL] SparkSession.range should be consistent with Spark…
dongjoon-hyun Jun 12, 2017
b9d80e8
[SPARK-21046][SQL] simplify the array offset and length in ColumnVector
cloud-fan Jun 12, 2017
11755f1
[SPARK-17914][SQL] Fix parsing of timestamp strings with nanoseconds
Jun 12, 2017
160fcdb
[SPARK-20345][SQL] Fix STS error handling logic on HiveSQLException
dongjoon-hyun Jun 12, 2017
26f056c
[SPARK-21059][SQL] LikeSimplification can NPE on null pattern
rxin Jun 12, 2017
3cd45c8
[SPARK-21050][ML] Word2vec persistence overflow bug fix
jkbradley Jun 12, 2017
66bd772
[SPARK-20979][SS] Add RateSource to generate values for tests and ben…
zsxwing Jun 12, 2017
cb8a343
Revert "[SPARK-21046][SQL] simplify the array offset and length in Co…
cloud-fan Jun 13, 2017
a071d75
[SPARK-19910][SQL] `stack` should not reject NULL values due to type …
dongjoon-hyun Jun 13, 2017
a01e076
[TEST][SPARKR][CORE] Fix broken SparkSubmitSuite
felixcheung Jun 13, 2017
0d248cf
[SPARK-20920][SQL] ForkJoinPool pools are leaked when writing hive ta…
srowen Jun 13, 2017
8dcca70
[SPARK-21006][TESTS][FOLLOW-UP] Some Worker's RpcEnv is leaked in Wor…
10110346 Jun 13, 2017
f163e86
[SPARK-21039][SPARK CORE] Use treeAggregate instead of aggregate in D…
rishabhbhardwaj Jun 13, 2017
5621e4a
[SPARK-21060][WEB-UI] Css style about paging function is error in the…
Jun 13, 2017
df01660
[SPARK-21064][CORE][TEST] Fix the default value bug in NettyBlockTran…
Jun 13, 2017
bf139ca
[SPARK-21051][SQL] Add hash map metrics to aggregate
viirya Jun 13, 2017
60bc9ff
[SPARK-21016][CORE] Improve code fault tolerance for converting strin…
10110346 Jun 13, 2017
fb1a870
[SPARK-12552][CORE] Correctly count the driver resource when recoveri…
jerryshao Jun 14, 2017
f71145a
[SPARK-20986][SQL] Reset table's statistics after PruneFileSourcePart…
lianhuiwang Jun 14, 2017
1f71f40
[SPARK-19753][CORE] Un-register all shuffle output on a host in case …
Jun 14, 2017
6d097f7
[SPARK-20754][SQL][FOLLOWUP] Add Function Alias For MOD/POSITION.
wangyum Jun 14, 2017
c19ca9f
[SPARK-21057][ML] Do not use a PascalDistribution in countApprox
srowen Jun 14, 2017
12e068d
[SPARK-21085][SQL] Failed to read the partitioned table created by Sp…
gatorsmile Jun 14, 2017
5a54d6b
[SPARK-21089][SQL] Fix DESC EXTENDED/FORMATTED to Show Table Properties
gatorsmile Jun 14, 2017
7686f02
Revert "[SPARK-20941][SQL] Fix SubqueryExec Reuse"
gatorsmile Jun 14, 2017
24b409d
[SPARK-21091][SQL] Move constraint code into QueryPlanConstraints
rxin Jun 14, 2017
3c1f793
[SPARK-19900][CORE] Remove driver when relaunching.
liyichao Jun 15, 2017
b897c60
[SPARK-21092][SQL] Wire SQLConf in logical plan and expressions
rxin Jun 15, 2017
fb8bcf1
[SPARK-20980][SQL] Rename `wholeFile` to `multiLine` for both CSV and…
gatorsmile Jun 15, 2017
73dae47
[SPARK-18016][SQL][CATALYST] Code Generation: Constant Pool Limit - C…
Jun 15, 2017
b89d66a
[SPARK-20980][DOCS] update doc to reflect multiLine change
felixcheung Jun 15, 2017
fea7e71
[SPARK-16251][SPARK-20200][CORE][TEST] Flaky test: org.apache.spark.r…
jiangxb1987 Jun 15, 2017
6e74008
[SPARK-20434][YARN][CORE] Move Hadoop delegation token code from yarn…
Jun 15, 2017
3c77234
[SPARK-21112][SQL] ALTER TABLE SET TBLPROPERTIES should not overwrite…
gatorsmile Jun 16, 2017
6bbd09c
[SPARK-21072][SQL] TreeNode.mapChildren should only apply to the chil…
ConeyLiu Jun 16, 2017
8120192
add test for SPARK-21045
Jun 16, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions R/pkg/.Rbuildignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
^README\.Rmd$
^src-native$
^html$
^tests/fulltests/*
6 changes: 3 additions & 3 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ setMethod("toDF", signature(x = "RDD"),
#'
#' Loads a JSON file, returning the result as a SparkDataFrame
#' By default, (\href{http://jsonlines.org/}{JSON Lines text format or newline-delimited JSON}
#' ) is supported. For JSON (one record per file), set a named property \code{wholeFile} to
#' ) is supported. For JSON (one record per file), set a named property \code{multiLine} to
#' \code{TRUE}.
#' It goes through the entire dataset once to determine the schema.
#'
Expand All @@ -348,7 +348,7 @@ setMethod("toDF", signature(x = "RDD"),
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' df <- read.json(path, wholeFile = TRUE)
#' df <- read.json(path, multiLine = TRUE)
#' df <- jsonFile(path)
#' }
#' @name read.json
Expand Down Expand Up @@ -598,7 +598,7 @@ tableToDF <- function(tableName) {
#' df1 <- read.df("path/to/file.json", source = "json")
#' schema <- structType(structField("name", "string"),
#' structField("info", "map<string,double>"))
#' df2 <- read.df(mapTypeJsonPath, "json", schema, wholeFile = TRUE)
#' df2 <- read.df(mapTypeJsonPath, "json", schema, multiLine = TRUE)
#' df3 <- loadDF("data/test_table", "parquet", mergeSchema = "true")
#' }
#' @name read.df
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/install.R
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ hadoopVersionName <- function(hadoopVersion) {
# The implementation refers to appdirs package: https://pypi.python.org/pypi/appdirs and
# adapt to Spark context
sparkCachePath <- function() {
if (.Platform$OS.type == "windows") {
if (is_windows()) {
winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA)
if (is.na(winAppPath)) {
stop(paste("%LOCALAPPDATA% not found.",
Expand Down
8 changes: 2 additions & 6 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -908,10 +908,6 @@ isAtomicLengthOne <- function(x) {
is.atomic(x) && length(x) == 1
}

is_cran <- function() {
!identical(Sys.getenv("NOT_CRAN"), "true")
}

is_windows <- function() {
.Platform$OS.type == "windows"
}
Expand All @@ -920,6 +916,6 @@ hadoop_home_set <- function() {
!identical(Sys.getenv("HADOOP_HOME"), "")
}

not_cran_or_windows_with_hadoop <- function() {
!is_cran() && (!is_windows() || hadoop_home_set())
windows_with_hadoop <- function() {
!is_windows() || hadoop_home_set()
}
90 changes: 90 additions & 0 deletions R/pkg/inst/tests/testthat/test_basic.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

context("basic tests for CRAN")

test_that("create DataFrame from list or data.frame", {
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)

i <- 4
df <- createDataFrame(data.frame(dummy = 1:i))
expect_equal(count(df), i)

l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
df <- createDataFrame(l)
expect_equal(columns(df), c("a", "b"))

a <- 1:3
b <- c("a", "b", "c")
ldf <- data.frame(a, b)
df <- createDataFrame(ldf)
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
expect_equal(count(df), 3)
ldf2 <- collect(df)
expect_equal(ldf$a, ldf2$a)

mtcarsdf <- createDataFrame(mtcars)
expect_equivalent(collect(mtcarsdf), mtcars)

bytes <- as.raw(c(1, 2, 3))
df <- createDataFrame(list(list(bytes)))
expect_equal(collect(df)[[1]][[1]], bytes)

sparkR.session.stop()
})

test_that("spark.glm and predict", {
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)

training <- suppressWarnings(createDataFrame(iris))
# gaussian family
model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species)
prediction <- predict(model, training)
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
vals <- collect(select(prediction, "prediction"))
rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)

# Gamma family
x <- runif(100, -1, 1)
y <- rgamma(100, rate = 10 / exp(0.5 + 1.2 * x), shape = 10)
df <- as.DataFrame(as.data.frame(list(x = x, y = y)))
model <- glm(y ~ x, family = Gamma, df)
out <- capture.output(print(summary(model)))
expect_true(any(grepl("Dispersion parameter for gamma family", out)))

# tweedie family
model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
family = "tweedie", var.power = 1.2, link.power = 0.0)
prediction <- predict(model, training)
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
vals <- collect(select(prediction, "prediction"))

# manual calculation of the R predicted values to avoid dependence on statmod
#' library(statmod)
#' rModel <- glm(Sepal.Width ~ Sepal.Length + Species, data = iris,
#' family = tweedie(var.power = 1.2, link.power = 0.0))
#' print(coef(rModel))

rCoef <- c(0.6455409, 0.1169143, -0.3224752, -0.3282174)
rVals <- exp(as.numeric(model.matrix(Sepal.Width ~ Sepal.Length + Species,
data = iris) %*% rCoef))
expect_true(all(abs(rVals - vals) < 1e-5), rVals - vals)

sparkR.session.stop()
})
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ context("SerDe functionality")
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)

test_that("SerDe of primitive types", {
skip_on_cran()

x <- callJStatic("SparkRHandler", "echo", 1L)
expect_equal(x, 1L)
expect_equal(class(x), "integer")
Expand All @@ -40,8 +38,6 @@ test_that("SerDe of primitive types", {
})

test_that("SerDe of list of primitive types", {
skip_on_cran()

x <- list(1L, 2L, 3L)
y <- callJStatic("SparkRHandler", "echo", x)
expect_equal(x, y)
Expand Down Expand Up @@ -69,8 +65,6 @@ test_that("SerDe of list of primitive types", {
})

test_that("SerDe of list of lists", {
skip_on_cran()

x <- list(list(1L, 2L, 3L), list(1, 2, 3),
list(TRUE, FALSE), list("a", "b", "c"))
y <- callJStatic("SparkRHandler", "echo", x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,11 @@
context("Windows-specific tests")

test_that("sparkJars tag in SparkContext", {
skip_on_cran()

if (.Platform$OS.type != "windows") {
if (!is_windows()) {
skip("This test is only for Windows, skipped")
}

testOutput <- launchScript("ECHO", "a/b/c", wait = TRUE)
abcPath <- testOutput[1]
expect_equal(abcPath, "a\\b\\c")
})

message("--- End test (Windows) ", as.POSIXct(Sys.time(), tz = "GMT"))
message("elapsed ", (proc.time() - timer_ptm)[3])
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext",
mockFile <- c("Spark is pretty.", "Spark is awesome.")

test_that("saveAsObjectFile()/objectFile() following textFile() works", {
skip_on_cran()

fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
writeLines(mockFile, fileName1)
Expand All @@ -40,8 +38,6 @@ test_that("saveAsObjectFile()/objectFile() following textFile() works", {
})

test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
skip_on_cran()

fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")

l <- list(1, 2, 3)
Expand All @@ -54,8 +50,6 @@ test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
})

test_that("saveAsObjectFile()/objectFile() following RDD transformations works", {
skip_on_cran()

fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
writeLines(mockFile, fileName1)
Expand All @@ -80,8 +74,6 @@ test_that("saveAsObjectFile()/objectFile() following RDD transformations works",
})

test_that("saveAsObjectFile()/objectFile() works with multiple paths", {
skip_on_cran()

fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ rdd <- parallelize(sc, nums, 2L)
mockFile <- c("Spark is pretty.", "Spark is awesome.")

test_that("union on two RDDs", {
skip_on_cran()

actual <- collectRDD(unionRDD(rdd, rdd))
expect_equal(actual, as.list(rep(nums, 2)))

Expand All @@ -53,8 +51,6 @@ test_that("union on two RDDs", {
})

test_that("cogroup on two RDDs", {
skip_on_cran()

rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
Expand All @@ -73,8 +69,6 @@ test_that("cogroup on two RDDs", {
})

test_that("zipPartitions() on RDDs", {
skip_on_cran()

rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ nums <- 1:2
rrdd <- parallelize(sc, nums, 2L)

test_that("using broadcast variable", {
skip_on_cran()

randomMat <- matrix(nrow = 10, ncol = 10, data = rnorm(100))
randomMatBr <- broadcastRDD(sc, randomMat)

Expand All @@ -40,8 +38,6 @@ test_that("using broadcast variable", {
})

test_that("without using broadcast variable", {
skip_on_cran()

randomMat <- matrix(nrow = 10, ncol = 10, data = rnorm(100))

useBroadcast <- function(x) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
context("functions in client.R")

test_that("adding spark-testing-base as a package works", {
skip_on_cran()

args <- generateSparkSubmitArgs("", "", "", "",
"holdenk:spark-testing-base:1.3.0_0.0.5")
expect_equal(gsub("[[:space:]]", "", args),
Expand All @@ -28,22 +26,16 @@ test_that("adding spark-testing-base as a package works", {
})

test_that("no package specified doesn't add packages flag", {
skip_on_cran()

args <- generateSparkSubmitArgs("", "", "", "", "")
expect_equal(gsub("[[:space:]]", "", args),
"")
})

test_that("multiple packages don't produce a warning", {
skip_on_cran()

expect_warning(generateSparkSubmitArgs("", "", "", "", c("A", "B")), NA)
})

test_that("sparkJars sparkPackages as character vectors", {
skip_on_cran()

args <- generateSparkSubmitArgs("", "", c("one.jar", "two.jar", "three.jar"), "",
c("com.databricks:spark-avro_2.10:2.0.1"))
expect_match(args, "--jars one.jar,two.jar,three.jar")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
context("test functions in sparkR.R")

test_that("Check masked functions", {
skip_on_cran()

# Check that we are not masking any new function from base, stats, testthat unexpectedly
# NOTE: We should avoid adding entries to *namesOfMaskedCompletely* as masked functions make it
# hard for users to use base R functions. Please check when in doubt.
Expand Down Expand Up @@ -57,8 +55,6 @@ test_that("Check masked functions", {
})

test_that("repeatedly starting and stopping SparkR", {
skip_on_cran()

for (i in 1:4) {
sc <- suppressWarnings(sparkR.init(master = sparkRTestMaster))
rdd <- parallelize(sc, 1:20, 2L)
Expand All @@ -77,8 +73,6 @@ test_that("repeatedly starting and stopping SparkSession", {
})

test_that("rdd GC across sparkR.stop", {
skip_on_cran()

sc <- sparkR.sparkContext(master = sparkRTestMaster) # sc should get id 0
rdd1 <- parallelize(sc, 1:20, 2L) # rdd1 should get id 1
rdd2 <- parallelize(sc, 1:10, 2L) # rdd2 should get id 2
Expand All @@ -102,8 +96,6 @@ test_that("rdd GC across sparkR.stop", {
})

test_that("job group functions can be called", {
skip_on_cran()

sc <- sparkR.sparkContext(master = sparkRTestMaster)
setJobGroup("groupId", "job description", TRUE)
cancelJobGroup("groupId")
Expand All @@ -116,16 +108,12 @@ test_that("job group functions can be called", {
})

test_that("utility function can be called", {
skip_on_cran()

sparkR.sparkContext(master = sparkRTestMaster)
setLogLevel("ERROR")
sparkR.session.stop()
})

test_that("getClientModeSparkSubmitOpts() returns spark-submit args from whitelist", {
skip_on_cran()

e <- new.env()
e[["spark.driver.memory"]] <- "512m"
ops <- getClientModeSparkSubmitOpts("sparkrmain", e)
Expand Down Expand Up @@ -153,8 +141,6 @@ test_that("getClientModeSparkSubmitOpts() returns spark-submit args from whiteli
})

test_that("sparkJars sparkPackages as comma-separated strings", {
skip_on_cran()

expect_warning(processSparkJars(" a, b "))
jars <- suppressWarnings(processSparkJars(" a, b "))
expect_equal(lapply(jars, basename), list("a", "b"))
Expand Down Expand Up @@ -182,8 +168,6 @@ test_that("spark.lapply should perform simple transforms", {
})

test_that("add and get file to be downloaded with Spark job on every node", {
skip_on_cran()

sparkR.sparkContext(master = sparkRTestMaster)
# Test add file.
path <- tempfile(pattern = "hello", fileext = ".txt")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ nums <- 1:2
rdd <- parallelize(sc, nums, 2L)

test_that("include inside function", {
skip_on_cran()

# Only run the test if plyr is installed.
if ("plyr" %in% rownames(installed.packages())) {
suppressPackageStartupMessages(library(plyr))
Expand All @@ -44,8 +42,6 @@ test_that("include inside function", {
})

test_that("use include package", {
skip_on_cran()

# Only run the test if plyr is installed.
if ("plyr" %in% rownames(installed.packages())) {
suppressPackageStartupMessages(library(plyr))
Expand Down
Loading